summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c104
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.h1
-rw-r--r--xlators/features/changelog/src/changelog-ev-handle.c12
-rw-r--r--xlators/features/changelog/src/changelog-rpc.c5
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-rebalance.c1
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-utils.c1
6 files changed, 117 insertions, 7 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 27e394093cf..a9e43eb42f1 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -121,6 +121,7 @@ call_bail (void *data)
struct timespec timeout = {0,};
struct iovec iov = {0,};
char peerid[UNIX_PATH_MAX] = {0};
+ gf_boolean_t need_unref = _gf_false;
GF_VALIDATE_OR_GOTO ("client", data, out);
@@ -153,6 +154,8 @@ call_bail (void *data)
timeout.tv_sec = 10;
timeout.tv_nsec = 0;
+ /* Ref rpc as it's added to timer event queue */
+ rpc_clnt_ref (clnt);
gf_timer_call_cancel (clnt->ctx, conn->timer);
conn->timer = gf_timer_call_after (clnt->ctx,
timeout,
@@ -163,6 +166,7 @@ call_bail (void *data)
gf_log (conn->name, GF_LOG_WARNING,
"Cannot create bailout timer for %s",
peerid);
+ need_unref = _gf_true;
}
}
@@ -205,6 +209,9 @@ call_bail (void *data)
mem_put (trav);
}
out:
+ rpc_clnt_unref (clnt);
+ if (need_unref)
+ rpc_clnt_unref (clnt);
return;
}
@@ -230,6 +237,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,
if (conn->timer == NULL) {
timeout.tv_sec = 10;
timeout.tv_nsec = 0;
+ rpc_clnt_ref (rpc_clnt);
conn->timer = gf_timer_call_after (rpc_clnt->ctx,
timeout,
call_bail,
@@ -389,6 +397,7 @@ rpc_clnt_reconnect (void *conn_ptr)
struct timespec ts = {0, 0};
int32_t ret = 0;
struct rpc_clnt *clnt = NULL;
+ gf_boolean_t need_unref = _gf_false;
conn = conn_ptr;
clnt = conn->rpc_clnt;
@@ -413,10 +422,16 @@ rpc_clnt_reconnect (void *conn_ptr)
"attempting reconnect");
ret = rpc_transport_connect (trans,
conn->config.remote_port);
+ rpc_clnt_ref (clnt);
conn->reconnect =
gf_timer_call_after (clnt->ctx, ts,
rpc_clnt_reconnect,
conn);
+ if (!conn->reconnect) {
+ need_unref = _gf_true;
+ gf_log (conn->name, GF_LOG_ERROR,
+ "Error adding to timer event queue");
+ }
} else {
gf_log (conn->name, GF_LOG_TRACE,
"breaking reconnect chain");
@@ -424,6 +439,9 @@ rpc_clnt_reconnect (void *conn_ptr)
}
pthread_mutex_unlock (&conn->lock);
+ rpc_clnt_unref (clnt);
+ if (need_unref)
+ rpc_clnt_unref (clnt);
return;
}
@@ -463,6 +481,8 @@ int
rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn)
{
struct rpc_clnt *clnt = NULL;
+ int ret = 0;
+ gf_boolean_t reconnect_unref = _gf_false;
if (!conn) {
goto out;
@@ -474,13 +494,18 @@ rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn)
{
if (conn->reconnect) {
- gf_timer_call_cancel (clnt->ctx, conn->reconnect);
+ ret = gf_timer_call_cancel (clnt->ctx, conn->reconnect);
+ if (!ret)
+ reconnect_unref = _gf_true;
conn->reconnect = NULL;
}
}
pthread_mutex_unlock (&conn->lock);
+ if (reconnect_unref)
+ rpc_clnt_unref (clnt);
+
out:
return 0;
}
@@ -496,6 +521,8 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)
struct saved_frames *saved_frames = NULL;
struct rpc_clnt *clnt = NULL;
int unref = 0;
+ int ret = 0;
+ gf_boolean_t timer_unref = _gf_false;
if (!conn) {
goto out;
@@ -511,7 +538,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)
/* bailout logic cleanup */
if (conn->timer) {
- gf_timer_call_cancel (clnt->ctx, conn->timer);
+ ret = gf_timer_call_cancel (clnt->ctx, conn->timer);
+ if (!ret)
+ timer_unref = _gf_true;
conn->timer = NULL;
}
@@ -528,6 +557,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)
if (unref)
rpc_clnt_unref (clnt);
+ if (timer_unref)
+ rpc_clnt_unref (clnt);
+
out:
return 0;
}
@@ -851,6 +883,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
rpc_transport_pollin_t *pollin = NULL;
struct timespec ts = {0, };
void *clnt_mydata = NULL;
+ gf_boolean_t unref_clnt = _gf_false;
DECLARE_OLD_THIS;
conn = mydata;
@@ -875,10 +908,16 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
ts.tv_sec = 10;
ts.tv_nsec = 0;
+ rpc_clnt_ref (clnt);
conn->reconnect =
gf_timer_call_after (clnt->ctx, ts,
rpc_clnt_reconnect,
conn);
+ if (conn->reconnect == NULL) {
+ gf_log (conn->name, GF_LOG_WARNING,
+ "Cannot create rpc_clnt_reconnect timer");
+ unref_clnt = _gf_true;
+ }
}
}
pthread_mutex_unlock (&conn->lock);
@@ -886,6 +925,9 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
if (clnt->notifyfn)
ret = clnt->notifyfn (clnt, clnt->mydata,
RPC_CLNT_DISCONNECT, NULL);
+ if (unref_clnt)
+ rpc_clnt_ref (clnt);
+
break;
}
@@ -1135,6 +1177,10 @@ rpc_clnt_start (struct rpc_clnt *rpc)
rpc->disabled = 0;
}
pthread_mutex_unlock (&conn->lock);
+ /* Corresponding unref will be either on successful timer cancel or last
+ * rpc_clnt_reconnect fire event.
+ */
+ rpc_clnt_ref (rpc);
rpc_clnt_reconnect (conn);
return 0;
@@ -1512,6 +1558,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
int proglen = 0;
char new_iobref = 0;
uint64_t callid = 0;
+ gf_boolean_t need_unref = _gf_false;
if (!rpc || !prog || !frame) {
goto out;
@@ -1596,6 +1643,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
if ((ret >= 0) && frame) {
/* Save the frame in queue */
__save_frame (rpc, frame, rpcreq);
+
+ /* A ref on rpc-clnt object is taken while registering
+ * call_bail to timer in __save_frame. If it fails to
+ * register, it needs an unref and should happen outside
+ * conn->lock which otherwise leads to deadlocks */
+ if (conn->timer == NULL)
+ need_unref = _gf_true;
+
conn->msgcnt++;
gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request "
@@ -1607,6 +1662,9 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
}
pthread_mutex_unlock (&conn->lock);
+ if (need_unref)
+ rpc_clnt_unref (rpc);
+
if (ret == -1) {
goto out;
}
@@ -1739,6 +1797,9 @@ rpc_clnt_disable (struct rpc_clnt *rpc)
rpc_clnt_connection_t *conn = NULL;
rpc_transport_t *trans = NULL;
int unref = 0;
+ int ret = 0;
+ gf_boolean_t timer_unref = _gf_false;
+ gf_boolean_t reconnect_unref = _gf_false;
if (!rpc) {
goto out;
@@ -1751,12 +1812,20 @@ rpc_clnt_disable (struct rpc_clnt *rpc)
rpc->disabled = 1;
if (conn->timer) {
- gf_timer_call_cancel (rpc->ctx, conn->timer);
+ ret = gf_timer_call_cancel (rpc->ctx, conn->timer);
+ /* If the event is not fired and it actually cancelled
+ * the timer, do the unref else registered call back
+ * function will take care of it.
+ */
+ if (!ret)
+ timer_unref = _gf_true;
conn->timer = NULL;
}
if (conn->reconnect) {
- gf_timer_call_cancel (rpc->ctx, conn->reconnect);
+ ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect);
+ if (!ret)
+ reconnect_unref = _gf_true;
conn->reconnect = NULL;
}
conn->connected = 0;
@@ -1774,6 +1843,12 @@ rpc_clnt_disable (struct rpc_clnt *rpc)
if (unref)
rpc_clnt_unref (rpc);
+ if (timer_unref)
+ rpc_clnt_unref (rpc);
+
+ if (reconnect_unref)
+ rpc_clnt_unref (rpc);
+
out:
return;
}
@@ -1784,6 +1859,9 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)
rpc_clnt_connection_t *conn = NULL;
rpc_transport_t *trans = NULL;
int unref = 0;
+ int ret = 0;
+ gf_boolean_t timer_unref = _gf_false;
+ gf_boolean_t reconnect_unref = _gf_false;
if (!rpc)
goto out;
@@ -1794,12 +1872,20 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)
{
rpc->disabled = 1;
if (conn->timer) {
- gf_timer_call_cancel (rpc->ctx, conn->timer);
+ ret = gf_timer_call_cancel (rpc->ctx, conn->timer);
+ /* If the event is not fired and it actually cancelled
+ * the timer, do the unref else registered call back
+ * function will take care of unref.
+ */
+ if (!ret)
+ timer_unref = _gf_true;
conn->timer = NULL;
}
if (conn->reconnect) {
- gf_timer_call_cancel (rpc->ctx, conn->reconnect);
+ ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect);
+ if (!ret)
+ reconnect_unref = _gf_true;
conn->reconnect = NULL;
}
conn->connected = 0;
@@ -1815,6 +1901,12 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)
if (unref)
rpc_clnt_unref (rpc);
+ if (timer_unref)
+ rpc_clnt_unref (rpc);
+
+ if (reconnect_unref)
+ rpc_clnt_unref (rpc);
+
out:
return;
}
diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h
index 01caeb814c0..f84b4cbf806 100644
--- a/rpc/rpc-lib/src/rpc-clnt.h
+++ b/rpc/rpc-lib/src/rpc-clnt.h
@@ -231,6 +231,7 @@ struct rpc_clnt *
rpc_clnt_unref (struct rpc_clnt *rpc);
int rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn);
+int rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn);
void rpc_clnt_set_connected (rpc_clnt_connection_t *conn);
diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c
index 79652a969bd..77637c7beec 100644
--- a/xlators/features/changelog/src/changelog-ev-handle.c
+++ b/xlators/features/changelog/src/changelog-ev-handle.c
@@ -157,6 +157,13 @@ changelog_rpc_notify (struct rpc_clnt *rpc,
break;
case RPC_CLNT_DISCONNECT:
rpc_clnt_disable (crpc->rpc);
+
+ /* rpc_clnt_disable doesn't unref the rpc. It just marks
+ * the rpc as disabled and cancels reconnection timer.
+ * Hence unref the rpc object to free it.
+ */
+ rpc_clnt_unref (crpc->rpc);
+
selection = &priv->ev_selection;
LOCK (&crpc->lock);
@@ -170,6 +177,8 @@ changelog_rpc_notify (struct rpc_clnt *rpc,
break;
case RPC_CLNT_MSG:
case RPC_CLNT_DESTROY:
+ /* Free up mydata */
+ changelog_rpc_clnt_unref (crpc);
break;
}
@@ -253,7 +262,9 @@ get_client (changelog_clnt_t *c_clnt, struct list_head **next)
if (*next == &c_clnt->active)
goto unblock;
crpc = list_entry (*next, changelog_rpc_clnt_t, list);
+ /* ref rpc as DISCONNECT might unref the rpc asynchronously */
changelog_rpc_clnt_ref (crpc);
+ rpc_clnt_ref (crpc->rpc);
*next = (*next)->next;
}
unblock:
@@ -267,6 +278,7 @@ put_client (changelog_clnt_t *c_clnt, changelog_rpc_clnt_t *crpc)
{
LOCK (&c_clnt->active_lock);
{
+ rpc_clnt_unref (crpc->rpc);
changelog_rpc_clnt_unref (crpc);
}
UNLOCK (&c_clnt->active_lock);
diff --git a/xlators/features/changelog/src/changelog-rpc.c b/xlators/features/changelog/src/changelog-rpc.c
index 76addf18545..4bc24203118 100644
--- a/xlators/features/changelog/src/changelog-rpc.c
+++ b/xlators/features/changelog/src/changelog-rpc.c
@@ -199,7 +199,10 @@ changelog_rpc_clnt_init (xlator_t *this,
goto error_return;
INIT_LIST_HEAD (&crpc->list);
- crpc->ref = 0;
+ /* Take a ref, the last unref will be on RPC_CLNT_DESTROY
+ * which comes as a result of last rpc_clnt_unref.
+ */
+ crpc->ref = 1;
changelog_set_disconnect_flag (crpc, _gf_false);
crpc->filter = rpc_req->filter;
diff --git a/xlators/mgmt/glusterd/src/glusterd-rebalance.c b/xlators/mgmt/glusterd/src/glusterd-rebalance.c
index e0eee02ed52..35fa4627d04 100644
--- a/xlators/mgmt/glusterd/src/glusterd-rebalance.c
+++ b/xlators/mgmt/glusterd/src/glusterd-rebalance.c
@@ -144,6 +144,7 @@ __glusterd_defrag_notify (struct rpc_clnt *rpc, void *mydata,
glusterd_store_perform_node_state_store (volinfo);
+ rpc_clnt_reconnect_cleanup (&defrag->rpc->conn);
glusterd_defrag_rpc_put (defrag);
if (defrag->cbk_fn)
defrag->cbk_fn (volinfo,
diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c
index 0cca218488b..a03b041a4e8 100644
--- a/xlators/mgmt/glusterd/src/glusterd-utils.c
+++ b/xlators/mgmt/glusterd/src/glusterd-utils.c
@@ -10612,6 +10612,7 @@ glusterd_rpc_clnt_unref (glusterd_conf_t *conf, rpc_clnt_t *rpc)
GF_ASSERT (conf);
GF_ASSERT (rpc);
synclock_unlock (&conf->big_lock);
+ (void) rpc_clnt_reconnect_cleanup (&rpc->conn);
ret = rpc_clnt_unref (rpc);
synclock_lock (&conf->big_lock);