diff options
Diffstat (limited to 'rpc/rpc-lib/src')
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 98 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.h | 4 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 33 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 3 |
4 files changed, 103 insertions, 35 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 014b2a25d11..406efdb2d4f 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -520,8 +520,10 @@ rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn) if (conn->reconnect) { ret = gf_timer_call_cancel (clnt->ctx, conn->reconnect); - if (!ret) + if (!ret) { reconnect_unref = _gf_true; + conn->cleanup_gen++; + } conn->reconnect = NULL; } @@ -575,6 +577,7 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) /*reset rpc msgs stats*/ conn->pingcnt = 0; conn->msgcnt = 0; + conn->cleanup_gen++; } pthread_mutex_unlock (&conn->lock); @@ -900,10 +903,29 @@ rpc_clnt_destroy (struct rpc_clnt *rpc); static int rpc_clnt_handle_disconnect (struct rpc_clnt *clnt, rpc_clnt_connection_t *conn) { - struct timespec ts = {0, }; - gf_boolean_t unref_clnt = _gf_false; + struct timespec ts = {0, }; + gf_boolean_t unref_clnt = _gf_false; + uint64_t pre_notify_gen = 0, post_notify_gen = 0; - rpc_clnt_connection_cleanup (conn); + pthread_mutex_lock (&conn->lock); + { + pre_notify_gen = conn->cleanup_gen; + } + pthread_mutex_unlock (&conn->lock); + + if (clnt->notifyfn) + clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL); + + pthread_mutex_lock (&conn->lock); + { + post_notify_gen = conn->cleanup_gen; + } + pthread_mutex_unlock (&conn->lock); + + if (pre_notify_gen == post_notify_gen) { + /* program didn't invoke cleanup, so rpc has to do it */ + rpc_clnt_connection_cleanup (conn); + } pthread_mutex_lock (&conn->lock); { @@ -923,8 +945,6 @@ rpc_clnt_handle_disconnect (struct rpc_clnt *clnt, rpc_clnt_connection_t *conn) } pthread_mutex_unlock (&conn->lock); - if (clnt->notifyfn) - clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL); if (unref_clnt) rpc_clnt_ref (clnt); @@ -957,11 +977,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, switch (event) { case RPC_TRANSPORT_DISCONNECT: { - pthread_mutex_lock (&clnt->notifylock); - { - rpc_clnt_handle_disconnect (clnt, conn); - } - pthread_mutex_unlock (&clnt->notifylock); + rpc_clnt_handle_disconnect (clnt, conn); break; } @@ -1016,21 +1032,19 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, case RPC_TRANSPORT_CONNECT: { - pthread_mutex_lock (&clnt->notifylock); - { - /* Every time there is a disconnection, processes - * should try to connect to 'glusterd' (ie, default - * port) or whichever port given as 'option remote-port' - * in volume file. */ - /* Below code makes sure the (re-)configured port lasts - * for just one successful attempt */ - conn->config.remote_port = 0; - - if (clnt->notifyfn) - ret = clnt->notifyfn (clnt, clnt->mydata, - RPC_CLNT_CONNECT, NULL); - } - pthread_mutex_unlock (&clnt->notifylock); + + /* Every time there is a disconnection, processes + * should try to connect to 'glusterd' (ie, default + * port) or whichever port given as 'option remote-port' + * in volume file. */ + /* Below code makes sure the (re-)configured port lasts + * for just one successful attempt */ + conn->config.remote_port = 0; + + if (clnt->notifyfn) + ret = clnt->notifyfn (clnt, clnt->mydata, + RPC_CLNT_CONNECT, NULL); + break; } @@ -1154,7 +1168,6 @@ rpc_clnt_new (dict_t *options, xlator_t *owner, char *name, } pthread_mutex_init (&rpc->lock, NULL); - pthread_mutex_init (&rpc->notifylock, NULL); rpc->ctx = ctx; rpc->owner = owner; @@ -1164,7 +1177,6 @@ rpc_clnt_new (dict_t *options, xlator_t *owner, char *name, rpc->reqpool = mem_pool_new (struct rpc_req, reqpool_size); if (rpc->reqpool == NULL) { pthread_mutex_destroy (&rpc->lock); - pthread_mutex_destroy (&rpc->notifylock); GF_FREE (rpc); rpc = NULL; goto out; @@ -1174,7 +1186,6 @@ rpc_clnt_new (dict_t *options, xlator_t *owner, char *name, reqpool_size); if (rpc->saved_frames_pool == NULL) { pthread_mutex_destroy (&rpc->lock); - pthread_mutex_destroy (&rpc->notifylock); mem_pool_destroy (rpc->reqpool); GF_FREE (rpc); rpc = NULL; @@ -1184,7 +1195,6 @@ rpc_clnt_new (dict_t *options, xlator_t *owner, char *name, ret = rpc_clnt_connection_init (rpc, ctx, options, name); if (ret == -1) { pthread_mutex_destroy (&rpc->lock); - pthread_mutex_destroy (&rpc->notifylock); mem_pool_destroy (rpc->reqpool); mem_pool_destroy (rpc->saved_frames_pool); GF_FREE (rpc); @@ -1230,6 +1240,33 @@ rpc_clnt_start (struct rpc_clnt *rpc) int +rpc_clnt_cleanup_and_start (struct rpc_clnt *rpc) +{ + struct rpc_clnt_connection *conn = NULL; + + if (!rpc) + return -1; + + conn = &rpc->conn; + + rpc_clnt_connection_cleanup (conn); + + pthread_mutex_lock (&conn->lock); + { + rpc->disabled = 0; + } + pthread_mutex_unlock (&conn->lock); + /* Corresponding unref will be either on successful timer cancel or last + * rpc_clnt_reconnect fire event. + */ + rpc_clnt_ref (rpc); + rpc_clnt_reconnect (conn); + + return 0; +} + + +int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn, void *mydata) { @@ -1780,7 +1817,6 @@ rpc_clnt_destroy (struct rpc_clnt *rpc) saved_frames_destroy (rpc->conn.saved_frames); pthread_mutex_destroy (&rpc->lock); pthread_mutex_destroy (&rpc->conn.lock); - pthread_mutex_destroy (&rpc->notifylock); /* mem-pool should be destroyed, otherwise, it will cause huge memory leaks */ diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h index 2ccaa56e4cb..5ad4fd42298 100644 --- a/rpc/rpc-lib/src/rpc-clnt.h +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -151,6 +151,7 @@ struct rpc_clnt_connection { int32_t ping_timeout; uint64_t pingcnt; uint64_t msgcnt; + uint64_t cleanup_gen; }; typedef struct rpc_clnt_connection rpc_clnt_connection_t; @@ -173,7 +174,6 @@ struct rpc_req { typedef struct rpc_clnt { pthread_mutex_t lock; - pthread_mutex_t notifylock; rpc_clnt_notify_t notifyfn; rpc_clnt_connection_t conn; void *mydata; @@ -200,6 +200,8 @@ struct rpc_clnt *rpc_clnt_new (dict_t *options, xlator_t *owner, int rpc_clnt_start (struct rpc_clnt *rpc); +int rpc_clnt_cleanup_and_start (struct rpc_clnt *rpc); + int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn, void *mydata); diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 617e3cc76ed..bc661043674 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -1026,6 +1026,7 @@ int rpcsvc_request_submit (rpcsvc_t *rpc, rpc_transport_t *trans, struct iovec iov = {0, }; struct iobuf *iobuf = NULL; ssize_t xdr_size = 0; + struct iobref *iobref = NULL; if (!req) goto out; @@ -1048,20 +1049,33 @@ int rpcsvc_request_submit (rpcsvc_t *rpc, rpc_transport_t *trans, iov.iov_len = ret; count = 1; + iobref = iobref_new (); + if (!iobref) { + ret = -1; + gf_log ("rpcsvc", GF_LOG_WARNING, "Failed to create iobref"); + goto out; + } + + iobref_add (iobref, iobuf); + ret = rpcsvc_callback_submit (rpc, trans, prog, procnum, - &iov, count); + &iov, count, iobref); out: if (iobuf) iobuf_unref (iobuf); + if (iobref) + iobref_unref (iobref); + return ret; } int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, rpcsvc_cbk_program_t *prog, int procnum, - struct iovec *proghdr, int proghdrcount) + struct iovec *proghdr, int proghdrcount, + struct iobref *iobref) { struct iobuf *request_iob = NULL; struct iovec rpchdr = {0,}; @@ -1069,6 +1083,7 @@ rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, int ret = -1; int proglen = 0; uint32_t xid = 0; + gf_boolean_t new_iobref = _gf_false; if (!rpc) { goto out; @@ -1090,11 +1105,22 @@ rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, "cannot build rpc-record"); goto out; } + if (!iobref) { + iobref = iobref_new (); + if (!iobref) { + gf_log ("rpcsvc", GF_LOG_WARNING, "Failed to create iobref"); + goto out; + } + new_iobref = 1; + } + + iobref_add (iobref, request_iob); req.msg.rpchdr = &rpchdr; req.msg.rpchdrcount = 1; req.msg.proghdr = proghdr; req.msg.proghdrcount = proghdrcount; + req.msg.iobref = iobref; ret = rpc_transport_submit_request (trans, &req); if (ret == -1) { @@ -1108,6 +1134,9 @@ rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, out: iobuf_unref (request_iob); + if (new_iobref) + iobref_unref (iobref); + return ret; } diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 9a2257a6087..17e72482531 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -591,7 +591,8 @@ int rpcsvc_request_submit (rpcsvc_t *rpc, rpc_transport_t *trans, int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, rpcsvc_cbk_program_t *prog, int procnum, - struct iovec *proghdr, int proghdrcount); + struct iovec *proghdr, int proghdrcount, + struct iobref *iobref); rpcsvc_actor_t * rpcsvc_program_actor (rpcsvc_request_t *req); |