diff options
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt-ping.c | 2 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 4 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 5 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 4 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 2 | ||||
-rw-r--r-- | rpc/rpc-transport/rdma/src/rdma.c | 19 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 98 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 6 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-handshake.c | 7 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-utils.c | 3 | ||||
-rw-r--r-- | xlators/protocol/client/src/client-handshake.c | 4 | ||||
-rw-r--r-- | xlators/protocol/server/src/server.c | 3 |
12 files changed, 116 insertions, 41 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt-ping.c b/rpc/rpc-lib/src/rpc-clnt-ping.c index 3eb7e90cb01..e042121ad47 100644 --- a/rpc/rpc-lib/src/rpc-clnt-ping.c +++ b/rpc/rpc-lib/src/rpc-clnt-ping.c @@ -159,7 +159,7 @@ rpc_clnt_ping_timer_expired (void *rpc_ptr) trans->peerinfo.identifier, conn->ping_timeout); - rpc_transport_disconnect (conn->trans); + rpc_transport_disconnect (conn->trans, _gf_false); } out: diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index b868f56bdb3..d39b5236b91 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -1854,7 +1854,7 @@ rpc_clnt_disable (struct rpc_clnt *rpc) pthread_mutex_unlock (&conn->lock); if (trans) { - rpc_transport_disconnect (trans); + rpc_transport_disconnect (trans, _gf_true); } if (unref) @@ -1913,7 +1913,7 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc) pthread_mutex_unlock (&conn->lock); if (trans) { - rpc_transport_disconnect (trans); + rpc_transport_disconnect (trans, _gf_true); } if (unref) rpc_clnt_unref (rpc); diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index 6ee5e15ede4..33e94450d9c 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -435,13 +435,14 @@ fail: int32_t -rpc_transport_disconnect (rpc_transport_t *this) +rpc_transport_disconnect (rpc_transport_t *this, gf_boolean_t wait) { int32_t ret = -1; GF_VALIDATE_OR_GOTO("rpc_transport", this, fail); - ret = this->ops->disconnect (this); + ret = this->ops->disconnect (this, wait); + fail: return ret; } diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index 4e7a8c46fae..717c40af13a 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -224,7 +224,7 @@ struct rpc_transport_ops { rpc_transport_reply_t *reply); int32_t (*connect) (rpc_transport_t *this, int port); int32_t (*listen) (rpc_transport_t *this); - int32_t (*disconnect) (rpc_transport_t *this); + int32_t (*disconnect) (rpc_transport_t *this, gf_boolean_t wait); int32_t (*get_peername) (rpc_transport_t *this, char *hostname, int hostlen); int32_t (*get_peeraddr) (rpc_transport_t *this, char *peeraddr, @@ -248,7 +248,7 @@ int32_t rpc_transport_connect (rpc_transport_t *this, int port); int32_t -rpc_transport_disconnect (rpc_transport_t *this); +rpc_transport_disconnect (rpc_transport_t *this, gf_boolean_t wait); int32_t rpc_transport_destroy (rpc_transport_t *this); diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index c792909cb87..52b57205f6d 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -1654,7 +1654,7 @@ rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name) ret = 0; out: if (!listener && trans) { - rpc_transport_disconnect (trans); + rpc_transport_disconnect (trans, _gf_true); } return ret; diff --git a/rpc/rpc-transport/rdma/src/rdma.c b/rpc/rpc-transport/rdma/src/rdma.c index 551ac072079..d2f04bd6d0c 100644 --- a/rpc/rpc-transport/rdma/src/rdma.c +++ b/rpc/rpc-transport/rdma/src/rdma.c @@ -51,7 +51,7 @@ static int32_t gf_rdma_teardown (rpc_transport_t *this); static int32_t -gf_rdma_disconnect (rpc_transport_t *this); +gf_rdma_disconnect (rpc_transport_t *this, gf_boolean_t wait); static void gf_rdma_cm_handle_disconnect (rpc_transport_t *this); @@ -1209,7 +1209,7 @@ gf_rdma_cm_handle_connect_init (struct rdma_cm_event *event) } if (ret < 0) { - gf_rdma_disconnect (this); + gf_rdma_disconnect (this, _gf_false); } return ret; @@ -3014,7 +3014,7 @@ gf_rdma_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) RDMA_MSG_WRITE_PEER_FAILED, "sending request to peer (%s) failed", this->peerinfo.identifier); - rpc_transport_disconnect (this); + rpc_transport_disconnect (this, _gf_false); } out: @@ -3051,7 +3051,7 @@ gf_rdma_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) RDMA_MSG_WRITE_PEER_FAILED, "sending request to peer (%s) failed", this->peerinfo.identifier); - rpc_transport_disconnect (this); + rpc_transport_disconnect (this, _gf_false); } out: @@ -4095,7 +4095,7 @@ gf_rdma_process_recv (gf_rdma_peer_t *peer, struct ibv_wc *wc) out: if (ret == -1) { - rpc_transport_disconnect (peer->trans); + rpc_transport_disconnect (peer->trans, _gf_false); } return; @@ -4216,7 +4216,8 @@ gf_rdma_recv_completion_proc (void *data) if (peer) { ibv_ack_cq_events (event_cq, num_wr); rpc_transport_unref (peer->trans); - rpc_transport_disconnect (peer->trans); + rpc_transport_disconnect (peer->trans, + _gf_false); } if (post) { @@ -4292,7 +4293,7 @@ gf_rdma_handle_failed_send_completion (gf_rdma_peer_t *peer, struct ibv_wc *wc) } if (peer) { - rpc_transport_disconnect (peer->trans); + rpc_transport_disconnect (peer->trans, _gf_false); } return; @@ -4343,7 +4344,7 @@ gf_rdma_handle_successful_send_completion (gf_rdma_peer_t *peer, ret = gf_rdma_pollin_notify (peer, post); if ((ret == -1) && (peer != NULL)) { - rpc_transport_disconnect (peer->trans); + rpc_transport_disconnect (peer->trans, _gf_false); } out: @@ -4657,7 +4658,7 @@ gf_rdma_init (rpc_transport_t *this) static int32_t -gf_rdma_disconnect (rpc_transport_t *this) +gf_rdma_disconnect (rpc_transport_t *this, gf_boolean_t wait) { gf_rdma_private_t *priv = NULL; int32_t ret = 0; diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 36388548937..d05dc4189aa 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -2344,7 +2344,7 @@ out: return ret; } -static int socket_disconnect (rpc_transport_t *this); +static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait); /* reads rpc_requests during pollin */ static int @@ -2375,7 +2375,7 @@ socket_event_handler (int fd, int idx, void *data, EINPROGRESS or ENOENT, so nothing more to do, fail reading/writing anything even if poll_in or poll_out is set */ - ret = socket_disconnect (this); + ret = socket_disconnect (this, _gf_false); /* Force ret to be -1, as we are officially done with this socket */ @@ -2424,6 +2424,13 @@ socket_poller (void *ctx) * conditionally */ THIS = this->xl; + GF_REF_GET (priv); + + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_DEBUG, "socket_poller is exiting " + "because socket state is OT_PLEASE_DIE"); + goto err; + } priv->ot_state = OT_RUNNING; @@ -2494,6 +2501,13 @@ socket_poller (void *ctx) break; } + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_DEBUG, + "OT_PLEASE_DIE on %p (exiting socket_poller)", + this); + break; + } + if (pfd[1].revents & POLL_MASK_INPUT) { ret = socket_event_poll_in(this); if (ret >= 0) { @@ -2507,7 +2521,6 @@ socket_poller (void *ctx) gf_log (this->name, GF_LOG_TRACE, "OT_IDLE on %p (input request)", this); - priv->ot_state = OT_IDLE; break; } } @@ -2524,7 +2537,6 @@ socket_poller (void *ctx) gf_log (this->name, GF_LOG_TRACE, "OT_IDLE on %p (output request)", this); - priv->ot_state = OT_IDLE; break; } } @@ -2561,22 +2573,24 @@ socket_poller (void *ctx) err: /* All (and only) I/O errors should come here. */ pthread_mutex_lock(&priv->lock); + { + __socket_teardown_connection (this); + sys_close (priv->sock); + priv->sock = -1; - __socket_teardown_connection (this); - sys_close (priv->sock); - priv->sock = -1; - - sys_close (priv->pipe[0]); - sys_close (priv->pipe[1]); - priv->pipe[0] = -1; - priv->pipe[1] = -1; - - priv->ot_state = OT_IDLE; + sys_close (priv->pipe[0]); + sys_close (priv->pipe[1]); + priv->pipe[0] = -1; + priv->pipe[1] = -1; + priv->ot_state = OT_IDLE; + } pthread_mutex_unlock(&priv->lock); rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); + GF_REF_PUT (priv); + rpc_transport_unref (this); return NULL; @@ -2848,16 +2862,39 @@ out: static int -socket_disconnect (rpc_transport_t *this) +socket_disconnect (rpc_transport_t *this, gf_boolean_t wait) { - socket_private_t *priv = NULL; - int ret = -1; + socket_private_t *priv = NULL; + int ret = -1; + char a_byte = 'r'; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; + if (wait && priv->own_thread) { + pthread_mutex_lock (&priv->cond_lock); + { + GF_REF_PUT (priv); + /* Change the state to OT_PLEASE_DIE so that + * socket_poller can exit. */ + priv->ot_state = OT_PLEASE_DIE; + /* Write something into the pipe so that poller + * thread can wake up.*/ + if (sys_write (priv->pipe[1], &a_byte, 1) < 1) { + gf_log (this->name, GF_LOG_WARNING, + "write error on pipe"); + } + + /* Wait for socket_poller to exit */ + if (!priv->own_thread_done) + pthread_cond_wait (&priv->cond, + &priv->cond_lock); + } + pthread_mutex_unlock (&priv->cond_lock); + } + pthread_mutex_lock (&priv->lock); { ret = __socket_disconnect (this); @@ -2937,6 +2974,7 @@ socket_connect (rpc_transport_t *this, int port) pthread_mutex_lock (&priv->lock); { + priv->own_thread_done = _gf_false; if (priv->sock != -1) { gf_log_callingfn (this->name, GF_LOG_TRACE, "connect () called on transport " @@ -3805,6 +3843,26 @@ init_openssl_mt (void) SSL_load_error_strings(); } +void +socket_poller_mayday (void *data) +{ + socket_private_t *priv = (socket_private_t *)data; + + if (priv == NULL) + return; + + pthread_mutex_lock (&priv->cond_lock); + { + /* Signal waiting threads before exiting from socket_poller */ + if (!priv->own_thread_done) { + gf_log ("socket", GF_LOG_TRACE, "priv->cond SIGNALED"); + pthread_cond_signal (&priv->cond); + priv->own_thread_done = _gf_true; + } + } + pthread_mutex_unlock (&priv->cond_lock); +} + static int socket_init (rpc_transport_t *this) { @@ -3835,6 +3893,10 @@ socket_init (rpc_transport_t *this) memset(priv,0,sizeof(*priv)); pthread_mutex_init (&priv->lock, NULL); + pthread_mutex_init (&priv->cond_lock, NULL); + pthread_cond_init (&priv->cond, NULL); + + GF_REF_INIT (priv, socket_poller_mayday); priv->sock = -1; priv->idx = -1; @@ -4265,6 +4327,8 @@ fini (rpc_transport_t *this) "transport %p destroyed", this); pthread_mutex_destroy (&priv->lock); + pthread_mutex_destroy (&priv->cond_lock); + pthread_cond_destroy (&priv->cond); if (priv->ssl_private_key) { GF_FREE(priv->ssl_private_key); } diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index 7c7005b59e7..8528bdeba8d 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -27,6 +27,7 @@ #include "dict.h" #include "mem-pool.h" #include "globals.h" +#include "refcount.h" #ifndef MAX_IOVEC #define MAX_IOVEC 16 @@ -215,6 +216,8 @@ typedef struct { }; struct gf_sock_incoming incoming; pthread_mutex_t lock; + pthread_mutex_t cond_lock; + pthread_cond_t cond; int windowsize; char lowlat; char nodelay; @@ -239,10 +242,13 @@ typedef struct { pthread_t thread; int pipe[2]; gf_boolean_t own_thread; + gf_boolean_t own_thread_done; ot_state_t ot_state; uint32_t ot_gen; gf_boolean_t is_server; int log_ctr; + GF_REF_DECL; /* refcount to keep track of socket_poller + threads */ } socket_private_t; diff --git a/xlators/mgmt/glusterd/src/glusterd-handshake.c b/xlators/mgmt/glusterd/src/glusterd-handshake.c index dc772ca6f51..550ee2c10b3 100644 --- a/xlators/mgmt/glusterd/src/glusterd-handshake.c +++ b/xlators/mgmt/glusterd/src/glusterd-handshake.c @@ -1824,7 +1824,7 @@ __glusterd_mgmt_hndsk_version_ack_cbk (struct rpc_req *req, struct iovec *iov, out: if (ret != 0 && peerinfo) - rpc_transport_disconnect (peerinfo->rpc->conn.trans); + rpc_transport_disconnect (peerinfo->rpc->conn.trans, _gf_false); rcu_read_unlock (); @@ -1949,7 +1949,8 @@ out: frame->local = NULL; STACK_DESTROY (frame->root); if (peerinfo) - rpc_transport_disconnect (peerinfo->rpc->conn.trans); + rpc_transport_disconnect (peerinfo->rpc->conn.trans, + _gf_false); } rcu_read_unlock (); @@ -2218,7 +2219,7 @@ __glusterd_peer_dump_version_cbk (struct rpc_req *req, struct iovec *iov, out: if (ret != 0 && peerinfo) - rpc_transport_disconnect (peerinfo->rpc->conn.trans); + rpc_transport_disconnect (peerinfo->rpc->conn.trans, _gf_false); rcu_read_unlock (); diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c index fedfb746c50..a1c9132feda 100644 --- a/xlators/mgmt/glusterd/src/glusterd-utils.c +++ b/xlators/mgmt/glusterd/src/glusterd-utils.c @@ -4059,7 +4059,8 @@ gd_check_and_update_rebalance_info (glusterd_volinfo_t *old_volinfo, //Disconnect from rebalance process if (glusterd_defrag_rpc_get (old->defrag)) { - rpc_transport_disconnect (old->defrag->rpc->conn.trans); + rpc_transport_disconnect (old->defrag->rpc->conn.trans, + _gf_false); glusterd_defrag_rpc_put (old->defrag); } diff --git a/xlators/protocol/client/src/client-handshake.c b/xlators/protocol/client/src/client-handshake.c index c28fa5dd7cd..adccd8d4e51 100644 --- a/xlators/protocol/client/src/client-handshake.c +++ b/xlators/protocol/client/src/client-handshake.c @@ -1558,7 +1558,7 @@ out: if (conf) { /* Need this to connect the same transport on different port */ /* ie, glusterd to glusterfsd */ - rpc_transport_disconnect (conf->rpc->conn.trans); + rpc_transport_disconnect (conf->rpc->conn.trans, _gf_false); } return ret; @@ -1677,7 +1677,7 @@ out: STACK_DESTROY (frame->root); if (ret != 0) - rpc_transport_disconnect (conf->rpc->conn.trans); + rpc_transport_disconnect (conf->rpc->conn.trans, _gf_false); return ret; } diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index a99c1470276..af3adb36ef2 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -897,7 +897,8 @@ reconfigure (xlator_t *this, dict_t *options) "unauthorized client, hence " "terminating the connection %s", xprt->peerinfo.identifier); - rpc_transport_disconnect(xprt); + rpc_transport_disconnect(xprt, + _gf_false); } } } |