Initial import
[darksolar] / node_modules / mongodb / lib / mongodb / connection / repl_set.js
1 var Connection = require('./connection').Connection,
2   DbCommand = require('../commands/db_command').DbCommand,
3   MongoReply = require('../responses/mongo_reply').MongoReply,
4   debug = require('util').debug,
5   EventEmitter = require('events').EventEmitter,
6   inherits = require('util').inherits,
7   inspect = require('util').inspect,
8   Server = require('./server').Server,  
9   PingStrategy = require('./strategies/ping_strategy').PingStrategy,
10   StatisticsStrategy = require('./strategies/statistics_strategy').StatisticsStrategy;
11
12 const STATE_STARTING_PHASE_1 = 0;
13 const STATE_PRIMARY = 1;
14 const STATE_SECONDARY = 2;
15 const STATE_RECOVERING = 3;
16 const STATE_FATAL_ERROR = 4;
17 const STATE_STARTING_PHASE_2 = 5;
18 const STATE_UNKNOWN = 6;
19 const STATE_ARBITER = 7;
20 const STATE_DOWN = 8;
21 const STATE_ROLLBACK = 9;
22
23 /**
24  * ReplSet constructor provides replicaset functionality
25  *
26  * Options
27  *  - **ha** {Boolean, default:false}, turn on high availability.
28  *  - **haInterval** {Number, default:2000}, time between each replicaset status check.
29  *  - **reconnectWait** {Number, default:1000}, time to wait in miliseconds before attempting reconnect.
30  *  - **retries** {Number, default:30}, number of times to attempt a replicaset reconnect.
31  *  - **rs_name** {String}, the name of the replicaset to connect to. 
32  *  - **readPreference** {String}, the prefered read preference (Server.READ_PRIMARY, Server.READ_SECONDARY, Server.READ_SECONDARY_ONLY).
33  *  - **read_secondary** {Boolean, deprecated}, allow reads from secondary.
34  *  - **strategy** {String, default:null}, selection strategy for reads choose between (ping and statistical, default is round-robin)
35  *
36  * @class Represents a Replicaset Configuration
37  * @param {Array} list of server objects participating in the replicaset. 
38  * @param {Object} [options] additional options for the collection.
39  */
40 var ReplSet = exports.ReplSet = function(servers, options) {
41   // Set up basic
42   if(!(this instanceof ReplSet))
43     return new ReplSet(servers, options);
44
45   // Set up event emitter
46   EventEmitter.call(this);
47
48   var self = this;
49   // Contains the master server entry
50   this.options = options == null ? {} : options;
51   this.reconnectWait = this.options["reconnectWait"] != null ? this.options["reconnectWait"] : 1000;
52   this.retries = this.options["retries"] != null ? this.options["retries"] : 30;
53   this.replicaSet = this.options["rs_name"];
54
55   // Are we allowing reads from secondaries ?
56   this.readSecondary = this.options["read_secondary"];
57   this.slaveOk = true;
58   this.closedConnectionCount = 0;
59   this._used = false;  
60
61   // Default poolSize for new server instances
62   this.poolSize = this.options.poolSize == null ? 1 : this.options.poolSize;
63   this._currentServerChoice = 0;
64
65   // Set up ssl connections
66   this.ssl = this.options.ssl == null ? false : this.options.ssl;
67
68   // Just keeps list of events we allow
69   this.eventHandlers = {error:[], parseError:[], poolReady:[], message:[], close:[], timeout:[]};
70   // Internal state of server connection
71   this._serverState = 'disconnected';
72   // Read preference
73   this._readPreference = null;
74   // Number of initalized severs
75   this._numberOfServersLeftToInitialize = 0;
76   // Do we record server stats or not
77   this.recordQueryStats = false;
78     
79   // Get the readPreference
80   var readPreference = this.options['readPreference'];  
81   // Read preference setting
82   if(readPreference != null) {
83     if(readPreference != Server.READ_PRIMARY && readPreference != Server.READ_SECONDARY_ONLY
84       && readPreference != Server.READ_SECONDARY) {
85         throw new Error("Illegal readPreference mode specified, " + readPreference);
86     }
87     
88     // Set read Preference
89     this._readPreference = readPreference;
90   } else {
91     this._readPreference = null;        
92   }
93   
94   // Strategy for picking a secondary
95   // this.strategy = this.options['strategy'] == null ? 'statistical' : this.options['strategy'];  
96   this.strategy = this.options['strategy'];
97   // Make sure strategy is one of the two allowed
98   if(this.strategy != null && (this.strategy != 'ping' && this.strategy != 'statistical')) throw new Error("Only ping or statistical strategies allowed");  
99   // Let's set up our strategy object for picking secodaries
100   if(this.strategy == 'ping') {
101     // Create a new instance
102     this.strategyInstance = new PingStrategy(this);
103   } else if(this.strategy == 'statistical') {
104     // Set strategy as statistical
105     this.strategyInstance = new StatisticsStrategy(this);
106     // Add enable query information
107     this.enableRecordQueryStats(true);
108   }  
109   
110   // Set default connection pool options
111   this.socketOptions = this.options.socketOptions != null ? this.options.socketOptions : {};  
112
113   // Set up logger if any set
114   this.logger = this.options.logger != null 
115     && (typeof this.options.logger.debug == 'function') 
116     && (typeof this.options.logger.error == 'function') 
117     && (typeof this.options.logger.debug == 'function') 
118       ? this.options.logger : {error:function(message, object) {}, log:function(message, object) {}, debug:function(message, object) {}};
119   
120   // Ensure all the instances are of type server and auto_reconnect is false
121   if(!Array.isArray(servers) || servers.length == 0) {
122     throw Error("The parameter must be an array of servers and contain at least one server");
123   } else if(Array.isArray(servers) || servers.length > 0) {
124     var count = 0;
125     servers.forEach(function(server) {
126       if(server instanceof Server) count = count + 1;
127       // Ensure no server has reconnect on
128       server.options.auto_reconnect = false;
129     });
130
131     if(count < servers.length) {
132       throw Error("All server entries must be of type Server");
133     } else {
134       this.servers = servers;
135     }
136   }  
137     
138   // Enabled ha
139   this.haEnabled = this.options['ha'] == null ? false : this.options['ha'];
140   // How often are we checking for new servers in the replicaset
141   this.replicasetStatusCheckInterval = this.options['haInterval'] == null ? 2000 : this.options['haInterval'];
142   this._replicasetTimeoutId = null;
143 };
144
145 /**
146  * @ignore
147  */
148 inherits(ReplSet, EventEmitter);
149
150 /**
151  * @ignore
152  */
153 // Allow setting the read preference at the replicaset level
154 ReplSet.prototype.setReadPreference = function(preference) {
155   // Set read preference
156   this._readPreference = preference;
157   // Ensure slaveOk is correct for secodnaries read preference and tags
158   if((this._readPreference == Server.READ_SECONDARY || this._readPreference == Server.READ_SECONDARY_ONLY) 
159     || (this._readPreference != null && typeof this._readPreference == 'object')) {
160     this.slaveOk = true;
161   }
162 }
163
164 /**
165  * @ignore
166  */
167 // Return the used state
168 ReplSet.prototype._isUsed = function() {  
169   return this._used;
170 }
171
172 /**
173  * @ignore
174  */
175 ReplSet.prototype.setTarget = function(target) {
176   this.target = target;
177 };
178
179 /**
180  * @ignore
181  */
182 ReplSet.prototype.isConnected = function() {
183   // Return the state of the replicaset server
184   return this.primary != null && this._state.master != null && this._state.master.isConnected();
185 }
186
187 /**
188  * @ignore
189  */
190 Server.prototype.isSetMember = function() {
191   return false;
192 }
193
194 /**
195  * @ignore
196  */
197 ReplSet.prototype.isPrimary = function(config) {
198   return this.readSecondary && this.secondaries.length > 0 ? false : true;
199 }
200
201 /**
202  * @ignore
203  */
204 ReplSet.prototype.isReadPrimary = ReplSet.prototype.isPrimary;
205
206 /**
207  * @ignore
208  */
209 // Clean up dead connections
210 var cleanupConnections = ReplSet.cleanupConnections = function(connections, addresses, byTags) {
211   // Ensure we don't have entries in our set with dead connections
212   var keys = Object.keys(connections);
213   for(var i = 0; i < keys.length; i++) {
214     var server = connections[keys[i]];
215     // If it's not connected remove it from the list
216     if(!server.isConnected()) {
217       // Remove from connections and addresses
218       delete connections[keys[i]];
219       delete addresses[keys[i]];
220       // Clean up tags if needed
221       if(server.tags != null && typeof server.tags === 'object') {
222         cleanupTags(server, byTags);
223       }
224     }
225   }  
226 }
227
228 /**
229  * @ignore
230  */
231 var cleanupTags = ReplSet._cleanupTags = function(server, byTags) {
232   var serverTagKeys = Object.keys(server.tags);
233   // Iterate over all server tags and remove any instances for that tag that matches the current
234   // server
235   for(var i = 0; i < serverTagKeys.length; i++) {
236     // Fetch the value for the tag key
237     var value = server.tags[serverTagKeys[i]];
238
239     // If we got an instance of the server
240     if(byTags[serverTagKeys[i]] != null 
241       && byTags[serverTagKeys[i]][value] != null  
242       && Array.isArray(byTags[serverTagKeys[i]][value])) {
243       // List of clean servers
244       var cleanInstances = [];
245       // We got instances for the particular tag set
246       var instances = byTags[serverTagKeys[i]][value];
247       for(var j = 0, jlen = instances.length; j < jlen; j++) {
248         var serverInstance = instances[j];
249         // If we did not find an instance add it to the clean instances
250         if((serverInstance.host + ":" + serverInstance.port) !== (server.host + ":" + server.port)) {
251           cleanInstances.push(serverInstance);
252         }
253       }
254       
255       // Update the byTags list
256       byTags[serverTagKeys[i]][value] = cleanInstances;
257     }
258   }
259 }
260
261 /**
262  * @ignore
263  */
264 ReplSet.prototype.allServerInstances = function() {
265   var self = this;
266   // Close all the servers (concatenate entire list of servers first for ease)
267   var allServers = self._state.master != null ? [self._state.master] : [];
268
269   // Secondary keys
270   var keys = Object.keys(self._state.secondaries);
271   // Add all secondaries
272   for(var i = 0; i < keys.length; i++) {
273     allServers.push(self._state.secondaries[keys[i]]);
274   }
275
276   // Arbiter keys
277   var keys = Object.keys(self._state.arbiters);
278   // Add all arbiters
279   for(var i = 0; i < keys.length; i++) {
280     allServers.push(self._state.arbiters[keys[i]]);
281   }
282
283   // Passive keys
284   var keys = Object.keys(self._state.passives);
285   // Add all arbiters
286   for(var i = 0; i < keys.length; i++) {
287     allServers.push(self._state.passives[keys[i]]);
288   }
289
290   // Return complete list of all servers
291   return allServers;
292 }
293
294 /**
295  * @ignore
296  */
297 // Ensure no callback is left hanging when we have an error
298 var __executeAllCallbacksWithError = function(dbInstance, error) {
299   var keys = Object.keys(dbInstance._callBackStore._notReplied);
300   // Iterate over all callbacks
301   for(var i = 0; i < keys.length; i++) {
302     // Delete info object
303     delete dbInstance._callBackStore._notReplied[keys[i]];
304     // Emit the error
305     dbInstance._callBackStore.emit(keys[i], error);
306   }
307 }
308
309 /**
310  * @ignore
311  */
312 ReplSet.prototype.connect = function(parent, options, callback) {
313   var self = this;
314   var dateStamp = new Date().getTime();
315   if('function' === typeof options) callback = options, options = {};  
316   if(options == null) options = {};
317   if(!('function' === typeof callback)) callback = null;
318
319   // Keep reference to parent
320   this.db = parent;
321   // Set server state to connecting
322   this._serverState = 'connecting';
323   // Reference to the instance
324   var replSetSelf = this;
325   var serverConnections = this.servers;
326   // Ensure parent can do a slave query if it's set
327   parent.slaveOk = this.slaveOk ? this.slaveOk : parent.slaveOk;
328   // Number of total servers that need to initialized (known servers)
329   this._numberOfServersLeftToInitialize = serverConnections.length;
330
331   // Clean up state
332   replSetSelf._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'byTags':{}, 'setName':null, 'errorMessages':[], 'members':[]};
333
334   // Create a connection handler
335   // self.connectionHandler = null != self.connectionHandler ? self.connectionHandler : function(instanceServer) {
336   self.connectionHandler = function(instanceServer) {
337     return function(err, result) {
338       // Remove a server from the list of intialized servers we need to perform
339       self._numberOfServersLeftToInitialize = self._numberOfServersLeftToInitialize - 1;
340
341       if(err != null) {
342         self._state.errors[instanceServer.name] = instanceServer;
343       }
344             
345       // Add enable query information
346       instanceServer.enableRecordQueryStats(replSetSelf.recordQueryStats);
347       
348       if(err == null && result.documents[0].hosts != null) {
349         // Fetch the isMaster command result
350         var document = result.documents[0];
351         // Break out the results
352         var setName = document.setName;
353         var isMaster = document.ismaster;
354         var secondary = document.secondary;
355         var passive = document.passive;
356         var arbiterOnly = document.arbiterOnly;
357         var hosts = Array.isArray(document.hosts) ? document.hosts : [];
358         var arbiters = Array.isArray(document.arbiters) ? document.arbiters : [];
359         var passives = Array.isArray(document.passives) ? document.passives : [];
360         var tags = document.tags ? document.tags : {};
361         var primary = document.primary;
362         
363         // Ensure we are keying on the same name for lookups as mongodb might return 
364         // dns name and the driver is using ip's                
365         // Rename the connection so we are keying on the name used by mongod
366         var userProvidedServerString = instanceServer.host + ":" + instanceServer.port;                        
367         var me = document.me || userProvidedServerString;
368         
369         // If we have user provided entries already, switch them to avoid additional
370         // open connections
371         if(replSetSelf._state['addresses'][userProvidedServerString]) {
372           // Fetch server
373           var server = replSetSelf._state['addresses'][userProvidedServerString];
374           // Remove entry
375           delete replSetSelf._state['addresses'][userProvidedServerString];
376           // Remove other entries
377           if(replSetSelf._state['secondaries'][userProvidedServerString]) {
378             delete replSetSelf._state['secondaries'][userProvidedServerString];
379             replSetSelf._state['secondaries'][me] = server;
380           } else if(replSetSelf._state['passives'][userProvidedServerString]) {
381             delete replSetSelf._state['passives'][userProvidedServerString];
382             replSetSelf._state['passives'][me] = server;
383           } else if(replSetSelf._state['arbiters'][userProvidedServerString]) {
384             delete replSetSelf._state['arbiters'][userProvidedServerString];
385             replSetSelf._state['arbiters'][me] = server;
386           }  
387           
388           // Set name of the server
389           server.name = me;
390           // Add the existing one to the replicaset list of addresses
391           replSetSelf._state['addresses'][me] = server;          
392         } else {
393           instanceServer.name = me;
394         }
395         
396         // Only add server to our internal list if it's a master, secondary or arbiter
397         if(isMaster == true || secondary == true || arbiterOnly == true) {
398           // Handle a closed connection
399           replSetSelf.closeHandler = function(err, server) {
400             var closeServers = function() {
401               // Set the state to disconnected
402               parent._state = 'disconnected';
403               // Shut down the replicaset for now and Fire off all the callbacks sitting with no reply
404               if(replSetSelf._serverState == 'connected') {
405                 // Close the replicaset
406                 replSetSelf.close(function() {
407                   __executeAllCallbacksWithError(parent, err);
408                   // Ensure single callback only
409                   if(callback != null) {
410                     // Single callback only
411                     var internalCallback = callback;
412                     callback = null;
413                     // Return the error
414                     internalCallback(err, null);
415                   } else {
416                     // If the parent has listeners trigger an event
417                     if(parent.listeners("close").length > 0) {
418                       parent.emit("close", err);
419                     }
420                   }
421                 });
422               }
423             }            
424             
425             // Check if this is the primary server, then disconnect otherwise keep going
426             if(replSetSelf._state.master != null) {
427               var primaryAddress = replSetSelf._state.master.host + ":" + replSetSelf._state.master.port;
428               // var errorServerAddress = server.host + ":" +  server.port;
429               var errorServerAddress = server.name;
430             
431               // Only shut down the set if we have a primary server error
432               if(primaryAddress == errorServerAddress) {
433                 closeServers();
434               } else {
435                 // Remove from the list of servers
436                 delete replSetSelf._state.addresses[errorServerAddress];
437                 // Locate one of the lists and remove
438                 if(replSetSelf._state.secondaries[errorServerAddress] != null) {
439                   delete replSetSelf._state.secondaries[errorServerAddress];
440                 } else if(replSetSelf._state.arbiters[errorServerAddress] != null) {
441                   delete replSetSelf._state.arbiters[errorServerAddress];
442                 } else if(replSetSelf._state.passives[errorServerAddress] != null) {
443                   delete replSetSelf._state.passives[errorServerAddress];
444                 }  
445                 
446                 // Check if we are reading from Secondary only
447                 if(replSetSelf._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(replSetSelf._state.secondaries).length == 0) {
448                   closeServers();
449                 }              
450               }
451             } else {
452               closeServers();
453             }
454           }
455
456           // Handle a connection timeout
457           replSetSelf.timeoutHandler = function(err, server) {
458             var closeServers = function() {
459               // Set the state to disconnected
460               parent._state = 'disconnected';
461               // Shut down the replicaset for now and Fire off all the callbacks sitting with no reply
462               if(replSetSelf._serverState == 'connected') {
463                 // Close the replicaset
464                 replSetSelf.close(function() {
465                   __executeAllCallbacksWithError(parent, err);
466                   // Ensure single callback only
467                   if(callback != null) {
468                     // Single callback only
469                     var internalCallback = callback;
470                     callback = null;
471                     // Return the error
472                     internalCallback(new Error("connection timed out"), null);
473                   } else {
474                     // If the parent has listeners trigger an event
475                     if(parent.listeners("error").length > 0) {
476                       parent.emit("timeout", new Error("connection timed out"));
477                     }
478                   }
479                 });
480               }
481             }            
482             
483             // Check if this is the primary server, then disconnect otherwise keep going
484             if(replSetSelf._state.master != null) {
485               var primaryAddress = replSetSelf._state.master.host + ":" + replSetSelf._state.master.port;
486               var errorServerAddress = server.name;
487             
488               // Only shut down the set if we have a primary server error
489               if(primaryAddress == errorServerAddress) {
490                 closeServers();
491               } else {
492                 // Remove from the list of servers
493                 delete replSetSelf._state.addresses[errorServerAddress];
494                 // Locate one of the lists and remove
495                 if(replSetSelf._state.secondaries[errorServerAddress] != null) {
496                   delete replSetSelf._state.secondaries[errorServerAddress];
497                 } else if(replSetSelf._state.arbiters[errorServerAddress] != null) {
498                   delete replSetSelf._state.arbiters[errorServerAddress];
499                 } else if(replSetSelf._state.passives[errorServerAddress] != null) {
500                   delete replSetSelf._state.passives[errorServerAddress];
501                 }  
502                 
503                 // Check if we are reading from Secondary only
504                 if(replSetSelf._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(replSetSelf._state.secondaries).length == 0) {
505                   closeServers();
506                 }              
507               }
508             } else {
509               closeServers();
510             }
511           }
512
513           // Handle an error
514           replSetSelf.errorHandler = function(err, server) {
515             var closeServers = function() {
516              // Set the state to disconnected
517               parent._state = 'disconnected';
518               // Shut down the replicaset for now and Fire off all the callbacks sitting with no reply
519               if(replSetSelf._serverState == 'connected') {
520                 // Close the replicaset
521                 replSetSelf.close(function() {
522                   __executeAllCallbacksWithError(parent, err);
523                   // Ensure single callback only
524                   if(callback != null) {
525                     // Single callback only
526                     var internalCallback = callback;
527                     callback = null;
528                     // Return the error
529                     internalCallback(err, null);
530                   } else {
531                     // If the parent has listeners trigger an event
532                     if(parent.listeners("error").length > 0) {
533                       parent.emit("error", err);
534                     }
535                   }
536                 });
537               }
538             }            
539             
540             // Check if this is the primary server, then disconnect otherwise keep going
541             if(replSetSelf._state.master != null) {
542               var primaryAddress = replSetSelf._state.master.host + ":" + replSetSelf._state.master.port;
543               var errorServerAddress = server.name;
544               // var errorServerAddress = server.host + ":" + server.port;
545             
546               // Only shut down the set if we have a primary server error
547               if(primaryAddress == errorServerAddress) {
548                 closeServers();
549               } else {
550                 // Remove from the list of servers
551                 delete replSetSelf._state.addresses[errorServerAddress];
552                 // Locate one of the lists and remove
553                 if(replSetSelf._state.secondaries[errorServerAddress] != null) {
554                   delete replSetSelf._state.secondaries[errorServerAddress];
555                 } else if(replSetSelf._state.arbiters[errorServerAddress] != null) {
556                   delete replSetSelf._state.arbiters[errorServerAddress];
557                 } else if(replSetSelf._state.passives[errorServerAddress] != null) {
558                   delete replSetSelf._state.passives[errorServerAddress];
559                 }  
560                 
561                 // Check if we are reading from Secondary only
562                 if(replSetSelf._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(replSetSelf._state.secondaries).length == 0) {
563                   closeServers();
564                 }              
565               }
566             } else {
567               closeServers();
568             }
569           }
570           
571           // Ensure we don't have duplicate handlers
572           instanceServer.removeAllListeners("close");
573           instanceServer.removeAllListeners("error");
574           instanceServer.removeAllListeners("timeout");
575
576           // Add error handler to the instance of the server
577           instanceServer.on("close", replSetSelf.closeHandler);
578           // Add error handler to the instance of the server
579           instanceServer.on("error", replSetSelf.errorHandler);
580           // instanceServer.on("timeout", errorHandler);
581           instanceServer.on("timeout", replSetSelf.timeoutHandler);
582           // Add tag info
583           instanceServer.tags = tags;
584
585           // For each tag in tags let's add the instance Server to the list for that tag
586           if(tags != null && typeof tags === 'object') {
587             var tagKeys = Object.keys(tags);
588             // For each tag file in the server add it to byTags
589             for(var i = 0; i < tagKeys.length; i++) {
590               var value = tags[tagKeys[i]];
591               // Check if we have a top level tag object
592               if(replSetSelf._state.byTags[tagKeys[i]] == null) replSetSelf._state.byTags[tagKeys[i]] = {};
593               // For the value check if we have an array of server instances
594               if(!Array.isArray(replSetSelf._state.byTags[tagKeys[i]][value])) replSetSelf._state.byTags[tagKeys[i]][value] = [];
595               // Check that the instance is not already registered there
596               var valueArray = replSetSelf._state.byTags[tagKeys[i]][value];            
597               var found = false;
598
599               // Iterate over all values
600               for(var j = 0; j < valueArray.length; j++) {
601                 if(valueArray[j].host == instanceServer.host && valueArray[j].port == instanceServer.port) {
602                   found = true;
603                   break;
604                 }
605               }
606
607               // If it was not found push the instance server to the list
608               if(!found) valueArray.push(instanceServer);
609             }
610           }
611
612           // Remove from error list
613           delete replSetSelf._state.errors[me];
614
615           // Add our server to the list of finished servers
616           replSetSelf._state.addresses[me] = instanceServer;
617
618           // Assign the set name
619           if(replSetSelf.replicaSet == null) {
620             replSetSelf._state.setName = setName;          
621           } else if(replSetSelf.replicaSet != setName && replSetSelf._serverState != 'disconnected') {
622             replSetSelf._state.errorMessages.push(new Error("configured mongodb replicaset does not match provided replicaset [" + setName + "] != [" + replSetSelf.replicaSet + "]"));
623             // Set done
624             replSetSelf._serverState = 'disconnected';
625             // ensure no callbacks get called twice
626             var internalCallback = callback;
627             callback = null;
628             // Return error message ignoring rest of calls
629             return internalCallback(replSetSelf._state.errorMessages[0], parent);
630           }
631           
632           // Let's add the server to our list of server types
633           if(secondary == true && (passive == false || passive == null)) {
634             replSetSelf._state.secondaries[me] = instanceServer;
635           } else if(arbiterOnly == true) {
636             replSetSelf._state.arbiters[me] = instanceServer;
637           } else if(secondary == true && passive == true) {
638             replSetSelf._state.passives[me] = instanceServer;
639           } else if(isMaster == true) {
640             replSetSelf._state.master = instanceServer;
641           } else if(isMaster == false && primary != null && replSetSelf._state.addresses[primary]) {
642             replSetSelf._state.master = replSetSelf._state.addresses[primary];
643           }
644
645           // Let's go throught all the "possible" servers in the replicaset
646           var candidateServers = hosts.concat(arbiters).concat(passives);        
647
648           // If we have new servers let's add them
649           for(var i = 0; i < candidateServers.length; i++) {
650             // Fetch the server string
651             var candidateServerString = candidateServers[i];        
652             // Add the server if it's not defined and not already errored out
653             if(null == replSetSelf._state.addresses[candidateServerString]
654               && null == replSetSelf._state.errors[candidateServerString]) {            
655               // Split the server string
656               var parts = candidateServerString.split(/:/);
657               if(parts.length == 1) {
658                 parts = [parts[0], Connection.DEFAULT_PORT];
659               }
660
661               // Default empty socket options object
662               var socketOptions = {};
663               // If a socket option object exists clone it
664               if(replSetSelf.socketOptions != null) {
665                 var keys = Object.keys(replSetSelf.socketOptions);
666                 for(var i = 0; i < keys.length;i++) socketOptions[keys[i]] = replSetSelf.socketOptions[keys[i]];
667               }
668
669               // Add host information to socket options
670               socketOptions['host'] = parts[0];
671               socketOptions['port'] = parseInt(parts[1]);
672
673               // Create a new server instance
674               var newServer = new Server(parts[0], parseInt(parts[1]), {auto_reconnect:false, 'socketOptions':socketOptions
675                               , logger:replSetSelf.logger, ssl:replSetSelf.ssl, poolSize:replSetSelf.poolSize});
676               // Set the replicaset instance
677               newServer.replicasetInstance = replSetSelf;              
678
679               // Add handlers
680               newServer.on("close", replSetSelf.closeHandler);
681               newServer.on("timeout", replSetSelf.timeoutHandler);
682               newServer.on("error", replSetSelf.errorHandler);
683               
684               // Add server to list, ensuring we don't get a cascade of request to the same server
685               replSetSelf._state.addresses[candidateServerString] = newServer;
686
687               // Add a new server to the total number of servers that need to initialized before we are done
688               self._numberOfServersLeftToInitialize = self._numberOfServersLeftToInitialize + 1;
689
690               // Let's set up a new server instance
691               newServer.connect(parent, {returnIsMasterResults: true, eventReceiver:newServer}, self.connectionHandler(newServer));
692             }
693           }          
694         } else {
695           // Remove the instance from out list of servers
696           delete replSetSelf._state.addresses[me];
697         }
698       } else {
699         delete replSetSelf._state.addresses[instanceServer.host + ":" + instanceServer.port];
700       }
701       
702       // If done finish up
703       if((self._numberOfServersLeftToInitialize == 0) && replSetSelf._serverState === 'connecting' && replSetSelf._state.errorMessages.length == 0) {
704         // Set db as connected
705         replSetSelf._serverState = 'connected';
706         // If we don't expect a master let's call back, otherwise we need a master before
707         // the connection is successful
708         if(replSetSelf.masterNotNeeded || replSetSelf._state.master != null) {
709           // If we have a read strategy boot it
710           if(replSetSelf.strategyInstance != null) {
711             // Ensure we have a proper replicaset defined
712             replSetSelf.strategyInstance.replicaset = replSetSelf;
713             // Start strategy
714             replSetSelf.strategyInstance.start(function(err) {
715               // ensure no callbacks get called twice
716               var internalCallback = callback;
717               callback = null;
718               // Start up ha
719               if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
720                 replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
721               }
722               // Perform callback
723               internalCallback(null, parent);
724             })
725           } else {
726             // ensure no callbacks get called twice
727             var internalCallback = callback;
728             callback = null;
729             // Start up ha
730             if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
731               replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
732             }
733             // Perform callback
734             internalCallback(null, parent);
735           }
736         } else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length > 0) {
737           // If we have a read strategy boot it
738           if(replSetSelf.strategyInstance != null) {
739             // Ensure we have a proper replicaset defined
740             replSetSelf.strategyInstance.replicaset = replSetSelf;
741             // Start strategy
742             replSetSelf.strategyInstance.start(function(err) {
743               // ensure no callbacks get called twice
744               var internalCallback = callback;
745               callback = null;
746               // Start up ha
747               if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
748                 replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
749               }
750               // Perform callback
751               internalCallback(null, parent);
752             })
753           } else {
754             // ensure no callbacks get called twice
755             var internalCallback = callback;
756             callback = null;
757             // Start up ha
758             if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
759               replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
760             }
761             // Perform callback
762             internalCallback(null, parent);
763           }
764         } else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length == 0) {          
765           replSetSelf._serverState = 'disconnected';
766           // ensure no callbacks get called twice
767           var internalCallback = callback;
768           callback = null;
769           // Force close all server instances
770           replSetSelf.close();
771           // Perform callback
772           internalCallback(new Error("no secondary server found"), null);
773         } else if(typeof callback === 'function') {
774           replSetSelf._serverState = 'disconnected';
775           // ensure no callbacks get called twice
776           var internalCallback = callback;
777           callback = null;
778           // Force close all server instances
779           replSetSelf.close();
780           // Perform callback
781           internalCallback(new Error("no primary server found"), null);
782         }          
783       } else if((self._numberOfServersLeftToInitialize == 0) && replSetSelf._state.errorMessages.length > 0 && replSetSelf._serverState != 'disconnected') {
784         // Set done
785         replSetSelf._serverState = 'disconnected';
786         // ensure no callbacks get called twice
787         var internalCallback = callback;
788         callback = null;
789         // Force close all server instances
790         replSetSelf.close();
791         // Callback to signal we are done
792         internalCallback(replSetSelf._state.errorMessages[0], null);          
793       }
794     }
795   }
796   
797   // Ensure we have all registered servers in our set
798   for(var i = 0; i < serverConnections.length; i++) {
799     replSetSelf._state.addresses[serverConnections[i].host + ':' + serverConnections[i].port] = serverConnections[i];
800   }
801
802   // Initialize all the connections
803   for(var i = 0; i < serverConnections.length; i++) {    
804     // Set up the logger for the server connection
805     serverConnections[i].logger = replSetSelf.logger;
806     // Default empty socket options object
807     var socketOptions = {};
808     // If a socket option object exists clone it
809     if(this.socketOptions != null && typeof this.socketOptions === 'object') {
810       var keys = Object.keys(this.socketOptions);
811       for(var j = 0; j < keys.length;j++) socketOptions[keys[j]] = this.socketOptions[keys[j]];
812     }
813
814     // If ssl is specified
815     if(replSetSelf.ssl) serverConnections[i].ssl = true;
816
817     // Add host information to socket options
818     socketOptions['host'] = serverConnections[i].host;
819     socketOptions['port'] = serverConnections[i].port;
820
821     // Set the socket options
822     serverConnections[i].socketOptions = socketOptions;
823     // Set the replicaset instance
824     serverConnections[i].replicasetInstance = replSetSelf;
825     // Connect to server
826     serverConnections[i].connect(parent, {returnIsMasterResults: true, eventReceiver:serverConnections[i]}, self.connectionHandler(serverConnections[i]));
827   }      
828
829   // The checking function
830   this.replicasetCheckFunction = function() {
831     try {
832       // Retrieve a reader connection
833       var con = self.checkoutReader();
834       // If we have a connection and we have a db object
835       if(con != null && Array.isArray(self.dbInstances) && self.dbInstances.length > 0) {
836         var dbInstance = self.dbInstances[0];
837         dbInstance.admin().command({replSetGetStatus:1}, {connection:con}, function(err, result) {          
838           // Paranoid android
839           if(null == err && null != result && null != result["documents"] && result["documents"].length > 0) {
840             // For each member we need to check if we have a new connection that needs to be established
841             var members = result['documents'][0]['members'];
842             
843             if(null != members) {
844               // The total members we check
845               var newServers = 0;
846               // Iterate over all existing members
847               for(var i = 0, jlen = members.length; i < jlen; i++) {
848                 // Get a member
849                 var member = members[i];
850                 // If the node is healthy and it does not exist in the current replicaset, add it to the
851                 // current setup
852                 if(null != self._state && 0 != member['health'] && null == self._state['addresses'][member['name']]) {                  
853                   // We need to add a server to the connection, this means going through the notions of establishing
854                   // A completely new connection
855                   // Found a new server
856                   newServers = newServers + 1;
857
858                   // Split the server string
859                   var parts = member.name.split(/:/);
860                   if(parts.length == 1) {
861                     parts = [parts[0], Connection.DEFAULT_PORT];
862                   }
863                   
864                   // Default empty socket options object
865                   var socketOptions = {};
866                   // If a socket option object exists clone it
867                   if(self.socketOptions != null) {
868                     var keys = Object.keys(self.socketOptions);
869                     for(var k = 0; k < keys.length;k++) socketOptions[keys[i]] = self.socketOptions[keys[i]];
870                   }
871                   
872                   // Add host information to socket options
873                   socketOptions['host'] = parts[0];
874                   socketOptions['port'] = parseInt(parts[1]);
875                   
876                   // Create a new server instance
877                   var newServer = new Server(parts[0], parseInt(parts[1]), {auto_reconnect:false, 'socketOptions':socketOptions
878                                   , logger:self.logger, ssl:self.ssl, poolSize:self.poolSize});
879                   // Set the replicaset instance
880                   newServer.replicasetInstance = self;              
881                   
882                   // Add handlers
883                   newServer.on("close", self.closeHandler);
884                   newServer.on("timeout", self.timeoutHandler);
885                   newServer.on("error", self.errorHandler);
886                   
887                   // Add a new server to the total number of servers that need to initialized before we are done
888                   var newServerCallback = self.connectionHandler(newServer);
889                   
890                   // Let's set up a new server instance
891                   newServer.connect(self.db, {returnIsMasterResults: true, eventReceiver:newServer}, function(err, result) {
892                     // Remove from number of newServers
893                     newServers = newServers - 1;
894                     // Call the setup
895                     newServerCallback(err, result);
896                     // If we have 0 new servers let's go back to rechecking
897                     if(newServers <= 0) {
898                       setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);                      
899                     }
900                   });
901                 }
902               }
903               
904               // If we have no new servers check status again
905               if(newServers == 0) {
906                 setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);
907               }
908             }            
909           }
910         });        
911       }      
912     } catch(err) {
913       setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);
914     }
915   };    
916 }
917
918 /**
919  * @ignore
920  */
921 ReplSet.prototype.checkoutWriter = function() {
922   // Establish connection
923   var connection = this._state.master != null ? this._state.master.checkoutWriter() : null;  
924   // Return the connection
925   return connection;
926 }
927
928 /**
929  * @ignore
930  */
931 ReplSet.prototype.checkoutReader = function() {
932   var connection = null;
933   // If we have specified to read from a secondary server grab a random one and read
934   // from it, otherwise just pass the primary connection
935   if((this.readSecondary == true || this._readPreference == Server.READ_SECONDARY || this._readPreference == Server.READ_SECONDARY_ONLY) && Object.keys(this._state.secondaries).length > 0) {
936     // Checkout a secondary server from the passed in set of servers
937     if(this.strategyInstance != null) {
938       connection = this.strategyInstance.checkoutSecondary();
939     } else {
940       // Pick a random key
941       var keys = Object.keys(this._state.secondaries);
942       this._currentServerChoice = this._currentServerChoice % keys.length;
943       var key = keys[this._currentServerChoice++];
944       connection = this._state.secondaries[key].checkoutReader();
945     }
946   } else if(this._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(this._state.secondaries).length == 0) {
947     connection = null;
948   } else if(this._readPreference != null && typeof this._readPreference === 'object') {
949     // Get all tag keys (used to try to find a server that is valid)
950     var keys = Object.keys(this._readPreference);
951     // final instance server
952     var instanceServer = null;
953     // for each key look for an avilable instance
954     for(var i = 0; i < keys.length; i++) {
955       // Grab subkey value
956       var value = this._readPreference[keys[i]];
957
958       // Check if we have any servers for the tag, if we do pick a random one
959       if(this._state.byTags[keys[i]] != null
960         && this._state.byTags[keys[i]][value] != null
961         && Array.isArray(this._state.byTags[keys[i]][value])
962         && this._state.byTags[keys[i]][value].length > 0) {
963         // Let's grab an available server from the list using a random pick
964         var serverInstances = this._state.byTags[keys[i]][value];
965         // Set instance to return
966         instanceServer = serverInstances[Math.floor(Math.random() * serverInstances.length)];
967         break;
968       }
969     }
970
971     // Return the instance of the server
972     connection = instanceServer != null ? instanceServer.checkoutReader() : this.checkoutWriter();
973   } else {
974     connection = this.checkoutWriter();
975   }
976   
977   // Return the connection
978   return connection;
979 }
980
981 /**
982  * @ignore
983  */
984 ReplSet.prototype.allRawConnections = function() {
985   // Neeed to build a complete list of all raw connections, start with master server
986   var allConnections = [];
987   // Get connection object
988   var allMasterConnections = this._state.master.connectionPool.getAllConnections();
989   // Add all connections to list
990   allConnections = allConnections.concat(allMasterConnections);
991   
992   // If we have read secondary let's add all secondary servers
993   if(this.readSecondary && Object.keys(this._state.secondaries).length > 0) {
994     // Get all the keys
995     var keys = Object.keys(this._state.secondaries);
996     // For each of the secondaries grab the connections
997     for(var i = 0; i < keys.length; i++) {
998       // Get connection object
999       var secondaryPoolConnections = this._state.secondaries[keys[i]].connectionPool.getAllConnections();
1000       // Add all connections to list
1001       allConnections = allConnections.concat(secondaryPoolConnections);
1002     }
1003   }
1004   
1005   // Return all the conections
1006   return allConnections;
1007 }
1008
1009 /**
1010  * @ignore
1011  */
1012 ReplSet.prototype.enableRecordQueryStats = function(enable) {
1013   // Set the global enable record query stats
1014   this.recordQueryStats = enable;
1015   // Ensure all existing servers already have the flag set, even if the 
1016   // connections are up already or we have not connected yet
1017   if(this._state != null && this._state.addresses != null) {
1018     var keys = Object.keys(this._state.addresses);
1019     // Iterate over all server instances and set the  enableRecordQueryStats flag
1020     for(var i = 0; i < keys.length; i++) {
1021       this._state.addresses[keys[i]].enableRecordQueryStats(enable);
1022     }
1023   } else if(Array.isArray(this.servers)) {
1024     for(var i = 0; i < this.servers.length; i++) {
1025       this.servers[i].enableRecordQueryStats(enable);
1026     }
1027   }
1028 }
1029
1030 /**
1031  * @ignore
1032  */
1033 ReplSet.prototype.disconnect = function(callback) {
1034   this.close(callback);
1035 }
1036
1037 /**
1038  * @ignore
1039  */
1040 ReplSet.prototype.close = function(callback) {
1041   var self = this;  
1042   // Set server status as disconnected
1043   this._serverState = 'disconnected';  
1044   // Get all the server instances and close them
1045   var allServers = [];
1046   // Make sure we have servers
1047   if(this._state['addresses'] != null) {
1048     var keys = Object.keys(this._state.addresses);
1049     for(var i = 0; i < keys.length; i++) {
1050       allServers.push(this._state.addresses[keys[i]]);
1051     }    
1052   }
1053   
1054   // Let's process all the closing
1055   var numberOfServersToClose = allServers.length;
1056
1057   // Remove all the listeners
1058   self.removeAllListeners();
1059
1060   // Special case where there are no servers
1061   if(allServers.length == 0 && typeof callback === 'function') return callback(null, null);
1062
1063   // Close the servers
1064   for(var i = 0; i < allServers.length; i++) {
1065     var server = allServers[i];
1066     if(server.isConnected()) {
1067       // Close each server
1068       server.close(function() {
1069         numberOfServersToClose = numberOfServersToClose - 1;
1070         // Clear out state if we are done
1071         if(numberOfServersToClose == 0) {
1072           // Clear out state
1073           self._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'byTags':{}, 'setName':null, 'errorMessages':[], 'members':[]};
1074         }
1075
1076         // If we are finished perform the call back
1077         if(numberOfServersToClose == 0 && typeof callback === 'function') {
1078           callback(null);          
1079         }
1080       })      
1081     } else {
1082       numberOfServersToClose = numberOfServersToClose - 1;      
1083       // If we have no more servers perform the callback
1084       if(numberOfServersToClose == 0 && typeof callback === 'function') {
1085         callback(null);          
1086       }
1087     }
1088   }
1089 }
1090
1091 /**
1092  * Auto Reconnect property
1093  * @ignore
1094  */
1095 Object.defineProperty(ReplSet.prototype, "autoReconnect", { enumerable: true
1096   , get: function () {
1097       return true;
1098     }
1099 });
1100
1101 /**
1102  * Get Read Preference method
1103  * @ignore
1104  */
1105 Object.defineProperty(ReplSet.prototype, "readPreference", { enumerable: true
1106   , get: function () {
1107       if(this._readPreference == null && this.readSecondary) {
1108         return Server.READ_SECONDARY;
1109       } else if(this._readPreference == null && !this.readSecondary) {
1110         return Server.READ_PRIMARY;
1111       } else {
1112         return this._readPreference;
1113       }
1114     }
1115 });  
1116
1117 /**
1118  * Db Instances
1119  * @ignore
1120  */
1121 Object.defineProperty(ReplSet.prototype, "dbInstances", {enumerable:true
1122   , get: function() {
1123     var servers = this.allServerInstances();
1124     return servers[0].dbInstances;
1125   }
1126 })
1127
1128 /**
1129  * Just make compatible with server.js
1130  * @ignore
1131  */
1132 Object.defineProperty(ReplSet.prototype, "host", { enumerable: true
1133   , get: function () {
1134       if (this.primary != null) return this.primary.host;
1135     }
1136 });
1137
1138 /**
1139  * Just make compatible with server.js
1140  * @ignore
1141  */
1142 Object.defineProperty(ReplSet.prototype, "port", { enumerable: true
1143   , get: function () {
1144       if (this.primary != null) return this.primary.port;
1145     }
1146 });
1147
1148 /**
1149  * Get status of read
1150  * @ignore
1151  */
1152 Object.defineProperty(ReplSet.prototype, "read", { enumerable: true
1153   , get: function () {
1154       return this.secondaries.length > 0 ? this.secondaries[0] : null;
1155     }
1156 });
1157
1158 /**
1159  * Get list of secondaries
1160  * @ignore
1161  */
1162 Object.defineProperty(ReplSet.prototype, "secondaries", {enumerable: true
1163   , get: function() {              
1164       var keys = Object.keys(this._state.secondaries);
1165       var array = new Array(keys.length);
1166       // Convert secondaries to array
1167       for(var i = 0; i < keys.length; i++) {
1168         array[i] = this._state.secondaries[keys[i]];
1169       }
1170       return array;
1171     }
1172 });
1173
1174 /**
1175  * Get list of all secondaries including passives
1176  * @ignore
1177  */
1178 Object.defineProperty(ReplSet.prototype, "allSecondaries", {enumerable: true
1179   , get: function() {              
1180       return this.secondaries.concat(this.passives);
1181     }
1182 });
1183
1184 /**
1185  * Get list of arbiters
1186  * @ignore
1187  */
1188 Object.defineProperty(ReplSet.prototype, "arbiters", {enumerable: true
1189   , get: function() {
1190       var keys = Object.keys(this._state.arbiters);
1191       var array = new Array(keys.length);
1192       // Convert arbiters to array
1193       for(var i = 0; i < keys.length; i++) {
1194         array[i] = this._state.arbiters[keys[i]];
1195       }
1196       return array;
1197     }
1198 });
1199
1200 /**
1201  * Get list of passives
1202  * @ignore
1203  */
1204 Object.defineProperty(ReplSet.prototype, "passives", {enumerable: true
1205   , get: function() {
1206       var keys = Object.keys(this._state.passives);
1207       var array = new Array(keys.length);
1208       // Convert arbiters to array
1209       for(var i = 0; i < keys.length; i++) {
1210         array[i] = this._state.passives[keys[i]];
1211       }
1212       return array;
1213     }
1214 });
1215
1216 /**
1217  * Master connection property
1218  * @ignore
1219  */
1220 Object.defineProperty(ReplSet.prototype, "primary", { enumerable: true
1221   , get: function () {
1222       return this._state != null ? this._state.master : null;
1223     }
1224 });  
1225
1226 /**
1227  * @ignore
1228  */
1229 // Backward compatibility
1230 exports.ReplSetServers = ReplSet;