diff options
Diffstat (limited to 'rpc')
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 131 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 9 | 
2 files changed, 102 insertions, 38 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 7657e5d3562..fffc137f665 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -311,6 +311,15 @@ done:  } +void +ssl_teardown_connection (socket_private_t *priv) +{ +        SSL_shutdown(priv->ssl_ssl); +        SSL_clear(priv->ssl_ssl); +        SSL_free(priv->ssl_ssl); +        priv->ssl_ssl = NULL; +} +  ssize_t  __socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount) @@ -581,10 +590,29 @@ __socket_writev (rpc_transport_t *this, struct iovec *vector, int count,  int +__socket_shutdown (rpc_transport_t *this) +{ +        int               ret = -1; +        socket_private_t *priv = this->private; + +        priv->connected = -1; +        ret = shutdown (priv->sock, SHUT_RDWR); +        if (ret) { +                /* its already disconnected.. no need to understand +                   why it failed to shutdown in normal cases */ +                gf_log (this->name, GF_LOG_DEBUG, +                        "shutdown() returned %d. %s", +                        ret, strerror (errno)); +        } + +        return ret; +} + +int  __socket_disconnect (rpc_transport_t *this)  { -        socket_private_t *priv = NULL;          int               ret = -1; +        socket_private_t *priv = NULL;          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -592,21 +620,7 @@ __socket_disconnect (rpc_transport_t *this)          priv = this->private;          if (priv->sock != -1) { -                priv->connected = -1; -                ret = shutdown (priv->sock, SHUT_RDWR); -                if (ret) { -                        /* its already disconnected.. no need to understand -                           why it failed to shutdown in normal cases */ -                        gf_log (this->name, GF_LOG_DEBUG, -                                "shutdown() returned %d. %s", -                                ret, strerror (errno)); -                } -		if (priv->ssl_ssl) { -			SSL_shutdown(priv->ssl_ssl); -			SSL_clear(priv->ssl_ssl); -			SSL_free(priv->ssl_ssl); -                        priv->ssl_ssl = NULL; -		} +                ret = __socket_shutdown(this);  		if (priv->own_thread) {  			/*  			 * Without this, reconnect (= disconnect + connect) @@ -614,8 +628,19 @@ __socket_disconnect (rpc_transport_t *this)  			 */  			close(priv->sock);  			priv->sock = -1; -			++(priv->socket_gen); +                        /* +                         * Closing the socket forces an error that will wake +                         * up the polling thread.  Wait for it to notice and +                         * respond. +                         */ +                        if (priv->ot_state == OT_ALIVE) { +                                priv->ot_state = OT_DYING; +                                pthread_cond_wait(&priv->ot_event,&priv->lock); +                        }  		} +                else if (priv->use_ssl) { +                        ssl_teardown_connection(priv); +                }          }  out: @@ -2226,7 +2251,6 @@ socket_poller (void *ctx)  	struct pollfd     pfd[2] = {{0,},};  	gf_boolean_t      to_write = _gf_false;  	int               ret = 0; -	int               orig_gen;          if (priv->use_ssl) {                  if (ssl_setup_connection(this,priv->connected) < 0) { @@ -2246,8 +2270,6 @@ socket_poller (void *ctx)                  }          } -	orig_gen = ++(priv->socket_gen); -          if (priv->connected == 0) {  		THIS = this->xl;                  ret = socket_connect_finish (this); @@ -2265,11 +2287,6 @@ socket_poller (void *ctx)          }  	for (;;) { -		if (priv->socket_gen != orig_gen) { -			gf_log(this->name,GF_LOG_DEBUG, -			       "redundant poller exiting"); -			return NULL; -		}  		pthread_mutex_lock(&priv->lock);  		to_write = !list_empty(&priv->ioq);  		pthread_mutex_unlock(&priv->lock); @@ -2339,13 +2356,57 @@ socket_poller (void *ctx)  err:  	/* All (and only) I/O errors should come here. */ -	__socket_disconnect (this); +        pthread_mutex_lock(&priv->lock); +        if (priv->ot_state == OT_ALIVE) { +                /* +                 * We have to do this if we're here because of an error we +                 * detected ourselves, but need to avoid a recursive call +                 * if our death is the result of an external disconnect. +                 */ +                __socket_shutdown(this); +                close(priv->sock); +                priv->sock = -1; +        } +        if (priv->ssl_ssl) { +                /* +                 * We're always responsible for this part, but only actually +                 * have to do it if we got far enough for ssl_ssl to be valid +                 * (i.e. errors in ssl_setup_connection don't count). +                 */ +                ssl_teardown_connection(priv); +        } +        priv->ot_state = OT_IDLE; +        /* +         * We expect there to be only one waiter, but if there do happen to +         * be multiple it's probably better to unblock them than to let them +         * hang.  If there are none, this is a harmless no-op. +         */ +        pthread_cond_broadcast(&priv->ot_event); +        pthread_mutex_unlock(&priv->lock);          rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, this);  	rpc_transport_unref (this);  	return NULL;  } +void +socket_spawn (rpc_transport_t *this) +{ +        socket_private_t        *priv   = this->private; + +        if (priv->ot_state == OT_ALIVE) { +                gf_log (this->name, GF_LOG_WARNING, +                        "refusing to start redundant poller"); +                return; +        } + +        priv->ot_state = OT_ALIVE; + +        if (pthread_create(&priv->thread,NULL,socket_poller,this) != 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "could not create poll thread"); +        } +}  int  socket_server_event_handler (int fd, int idx, void *data, @@ -2489,12 +2550,7 @@ socket_server_event_handler (int fd, int idx, void *data,  						gf_log(this->name,GF_LOG_ERROR,  						       "could not create pipe");  					} -					if (pthread_create(&new_priv->thread, -							NULL, socket_poller, -							new_trans) != 0) { -						gf_log(this->name,GF_LOG_ERROR, -						       "could not create poll thread"); -					} +                                        socket_spawn(new_trans);  				}  				else {  					new_priv->idx = @@ -2752,11 +2808,7 @@ socket_connect (rpc_transport_t *this, int port)  			}                          this->listener = this; -			if (pthread_create(&priv->thread,NULL, -					socket_poller, this) != 0) { -				gf_log(this->name,GF_LOG_ERROR, -				       "could not create poll thread"); -			} +                        socket_spawn(this);  		}  		else {  			priv->idx = event_register (ctx->event_pool, priv->sock, @@ -3468,6 +3520,11 @@ socket_init (rpc_transport_t *this)  		SSL_CTX_set_verify(priv->ssl_ctx,SSL_VERIFY_PEER,0);  	} +        if (priv->own_thread) { +                priv->ot_state = OT_IDLE; +                pthread_cond_init (&priv->ot_event, NULL); +        } +  out:          this->private = priv;          return 0; diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index 78faad9038d..bb342d99869 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -184,6 +184,12 @@ struct gf_sock_incoming {  	char                *ra_buf;  }; +typedef enum { +        OT_IDLE,        /* Uninitialized or termination complete. */ +        OT_ALIVE,       /* Past pthread_create, no error/disconnect. */ +        OT_DYING,       /* Disconnect in progress. */ +} ot_state_t; +  typedef struct {          int32_t                sock;          int32_t                idx; @@ -222,7 +228,8 @@ typedef struct {  	pthread_t              thread;  	int                    pipe[2];  	gf_boolean_t           own_thread; -	volatile int           socket_gen; +        ot_state_t             ot_state; +        pthread_cond_t         ot_event;  } socket_private_t;  | 
