diff options
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 104 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.h | 1 | ||||
-rw-r--r-- | xlators/features/changelog/src/changelog-ev-handle.c | 12 | ||||
-rw-r--r-- | xlators/features/changelog/src/changelog-rpc.c | 5 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-rebalance.c | 1 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-utils.c | 1 |
6 files changed, 117 insertions, 7 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 27e394093cf..a9e43eb42f1 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -121,6 +121,7 @@ call_bail (void *data) struct timespec timeout = {0,}; struct iovec iov = {0,}; char peerid[UNIX_PATH_MAX] = {0}; + gf_boolean_t need_unref = _gf_false; GF_VALIDATE_OR_GOTO ("client", data, out); @@ -153,6 +154,8 @@ call_bail (void *data) timeout.tv_sec = 10; timeout.tv_nsec = 0; + /* Ref rpc as it's added to timer event queue */ + rpc_clnt_ref (clnt); gf_timer_call_cancel (clnt->ctx, conn->timer); conn->timer = gf_timer_call_after (clnt->ctx, timeout, @@ -163,6 +166,7 @@ call_bail (void *data) gf_log (conn->name, GF_LOG_WARNING, "Cannot create bailout timer for %s", peerid); + need_unref = _gf_true; } } @@ -205,6 +209,9 @@ call_bail (void *data) mem_put (trav); } out: + rpc_clnt_unref (clnt); + if (need_unref) + rpc_clnt_unref (clnt); return; } @@ -230,6 +237,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, if (conn->timer == NULL) { timeout.tv_sec = 10; timeout.tv_nsec = 0; + rpc_clnt_ref (rpc_clnt); conn->timer = gf_timer_call_after (rpc_clnt->ctx, timeout, call_bail, @@ -389,6 +397,7 @@ rpc_clnt_reconnect (void *conn_ptr) struct timespec ts = {0, 0}; int32_t ret = 0; struct rpc_clnt *clnt = NULL; + gf_boolean_t need_unref = _gf_false; conn = conn_ptr; clnt = conn->rpc_clnt; @@ -413,10 +422,16 @@ rpc_clnt_reconnect (void *conn_ptr) "attempting reconnect"); ret = rpc_transport_connect (trans, conn->config.remote_port); + rpc_clnt_ref (clnt); conn->reconnect = gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, conn); + if (!conn->reconnect) { + need_unref = _gf_true; + gf_log (conn->name, GF_LOG_ERROR, + "Error adding to timer event queue"); + } } else { gf_log (conn->name, GF_LOG_TRACE, "breaking reconnect chain"); @@ -424,6 +439,9 @@ rpc_clnt_reconnect (void *conn_ptr) } pthread_mutex_unlock (&conn->lock); + rpc_clnt_unref (clnt); + if (need_unref) + rpc_clnt_unref (clnt); return; } @@ -463,6 +481,8 @@ int rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn) { struct rpc_clnt *clnt = NULL; + int ret = 0; + gf_boolean_t reconnect_unref = _gf_false; if (!conn) { goto out; @@ -474,13 +494,18 @@ rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn) { if (conn->reconnect) { - gf_timer_call_cancel (clnt->ctx, conn->reconnect); + ret = gf_timer_call_cancel (clnt->ctx, conn->reconnect); + if (!ret) + reconnect_unref = _gf_true; conn->reconnect = NULL; } } pthread_mutex_unlock (&conn->lock); + if (reconnect_unref) + rpc_clnt_unref (clnt); + out: return 0; } @@ -496,6 +521,8 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) struct saved_frames *saved_frames = NULL; struct rpc_clnt *clnt = NULL; int unref = 0; + int ret = 0; + gf_boolean_t timer_unref = _gf_false; if (!conn) { goto out; @@ -511,7 +538,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) /* bailout logic cleanup */ if (conn->timer) { - gf_timer_call_cancel (clnt->ctx, conn->timer); + ret = gf_timer_call_cancel (clnt->ctx, conn->timer); + if (!ret) + timer_unref = _gf_true; conn->timer = NULL; } @@ -528,6 +557,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) if (unref) rpc_clnt_unref (clnt); + if (timer_unref) + rpc_clnt_unref (clnt); + out: return 0; } @@ -851,6 +883,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, rpc_transport_pollin_t *pollin = NULL; struct timespec ts = {0, }; void *clnt_mydata = NULL; + gf_boolean_t unref_clnt = _gf_false; DECLARE_OLD_THIS; conn = mydata; @@ -875,10 +908,16 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, ts.tv_sec = 10; ts.tv_nsec = 0; + rpc_clnt_ref (clnt); conn->reconnect = gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, conn); + if (conn->reconnect == NULL) { + gf_log (conn->name, GF_LOG_WARNING, + "Cannot create rpc_clnt_reconnect timer"); + unref_clnt = _gf_true; + } } } pthread_mutex_unlock (&conn->lock); @@ -886,6 +925,9 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, if (clnt->notifyfn) ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL); + if (unref_clnt) + rpc_clnt_ref (clnt); + break; } @@ -1135,6 +1177,10 @@ rpc_clnt_start (struct rpc_clnt *rpc) rpc->disabled = 0; } pthread_mutex_unlock (&conn->lock); + /* Corresponding unref will be either on successful timer cancel or last + * rpc_clnt_reconnect fire event. + */ + rpc_clnt_ref (rpc); rpc_clnt_reconnect (conn); return 0; @@ -1512,6 +1558,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int proglen = 0; char new_iobref = 0; uint64_t callid = 0; + gf_boolean_t need_unref = _gf_false; if (!rpc || !prog || !frame) { goto out; @@ -1596,6 +1643,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, if ((ret >= 0) && frame) { /* Save the frame in queue */ __save_frame (rpc, frame, rpcreq); + + /* A ref on rpc-clnt object is taken while registering + * call_bail to timer in __save_frame. If it fails to + * register, it needs an unref and should happen outside + * conn->lock which otherwise leads to deadlocks */ + if (conn->timer == NULL) + need_unref = _gf_true; + conn->msgcnt++; gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request " @@ -1607,6 +1662,9 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, } pthread_mutex_unlock (&conn->lock); + if (need_unref) + rpc_clnt_unref (rpc); + if (ret == -1) { goto out; } @@ -1739,6 +1797,9 @@ rpc_clnt_disable (struct rpc_clnt *rpc) rpc_clnt_connection_t *conn = NULL; rpc_transport_t *trans = NULL; int unref = 0; + int ret = 0; + gf_boolean_t timer_unref = _gf_false; + gf_boolean_t reconnect_unref = _gf_false; if (!rpc) { goto out; @@ -1751,12 +1812,20 @@ rpc_clnt_disable (struct rpc_clnt *rpc) rpc->disabled = 1; if (conn->timer) { - gf_timer_call_cancel (rpc->ctx, conn->timer); + ret = gf_timer_call_cancel (rpc->ctx, conn->timer); + /* If the event is not fired and it actually cancelled + * the timer, do the unref else registered call back + * function will take care of it. + */ + if (!ret) + timer_unref = _gf_true; conn->timer = NULL; } if (conn->reconnect) { - gf_timer_call_cancel (rpc->ctx, conn->reconnect); + ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect); + if (!ret) + reconnect_unref = _gf_true; conn->reconnect = NULL; } conn->connected = 0; @@ -1774,6 +1843,12 @@ rpc_clnt_disable (struct rpc_clnt *rpc) if (unref) rpc_clnt_unref (rpc); + if (timer_unref) + rpc_clnt_unref (rpc); + + if (reconnect_unref) + rpc_clnt_unref (rpc); + out: return; } @@ -1784,6 +1859,9 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc) rpc_clnt_connection_t *conn = NULL; rpc_transport_t *trans = NULL; int unref = 0; + int ret = 0; + gf_boolean_t timer_unref = _gf_false; + gf_boolean_t reconnect_unref = _gf_false; if (!rpc) goto out; @@ -1794,12 +1872,20 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc) { rpc->disabled = 1; if (conn->timer) { - gf_timer_call_cancel (rpc->ctx, conn->timer); + ret = gf_timer_call_cancel (rpc->ctx, conn->timer); + /* If the event is not fired and it actually cancelled + * the timer, do the unref else registered call back + * function will take care of unref. + */ + if (!ret) + timer_unref = _gf_true; conn->timer = NULL; } if (conn->reconnect) { - gf_timer_call_cancel (rpc->ctx, conn->reconnect); + ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect); + if (!ret) + reconnect_unref = _gf_true; conn->reconnect = NULL; } conn->connected = 0; @@ -1815,6 +1901,12 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc) if (unref) rpc_clnt_unref (rpc); + if (timer_unref) + rpc_clnt_unref (rpc); + + if (reconnect_unref) + rpc_clnt_unref (rpc); + out: return; } diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h index 01caeb814c0..f84b4cbf806 100644 --- a/rpc/rpc-lib/src/rpc-clnt.h +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -231,6 +231,7 @@ struct rpc_clnt * rpc_clnt_unref (struct rpc_clnt *rpc); int rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn); +int rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn); void rpc_clnt_set_connected (rpc_clnt_connection_t *conn); diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c index 79652a969bd..77637c7beec 100644 --- a/xlators/features/changelog/src/changelog-ev-handle.c +++ b/xlators/features/changelog/src/changelog-ev-handle.c @@ -157,6 +157,13 @@ changelog_rpc_notify (struct rpc_clnt *rpc, break; case RPC_CLNT_DISCONNECT: rpc_clnt_disable (crpc->rpc); + + /* rpc_clnt_disable doesn't unref the rpc. It just marks + * the rpc as disabled and cancels reconnection timer. + * Hence unref the rpc object to free it. + */ + rpc_clnt_unref (crpc->rpc); + selection = &priv->ev_selection; LOCK (&crpc->lock); @@ -170,6 +177,8 @@ changelog_rpc_notify (struct rpc_clnt *rpc, break; case RPC_CLNT_MSG: case RPC_CLNT_DESTROY: + /* Free up mydata */ + changelog_rpc_clnt_unref (crpc); break; } @@ -253,7 +262,9 @@ get_client (changelog_clnt_t *c_clnt, struct list_head **next) if (*next == &c_clnt->active) goto unblock; crpc = list_entry (*next, changelog_rpc_clnt_t, list); + /* ref rpc as DISCONNECT might unref the rpc asynchronously */ changelog_rpc_clnt_ref (crpc); + rpc_clnt_ref (crpc->rpc); *next = (*next)->next; } unblock: @@ -267,6 +278,7 @@ put_client (changelog_clnt_t *c_clnt, changelog_rpc_clnt_t *crpc) { LOCK (&c_clnt->active_lock); { + rpc_clnt_unref (crpc->rpc); changelog_rpc_clnt_unref (crpc); } UNLOCK (&c_clnt->active_lock); diff --git a/xlators/features/changelog/src/changelog-rpc.c b/xlators/features/changelog/src/changelog-rpc.c index 76addf18545..4bc24203118 100644 --- a/xlators/features/changelog/src/changelog-rpc.c +++ b/xlators/features/changelog/src/changelog-rpc.c @@ -199,7 +199,10 @@ changelog_rpc_clnt_init (xlator_t *this, goto error_return; INIT_LIST_HEAD (&crpc->list); - crpc->ref = 0; + /* Take a ref, the last unref will be on RPC_CLNT_DESTROY + * which comes as a result of last rpc_clnt_unref. + */ + crpc->ref = 1; changelog_set_disconnect_flag (crpc, _gf_false); crpc->filter = rpc_req->filter; diff --git a/xlators/mgmt/glusterd/src/glusterd-rebalance.c b/xlators/mgmt/glusterd/src/glusterd-rebalance.c index e0eee02ed52..35fa4627d04 100644 --- a/xlators/mgmt/glusterd/src/glusterd-rebalance.c +++ b/xlators/mgmt/glusterd/src/glusterd-rebalance.c @@ -144,6 +144,7 @@ __glusterd_defrag_notify (struct rpc_clnt *rpc, void *mydata, glusterd_store_perform_node_state_store (volinfo); + rpc_clnt_reconnect_cleanup (&defrag->rpc->conn); glusterd_defrag_rpc_put (defrag); if (defrag->cbk_fn) defrag->cbk_fn (volinfo, diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c index 0cca218488b..a03b041a4e8 100644 --- a/xlators/mgmt/glusterd/src/glusterd-utils.c +++ b/xlators/mgmt/glusterd/src/glusterd-utils.c @@ -10612,6 +10612,7 @@ glusterd_rpc_clnt_unref (glusterd_conf_t *conf, rpc_clnt_t *rpc) GF_ASSERT (conf); GF_ASSERT (rpc); synclock_unlock (&conf->big_lock); + (void) rpc_clnt_reconnect_cleanup (&rpc->conn); ret = rpc_clnt_unref (rpc); synclock_lock (&conf->big_lock); |