Patches for Berkeley DB version 4.6.21

  1. [#15692] Fixes a race condition between checkpoint and DB->close which can result in the checkpoint thread self-deadlocking.
  2. Apply the following patch to the db-4.6.21 release.  
    *** dbinc/mp.h	2007-09-28 01:28:25.000000000 +1000
    --- dbinc/mp.h	2008-02-14 01:22:09.000000000 +1100
    ***************
    *** 639,644 ****
    --- 639,647 ----
       */
      #define	MP_TRUNC_RECOVER	0x01
      
    + /* Private flags to DB_MPOOLFILE->close. */
    + #define	DB_MPOOL_NOLOCK		0x002	/* Already have mpf locked. */
    + 
      #if defined(__cplusplus)
      }
      #endif
    *** mp/mp_fopen.c	2007-05-18 03:18:01.000000000 +1000
    --- mp/mp_fopen.c	2008-02-12 16:09:42.000000000 +1100
    ***************
    *** 888,894 ****
      	 * when we try to flush them.
      	 */
      	deleted = 0;
    ! 	MUTEX_LOCK(dbenv, mfp->mutex);
      	if (F_ISSET(dbmfp, MP_MULTIVERSION))
      		--mfp->multiversion;
      	if (--mfp->mpf_cnt == 0 || LF_ISSET(DB_MPOOL_DISCARD)) {
    --- 888,895 ----
      	 * when we try to flush them.
      	 */
      	deleted = 0;
    ! 	if (!LF_ISSET(DB_MPOOL_NOLOCK))
    ! 		MUTEX_LOCK(dbenv, mfp->mutex);
      	if (F_ISSET(dbmfp, MP_MULTIVERSION))
      		--mfp->multiversion;
      	if (--mfp->mpf_cnt == 0 || LF_ISSET(DB_MPOOL_DISCARD)) {
    ***************
    *** 909,921 ****
      			}
      		}
      		if (mfp->block_cnt == 0) {
      			if ((t_ret =
      			    __memp_mf_discard(dbmp, mfp)) != 0 && ret == 0)
      				ret = t_ret;
      			deleted = 1;
      		}
      	}
    ! 	if (!deleted)
      		MUTEX_UNLOCK(dbenv, mfp->mutex);
      
      done:	/* Discard the DB_MPOOLFILE structure. */
    --- 910,928 ----
      			}
      		}
      		if (mfp->block_cnt == 0) {
    + 			/*
    + 			 * We should never discard this mp file if our caller
    + 			 * is holding the lock on it.  See comment in
    + 			 * __memp_sync_file.
    + 			 */
    + 			DB_ASSERT(dbenv, !LF_ISSET(DB_MPOOL_NOLOCK));
      			if ((t_ret =
      			    __memp_mf_discard(dbmp, mfp)) != 0 && ret == 0)
      				ret = t_ret;
      			deleted = 1;
      		}
      	}
    ! 	if (!deleted && !LF_ISSET(DB_MPOOL_NOLOCK))
      		MUTEX_UNLOCK(dbenv, mfp->mutex);
      
      done:	/* Discard the DB_MPOOLFILE structure. */
    *** mp/mp_sync.c	2007-06-02 04:32:44.000000000 +1000
    --- mp/mp_sync.c	2008-02-12 16:09:42.000000000 +1100
    ***************
    *** 755,761 ****
      	 * This is important since we are called with the hash bucket
      	 * locked.  The mfp will get freed via the cleanup pass.
      	 */
    ! 	if (dbmfp != NULL && (t_ret = __memp_fclose(dbmfp, 0)) != 0 && ret == 0)
      		ret = t_ret;
      
      	--mfp->mpf_cnt;
    --- 755,762 ----
      	 * This is important since we are called with the hash bucket
      	 * locked.  The mfp will get freed via the cleanup pass.
      	 */
    ! 	if (dbmfp != NULL &&
    ! 	    (t_ret = __memp_fclose(dbmfp, DB_MPOOL_NOLOCK)) != 0 && ret == 0)
      		ret = t_ret;
      
      	--mfp->mpf_cnt;
    
    

  3. [#16178] Fixes the potential for the wrong number of mutexes to be allocated. This issue could cause applications with multiple cache regions to see undefined behavior in rare cases under load.
  4. Apply the following patch to the db-4.6.21 release.  
    *** mp/mp_region.c	2007-05-18 03:18:01.000000000 +1000
    --- mp/mp_region.c	2008-06-24 13:15:56.000000000 +1000
    ***************
    *** 249,256 ****
      		mtx_base = htab[0].mtx_hash;
      	}
      
      	if (mtx_base != MUTEX_INVALID)
    ! 		mtx_base += reginfo_off * htab_buckets;
      
      	/* Allocate hash table space and initialize it. */
      	if ((ret = __env_alloc(infop,
    --- 249,262 ----
      		mtx_base = htab[0].mtx_hash;
      	}
      
    + 	/*
    + 	 * We preallocated all of the mutexes in a block, so for regions after
    + 	 * the first, we skip mutexes in use in earlier regions.  Each region
    + 	 * has the same number of buckets and there are two mutexes per hash
    + 	 * bucket (the bucket mutex and the I/O mutex).
    + 	 */
      	if (mtx_base != MUTEX_INVALID)
    ! 		mtx_base += reginfo_off * htab_buckets * 2;
      
      	/* Allocate hash table space and initialize it. */
      	if ((ret = __env_alloc(infop,
    

  5. [#16406] Replication clients should be able to open a sequence.
  6. Apply the following patch to the db-4.6.21 release.  
    *** sequence/sequence.c
    --- sequence/sequence.c
    ***************
    *** 196,202 ****
      	if ((ret = __db_get_flags(dbp, &tflags)) != 0)
      		goto err;
      
    ! 	if (DB_IS_READONLY(dbp)) {
      		ret = __db_rdonly(dbp->dbenv, "DB_SEQUENCE->open");
      		goto err;
      	}
    --- 196,206 ----
      	if ((ret = __db_get_flags(dbp, &tflags)) != 0)
      		goto err;
      
    ! 	/*
    ! 	 * We can let replication clients open sequences, but must
    ! 	 * check later that they do not update them.
    ! 	 */
    ! 	if (F_ISSET(dbp, DB_AM_RDONLY)) {
      		ret = __db_rdonly(dbp->dbenv, "DB_SEQUENCE->open");
      		goto err;
      	}
    ***************
    *** 252,257 ****
    --- 256,266 ----
      		if ((ret != DB_NOTFOUND && ret != DB_KEYEMPTY) ||
      		    !LF_ISSET(DB_CREATE))
      			goto err;
    + 		if (IS_REP_CLIENT(dbenv) &&
    + 		    !F_ISSET(dbp, DB_AM_NOT_DURABLE)) {
    + 			ret = __db_rdonly(dbenv, "DB_SEQUENCE->open");
    + 			goto err;
    + 		}
      		ret = 0;
      
      		rp = &seq->seq_record;
    ***************
    *** 304,310 ****
      	 */
      	rp = seq->seq_data.data;
      	if (rp->seq_version == DB_SEQUENCE_OLDVER) {
    ! oldver:		rp->seq_version = DB_SEQUENCE_VERSION;
      		if (__db_isbigendian()) {
      			if (IS_DB_AUTO_COMMIT(dbp, txn)) {
      				if ((ret =
    --- 313,324 ----
      	 */
      	rp = seq->seq_data.data;
      	if (rp->seq_version == DB_SEQUENCE_OLDVER) {
    ! oldver:		if (IS_REP_CLIENT(dbenv) &&
    ! 		    !F_ISSET(dbp, DB_AM_NOT_DURABLE)) {
    ! 			ret = __db_rdonly(dbenv, "DB_SEQUENCE->open");
    ! 			goto err;
    ! 		}
    ! 		rp->seq_version = DB_SEQUENCE_VERSION;
      		if (__db_isbigendian()) {
      			if (IS_DB_AUTO_COMMIT(dbp, txn)) {
      				if ((ret =
    ***************
    *** 713,718 ****
    --- 727,738 ----
      
      	MUTEX_LOCK(dbenv, seq->mtx_seq);
      
    + 	if (handle_check && IS_REP_CLIENT(dbenv) &&
    + 	    !F_ISSET(dbp, DB_AM_NOT_DURABLE)) {
    + 		ret = __db_rdonly(dbenv, "DB_SEQUENCE->get");
    + 		goto err;
    + 	}
    + 
      	if (rp->seq_min + delta > rp->seq_max) {
      		__db_errx(dbenv, "Sequence overflow");
      		ret = EINVAL;
    

  7. [#15788] Fixes a bug which could lead to slow performance of internal init under Replication Manager, as evidenced by "queue limit exceeded" messages in verbose replication diagnostic output.
  8. Apply the following patch to the db-4.6.21 release.  
    *** dbinc/repmgr.h	2007-10-31 10:23:52.000000000 -0700
    --- dbinc/repmgr.h	2007-10-31 10:23:53.000000000 -0700
    ***************
    *** 36,41 ****
    --- 36,55 ----
      #endif
      
      /*
    +  * The (arbitrary) maximum number of outgoing messages we're willing to hold, on
    +  * a queue per connection, waiting for TCP buffer space to become available in
    +  * the kernel.  Rather than exceeding this limit, we simply discard additional
    +  * messages (since this is always allowed by the replication protocol).
    +  *    As a special dispensation, if a message is destined for a specific remote
    +  * site (i.e., it's not a broadcast), then we first try blocking the sending
    +  * thread, waiting for space to become available (though we only wait a limited
    +  * time).  This is so as to be able to handle the immediate flood of (a
    +  * potentially large number of) outgoing messages that replication generates, in
    +  * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests.
    +  */
    + #define	OUT_QUEUE_LIMIT	10
    + 
    + /*
       * The system value is available from sysconf(_SC_HOST_NAME_MAX).
       * Historically, the maximum host name was 256.
       */
    ***************
    *** 47,52 ****
    --- 61,71 ----
      #define	MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20)
      typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1];
      
    + /* Default timeout values, in seconds. */
    + #define	DB_REPMGR_DEFAULT_ACK_TIMEOUT		(1 * US_PER_SEC)
    + #define	DB_REPMGR_DEFAULT_CONNECTION_RETRY	(30 * US_PER_SEC)
    + #define	DB_REPMGR_DEFAULT_ELECTION_RETRY	(10 * US_PER_SEC)
    + 
      struct __repmgr_connection;
          typedef struct __repmgr_connection REPMGR_CONNECTION;
      struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE;
    ***************
    *** 171,178 ****
      #ifdef DB_WIN32
      	WSAEVENT event_object;
      #endif
    ! #define	CONN_CONNECTING	0x01	/* nonblocking connect in progress */
    ! #define	CONN_DEFUNCT	0x02	/* socket close pending */
      	u_int32_t flags;
      
      	/*
    --- 190,198 ----
      #ifdef DB_WIN32
      	WSAEVENT event_object;
      #endif
    ! #define	CONN_CONGESTED	0x01	/* msg thread wait has exceeded timeout */
    ! #define	CONN_CONNECTING	0x02	/* nonblocking connect in progress */
    ! #define	CONN_DEFUNCT	0x04	/* socket close pending */
      	u_int32_t flags;
      
      	/*
    ***************
    *** 180,189 ****
      	 * send() function's thread.  But if TCP doesn't have enough network
      	 * buffer space for us when we first try it, we instead allocate some
      	 * memory, and copy the message, and then send it as space becomes
    ! 	 * available in our main select() thread.
      	 */
      	OUT_Q_HEADER outbound_queue;
      	int out_queue_length;
      
      	/*
      	 * Input: while we're reading a message, we keep track of what phase
    --- 200,215 ----
      	 * send() function's thread.  But if TCP doesn't have enough network
      	 * buffer space for us when we first try it, we instead allocate some
      	 * memory, and copy the message, and then send it as space becomes
    ! 	 * available in our main select() thread.  In some cases, if the queue
    ! 	 * gets too long we wait until it's drained, and then append to it.
    ! 	 * This condition variable's associated mutex is the normal per-repmgr
    ! 	 * db_rep->mutex, because that mutex is always held anyway whenever the
    ! 	 * output queue is consulted.
      	 */
      	OUT_Q_HEADER outbound_queue;
      	int out_queue_length;
    + 	cond_var_t drained;
    + 	int blockers;		/* ref count of msg threads waiting on us */
      
      	/*
      	 * Input: while we're reading a message, we keep track of what phase
    *** dbinc_auto/int_def.in	2007-10-31 10:23:52.000000000 -0700
    --- dbinc_auto/int_def.in	2007-10-31 10:23:52.000000000 -0700
    ***************
    *** 1420,1425 ****
    --- 1420,1428 ----
      #define	__repmgr_wake_waiting_senders __repmgr_wake_waiting_senders@DB_VERSION_UNIQUE_NAME@
      #define	__repmgr_await_ack __repmgr_await_ack@DB_VERSION_UNIQUE_NAME@
      #define	__repmgr_compute_wait_deadline __repmgr_compute_wait_deadline@DB_VERSION_UNIQUE_NAME@
    + #define	__repmgr_await_drain __repmgr_await_drain@DB_VERSION_UNIQUE_NAME@
    + #define	__repmgr_alloc_cond __repmgr_alloc_cond@DB_VERSION_UNIQUE_NAME@
    + #define	__repmgr_free_cond __repmgr_free_cond@DB_VERSION_UNIQUE_NAME@
      #define	__repmgr_init_sync __repmgr_init_sync@DB_VERSION_UNIQUE_NAME@
      #define	__repmgr_close_sync __repmgr_close_sync@DB_VERSION_UNIQUE_NAME@
      #define	__repmgr_net_init __repmgr_net_init@DB_VERSION_UNIQUE_NAME@
    *** dbinc_auto/repmgr_ext.h	2007-10-31 10:23:52.000000000 -0700
    --- dbinc_auto/repmgr_ext.h	2007-10-31 10:23:52.000000000 -0700
    ***************
    *** 21,30 ****
      int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
      void __repmgr_stash_generation __P((DB_ENV *));
      int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
    ! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *));
      int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
    ! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *, int));
    ! void __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
      int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
      int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
      int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
    --- 21,30 ----
      int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
      void __repmgr_stash_generation __P((DB_ENV *));
      int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
    ! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *, int));
      int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
    ! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *));
    ! int __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
      int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
      int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
      int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
    ***************
    *** 39,44 ****
    --- 39,47 ----
      int __repmgr_wake_waiting_senders __P((DB_ENV *));
      int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *));
      void __repmgr_compute_wait_deadline __P((DB_ENV*, struct timespec *, db_timeout_t));
    + int __repmgr_await_drain __P((DB_ENV *, REPMGR_CONNECTION *, db_timeout_t));
    + int __repmgr_alloc_cond __P((cond_var_t *));
    + int __repmgr_free_cond __P((cond_var_t *));
      int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
      int __repmgr_close_sync __P((DB_ENV *));
      int __repmgr_net_init __P((DB_ENV *, DB_REP *));
    *** repmgr/repmgr_method.c	2007-10-31 10:23:52.000000000 -0700
    --- repmgr/repmgr_method.c	2007-10-31 10:23:53.000000000 -0700
    ***************
    *** 196,204 ****
      	int ret;
      
      	/* Set some default values. */
    ! 	db_rep->ack_timeout = 1 * US_PER_SEC;			/*  1 second */
    ! 	db_rep->connection_retry_wait = 30 * US_PER_SEC;	/* 30 seconds */
    ! 	db_rep->election_retry_wait = 10 * US_PER_SEC;		/* 10 seconds */
      	db_rep->config_nsites = 0;
      	db_rep->peer = DB_EID_INVALID;
      	db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
    --- 196,204 ----
      	int ret;
      
      	/* Set some default values. */
    ! 	db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
    ! 	db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY;
    ! 	db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY;
      	db_rep->config_nsites = 0;
      	db_rep->peer = DB_EID_INVALID;
      	db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
    ***************
    *** 238,243 ****
    --- 238,244 ----
      	DB_ENV *dbenv;
      {
      	DB_REP *db_rep;
    + 	REPMGR_CONNECTION *conn;
      	int ret;
      
      	db_rep = dbenv->rep_handle;
    ***************
    *** 254,259 ****
    --- 255,266 ----
      
      	if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0)
      		goto unlock;
    + 
    + 	TAILQ_FOREACH(conn, &db_rep->connections, entries) {
    + 		if (conn->blockers > 0 &&
    + 		    ((ret = __repmgr_signal(&conn->drained)) != 0))
    + 			goto unlock;
    + 	}
      	UNLOCK_MUTEX(db_rep->mutex);
      
      	return (__repmgr_wake_main_thread(dbenv));
    *** repmgr/repmgr_msg.c	2007-10-31 10:23:52.000000000 -0700
    --- repmgr/repmgr_msg.c	2007-10-31 10:23:53.000000000 -0700
    ***************
    *** 183,192 ****
      
      /*
       * Acknowledges a message.
    -  *
    -  * !!!
    -  * Note that this cannot be called from the select() thread, in case we call
    -  * __repmgr_bust_connection(..., FALSE).
       */
      static int
      ack_message(dbenv, generation, lsn)
    --- 183,188 ----
    ***************
    *** 227,235 ****
      		rec2.size = 0;
      
      		conn = site->ref.conn;
      		if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
    ! 		    &control2, &rec2)) == DB_REP_UNAVAIL)
    ! 			ret = __repmgr_bust_connection(dbenv, conn, FALSE);
      	}
      
      	UNLOCK_MUTEX(db_rep->mutex);
    --- 223,236 ----
      		rec2.size = 0;
      
      		conn = site->ref.conn;
    + 		/*
    + 		 * It's hard to imagine anyone would care about a lost ack if
    + 		 * the path to the master is so congested as to need blocking;
    + 		 * so pass "blockable" argument as FALSE.
    + 		 */
      		if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
    ! 		    &control2, &rec2, FALSE)) == DB_REP_UNAVAIL)
    ! 			ret = __repmgr_bust_connection(dbenv, conn);
      	}
      
      	UNLOCK_MUTEX(db_rep->mutex);
    *** repmgr/repmgr_net.c	2007-10-31 10:23:52.000000000 -0700
    --- repmgr/repmgr_net.c	2007-10-31 10:23:53.000000000 -0700
    ***************
    *** 63,69 ****
      static void setup_sending_msg
          __P((struct sending_msg *, u_int, const DBT *, const DBT *));
      static int __repmgr_send_internal
    !     __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *));
      static int enqueue_msg
          __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
      static int flatten __P((DB_ENV *, struct sending_msg *));
    --- 63,69 ----
      static void setup_sending_msg
          __P((struct sending_msg *, u_int, const DBT *, const DBT *));
      static int __repmgr_send_internal
    !     __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, int));
      static int enqueue_msg
          __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
      static int flatten __P((DB_ENV *, struct sending_msg *));
    ***************
    *** 73,85 ****
       * __repmgr_send --
       *	The send function for DB_ENV->rep_set_transport.
       *
    -  * !!!
    -  * This is only ever called as the replication transport call-back, which means
    -  * it's either on one of our message processing threads or an application
    -  * thread.  It mustn't be called from the select() thread, because we might call
    -  * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the
    -  * select() thread.
    -  *
       * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *,
       * PUBLIC:     const DB_LSN *, int, u_int32_t));
       */
    --- 73,78 ----
    ***************
    *** 126,134 ****
      		}
      
      		conn = site->ref.conn;
      		if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
    ! 		    control, rec)) == DB_REP_UNAVAIL &&
    ! 		    (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0)
      			ret = t_ret;
      		if (ret != 0)
      			goto out;
    --- 119,128 ----
      		}
      
      		conn = site->ref.conn;
    + 		/* Pass the "blockable" argument as TRUE. */
      		if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
    ! 		    control, rec, TRUE)) == DB_REP_UNAVAIL &&
    ! 		    (t_ret = __repmgr_bust_connection(dbenv, conn)) != 0)
      			ret = t_ret;
      		if (ret != 0)
      			goto out;
    ***************
    *** 222,228 ****
      	if (site->state != SITE_CONNECTED)
      		return (NULL);
      
    ! 	if (F_ISSET(site->ref.conn, CONN_CONNECTING))
      		return (NULL);
      	return (site);
      }
    --- 216,222 ----
      	if (site->state != SITE_CONNECTED)
      		return (NULL);
      
    ! 	if (F_ISSET(site->ref.conn, CONN_CONNECTING|CONN_DEFUNCT))
      		return (NULL);
      	return (site);
      }
    ***************
    *** 235,244 ****
       *
       * !!!
       * Caller must hold dbenv->mutex.
    -  *
    -  * !!!
    -  * Note that this cannot be called from the select() thread, in case we call
    -  * __repmgr_bust_connection(..., FALSE).
       */
      static int
      __repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp)
    --- 229,234 ----
    ***************
    *** 268,281 ****
      		    !IS_VALID_EID(conn->eid))
      			continue;
      
    ! 		if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) {
      			site = SITE_FROM_EID(conn->eid);
      			nsites++;
      			if (site->priority > 0)
      				npeers++;
      		} else if (ret == DB_REP_UNAVAIL) {
    ! 			if ((ret = __repmgr_bust_connection(
    ! 			     dbenv, conn, FALSE)) != 0)
      				return (ret);
      		} else
      			return (ret);
    --- 258,277 ----
      		    !IS_VALID_EID(conn->eid))
      			continue;
      
    ! 		/*
    ! 		 * Broadcast messages are either application threads committing
    ! 		 * transactions, or replication status message that we can
    ! 		 * afford to lose.  So don't allow blocking for them (pass
    ! 		 * "blockable" argument as FALSE).
    ! 		 */
    ! 		if ((ret = __repmgr_send_internal(dbenv,
    ! 		    conn, &msg, FALSE)) == 0) {
      			site = SITE_FROM_EID(conn->eid);
      			nsites++;
      			if (site->priority > 0)
      				npeers++;
      		} else if (ret == DB_REP_UNAVAIL) {
    ! 			if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
      				return (ret);
      		} else
      			return (ret);
    ***************
    *** 301,339 ****
       * intersperse writes that are part of two single messages.
       *
       * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
    !  * PUBLIC:    u_int, const DBT *, const DBT *));
       */
      int
    ! __repmgr_send_one(dbenv, conn, msg_type, control, rec)
      	DB_ENV *dbenv;
      	REPMGR_CONNECTION *conn;
      	u_int msg_type;
      	const DBT *control, *rec;
      {
      	struct sending_msg msg;
      
      	setup_sending_msg(&msg, msg_type, control, rec);
    ! 	return (__repmgr_send_internal(dbenv, conn, &msg));
      }
      
      /*
       * Attempts a "best effort" to send a message on the given site.  If there is an
    !  * excessive backlog of message already queued on the connection, we simply drop
    !  * this message, and still return 0 even in this case.
       */
      static int
    ! __repmgr_send_internal(dbenv, conn, msg)
      	DB_ENV *dbenv;
      	REPMGR_CONNECTION *conn;
      	struct sending_msg *msg;
      {
    ! #define	OUT_QUEUE_LIMIT 10	/* arbitrary, for now */
      	REPMGR_IOVECS iovecs;
      	SITE_STRING_BUFFER buffer;
      	int ret;
      	size_t nw;
      	size_t total_written;
      
      	DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
      	if (!STAILQ_EMPTY(&conn->outbound_queue)) {
      		/*
    --- 297,355 ----
       * intersperse writes that are part of two single messages.
       *
       * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
    !  * PUBLIC:    u_int, const DBT *, const DBT *, int));
       */
      int
    ! __repmgr_send_one(dbenv, conn, msg_type, control, rec, blockable)
      	DB_ENV *dbenv;
      	REPMGR_CONNECTION *conn;
      	u_int msg_type;
      	const DBT *control, *rec;
    + 	int blockable;
      {
      	struct sending_msg msg;
      
      	setup_sending_msg(&msg, msg_type, control, rec);
    ! 	return (__repmgr_send_internal(dbenv, conn, &msg, blockable));
      }
      
      /*
       * Attempts a "best effort" to send a message on the given site.  If there is an
    !  * excessive backlog of message already queued on the connection, what shall we
    !  * do?  If the caller doesn't mind blocking, we'll wait (a limited amount of
    !  * time) for the queue to drain.  Otherwise we'll simply drop the message.  This
    !  * is always allowed by the replication protocol.  But in the case of a
    !  * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we
    !  * almost always get a flood of messages that instantly fills our queue, so
    !  * blocking improves performance (by avoiding the need for the client to
    !  * re-request).
    !  *
    !  * How long shall we wait?  We could of course create a new timeout
    !  * configuration type, so that the application could set it directly.  But that
    !  * would start to overwhelm the user with too many choices to think about.  We
    !  * already have an ACK timeout, which is the user's estimate of how long it
    !  * should take to send a message to the client, have it be processed, and return
    !  * a message back to us.  We multiply that by the queue size, because that's how
    !  * many messages have to be swallowed up by the client before we're able to
    !  * start sending again (at least to a rough approximation).
       */
      static int
    ! __repmgr_send_internal(dbenv, conn, msg, blockable)
      	DB_ENV *dbenv;
      	REPMGR_CONNECTION *conn;
      	struct sending_msg *msg;
    + 	int blockable;
      {
    ! 	DB_REP *db_rep;
      	REPMGR_IOVECS iovecs;
      	SITE_STRING_BUFFER buffer;
    + 	db_timeout_t drain_to;
      	int ret;
      	size_t nw;
      	size_t total_written;
      
    + 	db_rep = dbenv->rep_handle;
    + 
      	DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
      	if (!STAILQ_EMPTY(&conn->outbound_queue)) {
      		/*
    ***************
    *** 344,358 ****
      		RPRINT(dbenv, (dbenv, "msg to %s to be queued",
      		    __repmgr_format_eid_loc(dbenv->rep_handle,
      		    conn->eid, buffer)));
      		if (conn->out_queue_length < OUT_QUEUE_LIMIT)
      			return (enqueue_msg(dbenv, conn, msg, 0));
      		else {
      			RPRINT(dbenv, (dbenv, "queue limit exceeded"));
      			STAT(dbenv->rep_handle->
      			    region->mstat.st_msgs_dropped++);
    ! 			return (0);
      		}
      	}
      
      	/*
      	 * Send as much data to the site as we can, without blocking.  Keep
    --- 360,393 ----
      		RPRINT(dbenv, (dbenv, "msg to %s to be queued",
      		    __repmgr_format_eid_loc(dbenv->rep_handle,
      		    conn->eid, buffer)));
    + 		if (conn->out_queue_length >= OUT_QUEUE_LIMIT &&
    + 		    blockable && !F_ISSET(conn, CONN_CONGESTED)) {
    + 			RPRINT(dbenv, (dbenv,
    + 			    "block msg thread, await queue space"));
    + 
    + 			if ((drain_to = db_rep->ack_timeout) == 0)
    + 				drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
    + 			conn->blockers++;
    + 			ret = __repmgr_await_drain(dbenv,
    + 			    conn, drain_to * OUT_QUEUE_LIMIT);
    + 			conn->blockers--;
    + 			if (db_rep->finished)
    + 				return (DB_TIMEOUT);
    + 			if (ret != 0)
    + 				return (ret);
    + 			if (STAILQ_EMPTY(&conn->outbound_queue))
    + 				goto empty;
    + 		}
      		if (conn->out_queue_length < OUT_QUEUE_LIMIT)
      			return (enqueue_msg(dbenv, conn, msg, 0));
      		else {
      			RPRINT(dbenv, (dbenv, "queue limit exceeded"));
      			STAT(dbenv->rep_handle->
      			    region->mstat.st_msgs_dropped++);
    ! 			return (blockable ? DB_TIMEOUT : 0);
      		}
      	}
    + empty:
      
      	/*
      	 * Send as much data to the site as we can, without blocking.  Keep
    ***************
    *** 498,521 ****
      
      /*
       * Abandons a connection, to recover from an error.  Upon entry the conn struct
    !  * must be on the connections list.
    !  *
    !  * If the 'do_close' flag is true, we do the whole job; the clean-up includes
    !  * removing the struct from the list and freeing all its memory, so upon return
    !  * the caller must not refer to it any further.  Otherwise, we merely mark the
    !  * connection for clean-up later by the main thread.
       *
       * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
    !  * PUBLIC:     REPMGR_CONNECTION *, int));
       *
       * !!!
       * Caller holds mutex.
       */
      int
    ! __repmgr_bust_connection(dbenv, conn, do_close)
      	DB_ENV *dbenv;
      	REPMGR_CONNECTION *conn;
    - 	int do_close;
      {
      	DB_REP *db_rep;
      	int connecting, ret, eid;
    --- 533,553 ----
      
      /*
       * Abandons a connection, to recover from an error.  Upon entry the conn struct
    !  * must be on the connections list.  For now, just mark it as unusable; it will
    !  * be fully cleaned up in the top-level select thread, as soon as possible.
       *
       * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
    !  * PUBLIC:     REPMGR_CONNECTION *));
       *
       * !!!
       * Caller holds mutex.
    +  *
    +  * Must be idempotent
       */
      int
    ! __repmgr_bust_connection(dbenv, conn)
      	DB_ENV *dbenv;
      	REPMGR_CONNECTION *conn;
      {
      	DB_REP *db_rep;
      	int connecting, ret, eid;
    ***************
    *** 526,537 ****
      	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
      	eid = conn->eid;
      	connecting = F_ISSET(conn, CONN_CONNECTING);
    ! 	if (do_close)
    ! 		__repmgr_cleanup_connection(dbenv, conn);
    ! 	else {
    ! 		F_SET(conn, CONN_DEFUNCT);
    ! 		conn->eid = -1;
    ! 	}
      
      	/*
      	 * When we first accepted the incoming connection, we set conn->eid to
    --- 558,566 ----
      	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
      	eid = conn->eid;
      	connecting = F_ISSET(conn, CONN_CONNECTING);
    ! 
    ! 	F_SET(conn, CONN_DEFUNCT);
    ! 	conn->eid = -1;
      
      	/*
      	 * When we first accepted the incoming connection, we set conn->eid to
    ***************
    *** 557,563 ****
      			    dbenv, ELECT_FAILURE_ELECTION)) != 0)
      				return (ret);
      		}
    ! 	} else if (!do_close) {
      		/*
      		 * One way or another, make sure the main thread is poked, so
      		 * that we do the deferred clean-up.
    --- 586,592 ----
      			    dbenv, ELECT_FAILURE_ELECTION)) != 0)
      				return (ret);
      		}
    ! 	} else {
      		/*
      		 * One way or another, make sure the main thread is poked, so
      		 * that we do the deferred clean-up.
    ***************
    *** 568,577 ****
      }
      
      /*
    !  * PUBLIC: void __repmgr_cleanup_connection
       * PUBLIC:    __P((DB_ENV *, REPMGR_CONNECTION *));
       */
    ! void
      __repmgr_cleanup_connection(dbenv, conn)
      	DB_ENV *dbenv;
      	REPMGR_CONNECTION *conn;
    --- 597,610 ----
      }
      
      /*
    !  * PUBLIC: int __repmgr_cleanup_connection
       * PUBLIC:    __P((DB_ENV *, REPMGR_CONNECTION *));
    +  *
    +  * !!!
    +  * Idempotent.  This can be called repeatedly as blocking message threads (of
    +  * which there could be multiples) wake up in case of error on the connection.
       */
    ! int
      __repmgr_cleanup_connection(dbenv, conn)
      	DB_ENV *dbenv;
      	REPMGR_CONNECTION *conn;
    ***************
    *** 580,596 ****
      	QUEUED_OUTPUT *out;
      	REPMGR_FLAT *msg;
      	DBT *dbt;
      
      	db_rep = dbenv->rep_handle;
      
    ! 	TAILQ_REMOVE(&db_rep->connections, conn, entries);
      	if (conn->fd != INVALID_SOCKET) {
    ! 		(void)closesocket(conn->fd);
      #ifdef DB_WIN32
    ! 		(void)WSACloseEvent(conn->event_object);
      #endif
      	}
      
      	/*
      	 * Deallocate any input and output buffers we may have.
      	 */
    --- 613,643 ----
      	QUEUED_OUTPUT *out;
      	REPMGR_FLAT *msg;
      	DBT *dbt;
    + 	int ret;
      
      	db_rep = dbenv->rep_handle;
      
    ! 	DB_ASSERT(dbenv, F_ISSET(conn, CONN_DEFUNCT) || db_rep->finished);
    ! 	
      	if (conn->fd != INVALID_SOCKET) {
    ! 		ret = closesocket(conn->fd);
    ! 		conn->fd = INVALID_SOCKET;
    ! 		if (ret == SOCKET_ERROR) {
    ! 			ret = net_errno;
    ! 			__db_err(dbenv, ret, "closing socket");
    ! 		}
      #ifdef DB_WIN32
    ! 		if (!WSACloseEvent(conn->event_object) && ret != 0)
    ! 			ret = net_errno;
      #endif
    + 		if (ret != 0)
    + 			return (ret);
      	}
      
    + 	if (conn->blockers > 0)
    + 		return (__repmgr_signal(&conn->drained));
    + 
    + 	TAILQ_REMOVE(&db_rep->connections, conn, entries);
      	/*
      	 * Deallocate any input and output buffers we may have.
      	 */
    ***************
    *** 614,620 ****
    --- 661,669 ----
      		__os_free(dbenv, out);
      	}
      
    + 	ret = __repmgr_free_cond(&conn->drained);
      	__os_free(dbenv, conn);
    + 	return (ret);
      }
      
      static int
    ***************
    *** 1063,1069 ****
      
      	while (!TAILQ_EMPTY(&db_rep->connections)) {
      		conn = TAILQ_FIRST(&db_rep->connections);
    ! 		__repmgr_cleanup_connection(dbenv, conn);
      	}
      
      	for (i = 0; i < db_rep->site_cnt; i++) {
    --- 1112,1118 ----
      
      	while (!TAILQ_EMPTY(&db_rep->connections)) {
      		conn = TAILQ_FIRST(&db_rep->connections);
    ! 		(void)__repmgr_cleanup_connection(dbenv, conn);
      	}
      
      	for (i = 0; i < db_rep->site_cnt; i++) {
    *** repmgr/repmgr_posix.c	2007-10-31 10:23:52.000000000 -0700
    --- repmgr/repmgr_posix.c	2007-10-31 10:23:53.000000000 -0700
    ***************
    *** 21,26 ****
    --- 21,28 ----
      size_t __repmgr_guesstimated_max = (128 * 1024);
      #endif
      
    + static int __repmgr_conn_work __P((DB_ENV *,
    +     REPMGR_CONNECTION *, fd_set *, fd_set *, int));
      static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *));
      
      /*
    ***************
    *** 189,194 ****
    --- 191,284 ----
      }
      
      /*
    +  * PUBLIC: int __repmgr_await_drain __P((DB_ENV *,
    +  * PUBLIC:    REPMGR_CONNECTION *, db_timeout_t));
    +  *
    +  * Waits for space to become available on the connection's output queue.
    +  * Various ways we can exit:
    +  *
    +  * 1. queue becomes non-full
    +  * 2. exceed time limit
    +  * 3. connection becomes defunct (due to error in another thread)
    +  * 4. repmgr is shutting down
    +  * 5. any unexpected system resource failure
    +  *
    +  * In cases #3 and #5 we return an error code.  Caller is responsible for
    +  * distinguishing the remaining cases if desired.
    +  * 
    +  * !!!
    +  * Caller must hold repmgr->mutex.
    +  */
    + int
    + __repmgr_await_drain(dbenv, conn, timeout)
    + 	DB_ENV *dbenv;
    + 	REPMGR_CONNECTION *conn;
    + 	db_timeout_t timeout;
    + {
    + 	DB_REP *db_rep;
    + 	struct timespec deadline;
    + 	int ret;
    + 
    + 	db_rep = dbenv->rep_handle;
    + 
    + 	__repmgr_compute_wait_deadline(dbenv, &deadline, timeout);
    + 
    + 	ret = 0;
    + 	while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
    + 		ret = pthread_cond_timedwait(&conn->drained,
    + 		    &db_rep->mutex, &deadline);
    + 		switch (ret) {
    + 		case 0:
    + 			if (db_rep->finished)
    + 				goto out; /* #4. */
    + 			/*
    + 			 * Another thread could have stumbled into an error on
    + 			 * the socket while we were waiting.
    + 			 */
    + 			if (F_ISSET(conn, CONN_DEFUNCT)) {
    + 				ret = DB_REP_UNAVAIL; /* #3. */
    + 				goto out;
    + 			}
    + 			break;
    + 		case ETIMEDOUT:
    + 			F_SET(conn, CONN_CONGESTED);
    + 			ret = 0;
    + 			goto out; /* #2. */
    + 		default:
    + 			goto out; /* #5. */
    + 		}
    + 	}
    + 	/* #1. */
    + 
    + out:
    + 	return (ret);
    + }
    + 
    + /*
    +  * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *));
    +  *
    +  * Initialize a condition variable (in allocated space).
    +  */
    + int
    + __repmgr_alloc_cond(c)
    + 	cond_var_t *c;
    + {
    + 	return (pthread_cond_init(c, NULL));
    + }
    + 
    + /*
    +  * PUBLIC: int __repmgr_free_cond __P((cond_var_t *));
    +  *
    +  * Clean up a previously initialized condition variable.
    +  */
    + int
    + __repmgr_free_cond(c)
    + 	cond_var_t *c;
    + {
    + 	return (pthread_cond_destroy(c));
    + }
    + 
    + /*
       * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
       *
       * Allocate/initialize all data necessary for thread synchronization.  This
    ***************
    *** 443,449 ****
      	REPMGR_RETRY *retry;
      	db_timespec timeout;
      	fd_set reads, writes;
    ! 	int ret, flow_control, maxfd, nready;
      	u_int8_t buf[10];	/* arbitrary size */
      
      	flow_control = FALSE;
    --- 533,539 ----
      	REPMGR_RETRY *retry;
      	db_timespec timeout;
      	fd_set reads, writes;
    ! 	int ret, flow_control, maxfd;
      	u_int8_t buf[10];	/* arbitrary size */
      
      	flow_control = FALSE;
    ***************
    *** 477,482 ****
    --- 567,575 ----
      		 * each one.
      		 */
      		TAILQ_FOREACH(conn, &db_rep->connections, entries) {
    + 			if (F_ISSET(conn, CONN_DEFUNCT))
    + 				continue;
    + 			
      			if (F_ISSET(conn, CONN_CONNECTING)) {
      				FD_SET((u_int)conn->fd, &reads);
      				FD_SET((u_int)conn->fd, &writes);
    ***************
    *** 533,616 ****
      				return (ret);
      			}
      		}
    - 		nready = ret;
    - 
      		LOCK_MUTEX(db_rep->mutex);
      
    - 		/*
    - 		 * The first priority thing we must do is to clean up any
    - 		 * pending defunct connections.  Otherwise, if they have any
    - 		 * lingering pending input, we get very confused if we try to
    - 		 * process it.
    - 		 *
    - 		 * The TAILQ_FOREACH macro would be suitable here, except that
    - 		 * it doesn't allow unlinking the current element, which is
    - 		 * needed for cleanup_connection.
    - 		 */
    - 		for (conn = TAILQ_FIRST(&db_rep->connections);
    - 		     conn != NULL;
    - 		     conn = next) {
    - 			next = TAILQ_NEXT(conn, entries);
    - 			if (F_ISSET(conn, CONN_DEFUNCT))
    - 				__repmgr_cleanup_connection(dbenv, conn);
    - 		}
    - 
      		if ((ret = __repmgr_retry_connections(dbenv)) != 0)
      			goto out;
    - 		if (nready == 0)
    - 			continue;
      
      		/*
    ! 		 * Traverse the linked list.  (Again, like TAILQ_FOREACH, except
    ! 		 * that we need the ability to unlink an element along the way.)
      		 */
      		for (conn = TAILQ_FIRST(&db_rep->connections);
      		     conn != NULL;
      		     conn = next) {
      			next = TAILQ_NEXT(conn, entries);
    ! 			if (F_ISSET(conn, CONN_CONNECTING)) {
    ! 				if (FD_ISSET((u_int)conn->fd, &reads) ||
    ! 				    FD_ISSET((u_int)conn->fd, &writes)) {
    ! 					if ((ret = finish_connecting(dbenv,
    ! 					    conn)) == DB_REP_UNAVAIL) {
    ! 						if ((ret =
    ! 						    __repmgr_bust_connection(
    ! 						    dbenv, conn, TRUE)) != 0)
    ! 							goto out;
    ! 					} else if (ret != 0)
    ! 						goto out;
    ! 				}
    ! 				continue;
    ! 			}
    ! 
    ! 			/*
    ! 			 * Here, the site is connected, and the FD_SET's are
    ! 			 * valid.
    ! 			 */
    ! 			if (FD_ISSET((u_int)conn->fd, &writes)) {
    ! 				if ((ret = __repmgr_write_some(
    ! 				    dbenv, conn)) == DB_REP_UNAVAIL) {
    ! 					if ((ret =
    ! 					    __repmgr_bust_connection(dbenv,
    ! 					    conn, TRUE)) != 0)
    ! 						goto out;
    ! 					continue;
    ! 				} else if (ret != 0)
    ! 					goto out;
    ! 			}
    ! 
    ! 			if (!flow_control &&
    ! 			    FD_ISSET((u_int)conn->fd, &reads)) {
    ! 				if ((ret = __repmgr_read_from_site(dbenv, conn))
    ! 				    == DB_REP_UNAVAIL) {
    ! 					if ((ret =
    ! 					    __repmgr_bust_connection(dbenv,
    ! 					    conn, TRUE)) != 0)
    ! 						goto out;
    ! 					continue;
    ! 				} else if (ret != 0)
    ! 					goto out;
    ! 			}
      		}
      
      		/*
    --- 626,650 ----
      				return (ret);
      			}
      		}
      		LOCK_MUTEX(db_rep->mutex);
      
      		if ((ret = __repmgr_retry_connections(dbenv)) != 0)
      			goto out;
      
      		/*
    ! 		 * Examine each connection, to see what work needs to be done.
    ! 		 * 
    ! 		 * The TAILQ_FOREACH macro would be suitable here, except that
    ! 		 * it doesn't allow unlinking the current element, which is
    ! 		 * needed for cleanup_connection.
      		 */
      		for (conn = TAILQ_FIRST(&db_rep->connections);
      		     conn != NULL;
      		     conn = next) {
      			next = TAILQ_NEXT(conn, entries);
    ! 			if ((ret = __repmgr_conn_work(dbenv,
    ! 			    conn, &reads, &writes, flow_control)) != 0)
    ! 				goto out;
      		}
      
      		/*
    ***************
    *** 637,642 ****
    --- 671,719 ----
      }
      
      static int
    + __repmgr_conn_work(dbenv, conn, reads, writes, flow_control)
    + 	DB_ENV *dbenv;
    + 	REPMGR_CONNECTION *conn;
    + 	fd_set *reads, *writes;
    + 	int flow_control;
    + {
    + 	int ret;
    + 	u_int fd;
    + 
    + 	if (F_ISSET(conn, CONN_DEFUNCT)) {
    + 		/*
    + 		 * Deferred clean-up, from an error that happened in another
    + 		 * thread, while we were sleeping in select().
    + 		*/
    + 		return (__repmgr_cleanup_connection(dbenv, conn));
    + 	}
    + 
    + 	ret = 0;
    + 	fd = (u_int)conn->fd;
    + 	
    + 	if (F_ISSET(conn, CONN_CONNECTING)) {
    + 		if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes))
    + 			ret = finish_connecting(dbenv, conn);
    + 	} else {
    + 		/*
    + 		 * Here, the site is connected, and the FD_SET's are valid.
    + 		 */
    + 		if (FD_ISSET(fd, writes))
    + 			ret = __repmgr_write_some(dbenv, conn);
    + 				
    + 		if (ret == 0 && !flow_control && FD_ISSET(fd, reads))
    + 			ret = __repmgr_read_from_site(dbenv, conn);
    + 	}
    + 
    + 	if (ret == DB_REP_UNAVAIL) {
    + 		if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
    + 			return (ret);
    + 		ret = __repmgr_cleanup_connection(dbenv, conn);
    + 	}
    + 	return (ret);
    + }
    + 
    + static int
      finish_connecting(dbenv, conn)
      	DB_ENV *dbenv;
      	REPMGR_CONNECTION *conn;
    ***************
    *** 657,662 ****
    --- 734,740 ----
      		goto err_rpt;
      	}
      
    + 	DB_ASSERT(dbenv, F_ISSET(conn, CONN_CONNECTING));
      	F_CLR(conn, CONN_CONNECTING);
      	return (__repmgr_send_handshake(dbenv, conn));
      
    ***************
    *** 671,690 ****
      	    "connecting to %s", __repmgr_format_site_loc(site, buffer));
      
      	/* If we've exhausted the list of possible addresses, give up. */
    ! 	if (ADDR_LIST_NEXT(&site->net_addr) == NULL)
      		return (DB_REP_UNAVAIL);
      
      	/*
      	 * This is just like a little mini-"bust_connection", except that we
      	 * don't reschedule for later, 'cuz we're just about to try again right
    ! 	 * now.
      	 *
      	 * !!!
      	 * Which means this must only be called on the select() thread, since
      	 * only there are we allowed to actually close a connection.
      	 */
      	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
    ! 	__repmgr_cleanup_connection(dbenv, conn);
      	ret = __repmgr_connect_site(dbenv, eid);
      	DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
      	return (ret);
    --- 749,773 ----
      	    "connecting to %s", __repmgr_format_site_loc(site, buffer));
      
      	/* If we've exhausted the list of possible addresses, give up. */
    ! 	if (ADDR_LIST_NEXT(&site->net_addr) == NULL) {
    ! 		STAT(db_rep->region->mstat.st_connect_fail++);
      		return (DB_REP_UNAVAIL);
    + 	}
      
      	/*
      	 * This is just like a little mini-"bust_connection", except that we
      	 * don't reschedule for later, 'cuz we're just about to try again right
    ! 	 * now.  (Note that we don't have to worry about message threads
    ! 	 * blocking on a full output queue: that can't happen when we're only
    ! 	 * just connecting.)
      	 *
      	 * !!!
      	 * Which means this must only be called on the select() thread, since
      	 * only there are we allowed to actually close a connection.
      	 */
      	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
    ! 	if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
    ! 		return (ret);
      	ret = __repmgr_connect_site(dbenv, eid);
      	DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
      	return (ret);
    *** repmgr/repmgr_sel.c	2007-10-31 10:23:52.000000000 -0700
    --- repmgr/repmgr_sel.c	2007-10-31 10:23:53.000000000 -0700
    ***************
    *** 36,45 ****
      
      /*
       * PUBLIC: int __repmgr_accept __P((DB_ENV *));
    -  *
    -  * !!!
    -  * Only ever called in the select() thread, since we may call
    -  * __repmgr_bust_connection(..., TRUE).
       */
      int
      __repmgr_accept(dbenv)
    --- 36,41 ----
    ***************
    *** 133,139 ****
      	case 0:
      		return (0);
      	case DB_REP_UNAVAIL:
    ! 		return (__repmgr_bust_connection(dbenv, conn, TRUE));
      	default:
      		return (ret);
      	}
    --- 129,135 ----
      	case 0:
      		return (0);
      	case DB_REP_UNAVAIL:
    ! 		return (__repmgr_bust_connection(dbenv, conn));
      	default:
      		return (ret);
      	}
    ***************
    *** 254,263 ****
       * starting with the "current" element of its address list and trying as many
       * addresses as necessary until the list is exhausted.
       *
    -  * !!!
    -  * Only ever called in the select() thread, since we may call
    -  * __repmgr_bust_connection(..., TRUE).
    -  *
       * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid));
       */
      int
    --- 250,255 ----
    ***************
    *** 332,338 ****
      		case 0:
      			break;
      		case DB_REP_UNAVAIL:
    ! 			return (__repmgr_bust_connection(dbenv, con, TRUE));
      		default:
      			return (ret);
      		}
    --- 324,330 ----
      		case 0:
      			break;
      		case DB_REP_UNAVAIL:
    ! 			return (__repmgr_bust_connection(dbenv, con));
      		default:
      			return (ret);
      		}
    ***************
    *** 437,443 ****
      
      	DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
      
    ! 	return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec));
      }
      
      /*
    --- 429,443 ----
      
      	DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
      
    ! 	/*
    ! 	 * It would of course be disastrous to block the select() thread, so
    ! 	 * pass the "blockable" argument as FALSE.  Fortunately blocking should
    ! 	 * never be necessary here, because the hand-shake is always the first
    ! 	 * thing we send.  Which is a good thing, because it would be almost as
    ! 	 * disastrous if we allowed ourselves to drop a handshake.
    ! 	 */
    ! 	return (__repmgr_send_one(dbenv,
    ! 	    conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE));
      }
      
      /*
    ***************
    *** 854,859 ****
    --- 854,872 ----
      			conn->out_queue_length--;
      			if (--msg->ref_count <= 0)
      				__os_free(dbenv, msg);
    + 
    + 			/*
    + 			 * We've achieved enough movement to free up at least
    + 			 * one space in the outgoing queue.  Wake any message
    + 			 * threads that may be waiting for space.  Clear the
    + 			 * CONGESTED status so that when the queue reaches the
    + 			 * high-water mark again, the filling thread will be
    + 			 * allowed to try waiting again.
    + 			 */
    + 			F_CLR(conn, CONN_CONGESTED);
    + 			if (conn->blockers > 0 &&
    + 			    (ret = __repmgr_signal(&conn->drained)) != 0)
    + 				return (ret);
      		}
      	}
      
    *** repmgr/repmgr_util.c	2007-10-31 10:23:52.000000000 -0700
    --- repmgr/repmgr_util.c	2007-10-31 10:23:53.000000000 -0700
    ***************
    *** 103,108 ****
    --- 103,113 ----
      	db_rep = dbenv->rep_handle;
      	if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0)
      		return (ret);
    + 	if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) {
    + 		__os_free(dbenv, c);
    + 		return (ret);
    + 	}
    + 	c->blockers = 0;
      
      	c->fd = s;
      	c->flags = flags;
    *** repmgr/repmgr_windows.c	2007-10-31 10:23:52.000000000 -0700
    --- repmgr/repmgr_windows.c	2007-10-31 10:23:53.000000000 -0700
    ***************
    *** 11,16 ****
    --- 11,19 ----
      #define	__INCLUDE_NETWORKING	1
      #include "db_int.h"
      
    + /* Convert time-out from microseconds to milliseconds, rounding up. */
    + #define	DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS)
    + 
      typedef struct __ack_waiter {
      	HANDLE event;
      	const DB_LSN *lsnp;
    ***************
    *** 120,136 ****
      {
      	DB_REP *db_rep;
      	ACK_WAITER *me;
    ! 	DWORD ret;
    ! 	DWORD timeout;
      
      	db_rep = dbenv->rep_handle;
      
      	if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
      		goto err;
      
    - 	/* convert time-out from microseconds to milliseconds, rounding up */
      	timeout = db_rep->ack_timeout > 0 ?
    ! 	    ((db_rep->ack_timeout + (US_PER_MS - 1)) / US_PER_MS) : INFINITE;
      	me->lsnp = lsnp;
      	if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
      	    FALSE)) == WAIT_FAILED) {
    --- 123,137 ----
      {
      	DB_REP *db_rep;
      	ACK_WAITER *me;
    ! 	DWORD ret, timeout;
      
      	db_rep = dbenv->rep_handle;
      
      	if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
      		goto err;
      
      	timeout = db_rep->ack_timeout > 0 ?
    ! 	    DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE;
      	me->lsnp = lsnp;
      	if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
      	    FALSE)) == WAIT_FAILED) {
    ***************
    *** 211,216 ****
    --- 212,296 ----
      	db_rep->waiters->first_free = slot;
      }
      
    + /* (See requirements described in repmgr_posix.c.) */
    + int
    + __repmgr_await_drain(dbenv, conn, timeout)
    + 	DB_ENV *dbenv;
    + 	REPMGR_CONNECTION *conn;
    + 	db_timeout_t timeout;
    + {
    + 	DB_REP *db_rep;
    + 	db_timespec deadline, delta, now;
    + 	db_timeout_t t;
    + 	DWORD duration, ret;
    + 	int round_up;
    + 	
    + 	db_rep = dbenv->rep_handle;
    + 	
    + 	__os_gettime(dbenv, &deadline);
    + 	DB_TIMEOUT_TO_TIMESPEC(timeout, &delta);
    + 	timespecadd(&deadline, &delta);
    + 
    + 	while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
    + 		if (!ResetEvent(conn->drained))
    + 			return (GetLastError());
    + 
    + 		/* How long until the deadline? */
    + 		__os_gettime(dbenv, &now);
    + 		if (timespeccmp(&now, &deadline, >=)) {
    + 			F_SET(conn, CONN_CONGESTED);
    + 			return (0);
    + 		}
    + 		delta = deadline;
    + 		timespecsub(&delta, &now);
    + 		round_up = TRUE;
    + 		DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up);
    + 		duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t);
    + 
    + 		ret = SignalObjectAndWait(db_rep->mutex,
    + 		    conn->drained, duration, FALSE);
    + 		LOCK_MUTEX(db_rep->mutex);
    + 		if (ret == WAIT_FAILED)
    + 			return (GetLastError());
    + 		else if (ret == WAIT_TIMEOUT) {
    + 			F_SET(conn, CONN_CONGESTED);
    + 			return (0);
    + 		} else
    + 			DB_ASSERT(dbenv, ret == WAIT_OBJECT_0);
    + 		
    + 		if (db_rep->finished)
    + 			return (0);
    + 		if (F_ISSET(conn, CONN_DEFUNCT))
    + 			return (DB_REP_UNAVAIL);
    + 	}
    + 	return (0);
    + }
    + 
    + /*
    +  * Creates a manual reset event, which is usually our best choice when we may
    +  * have multiple threads waiting on a single event.
    +  */
    + int
    + __repmgr_alloc_cond(c)
    + 	cond_var_t *c;
    + {
    + 	HANDLE event;
    + 
    + 	if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL)
    + 		return (GetLastError());
    + 	*c = event;
    + 	return (0);
    + }
    + 
    + int
    + __repmgr_free_cond(c)
    + 	cond_var_t *c;
    + {
    + 	if (CloseHandle(*c))
    + 		return (0);
    + 	return (GetLastError());
    + }
    + 
      /*
       * Make resource allocation an all-or-nothing affair, outside of this and the
       * close_sync function.  db_rep->waiters should be non-NULL iff all of these
    ***************
    *** 488,493 ****
    --- 568,576 ----
      		 * don't hurt anything flow-control-wise.
      		 */
      		TAILQ_FOREACH(conn, &db_rep->connections, entries) {
    + 			if (F_ISSET(conn, CONN_DEFUNCT))
    + 				continue;
    + 
      			if (F_ISSET(conn, CONN_CONNECTING) ||
      			    !STAILQ_EMPTY(&conn->outbound_queue) ||
      			    (!flow_control || !IS_VALID_EID(conn->eid))) {
    ***************
    *** 534,541 ****
      		     conn != NULL;
      		     conn = next) {
      			next = TAILQ_NEXT(conn, entries);
    ! 			if (F_ISSET(conn, CONN_DEFUNCT))
    ! 				__repmgr_cleanup_connection(dbenv, conn);
      		}
      
      		/*
    --- 617,626 ----
      		     conn != NULL;
      		     conn = next) {
      			next = TAILQ_NEXT(conn, entries);
    ! 			if (F_ISSET(conn, CONN_DEFUNCT) &&
    ! 			    (ret = __repmgr_cleanup_connection(dbenv,
    ! 			    conn)) != 0)
    ! 				goto unlock;
      		}
      
      		/*
    ***************
    *** 587,597 ****
      	return (ret);
      }
      
    - /*
    -  * !!!
    -  * Only ever called on the select() thread, since we may call
    -  * __repmgr_bust_connection(..., TRUE).
    -  */
      static int
      handle_completion(dbenv, conn)
      	DB_ENV *dbenv;
    --- 672,677 ----
    ***************
    *** 651,660 ****
      		}
      	}
      
    ! 	return (0);
    ! 
    ! err:	if (ret == DB_REP_UNAVAIL)
    ! 		return (__repmgr_bust_connection(dbenv, conn, TRUE));
      	return (ret);
      }
      
    --- 731,742 ----
      		}
      	}
      
    ! err:
    ! 	if (ret == DB_REP_UNAVAIL) {
    ! 		if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
    ! 			return (ret);
    ! 		ret = __repmgr_cleanup_connection(dbenv, conn);
    ! 	}
      	return (ret);
      }
      
    ***************
    *** 708,714 ****
      	}
      
      	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
    ! 	__repmgr_cleanup_connection(dbenv, conn);
      	ret = __repmgr_connect_site(dbenv, eid);
      	DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
      	return (ret);
    --- 790,797 ----
      	}
      
      	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
    ! 	if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
    ! 		return (ret);
      	ret = __repmgr_connect_site(dbenv, eid);
      	DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
      	return (ret);