diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 104 |
1 files changed, 98 insertions, 6 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 27e394093cf..a9e43eb42f1 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -121,6 +121,7 @@ call_bail (void *data) struct timespec timeout = {0,}; struct iovec iov = {0,}; char peerid[UNIX_PATH_MAX] = {0}; + gf_boolean_t need_unref = _gf_false; GF_VALIDATE_OR_GOTO ("client", data, out); @@ -153,6 +154,8 @@ call_bail (void *data) timeout.tv_sec = 10; timeout.tv_nsec = 0; + /* Ref rpc as it's added to timer event queue */ + rpc_clnt_ref (clnt); gf_timer_call_cancel (clnt->ctx, conn->timer); conn->timer = gf_timer_call_after (clnt->ctx, timeout, @@ -163,6 +166,7 @@ call_bail (void *data) gf_log (conn->name, GF_LOG_WARNING, "Cannot create bailout timer for %s", peerid); + need_unref = _gf_true; } } @@ -205,6 +209,9 @@ call_bail (void *data) mem_put (trav); } out: + rpc_clnt_unref (clnt); + if (need_unref) + rpc_clnt_unref (clnt); return; } @@ -230,6 +237,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, if (conn->timer == NULL) { timeout.tv_sec = 10; timeout.tv_nsec = 0; + rpc_clnt_ref (rpc_clnt); conn->timer = gf_timer_call_after (rpc_clnt->ctx, timeout, call_bail, @@ -389,6 +397,7 @@ rpc_clnt_reconnect (void *conn_ptr) struct timespec ts = {0, 0}; int32_t ret = 0; struct rpc_clnt *clnt = NULL; + gf_boolean_t need_unref = _gf_false; conn = conn_ptr; clnt = conn->rpc_clnt; @@ -413,10 +422,16 @@ rpc_clnt_reconnect (void *conn_ptr) "attempting reconnect"); ret = rpc_transport_connect (trans, conn->config.remote_port); + rpc_clnt_ref (clnt); conn->reconnect = gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, conn); + if (!conn->reconnect) { + need_unref = _gf_true; + gf_log (conn->name, GF_LOG_ERROR, + "Error adding to timer event queue"); + } } else { gf_log (conn->name, GF_LOG_TRACE, "breaking reconnect chain"); @@ -424,6 +439,9 @@ rpc_clnt_reconnect (void *conn_ptr) } pthread_mutex_unlock (&conn->lock); + rpc_clnt_unref (clnt); + if (need_unref) + rpc_clnt_unref (clnt); return; } @@ -463,6 +481,8 @@ int rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn) { struct rpc_clnt *clnt = NULL; + int ret = 0; + gf_boolean_t reconnect_unref = _gf_false; if (!conn) { goto out; @@ -474,13 +494,18 @@ rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn) { if (conn->reconnect) { - gf_timer_call_cancel (clnt->ctx, conn->reconnect); + ret = gf_timer_call_cancel (clnt->ctx, conn->reconnect); + if (!ret) + reconnect_unref = _gf_true; conn->reconnect = NULL; } } pthread_mutex_unlock (&conn->lock); + if (reconnect_unref) + rpc_clnt_unref (clnt); + out: return 0; } @@ -496,6 +521,8 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) struct saved_frames *saved_frames = NULL; struct rpc_clnt *clnt = NULL; int unref = 0; + int ret = 0; + gf_boolean_t timer_unref = _gf_false; if (!conn) { goto out; @@ -511,7 +538,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) /* bailout logic cleanup */ if (conn->timer) { - gf_timer_call_cancel (clnt->ctx, conn->timer); + ret = gf_timer_call_cancel (clnt->ctx, conn->timer); + if (!ret) + timer_unref = _gf_true; conn->timer = NULL; } @@ -528,6 +557,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) if (unref) rpc_clnt_unref (clnt); + if (timer_unref) + rpc_clnt_unref (clnt); + out: return 0; } @@ -851,6 +883,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, rpc_transport_pollin_t *pollin = NULL; struct timespec ts = {0, }; void *clnt_mydata = NULL; + gf_boolean_t unref_clnt = _gf_false; DECLARE_OLD_THIS; conn = mydata; @@ -875,10 +908,16 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, ts.tv_sec = 10; ts.tv_nsec = 0; + rpc_clnt_ref (clnt); conn->reconnect = gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, conn); + if (conn->reconnect == NULL) { + gf_log (conn->name, GF_LOG_WARNING, + "Cannot create rpc_clnt_reconnect timer"); + unref_clnt = _gf_true; + } } } pthread_mutex_unlock (&conn->lock); @@ -886,6 +925,9 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, if (clnt->notifyfn) ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL); + if (unref_clnt) + rpc_clnt_ref (clnt); + break; } @@ -1135,6 +1177,10 @@ rpc_clnt_start (struct rpc_clnt *rpc) rpc->disabled = 0; } pthread_mutex_unlock (&conn->lock); + /* Corresponding unref will be either on successful timer cancel or last + * rpc_clnt_reconnect fire event. + */ + rpc_clnt_ref (rpc); rpc_clnt_reconnect (conn); return 0; @@ -1512,6 +1558,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int proglen = 0; char new_iobref = 0; uint64_t callid = 0; + gf_boolean_t need_unref = _gf_false; if (!rpc || !prog || !frame) { goto out; @@ -1596,6 +1643,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, if ((ret >= 0) && frame) { /* Save the frame in queue */ __save_frame (rpc, frame, rpcreq); + + /* A ref on rpc-clnt object is taken while registering + * call_bail to timer in __save_frame. If it fails to + * register, it needs an unref and should happen outside + * conn->lock which otherwise leads to deadlocks */ + if (conn->timer == NULL) + need_unref = _gf_true; + conn->msgcnt++; gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request " @@ -1607,6 +1662,9 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, } pthread_mutex_unlock (&conn->lock); + if (need_unref) + rpc_clnt_unref (rpc); + if (ret == -1) { goto out; } @@ -1739,6 +1797,9 @@ rpc_clnt_disable (struct rpc_clnt *rpc) rpc_clnt_connection_t *conn = NULL; rpc_transport_t *trans = NULL; int unref = 0; + int ret = 0; + gf_boolean_t timer_unref = _gf_false; + gf_boolean_t reconnect_unref = _gf_false; if (!rpc) { goto out; @@ -1751,12 +1812,20 @@ rpc_clnt_disable (struct rpc_clnt *rpc) rpc->disabled = 1; if (conn->timer) { - gf_timer_call_cancel (rpc->ctx, conn->timer); + ret = gf_timer_call_cancel (rpc->ctx, conn->timer); + /* If the event is not fired and it actually cancelled + * the timer, do the unref else registered call back + * function will take care of it. + */ + if (!ret) + timer_unref = _gf_true; conn->timer = NULL; } if (conn->reconnect) { - gf_timer_call_cancel (rpc->ctx, conn->reconnect); + ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect); + if (!ret) + reconnect_unref = _gf_true; conn->reconnect = NULL; } conn->connected = 0; @@ -1774,6 +1843,12 @@ rpc_clnt_disable (struct rpc_clnt *rpc) if (unref) rpc_clnt_unref (rpc); + if (timer_unref) + rpc_clnt_unref (rpc); + + if (reconnect_unref) + rpc_clnt_unref (rpc); + out: return; } @@ -1784,6 +1859,9 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc) rpc_clnt_connection_t *conn = NULL; rpc_transport_t *trans = NULL; int unref = 0; + int ret = 0; + gf_boolean_t timer_unref = _gf_false; + gf_boolean_t reconnect_unref = _gf_false; if (!rpc) goto out; @@ -1794,12 +1872,20 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc) { rpc->disabled = 1; if (conn->timer) { - gf_timer_call_cancel (rpc->ctx, conn->timer); + ret = gf_timer_call_cancel (rpc->ctx, conn->timer); + /* If the event is not fired and it actually cancelled + * the timer, do the unref else registered call back + * function will take care of unref. + */ + if (!ret) + timer_unref = _gf_true; conn->timer = NULL; } if (conn->reconnect) { - gf_timer_call_cancel (rpc->ctx, conn->reconnect); + ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect); + if (!ret) + reconnect_unref = _gf_true; conn->reconnect = NULL; } conn->connected = 0; @@ -1815,6 +1901,12 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc) if (unref) rpc_clnt_unref (rpc); + if (timer_unref) + rpc_clnt_unref (rpc); + + if (reconnect_unref) + rpc_clnt_unref (rpc); + out: return; } |