summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src')
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c98
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.h4
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c33
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h3
4 files changed, 103 insertions, 35 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 014b2a25d11..406efdb2d4f 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -520,8 +520,10 @@ rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn)
if (conn->reconnect) {
ret = gf_timer_call_cancel (clnt->ctx, conn->reconnect);
- if (!ret)
+ if (!ret) {
reconnect_unref = _gf_true;
+ conn->cleanup_gen++;
+ }
conn->reconnect = NULL;
}
@@ -575,6 +577,7 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)
/*reset rpc msgs stats*/
conn->pingcnt = 0;
conn->msgcnt = 0;
+ conn->cleanup_gen++;
}
pthread_mutex_unlock (&conn->lock);
@@ -900,10 +903,29 @@ rpc_clnt_destroy (struct rpc_clnt *rpc);
static int
rpc_clnt_handle_disconnect (struct rpc_clnt *clnt, rpc_clnt_connection_t *conn)
{
- struct timespec ts = {0, };
- gf_boolean_t unref_clnt = _gf_false;
+ struct timespec ts = {0, };
+ gf_boolean_t unref_clnt = _gf_false;
+ uint64_t pre_notify_gen = 0, post_notify_gen = 0;
- rpc_clnt_connection_cleanup (conn);
+ pthread_mutex_lock (&conn->lock);
+ {
+ pre_notify_gen = conn->cleanup_gen;
+ }
+ pthread_mutex_unlock (&conn->lock);
+
+ if (clnt->notifyfn)
+ clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL);
+
+ pthread_mutex_lock (&conn->lock);
+ {
+ post_notify_gen = conn->cleanup_gen;
+ }
+ pthread_mutex_unlock (&conn->lock);
+
+ if (pre_notify_gen == post_notify_gen) {
+ /* program didn't invoke cleanup, so rpc has to do it */
+ rpc_clnt_connection_cleanup (conn);
+ }
pthread_mutex_lock (&conn->lock);
{
@@ -923,8 +945,6 @@ rpc_clnt_handle_disconnect (struct rpc_clnt *clnt, rpc_clnt_connection_t *conn)
}
pthread_mutex_unlock (&conn->lock);
- if (clnt->notifyfn)
- clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL);
if (unref_clnt)
rpc_clnt_ref (clnt);
@@ -957,11 +977,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
switch (event) {
case RPC_TRANSPORT_DISCONNECT:
{
- pthread_mutex_lock (&clnt->notifylock);
- {
- rpc_clnt_handle_disconnect (clnt, conn);
- }
- pthread_mutex_unlock (&clnt->notifylock);
+ rpc_clnt_handle_disconnect (clnt, conn);
break;
}
@@ -1016,21 +1032,19 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
case RPC_TRANSPORT_CONNECT:
{
- pthread_mutex_lock (&clnt->notifylock);
- {
- /* Every time there is a disconnection, processes
- * should try to connect to 'glusterd' (ie, default
- * port) or whichever port given as 'option remote-port'
- * in volume file. */
- /* Below code makes sure the (re-)configured port lasts
- * for just one successful attempt */
- conn->config.remote_port = 0;
-
- if (clnt->notifyfn)
- ret = clnt->notifyfn (clnt, clnt->mydata,
- RPC_CLNT_CONNECT, NULL);
- }
- pthread_mutex_unlock (&clnt->notifylock);
+
+ /* Every time there is a disconnection, processes
+ * should try to connect to 'glusterd' (ie, default
+ * port) or whichever port given as 'option remote-port'
+ * in volume file. */
+ /* Below code makes sure the (re-)configured port lasts
+ * for just one successful attempt */
+ conn->config.remote_port = 0;
+
+ if (clnt->notifyfn)
+ ret = clnt->notifyfn (clnt, clnt->mydata,
+ RPC_CLNT_CONNECT, NULL);
+
break;
}
@@ -1154,7 +1168,6 @@ rpc_clnt_new (dict_t *options, xlator_t *owner, char *name,
}
pthread_mutex_init (&rpc->lock, NULL);
- pthread_mutex_init (&rpc->notifylock, NULL);
rpc->ctx = ctx;
rpc->owner = owner;
@@ -1164,7 +1177,6 @@ rpc_clnt_new (dict_t *options, xlator_t *owner, char *name,
rpc->reqpool = mem_pool_new (struct rpc_req, reqpool_size);
if (rpc->reqpool == NULL) {
pthread_mutex_destroy (&rpc->lock);
- pthread_mutex_destroy (&rpc->notifylock);
GF_FREE (rpc);
rpc = NULL;
goto out;
@@ -1174,7 +1186,6 @@ rpc_clnt_new (dict_t *options, xlator_t *owner, char *name,
reqpool_size);
if (rpc->saved_frames_pool == NULL) {
pthread_mutex_destroy (&rpc->lock);
- pthread_mutex_destroy (&rpc->notifylock);
mem_pool_destroy (rpc->reqpool);
GF_FREE (rpc);
rpc = NULL;
@@ -1184,7 +1195,6 @@ rpc_clnt_new (dict_t *options, xlator_t *owner, char *name,
ret = rpc_clnt_connection_init (rpc, ctx, options, name);
if (ret == -1) {
pthread_mutex_destroy (&rpc->lock);
- pthread_mutex_destroy (&rpc->notifylock);
mem_pool_destroy (rpc->reqpool);
mem_pool_destroy (rpc->saved_frames_pool);
GF_FREE (rpc);
@@ -1230,6 +1240,33 @@ rpc_clnt_start (struct rpc_clnt *rpc)
int
+rpc_clnt_cleanup_and_start (struct rpc_clnt *rpc)
+{
+ struct rpc_clnt_connection *conn = NULL;
+
+ if (!rpc)
+ return -1;
+
+ conn = &rpc->conn;
+
+ rpc_clnt_connection_cleanup (conn);
+
+ pthread_mutex_lock (&conn->lock);
+ {
+ rpc->disabled = 0;
+ }
+ pthread_mutex_unlock (&conn->lock);
+ /* Corresponding unref will be either on successful timer cancel or last
+ * rpc_clnt_reconnect fire event.
+ */
+ rpc_clnt_ref (rpc);
+ rpc_clnt_reconnect (conn);
+
+ return 0;
+}
+
+
+int
rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn,
void *mydata)
{
@@ -1780,7 +1817,6 @@ rpc_clnt_destroy (struct rpc_clnt *rpc)
saved_frames_destroy (rpc->conn.saved_frames);
pthread_mutex_destroy (&rpc->lock);
pthread_mutex_destroy (&rpc->conn.lock);
- pthread_mutex_destroy (&rpc->notifylock);
/* mem-pool should be destroyed, otherwise,
it will cause huge memory leaks */
diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h
index 2ccaa56e4cb..5ad4fd42298 100644
--- a/rpc/rpc-lib/src/rpc-clnt.h
+++ b/rpc/rpc-lib/src/rpc-clnt.h
@@ -151,6 +151,7 @@ struct rpc_clnt_connection {
int32_t ping_timeout;
uint64_t pingcnt;
uint64_t msgcnt;
+ uint64_t cleanup_gen;
};
typedef struct rpc_clnt_connection rpc_clnt_connection_t;
@@ -173,7 +174,6 @@ struct rpc_req {
typedef struct rpc_clnt {
pthread_mutex_t lock;
- pthread_mutex_t notifylock;
rpc_clnt_notify_t notifyfn;
rpc_clnt_connection_t conn;
void *mydata;
@@ -200,6 +200,8 @@ struct rpc_clnt *rpc_clnt_new (dict_t *options, xlator_t *owner,
int rpc_clnt_start (struct rpc_clnt *rpc);
+int rpc_clnt_cleanup_and_start (struct rpc_clnt *rpc);
+
int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn,
void *mydata);
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 617e3cc76ed..bc661043674 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -1026,6 +1026,7 @@ int rpcsvc_request_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
struct iovec iov = {0, };
struct iobuf *iobuf = NULL;
ssize_t xdr_size = 0;
+ struct iobref *iobref = NULL;
if (!req)
goto out;
@@ -1048,20 +1049,33 @@ int rpcsvc_request_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
iov.iov_len = ret;
count = 1;
+ iobref = iobref_new ();
+ if (!iobref) {
+ ret = -1;
+ gf_log ("rpcsvc", GF_LOG_WARNING, "Failed to create iobref");
+ goto out;
+ }
+
+ iobref_add (iobref, iobuf);
+
ret = rpcsvc_callback_submit (rpc, trans, prog, procnum,
- &iov, count);
+ &iov, count, iobref);
out:
if (iobuf)
iobuf_unref (iobuf);
+ if (iobref)
+ iobref_unref (iobref);
+
return ret;
}
int
rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
rpcsvc_cbk_program_t *prog, int procnum,
- struct iovec *proghdr, int proghdrcount)
+ struct iovec *proghdr, int proghdrcount,
+ struct iobref *iobref)
{
struct iobuf *request_iob = NULL;
struct iovec rpchdr = {0,};
@@ -1069,6 +1083,7 @@ rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
int ret = -1;
int proglen = 0;
uint32_t xid = 0;
+ gf_boolean_t new_iobref = _gf_false;
if (!rpc) {
goto out;
@@ -1090,11 +1105,22 @@ rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
"cannot build rpc-record");
goto out;
}
+ if (!iobref) {
+ iobref = iobref_new ();
+ if (!iobref) {
+ gf_log ("rpcsvc", GF_LOG_WARNING, "Failed to create iobref");
+ goto out;
+ }
+ new_iobref = 1;
+ }
+
+ iobref_add (iobref, request_iob);
req.msg.rpchdr = &rpchdr;
req.msg.rpchdrcount = 1;
req.msg.proghdr = proghdr;
req.msg.proghdrcount = proghdrcount;
+ req.msg.iobref = iobref;
ret = rpc_transport_submit_request (trans, &req);
if (ret == -1) {
@@ -1108,6 +1134,9 @@ rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
out:
iobuf_unref (request_iob);
+ if (new_iobref)
+ iobref_unref (iobref);
+
return ret;
}
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index 9a2257a6087..17e72482531 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -591,7 +591,8 @@ int rpcsvc_request_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
rpcsvc_cbk_program_t *prog, int procnum,
- struct iovec *proghdr, int proghdrcount);
+ struct iovec *proghdr, int proghdrcount,
+ struct iobref *iobref);
rpcsvc_actor_t *
rpcsvc_program_actor (rpcsvc_request_t *req);