diff options
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 104 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.h | 1 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-ev-handle.c | 12 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-rpc.c | 5 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-rebalance.c | 1 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-utils.c | 1 | 
6 files changed, 117 insertions, 7 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 27e394093cf..a9e43eb42f1 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -121,6 +121,7 @@ call_bail (void *data)          struct timespec        timeout = {0,};          struct iovec           iov = {0,};          char                   peerid[UNIX_PATH_MAX] = {0}; +        gf_boolean_t           need_unref = _gf_false;          GF_VALIDATE_OR_GOTO ("client", data, out); @@ -153,6 +154,8 @@ call_bail (void *data)                          timeout.tv_sec = 10;                          timeout.tv_nsec = 0; +                        /* Ref rpc as it's added to timer event queue */ +                        rpc_clnt_ref (clnt);                          gf_timer_call_cancel (clnt->ctx, conn->timer);                          conn->timer = gf_timer_call_after (clnt->ctx,                                                             timeout, @@ -163,6 +166,7 @@ call_bail (void *data)                                  gf_log (conn->name, GF_LOG_WARNING,                                          "Cannot create bailout timer for %s",                                          peerid); +                                need_unref = _gf_true;                          }                  } @@ -205,6 +209,9 @@ call_bail (void *data)                  mem_put (trav);          }  out: +        rpc_clnt_unref (clnt); +        if (need_unref) +                rpc_clnt_unref (clnt);          return;  } @@ -230,6 +237,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,          if (conn->timer == NULL) {                  timeout.tv_sec  = 10;                  timeout.tv_nsec = 0; +                rpc_clnt_ref (rpc_clnt);                  conn->timer = gf_timer_call_after (rpc_clnt->ctx,                                                     timeout,                                                     call_bail, @@ -389,6 +397,7 @@ rpc_clnt_reconnect (void *conn_ptr)          struct timespec          ts    = {0, 0};          int32_t                  ret   = 0;          struct rpc_clnt         *clnt  = NULL; +        gf_boolean_t             need_unref = _gf_false;          conn  = conn_ptr;          clnt = conn->rpc_clnt; @@ -413,10 +422,16 @@ rpc_clnt_reconnect (void *conn_ptr)                                  "attempting reconnect");                          ret = rpc_transport_connect (trans,                                                       conn->config.remote_port); +                        rpc_clnt_ref (clnt);                          conn->reconnect =                                  gf_timer_call_after (clnt->ctx, ts,                                                       rpc_clnt_reconnect,                                                       conn); +                        if (!conn->reconnect) { +                                need_unref = _gf_true; +                                gf_log (conn->name, GF_LOG_ERROR, +                                        "Error adding to timer event queue"); +                        }                  } else {                          gf_log (conn->name, GF_LOG_TRACE,                                  "breaking reconnect chain"); @@ -424,6 +439,9 @@ rpc_clnt_reconnect (void *conn_ptr)          }          pthread_mutex_unlock (&conn->lock); +        rpc_clnt_unref (clnt); +        if (need_unref) +                rpc_clnt_unref (clnt);          return;  } @@ -463,6 +481,8 @@ int  rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn)  {          struct rpc_clnt         *clnt  = NULL; +        int                      ret   = 0; +        gf_boolean_t             reconnect_unref = _gf_false;          if (!conn) {                  goto out; @@ -474,13 +494,18 @@ rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn)          {                  if (conn->reconnect) { -                        gf_timer_call_cancel (clnt->ctx, conn->reconnect); +                        ret = gf_timer_call_cancel (clnt->ctx, conn->reconnect); +                        if (!ret) +                                reconnect_unref = _gf_true;                          conn->reconnect = NULL;                  }          }          pthread_mutex_unlock (&conn->lock); +        if (reconnect_unref) +                rpc_clnt_unref (clnt); +  out:          return 0;  } @@ -496,6 +521,8 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)          struct saved_frames    *saved_frames = NULL;          struct rpc_clnt         *clnt  = NULL;          int                     unref = 0; +        int                     ret   = 0; +        gf_boolean_t            timer_unref = _gf_false;          if (!conn) {                  goto out; @@ -511,7 +538,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)                  /* bailout logic cleanup */                  if (conn->timer) { -                        gf_timer_call_cancel (clnt->ctx, conn->timer); +                        ret = gf_timer_call_cancel (clnt->ctx, conn->timer); +                        if (!ret) +                                timer_unref = _gf_true;                          conn->timer = NULL;                  } @@ -528,6 +557,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)          if (unref)                  rpc_clnt_unref (clnt); +        if (timer_unref) +                rpc_clnt_unref (clnt); +  out:          return 0;  } @@ -851,6 +883,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,          rpc_transport_pollin_t *pollin      = NULL;          struct timespec         ts          = {0, };          void                   *clnt_mydata = NULL; +        gf_boolean_t            unref_clnt  = _gf_false;          DECLARE_OLD_THIS;          conn = mydata; @@ -875,10 +908,16 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,                                  ts.tv_sec = 10;                                  ts.tv_nsec = 0; +                                rpc_clnt_ref (clnt);                                  conn->reconnect =                                          gf_timer_call_after (clnt->ctx, ts,                                                               rpc_clnt_reconnect,                                                               conn); +                                if (conn->reconnect == NULL) { +                                        gf_log (conn->name, GF_LOG_WARNING, +                                                "Cannot create rpc_clnt_reconnect timer"); +                                        unref_clnt = _gf_true; +                                }                          }                  }                  pthread_mutex_unlock (&conn->lock); @@ -886,6 +925,9 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,                  if (clnt->notifyfn)                          ret = clnt->notifyfn (clnt, clnt->mydata,                                                RPC_CLNT_DISCONNECT, NULL); +                if (unref_clnt) +                        rpc_clnt_ref (clnt); +                  break;          } @@ -1135,6 +1177,10 @@ rpc_clnt_start (struct rpc_clnt *rpc)                  rpc->disabled = 0;          }          pthread_mutex_unlock (&conn->lock); +        /* Corresponding unref will be either on successful timer cancel or last +         * rpc_clnt_reconnect fire event. +         */ +        rpc_clnt_ref (rpc);          rpc_clnt_reconnect (conn);          return 0; @@ -1512,6 +1558,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,          int                    proglen     = 0;          char                   new_iobref  = 0;          uint64_t               callid      = 0; +        gf_boolean_t           need_unref  = _gf_false;          if (!rpc || !prog || !frame) {                  goto out; @@ -1596,6 +1643,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                  if ((ret >= 0) && frame) {                          /* Save the frame in queue */                          __save_frame (rpc, frame, rpcreq); + +                        /* A ref on rpc-clnt object is taken while registering +                         * call_bail to timer in __save_frame. If it fails to +                         * register, it needs an unref and should happen outside +                         * conn->lock which otherwise leads to deadlocks */ +                        if (conn->timer == NULL) +                                need_unref = _gf_true; +                          conn->msgcnt++;                          gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request " @@ -1607,6 +1662,9 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,          }          pthread_mutex_unlock (&conn->lock); +        if (need_unref) +                rpc_clnt_unref (rpc); +          if (ret == -1) {                  goto out;          } @@ -1739,6 +1797,9 @@ rpc_clnt_disable (struct rpc_clnt *rpc)          rpc_clnt_connection_t *conn = NULL;          rpc_transport_t       *trans = NULL;          int                    unref = 0; +        int                    ret   = 0; +        gf_boolean_t           timer_unref = _gf_false; +        gf_boolean_t           reconnect_unref = _gf_false;          if (!rpc) {                  goto out; @@ -1751,12 +1812,20 @@ rpc_clnt_disable (struct rpc_clnt *rpc)                  rpc->disabled = 1;                  if (conn->timer) { -                        gf_timer_call_cancel (rpc->ctx, conn->timer); +                        ret = gf_timer_call_cancel (rpc->ctx, conn->timer); +                        /* If the event is not fired and it actually cancelled +                         * the timer, do the unref else registered call back +                         * function will take care of it. +                         */ +                        if (!ret) +                                timer_unref = _gf_true;                          conn->timer = NULL;                  }                  if (conn->reconnect) { -                        gf_timer_call_cancel (rpc->ctx, conn->reconnect); +                        ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect); +                        if (!ret) +                                reconnect_unref = _gf_true;                          conn->reconnect = NULL;                  }                  conn->connected = 0; @@ -1774,6 +1843,12 @@ rpc_clnt_disable (struct rpc_clnt *rpc)          if (unref)                  rpc_clnt_unref (rpc); +        if (timer_unref) +                rpc_clnt_unref (rpc); + +        if (reconnect_unref) +                rpc_clnt_unref (rpc); +  out:          return;  } @@ -1784,6 +1859,9 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)          rpc_clnt_connection_t *conn  = NULL;          rpc_transport_t       *trans = NULL;          int                    unref = 0; +        int                    ret   = 0; +        gf_boolean_t           timer_unref = _gf_false; +        gf_boolean_t           reconnect_unref = _gf_false;          if (!rpc)                  goto out; @@ -1794,12 +1872,20 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)          {                  rpc->disabled = 1;                  if (conn->timer) { -                        gf_timer_call_cancel (rpc->ctx, conn->timer); +                        ret = gf_timer_call_cancel (rpc->ctx, conn->timer); +                        /* If the event is not fired and it actually cancelled +                         * the timer, do the unref else registered call back +                         * function will take care of unref. +                         */ +                        if (!ret) +                                timer_unref = _gf_true;                          conn->timer = NULL;                  }                  if (conn->reconnect) { -                        gf_timer_call_cancel (rpc->ctx, conn->reconnect); +                        ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect); +                        if (!ret) +                                reconnect_unref = _gf_true;                          conn->reconnect = NULL;                  }                  conn->connected = 0; @@ -1815,6 +1901,12 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)          if (unref)                  rpc_clnt_unref (rpc); +        if (timer_unref) +                rpc_clnt_unref (rpc); + +        if (reconnect_unref) +                rpc_clnt_unref (rpc); +  out:          return;  } diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h index 01caeb814c0..f84b4cbf806 100644 --- a/rpc/rpc-lib/src/rpc-clnt.h +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -231,6 +231,7 @@ struct rpc_clnt *  rpc_clnt_unref (struct rpc_clnt *rpc);  int rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn); +int rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn);  void rpc_clnt_set_connected (rpc_clnt_connection_t *conn); diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c index 79652a969bd..77637c7beec 100644 --- a/xlators/features/changelog/src/changelog-ev-handle.c +++ b/xlators/features/changelog/src/changelog-ev-handle.c @@ -157,6 +157,13 @@ changelog_rpc_notify (struct rpc_clnt *rpc,                  break;          case RPC_CLNT_DISCONNECT:                  rpc_clnt_disable (crpc->rpc); + +                /* rpc_clnt_disable doesn't unref the rpc. It just marks +                 * the rpc as disabled and cancels reconnection timer. +                 * Hence unref the rpc object to free it. +                 */ +                rpc_clnt_unref (crpc->rpc); +                  selection = &priv->ev_selection;                  LOCK (&crpc->lock); @@ -170,6 +177,8 @@ changelog_rpc_notify (struct rpc_clnt *rpc,                  break;          case RPC_CLNT_MSG:          case RPC_CLNT_DESTROY: +                /* Free up mydata */ +                changelog_rpc_clnt_unref (crpc);                  break;          } @@ -253,7 +262,9 @@ get_client (changelog_clnt_t *c_clnt, struct list_head **next)                  if (*next == &c_clnt->active)                          goto unblock;                  crpc = list_entry (*next, changelog_rpc_clnt_t, list); +                /* ref rpc as DISCONNECT might unref the rpc asynchronously */                  changelog_rpc_clnt_ref (crpc); +                rpc_clnt_ref (crpc->rpc);                  *next = (*next)->next;          }   unblock: @@ -267,6 +278,7 @@ put_client (changelog_clnt_t *c_clnt, changelog_rpc_clnt_t *crpc)  {          LOCK (&c_clnt->active_lock);          { +                rpc_clnt_unref (crpc->rpc);                  changelog_rpc_clnt_unref (crpc);          }          UNLOCK (&c_clnt->active_lock); diff --git a/xlators/features/changelog/src/changelog-rpc.c b/xlators/features/changelog/src/changelog-rpc.c index 76addf18545..4bc24203118 100644 --- a/xlators/features/changelog/src/changelog-rpc.c +++ b/xlators/features/changelog/src/changelog-rpc.c @@ -199,7 +199,10 @@ changelog_rpc_clnt_init (xlator_t *this,                  goto error_return;          INIT_LIST_HEAD (&crpc->list); -        crpc->ref = 0; +        /* Take a ref, the last unref will be on RPC_CLNT_DESTROY +         * which comes as a result of last rpc_clnt_unref. +         */ +        crpc->ref = 1;          changelog_set_disconnect_flag (crpc, _gf_false);          crpc->filter = rpc_req->filter; diff --git a/xlators/mgmt/glusterd/src/glusterd-rebalance.c b/xlators/mgmt/glusterd/src/glusterd-rebalance.c index e0eee02ed52..35fa4627d04 100644 --- a/xlators/mgmt/glusterd/src/glusterd-rebalance.c +++ b/xlators/mgmt/glusterd/src/glusterd-rebalance.c @@ -144,6 +144,7 @@ __glusterd_defrag_notify (struct rpc_clnt *rpc, void *mydata,                  glusterd_store_perform_node_state_store (volinfo); +                rpc_clnt_reconnect_cleanup (&defrag->rpc->conn);                  glusterd_defrag_rpc_put (defrag);                  if (defrag->cbk_fn)                          defrag->cbk_fn (volinfo, diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c index 65ca0e83b09..e11585df927 100644 --- a/xlators/mgmt/glusterd/src/glusterd-utils.c +++ b/xlators/mgmt/glusterd/src/glusterd-utils.c @@ -10612,6 +10612,7 @@ glusterd_rpc_clnt_unref (glusterd_conf_t *conf, rpc_clnt_t *rpc)          GF_ASSERT (conf);          GF_ASSERT (rpc);          synclock_unlock (&conf->big_lock); +        (void) rpc_clnt_reconnect_cleanup (&rpc->conn);          ret = rpc_clnt_unref (rpc);          synclock_lock (&conf->big_lock);  | 
