diff options
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 131 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 9 |
2 files changed, 102 insertions, 38 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 7657e5d3562..fffc137f665 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -311,6 +311,15 @@ done: } +void +ssl_teardown_connection (socket_private_t *priv) +{ + SSL_shutdown(priv->ssl_ssl); + SSL_clear(priv->ssl_ssl); + SSL_free(priv->ssl_ssl); + priv->ssl_ssl = NULL; +} + ssize_t __socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount) @@ -581,10 +590,29 @@ __socket_writev (rpc_transport_t *this, struct iovec *vector, int count, int +__socket_shutdown (rpc_transport_t *this) +{ + int ret = -1; + socket_private_t *priv = this->private; + + priv->connected = -1; + ret = shutdown (priv->sock, SHUT_RDWR); + if (ret) { + /* its already disconnected.. no need to understand + why it failed to shutdown in normal cases */ + gf_log (this->name, GF_LOG_DEBUG, + "shutdown() returned %d. %s", + ret, strerror (errno)); + } + + return ret; +} + +int __socket_disconnect (rpc_transport_t *this) { - socket_private_t *priv = NULL; int ret = -1; + socket_private_t *priv = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -592,21 +620,7 @@ __socket_disconnect (rpc_transport_t *this) priv = this->private; if (priv->sock != -1) { - priv->connected = -1; - ret = shutdown (priv->sock, SHUT_RDWR); - if (ret) { - /* its already disconnected.. no need to understand - why it failed to shutdown in normal cases */ - gf_log (this->name, GF_LOG_DEBUG, - "shutdown() returned %d. %s", - ret, strerror (errno)); - } - if (priv->ssl_ssl) { - SSL_shutdown(priv->ssl_ssl); - SSL_clear(priv->ssl_ssl); - SSL_free(priv->ssl_ssl); - priv->ssl_ssl = NULL; - } + ret = __socket_shutdown(this); if (priv->own_thread) { /* * Without this, reconnect (= disconnect + connect) @@ -614,8 +628,19 @@ __socket_disconnect (rpc_transport_t *this) */ close(priv->sock); priv->sock = -1; - ++(priv->socket_gen); + /* + * Closing the socket forces an error that will wake + * up the polling thread. Wait for it to notice and + * respond. + */ + if (priv->ot_state == OT_ALIVE) { + priv->ot_state = OT_DYING; + pthread_cond_wait(&priv->ot_event,&priv->lock); + } } + else if (priv->use_ssl) { + ssl_teardown_connection(priv); + } } out: @@ -2226,7 +2251,6 @@ socket_poller (void *ctx) struct pollfd pfd[2] = {{0,},}; gf_boolean_t to_write = _gf_false; int ret = 0; - int orig_gen; if (priv->use_ssl) { if (ssl_setup_connection(this,priv->connected) < 0) { @@ -2246,8 +2270,6 @@ socket_poller (void *ctx) } } - orig_gen = ++(priv->socket_gen); - if (priv->connected == 0) { THIS = this->xl; ret = socket_connect_finish (this); @@ -2265,11 +2287,6 @@ socket_poller (void *ctx) } for (;;) { - if (priv->socket_gen != orig_gen) { - gf_log(this->name,GF_LOG_DEBUG, - "redundant poller exiting"); - return NULL; - } pthread_mutex_lock(&priv->lock); to_write = !list_empty(&priv->ioq); pthread_mutex_unlock(&priv->lock); @@ -2339,13 +2356,57 @@ socket_poller (void *ctx) err: /* All (and only) I/O errors should come here. */ - __socket_disconnect (this); + pthread_mutex_lock(&priv->lock); + if (priv->ot_state == OT_ALIVE) { + /* + * We have to do this if we're here because of an error we + * detected ourselves, but need to avoid a recursive call + * if our death is the result of an external disconnect. + */ + __socket_shutdown(this); + close(priv->sock); + priv->sock = -1; + } + if (priv->ssl_ssl) { + /* + * We're always responsible for this part, but only actually + * have to do it if we got far enough for ssl_ssl to be valid + * (i.e. errors in ssl_setup_connection don't count). + */ + ssl_teardown_connection(priv); + } + priv->ot_state = OT_IDLE; + /* + * We expect there to be only one waiter, but if there do happen to + * be multiple it's probably better to unblock them than to let them + * hang. If there are none, this is a harmless no-op. + */ + pthread_cond_broadcast(&priv->ot_event); + pthread_mutex_unlock(&priv->lock); rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, this); rpc_transport_unref (this); return NULL; } +void +socket_spawn (rpc_transport_t *this) +{ + socket_private_t *priv = this->private; + + if (priv->ot_state == OT_ALIVE) { + gf_log (this->name, GF_LOG_WARNING, + "refusing to start redundant poller"); + return; + } + + priv->ot_state = OT_ALIVE; + + if (pthread_create(&priv->thread,NULL,socket_poller,this) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "could not create poll thread"); + } +} int socket_server_event_handler (int fd, int idx, void *data, @@ -2489,12 +2550,7 @@ socket_server_event_handler (int fd, int idx, void *data, gf_log(this->name,GF_LOG_ERROR, "could not create pipe"); } - if (pthread_create(&new_priv->thread, - NULL, socket_poller, - new_trans) != 0) { - gf_log(this->name,GF_LOG_ERROR, - "could not create poll thread"); - } + socket_spawn(new_trans); } else { new_priv->idx = @@ -2752,11 +2808,7 @@ socket_connect (rpc_transport_t *this, int port) } this->listener = this; - if (pthread_create(&priv->thread,NULL, - socket_poller, this) != 0) { - gf_log(this->name,GF_LOG_ERROR, - "could not create poll thread"); - } + socket_spawn(this); } else { priv->idx = event_register (ctx->event_pool, priv->sock, @@ -3468,6 +3520,11 @@ socket_init (rpc_transport_t *this) SSL_CTX_set_verify(priv->ssl_ctx,SSL_VERIFY_PEER,0); } + if (priv->own_thread) { + priv->ot_state = OT_IDLE; + pthread_cond_init (&priv->ot_event, NULL); + } + out: this->private = priv; return 0; diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index 78faad9038d..bb342d99869 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -184,6 +184,12 @@ struct gf_sock_incoming { char *ra_buf; }; +typedef enum { + OT_IDLE, /* Uninitialized or termination complete. */ + OT_ALIVE, /* Past pthread_create, no error/disconnect. */ + OT_DYING, /* Disconnect in progress. */ +} ot_state_t; + typedef struct { int32_t sock; int32_t idx; @@ -222,7 +228,8 @@ typedef struct { pthread_t thread; int pipe[2]; gf_boolean_t own_thread; - volatile int socket_gen; + ot_state_t ot_state; + pthread_cond_t ot_event; } socket_private_t; |