summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog-ev-handle.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/src/changelog-ev-handle.c')
-rw-r--r--xlators/features/changelog/src/changelog-ev-handle.c23
1 files changed, 18 insertions, 5 deletions
diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c
index 3ed6ff821d9..aa94459de5a 100644
--- a/xlators/features/changelog/src/changelog-ev-handle.c
+++ b/xlators/features/changelog/src/changelog-ev-handle.c
@@ -134,6 +134,8 @@ changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event,
changelog_clnt_t *c_clnt = NULL;
changelog_priv_t *priv = NULL;
changelog_ev_selector_t *selection = NULL;
+ uint64_t clntcnt = 0;
+ uint64_t xprtcnt = 0;
crpc = mydata;
this = crpc->this;
@@ -144,6 +146,7 @@ changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event,
switch (event) {
case RPC_CLNT_CONNECT:
selection = &priv->ev_selection;
+ GF_ATOMIC_INC(priv->clntcnt);
LOCK(&c_clnt->wait_lock);
{
@@ -176,12 +179,23 @@ changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event,
changelog_set_disconnect_flag(crpc, _gf_true);
}
UNLOCK(&crpc->lock);
+ LOCK(&c_clnt->active_lock);
+ {
+ list_del_init(&crpc->list);
+ }
+ UNLOCK(&c_clnt->active_lock);
break;
case RPC_CLNT_MSG:
case RPC_CLNT_DESTROY:
/* Free up mydata */
changelog_rpc_clnt_unref(crpc);
+ clntcnt = GF_ATOMIC_DEC(priv->clntcnt);
+ xprtcnt = GF_ATOMIC_GET(priv->xprtcnt);
+ if (this->cleanup_starting) {
+ if (!clntcnt && !xprtcnt)
+ changelog_process_cleanup_event(this);
+ }
break;
case RPC_CLNT_PING:
break;
@@ -211,8 +225,8 @@ changelog_ev_connector(void *data)
changelog_rpc_notify);
if (!crpc->rpc) {
gf_smsg(this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_RPC_CONNECT_ERROR,
- "failed to connect back", "path=%s", crpc->sock, NULL);
+ CHANGELOG_MSG_RPC_CONNECT_ERROR, "path=%s", crpc->sock,
+ NULL);
crpc->cleanup(crpc);
goto mutex_unlock;
}
@@ -364,9 +378,8 @@ changelog_ev_dispatch(void *data)
ret = rbuf_wait_for_completion(c_clnt->rbuf, opaque, _dispatcher,
c_clnt);
if (ret)
- gf_msg(this->name, GF_LOG_WARNING, 0,
- CHANGELOG_MSG_PUT_BUFFER_FAILED,
- "failed to put buffer after consumption");
+ gf_smsg(this->name, GF_LOG_WARNING, 0,
+ CHANGELOG_MSG_PUT_BUFFER_FAILED, NULL);
}
return NULL;