diff options
author | Rajesh Joseph <rjoseph@redhat.com> | 2016-12-13 15:28:42 +0530 |
---|---|---|
committer | Raghavendra G <rgowdapp@redhat.com> | 2016-12-21 20:49:19 -0800 |
commit | af6769675acbbfd780fa2ece8587502d6d579372 (patch) | |
tree | e464f44c08163b9ba84e3d91d1e9d71efd5c04dc /rpc/rpc-transport | |
parent | 8b42e1b5688f8600086ecc0e33ac4abf5e7c2772 (diff) |
socket: socket disconnect should wait for poller thread exit
When SSL is enabled or if "transport.socket.own-thread" option is set
then socket_poller is run as different thread. Currently during
disconnect or PARENT_DOWN scenario we don't wait for this thread
to terminate. PARENT_DOWN will disconnect the socket layer and
cleanup resources used by socket_poller.
Therefore before disconnect we should wait for poller thread to exit.
Change-Id: I71f984b47d260ffd979102f180a99a0bed29f0d6
BUG: 1404181
Signed-off-by: Rajesh Joseph <rjoseph@redhat.com>
Reviewed-on: http://review.gluster.org/16141
Smoke: Gluster Build System <jenkins@build.gluster.org>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
Reviewed-by: Kaushal M <kaushal@redhat.com>
Reviewed-by: Raghavendra Talur <rtalur@redhat.com>
Reviewed-by: Raghavendra G <rgowdapp@redhat.com>
Diffstat (limited to 'rpc/rpc-transport')
-rw-r--r-- | rpc/rpc-transport/rdma/src/rdma.c | 19 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 98 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 6 |
3 files changed, 97 insertions, 26 deletions
diff --git a/rpc/rpc-transport/rdma/src/rdma.c b/rpc/rpc-transport/rdma/src/rdma.c index 551ac072079..d2f04bd6d0c 100644 --- a/rpc/rpc-transport/rdma/src/rdma.c +++ b/rpc/rpc-transport/rdma/src/rdma.c @@ -51,7 +51,7 @@ static int32_t gf_rdma_teardown (rpc_transport_t *this); static int32_t -gf_rdma_disconnect (rpc_transport_t *this); +gf_rdma_disconnect (rpc_transport_t *this, gf_boolean_t wait); static void gf_rdma_cm_handle_disconnect (rpc_transport_t *this); @@ -1209,7 +1209,7 @@ gf_rdma_cm_handle_connect_init (struct rdma_cm_event *event) } if (ret < 0) { - gf_rdma_disconnect (this); + gf_rdma_disconnect (this, _gf_false); } return ret; @@ -3014,7 +3014,7 @@ gf_rdma_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) RDMA_MSG_WRITE_PEER_FAILED, "sending request to peer (%s) failed", this->peerinfo.identifier); - rpc_transport_disconnect (this); + rpc_transport_disconnect (this, _gf_false); } out: @@ -3051,7 +3051,7 @@ gf_rdma_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) RDMA_MSG_WRITE_PEER_FAILED, "sending request to peer (%s) failed", this->peerinfo.identifier); - rpc_transport_disconnect (this); + rpc_transport_disconnect (this, _gf_false); } out: @@ -4095,7 +4095,7 @@ gf_rdma_process_recv (gf_rdma_peer_t *peer, struct ibv_wc *wc) out: if (ret == -1) { - rpc_transport_disconnect (peer->trans); + rpc_transport_disconnect (peer->trans, _gf_false); } return; @@ -4216,7 +4216,8 @@ gf_rdma_recv_completion_proc (void *data) if (peer) { ibv_ack_cq_events (event_cq, num_wr); rpc_transport_unref (peer->trans); - rpc_transport_disconnect (peer->trans); + rpc_transport_disconnect (peer->trans, + _gf_false); } if (post) { @@ -4292,7 +4293,7 @@ gf_rdma_handle_failed_send_completion (gf_rdma_peer_t *peer, struct ibv_wc *wc) } if (peer) { - rpc_transport_disconnect (peer->trans); + rpc_transport_disconnect (peer->trans, _gf_false); } return; @@ -4343,7 +4344,7 @@ gf_rdma_handle_successful_send_completion (gf_rdma_peer_t *peer, ret = gf_rdma_pollin_notify (peer, post); if ((ret == -1) && (peer != NULL)) { - rpc_transport_disconnect (peer->trans); + rpc_transport_disconnect (peer->trans, _gf_false); } out: @@ -4657,7 +4658,7 @@ gf_rdma_init (rpc_transport_t *this) static int32_t -gf_rdma_disconnect (rpc_transport_t *this) +gf_rdma_disconnect (rpc_transport_t *this, gf_boolean_t wait) { gf_rdma_private_t *priv = NULL; int32_t ret = 0; diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 36388548937..d05dc4189aa 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -2344,7 +2344,7 @@ out: return ret; } -static int socket_disconnect (rpc_transport_t *this); +static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait); /* reads rpc_requests during pollin */ static int @@ -2375,7 +2375,7 @@ socket_event_handler (int fd, int idx, void *data, EINPROGRESS or ENOENT, so nothing more to do, fail reading/writing anything even if poll_in or poll_out is set */ - ret = socket_disconnect (this); + ret = socket_disconnect (this, _gf_false); /* Force ret to be -1, as we are officially done with this socket */ @@ -2424,6 +2424,13 @@ socket_poller (void *ctx) * conditionally */ THIS = this->xl; + GF_REF_GET (priv); + + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_DEBUG, "socket_poller is exiting " + "because socket state is OT_PLEASE_DIE"); + goto err; + } priv->ot_state = OT_RUNNING; @@ -2494,6 +2501,13 @@ socket_poller (void *ctx) break; } + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_DEBUG, + "OT_PLEASE_DIE on %p (exiting socket_poller)", + this); + break; + } + if (pfd[1].revents & POLL_MASK_INPUT) { ret = socket_event_poll_in(this); if (ret >= 0) { @@ -2507,7 +2521,6 @@ socket_poller (void *ctx) gf_log (this->name, GF_LOG_TRACE, "OT_IDLE on %p (input request)", this); - priv->ot_state = OT_IDLE; break; } } @@ -2524,7 +2537,6 @@ socket_poller (void *ctx) gf_log (this->name, GF_LOG_TRACE, "OT_IDLE on %p (output request)", this); - priv->ot_state = OT_IDLE; break; } } @@ -2561,22 +2573,24 @@ socket_poller (void *ctx) err: /* All (and only) I/O errors should come here. */ pthread_mutex_lock(&priv->lock); + { + __socket_teardown_connection (this); + sys_close (priv->sock); + priv->sock = -1; - __socket_teardown_connection (this); - sys_close (priv->sock); - priv->sock = -1; - - sys_close (priv->pipe[0]); - sys_close (priv->pipe[1]); - priv->pipe[0] = -1; - priv->pipe[1] = -1; - - priv->ot_state = OT_IDLE; + sys_close (priv->pipe[0]); + sys_close (priv->pipe[1]); + priv->pipe[0] = -1; + priv->pipe[1] = -1; + priv->ot_state = OT_IDLE; + } pthread_mutex_unlock(&priv->lock); rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); + GF_REF_PUT (priv); + rpc_transport_unref (this); return NULL; @@ -2848,16 +2862,39 @@ out: static int -socket_disconnect (rpc_transport_t *this) +socket_disconnect (rpc_transport_t *this, gf_boolean_t wait) { - socket_private_t *priv = NULL; - int ret = -1; + socket_private_t *priv = NULL; + int ret = -1; + char a_byte = 'r'; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; + if (wait && priv->own_thread) { + pthread_mutex_lock (&priv->cond_lock); + { + GF_REF_PUT (priv); + /* Change the state to OT_PLEASE_DIE so that + * socket_poller can exit. */ + priv->ot_state = OT_PLEASE_DIE; + /* Write something into the pipe so that poller + * thread can wake up.*/ + if (sys_write (priv->pipe[1], &a_byte, 1) < 1) { + gf_log (this->name, GF_LOG_WARNING, + "write error on pipe"); + } + + /* Wait for socket_poller to exit */ + if (!priv->own_thread_done) + pthread_cond_wait (&priv->cond, + &priv->cond_lock); + } + pthread_mutex_unlock (&priv->cond_lock); + } + pthread_mutex_lock (&priv->lock); { ret = __socket_disconnect (this); @@ -2937,6 +2974,7 @@ socket_connect (rpc_transport_t *this, int port) pthread_mutex_lock (&priv->lock); { + priv->own_thread_done = _gf_false; if (priv->sock != -1) { gf_log_callingfn (this->name, GF_LOG_TRACE, "connect () called on transport " @@ -3805,6 +3843,26 @@ init_openssl_mt (void) SSL_load_error_strings(); } +void +socket_poller_mayday (void *data) +{ + socket_private_t *priv = (socket_private_t *)data; + + if (priv == NULL) + return; + + pthread_mutex_lock (&priv->cond_lock); + { + /* Signal waiting threads before exiting from socket_poller */ + if (!priv->own_thread_done) { + gf_log ("socket", GF_LOG_TRACE, "priv->cond SIGNALED"); + pthread_cond_signal (&priv->cond); + priv->own_thread_done = _gf_true; + } + } + pthread_mutex_unlock (&priv->cond_lock); +} + static int socket_init (rpc_transport_t *this) { @@ -3835,6 +3893,10 @@ socket_init (rpc_transport_t *this) memset(priv,0,sizeof(*priv)); pthread_mutex_init (&priv->lock, NULL); + pthread_mutex_init (&priv->cond_lock, NULL); + pthread_cond_init (&priv->cond, NULL); + + GF_REF_INIT (priv, socket_poller_mayday); priv->sock = -1; priv->idx = -1; @@ -4265,6 +4327,8 @@ fini (rpc_transport_t *this) "transport %p destroyed", this); pthread_mutex_destroy (&priv->lock); + pthread_mutex_destroy (&priv->cond_lock); + pthread_cond_destroy (&priv->cond); if (priv->ssl_private_key) { GF_FREE(priv->ssl_private_key); } diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index 7c7005b59e7..8528bdeba8d 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -27,6 +27,7 @@ #include "dict.h" #include "mem-pool.h" #include "globals.h" +#include "refcount.h" #ifndef MAX_IOVEC #define MAX_IOVEC 16 @@ -215,6 +216,8 @@ typedef struct { }; struct gf_sock_incoming incoming; pthread_mutex_t lock; + pthread_mutex_t cond_lock; + pthread_cond_t cond; int windowsize; char lowlat; char nodelay; @@ -239,10 +242,13 @@ typedef struct { pthread_t thread; int pipe[2]; gf_boolean_t own_thread; + gf_boolean_t own_thread_done; ot_state_t ot_state; uint32_t ot_gen; gf_boolean_t is_server; int log_ctr; + GF_REF_DECL; /* refcount to keep track of socket_poller + threads */ } socket_private_t; |