diff options
author | Raghavendra G <raghavendra@gluster.com> | 2010-07-28 06:23:31 +0000 |
---|---|---|
committer | Anand V. Avati <avati@dev.gluster.com> | 2010-07-28 05:08:26 -0700 |
commit | 40d3ad15856c88d93d16264aa1f6bb55806aafde (patch) | |
tree | 1290d311c9001e3954176f005b89a2e438321bd9 /rpc/rpc-lib/src/rpc-clnt.c | |
parent | b8692a3c3cc8e0dab404664e0aeb6ebaea6ab6e5 (diff) |
changes to rpc
- use mem-pool for requests and saved_frames.
- preserve the rpc_req structure till rpc invokes program's reply.
This will enable us to store transport specific data that has to
last till reply has come (eg., memory regions of chunk lists in
case of rdma).
- change signature of rpc_clnt_submit to accept rsphdr_vector and
rsppayload_vector. The buffers pointed by these vectors will be
from iobufs and these iobufs are added to an iobref which should
also be passed as an arguement to rpc_clnt_submit.
Signed-off-by: Raghavendra G <raghavendra@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 219 |
1 files changed, 132 insertions, 87 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 9b0bfe33d5b..fce3e8200fe 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -23,10 +23,16 @@ #include "config.h" #endif +#define RPC_CLNT_DEFAULT_REQUEST_COUNT 4096 + #include "rpc-clnt.h" #include "xdr-rpcclnt.h" #include "rpc-transport.h" #include "protocol-common.h" +#include "mem-pool.h" + +void +rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool); uint64_t rpc_clnt_new_callid (struct rpc_clnt *clnt) @@ -63,28 +69,24 @@ __saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout, struct saved_frame * -__saved_frames_put (struct saved_frames *frames, void *frame, int32_t procnum, - rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, int64_t callid) +__saved_frames_put (struct saved_frames *frames, void *frame, + struct rpc_req *rpcreq) { struct saved_frame *saved_frame = NULL; - saved_frame = GF_CALLOC (1, sizeof (*saved_frame), - gf_common_mt_rpcclnt_savedframe_t); + saved_frame = mem_get (rpcreq->conn->rpc_clnt->saved_frames_pool); if (!saved_frame) { gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); goto out; } /* THIS should be saved and set back */ + memset (saved_frame, 0, sizeof (*saved_frame)); INIT_LIST_HEAD (&saved_frame->list); saved_frame->capital_this = THIS; saved_frame->frame = frame; - saved_frame->procnum = procnum; - saved_frame->callid = callid; - saved_frame->prog = prog; - saved_frame->cbkfn = cbk; - + saved_frame->rpcreq = rpcreq; gettimeofday (&saved_frame->saved_at, NULL); list_add_tail (&saved_frame->list, &frames->sf.list); @@ -110,7 +112,12 @@ saved_frames_delete (struct saved_frame *saved_frame, } pthread_mutex_unlock (&conn->lock); - GF_FREE (saved_frame); + if (saved_frame->rpcreq != NULL) { + rpc_clnt_reply_deinit (saved_frame->rpcreq, + conn->rpc_clnt->reqpool); + } + + mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame); out: return; } @@ -129,7 +136,6 @@ call_bail (void *data) struct tm frame_sent_tm; char frame_sent[32] = {0,}; struct timeval timeout = {0,}; - struct rpc_req req; struct iovec iov = {0,}; GF_VALIDATE_OR_GOTO ("client", data, out); @@ -180,15 +186,19 @@ call_bail (void *data) gf_log (conn->trans->name, GF_LOG_ERROR, "bailing out frame type(%s) op(%s(%d)) sent = %s. " "timeout = %d", - trav->prog->progname, (trav->prog->procnames) ? - trav->prog->procnames[trav->procnum] : "--", - trav->procnum, frame_sent, + trav->rpcreq->prog->progname, + (trav->rpcreq->prog->procnames) ? + trav->rpcreq->prog->procnames[trav->rpcreq->procnum] : + "--", + trav->rpcreq->procnum, frame_sent, conn->frame_timeout); - trav->cbkfn (&req, &iov, 1, trav->frame); + trav->rpcreq->rpc_status = -1; + trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); + rpc_clnt_reply_deinit (trav->rpcreq, clnt->reqpool); list_del_init (&trav->list); - GF_FREE (trav); + mem_put (conn->rpc_clnt->saved_frames_pool, trav); } out: return; @@ -197,8 +207,8 @@ out: /* to be called with conn->lock held */ struct saved_frame * -__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum, - rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, uint64_t callid) +__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, + struct rpc_req *rpcreq) { rpc_clnt_connection_t *conn = NULL; struct timeval timeout = {0, }; @@ -206,8 +216,8 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum, conn = &rpc_clnt->conn; - saved_frame = __saved_frames_put (conn->saved_frames, frame, - procnum, prog, cbk, callid); + saved_frame = __saved_frames_put (conn->saved_frames, frame, rpcreq); + if (saved_frame == NULL) { goto out; } @@ -258,7 +268,7 @@ __saved_frame_copy (struct saved_frames *frames, int64_t callid, } list_for_each_entry (tmp, &frames->sf.list, list) { - if (tmp->callid == callid) { + if (tmp->rpcreq->xid == callid) { *saved_frame = *tmp; ret = 0; break; @@ -277,7 +287,7 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid) struct saved_frame *tmp = NULL; list_for_each_entry (tmp, &frames->sf.list, list) { - if (tmp->callid == callid) { + if (tmp->rpcreq->xid == callid) { list_del_init (&tmp->list); frames->count--; saved_frame = tmp; @@ -300,32 +310,34 @@ saved_frames_unwind (struct saved_frames *saved_frames) struct tm *frame_sent_tm = NULL; char timestr[256] = {0,}; - struct rpc_req req; struct iovec iov = {0,}; - memset (&req, 0, sizeof (req)); - - req.rpc_status = -1; - list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) { frame_sent_tm = localtime (&trav->saved_at.tv_sec); strftime (timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S", frame_sent_tm); - snprintf (timestr + strlen (timestr), sizeof(timestr) - strlen (timestr), + snprintf (timestr + strlen (timestr), + sizeof(timestr) - strlen (timestr), ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec); gf_log ("rpc-clnt", GF_LOG_ERROR, - "forced unwinding frame type(%s) op(%s(%d)) called at %s", - trav->prog->progname, (trav->prog->procnames) ? - trav->prog->procnames[trav->procnum] : "--", - trav->procnum, timestr); + "forced unwinding frame type(%s) op(%s(%d)) " + "called at %s", + trav->rpcreq->prog->progname, + (trav->rpcreq->prog->procnames) ? + trav->rpcreq->prog->procnames[trav->rpcreq->procnum] + : "--", + trav->rpcreq->procnum, timestr); saved_frames->count--; - trav->cbkfn (&req, &iov, 1, trav->frame); + trav->rpcreq->rpc_status = -1; + trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); + rpc_clnt_reply_deinit (trav->rpcreq, + trav->rpcreq->conn->rpc_clnt->reqpool); list_del_init (&trav->list); - GF_FREE (trav); + mem_put (trav->rpcreq->conn->rpc_clnt->saved_frames_pool, trav); } } @@ -407,9 +419,9 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info) goto out; } - info->prognum = saved_frame.prog->prognum; - info->procnum = saved_frame.procnum; - info->progver = saved_frame.prog->progver; + info->prognum = saved_frame.rpcreq->prog->prognum; + info->procnum = saved_frame.rpcreq->procnum; + info->progver = saved_frame.rpcreq->prog->progver; info->rsp = saved_frame.rsp; ret = 0; @@ -490,9 +502,10 @@ int rpc_clnt_reply_fill (rpc_transport_pollin_t *msg, rpc_clnt_connection_t *conn, struct rpc_msg *replymsg, struct iovec progmsg, - struct rpc_req *req, struct saved_frame *saved_frame) + struct rpc_req *req, + struct saved_frame *saved_frame) { - int ret = -1; + int ret = -1; if ((!conn) || (!replymsg)|| (!req) || (!saved_frame) || (!msg)) { goto out; @@ -504,25 +517,14 @@ rpc_clnt_reply_fill (rpc_transport_pollin_t *msg, req->rpc_status = -1; } - req->xid = rpc_reply_xid (replymsg); - req->prog = saved_frame->prog; - req->procnum = saved_frame->procnum; - req->conn = conn; - req->rsp[0] = progmsg; + req->rsp_iobref = iobref_ref (msg->iobref); if (msg->vectored) { - req->rsp[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2); - req->rsp[1].iov_len = msg->data.vector.size2; - + req->rsp[1] = msg->vector[1]; req->rspcnt = 2; - - req->rsp_prochdr = iobuf_ref (msg->data.vector.iobuf1); - req->rsp_procpayload = iobuf_ref (msg->data.vector.iobuf2); } else { req->rspcnt = 1; - - req->rsp_prochdr = iobuf_ref (msg->data.simple.iobuf); } /* By this time, the data bytes for the auth scheme would have already @@ -544,20 +546,17 @@ out: void -rpc_clnt_reply_deinit (struct rpc_req *req) +rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool) { if (!req) { goto out; } - if (req->rsp_prochdr) { - iobuf_unref (req->rsp_prochdr); - } - - if (req->rsp_procpayload) { - iobuf_unref (req->rsp_procpayload); + if (req->rsp_iobref) { + iobref_unref (req->rsp_iobref); } + mem_put (pool, req); out: return; } @@ -574,13 +573,8 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, size_t msglen = 0; int ret = -1; - if (msg->vectored) { - msgbuf = iobuf_ptr (msg->data.vector.iobuf1); - msglen = msg->data.vector.size1; - } else { - msgbuf = iobuf_ptr (msg->data.simple.iobuf); - msglen = msg->data.simple.size; - } + msgbuf = msg->vector[0].iov_base; + msglen = msg->vector[0].iov_len; ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg, req->verf.authdata); @@ -595,10 +589,11 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, goto out; } - gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %"PRIx64", Program: %s," - " ProgVers: %d, Proc: %d", saved_frame->callid, - saved_frame->prog->progname, saved_frame->prog->progver, - saved_frame->procnum); + gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %d Program: %s," + " ProgVers: %d, Proc: %d", saved_frame->rpcreq->xid, + saved_frame->rpcreq->prog->progname, + saved_frame->rpcreq->prog->progver, + saved_frame->rpcreq->procnum); /* TODO: */ /* TODO: AUTH */ /* The verifier that is sent in a reply is a string that can be used as @@ -639,8 +634,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) struct saved_frame *saved_frame = NULL; rpc_request_info_t *request_info = NULL; int ret = -1; - struct rpc_req req = {0, }; - int cbk_ret = -1; + struct rpc_req *req = NULL; conn = &clnt->conn; @@ -657,26 +651,36 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) goto out; } - ret = rpc_clnt_reply_init (conn, pollin, &req, saved_frame); + req = saved_frame->rpcreq; + if (req == NULL) { + gf_log ("rpc-clnt", GF_LOG_CRITICAL, + "saved_frame for reply with xid (%d), " + "prog-version (%d), prog-num (%d), procnum (%d)" + "does not contain rpc-req", request_info->xid, + request_info->progver, request_info->prognum, + request_info->procnum); + goto out; + } + + ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame); if (ret != 0) { - req.rpc_status = -1; + req->rpc_status = -1; gf_log ("rpc-clnt", GF_LOG_DEBUG, "initialising rpc reply " "failed"); } - cbk_ret = saved_frame->cbkfn (&req, req.rsp, req.rspcnt, - saved_frame->frame); - - if (ret == 0) { - rpc_clnt_reply_deinit (&req); + req->cbkfn (req, req->rsp, req->rspcnt, saved_frame->frame); + + if (req) { + rpc_clnt_reply_deinit (req, conn->rpc_clnt->reqpool); } out: if (saved_frame) { - GF_FREE (saved_frame); + mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame); } - return cbk_ret; + return ret; } @@ -894,6 +898,25 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, pthread_mutex_init (&rpc->lock, NULL); rpc->ctx = ctx; + rpc->reqpool = mem_pool_new (struct rpc_req, + RPC_CLNT_DEFAULT_REQUEST_COUNT); + if (rpc->reqpool == NULL) { + pthread_mutex_destroy (&rpc->lock); + GF_FREE (rpc); + rpc = NULL; + goto out; + } + + rpc->saved_frames_pool = mem_pool_new (struct saved_frame, + RPC_CLNT_DEFAULT_REQUEST_COUNT); + if (rpc->saved_frames_pool == NULL) { + pthread_mutex_destroy (&rpc->lock); + mem_pool_destroy (rpc->reqpool); + GF_FREE (rpc); + rpc = NULL; + goto out; + } + ret = rpc_clnt_connection_init (rpc, ctx, options, name); if (ret == -1) { pthread_mutex_destroy (&rpc->lock); @@ -901,6 +924,7 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, rpc = NULL; goto out; } + out: return rpc; } @@ -1167,12 +1191,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum, fop_cbk_fn_t cbkfn, struct iovec *proghdr, int proghdrcount, struct iovec *progpayload, int progpayloadcount, - struct iobref *iobref, void *frame) + struct iobref *iobref, void *frame, struct iovec *rsphdr, + int rsphdr_count, struct iovec *rsp_payload, + int rsp_payload_count, struct iobref *rsp_iobref) { rpc_clnt_connection_t *conn = NULL; struct iobuf *request_iob = NULL; struct iovec rpchdr = {0,}; - struct rpc_req rpcreq = {0,}; + struct rpc_req *rpcreq = NULL; rpc_transport_req_t req; int ret = -1; int proglen = 0; @@ -1183,6 +1209,13 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, goto out; } + rpcreq = mem_get (rpc->reqpool); + if (rpcreq == NULL) { + gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); + goto out; + } + + memset (rpcreq, 0, sizeof (*rpcreq)); memset (&req, 0, sizeof (req)); if (!iobref) { @@ -1199,6 +1232,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, conn = &rpc->conn; + rpcreq->prog = prog; + rpcreq->procnum = procnum; + rpcreq->conn = conn; + rpcreq->xid = callid; + rpcreq->cbkfn = cbkfn; + pthread_mutex_lock (&conn->lock); { if (conn->connected == 0) { @@ -1235,6 +1274,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, req.msg.progpayloadcount = progpayloadcount; req.msg.iobref = iobref; + req.rsp.rsphdr = rsphdr; + req.rsp.rsphdr_count = rsphdr_count; + req.rsp.rsp_payload = rsp_payload; + req.rsp.rsp_payload_count = rsp_payload_count; + req.rsp.rsp_iobref = rsp_iobref; + ret = rpc_transport_submit_request (rpc->conn.trans, &req); if (ret == -1) { @@ -1245,9 +1290,8 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, if ((ret >= 0) && frame) { gettimeofday (&conn->last_sent, NULL); /* Save the frame in queue */ - __save_frame (rpc, frame, procnum, prog, cbkfn, callid); + __save_frame (rpc, frame, rpcreq); } - } unlock: pthread_mutex_unlock (&conn->lock); @@ -1266,8 +1310,9 @@ out: } if (frame && (ret == -1)) { - rpcreq.rpc_status = -1; - cbkfn (&rpcreq, NULL, 0, frame); + rpcreq->rpc_status = -1; + cbkfn (rpcreq, NULL, 0, frame); + mem_put (rpc->reqpool, rpcreq); } return ret; } |