diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 219 |
1 files changed, 132 insertions, 87 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 9b0bfe33d5b..fce3e8200fe 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -23,10 +23,16 @@ #include "config.h" #endif +#define RPC_CLNT_DEFAULT_REQUEST_COUNT 4096 + #include "rpc-clnt.h" #include "xdr-rpcclnt.h" #include "rpc-transport.h" #include "protocol-common.h" +#include "mem-pool.h" + +void +rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool); uint64_t rpc_clnt_new_callid (struct rpc_clnt *clnt) @@ -63,28 +69,24 @@ __saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout, struct saved_frame * -__saved_frames_put (struct saved_frames *frames, void *frame, int32_t procnum, - rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, int64_t callid) +__saved_frames_put (struct saved_frames *frames, void *frame, + struct rpc_req *rpcreq) { struct saved_frame *saved_frame = NULL; - saved_frame = GF_CALLOC (1, sizeof (*saved_frame), - gf_common_mt_rpcclnt_savedframe_t); + saved_frame = mem_get (rpcreq->conn->rpc_clnt->saved_frames_pool); if (!saved_frame) { gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); goto out; } /* THIS should be saved and set back */ + memset (saved_frame, 0, sizeof (*saved_frame)); INIT_LIST_HEAD (&saved_frame->list); saved_frame->capital_this = THIS; saved_frame->frame = frame; - saved_frame->procnum = procnum; - saved_frame->callid = callid; - saved_frame->prog = prog; - saved_frame->cbkfn = cbk; - + saved_frame->rpcreq = rpcreq; gettimeofday (&saved_frame->saved_at, NULL); list_add_tail (&saved_frame->list, &frames->sf.list); @@ -110,7 +112,12 @@ saved_frames_delete (struct saved_frame *saved_frame, } pthread_mutex_unlock (&conn->lock); - GF_FREE (saved_frame); + if (saved_frame->rpcreq != NULL) { + rpc_clnt_reply_deinit (saved_frame->rpcreq, + conn->rpc_clnt->reqpool); + } + + mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame); out: return; } @@ -129,7 +136,6 @@ call_bail (void *data) struct tm frame_sent_tm; char frame_sent[32] = {0,}; struct timeval timeout = {0,}; - struct rpc_req req; struct iovec iov = {0,}; GF_VALIDATE_OR_GOTO ("client", data, out); @@ -180,15 +186,19 @@ call_bail (void *data) gf_log (conn->trans->name, GF_LOG_ERROR, "bailing out frame type(%s) op(%s(%d)) sent = %s. " "timeout = %d", - trav->prog->progname, (trav->prog->procnames) ? - trav->prog->procnames[trav->procnum] : "--", - trav->procnum, frame_sent, + trav->rpcreq->prog->progname, + (trav->rpcreq->prog->procnames) ? + trav->rpcreq->prog->procnames[trav->rpcreq->procnum] : + "--", + trav->rpcreq->procnum, frame_sent, conn->frame_timeout); - trav->cbkfn (&req, &iov, 1, trav->frame); + trav->rpcreq->rpc_status = -1; + trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); + rpc_clnt_reply_deinit (trav->rpcreq, clnt->reqpool); list_del_init (&trav->list); - GF_FREE (trav); + mem_put (conn->rpc_clnt->saved_frames_pool, trav); } out: return; @@ -197,8 +207,8 @@ out: /* to be called with conn->lock held */ struct saved_frame * -__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum, - rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, uint64_t callid) +__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, + struct rpc_req *rpcreq) { rpc_clnt_connection_t *conn = NULL; struct timeval timeout = {0, }; @@ -206,8 +216,8 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum, conn = &rpc_clnt->conn; - saved_frame = __saved_frames_put (conn->saved_frames, frame, - procnum, prog, cbk, callid); + saved_frame = __saved_frames_put (conn->saved_frames, frame, rpcreq); + if (saved_frame == NULL) { goto out; } @@ -258,7 +268,7 @@ __saved_frame_copy (struct saved_frames *frames, int64_t callid, } list_for_each_entry (tmp, &frames->sf.list, list) { - if (tmp->callid == callid) { + if (tmp->rpcreq->xid == callid) { *saved_frame = *tmp; ret = 0; break; @@ -277,7 +287,7 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid) struct saved_frame *tmp = NULL; list_for_each_entry (tmp, &frames->sf.list, list) { - if (tmp->callid == callid) { + if (tmp->rpcreq->xid == callid) { list_del_init (&tmp->list); frames->count--; saved_frame = tmp; @@ -300,32 +310,34 @@ saved_frames_unwind (struct saved_frames *saved_frames) struct tm *frame_sent_tm = NULL; char timestr[256] = {0,}; - struct rpc_req req; struct iovec iov = {0,}; - memset (&req, 0, sizeof (req)); - - req.rpc_status = -1; - list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) { frame_sent_tm = localtime (&trav->saved_at.tv_sec); strftime (timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S", frame_sent_tm); - snprintf (timestr + strlen (timestr), sizeof(timestr) - strlen (timestr), + snprintf (timestr + strlen (timestr), + sizeof(timestr) - strlen (timestr), ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec); gf_log ("rpc-clnt", GF_LOG_ERROR, - "forced unwinding frame type(%s) op(%s(%d)) called at %s", - trav->prog->progname, (trav->prog->procnames) ? - trav->prog->procnames[trav->procnum] : "--", - trav->procnum, timestr); + "forced unwinding frame type(%s) op(%s(%d)) " + "called at %s", + trav->rpcreq->prog->progname, + (trav->rpcreq->prog->procnames) ? + trav->rpcreq->prog->procnames[trav->rpcreq->procnum] + : "--", + trav->rpcreq->procnum, timestr); saved_frames->count--; - trav->cbkfn (&req, &iov, 1, trav->frame); + trav->rpcreq->rpc_status = -1; + trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); + rpc_clnt_reply_deinit (trav->rpcreq, + trav->rpcreq->conn->rpc_clnt->reqpool); list_del_init (&trav->list); - GF_FREE (trav); + mem_put (trav->rpcreq->conn->rpc_clnt->saved_frames_pool, trav); } } @@ -407,9 +419,9 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info) goto out; } - info->prognum = saved_frame.prog->prognum; - info->procnum = saved_frame.procnum; - info->progver = saved_frame.prog->progver; + info->prognum = saved_frame.rpcreq->prog->prognum; + info->procnum = saved_frame.rpcreq->procnum; + info->progver = saved_frame.rpcreq->prog->progver; info->rsp = saved_frame.rsp; ret = 0; @@ -490,9 +502,10 @@ int rpc_clnt_reply_fill (rpc_transport_pollin_t *msg, rpc_clnt_connection_t *conn, struct rpc_msg *replymsg, struct iovec progmsg, - struct rpc_req *req, struct saved_frame *saved_frame) + struct rpc_req *req, + struct saved_frame *saved_frame) { - int ret = -1; + int ret = -1; if ((!conn) || (!replymsg)|| (!req) || (!saved_frame) || (!msg)) { goto out; @@ -504,25 +517,14 @@ rpc_clnt_reply_fill (rpc_transport_pollin_t *msg, req->rpc_status = -1; } - req->xid = rpc_reply_xid (replymsg); - req->prog = saved_frame->prog; - req->procnum = saved_frame->procnum; - req->conn = conn; - req->rsp[0] = progmsg; + req->rsp_iobref = iobref_ref (msg->iobref); if (msg->vectored) { - req->rsp[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2); - req->rsp[1].iov_len = msg->data.vector.size2; - + req->rsp[1] = msg->vector[1]; req->rspcnt = 2; - - req->rsp_prochdr = iobuf_ref (msg->data.vector.iobuf1); - req->rsp_procpayload = iobuf_ref (msg->data.vector.iobuf2); } else { req->rspcnt = 1; - - req->rsp_prochdr = iobuf_ref (msg->data.simple.iobuf); } /* By this time, the data bytes for the auth scheme would have already @@ -544,20 +546,17 @@ out: void -rpc_clnt_reply_deinit (struct rpc_req *req) +rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool) { if (!req) { goto out; } - if (req->rsp_prochdr) { - iobuf_unref (req->rsp_prochdr); - } - - if (req->rsp_procpayload) { - iobuf_unref (req->rsp_procpayload); + if (req->rsp_iobref) { + iobref_unref (req->rsp_iobref); } + mem_put (pool, req); out: return; } @@ -574,13 +573,8 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, size_t msglen = 0; int ret = -1; - if (msg->vectored) { - msgbuf = iobuf_ptr (msg->data.vector.iobuf1); - msglen = msg->data.vector.size1; - } else { - msgbuf = iobuf_ptr (msg->data.simple.iobuf); - msglen = msg->data.simple.size; - } + msgbuf = msg->vector[0].iov_base; + msglen = msg->vector[0].iov_len; ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg, req->verf.authdata); @@ -595,10 +589,11 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, goto out; } - gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %"PRIx64", Program: %s," - " ProgVers: %d, Proc: %d", saved_frame->callid, - saved_frame->prog->progname, saved_frame->prog->progver, - saved_frame->procnum); + gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %d Program: %s," + " ProgVers: %d, Proc: %d", saved_frame->rpcreq->xid, + saved_frame->rpcreq->prog->progname, + saved_frame->rpcreq->prog->progver, + saved_frame->rpcreq->procnum); /* TODO: */ /* TODO: AUTH */ /* The verifier that is sent in a reply is a string that can be used as @@ -639,8 +634,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) struct saved_frame *saved_frame = NULL; rpc_request_info_t *request_info = NULL; int ret = -1; - struct rpc_req req = {0, }; - int cbk_ret = -1; + struct rpc_req *req = NULL; conn = &clnt->conn; @@ -657,26 +651,36 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) goto out; } - ret = rpc_clnt_reply_init (conn, pollin, &req, saved_frame); + req = saved_frame->rpcreq; + if (req == NULL) { + gf_log ("rpc-clnt", GF_LOG_CRITICAL, + "saved_frame for reply with xid (%d), " + "prog-version (%d), prog-num (%d), procnum (%d)" + "does not contain rpc-req", request_info->xid, + request_info->progver, request_info->prognum, + request_info->procnum); + goto out; + } + + ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame); if (ret != 0) { - req.rpc_status = -1; + req->rpc_status = -1; gf_log ("rpc-clnt", GF_LOG_DEBUG, "initialising rpc reply " "failed"); } - cbk_ret = saved_frame->cbkfn (&req, req.rsp, req.rspcnt, - saved_frame->frame); - - if (ret == 0) { - rpc_clnt_reply_deinit (&req); + req->cbkfn (req, req->rsp, req->rspcnt, saved_frame->frame); + + if (req) { + rpc_clnt_reply_deinit (req, conn->rpc_clnt->reqpool); } out: if (saved_frame) { - GF_FREE (saved_frame); + mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame); } - return cbk_ret; + return ret; } @@ -894,6 +898,25 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, pthread_mutex_init (&rpc->lock, NULL); rpc->ctx = ctx; + rpc->reqpool = mem_pool_new (struct rpc_req, + RPC_CLNT_DEFAULT_REQUEST_COUNT); + if (rpc->reqpool == NULL) { + pthread_mutex_destroy (&rpc->lock); + GF_FREE (rpc); + rpc = NULL; + goto out; + } + + rpc->saved_frames_pool = mem_pool_new (struct saved_frame, + RPC_CLNT_DEFAULT_REQUEST_COUNT); + if (rpc->saved_frames_pool == NULL) { + pthread_mutex_destroy (&rpc->lock); + mem_pool_destroy (rpc->reqpool); + GF_FREE (rpc); + rpc = NULL; + goto out; + } + ret = rpc_clnt_connection_init (rpc, ctx, options, name); if (ret == -1) { pthread_mutex_destroy (&rpc->lock); @@ -901,6 +924,7 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, rpc = NULL; goto out; } + out: return rpc; } @@ -1167,12 +1191,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum, fop_cbk_fn_t cbkfn, struct iovec *proghdr, int proghdrcount, struct iovec *progpayload, int progpayloadcount, - struct iobref *iobref, void *frame) + struct iobref *iobref, void *frame, struct iovec *rsphdr, + int rsphdr_count, struct iovec *rsp_payload, + int rsp_payload_count, struct iobref *rsp_iobref) { rpc_clnt_connection_t *conn = NULL; struct iobuf *request_iob = NULL; struct iovec rpchdr = {0,}; - struct rpc_req rpcreq = {0,}; + struct rpc_req *rpcreq = NULL; rpc_transport_req_t req; int ret = -1; int proglen = 0; @@ -1183,6 +1209,13 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, goto out; } + rpcreq = mem_get (rpc->reqpool); + if (rpcreq == NULL) { + gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); + goto out; + } + + memset (rpcreq, 0, sizeof (*rpcreq)); memset (&req, 0, sizeof (req)); if (!iobref) { @@ -1199,6 +1232,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, conn = &rpc->conn; + rpcreq->prog = prog; + rpcreq->procnum = procnum; + rpcreq->conn = conn; + rpcreq->xid = callid; + rpcreq->cbkfn = cbkfn; + pthread_mutex_lock (&conn->lock); { if (conn->connected == 0) { @@ -1235,6 +1274,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, req.msg.progpayloadcount = progpayloadcount; req.msg.iobref = iobref; + req.rsp.rsphdr = rsphdr; + req.rsp.rsphdr_count = rsphdr_count; + req.rsp.rsp_payload = rsp_payload; + req.rsp.rsp_payload_count = rsp_payload_count; + req.rsp.rsp_iobref = rsp_iobref; + ret = rpc_transport_submit_request (rpc->conn.trans, &req); if (ret == -1) { @@ -1245,9 +1290,8 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, if ((ret >= 0) && frame) { gettimeofday (&conn->last_sent, NULL); /* Save the frame in queue */ - __save_frame (rpc, frame, procnum, prog, cbkfn, callid); + __save_frame (rpc, frame, rpcreq); } - } unlock: pthread_mutex_unlock (&conn->lock); @@ -1266,8 +1310,9 @@ out: } if (frame && (ret == -1)) { - rpcreq.rpc_status = -1; - cbkfn (&rpcreq, NULL, 0, frame); + rpcreq->rpc_status = -1; + cbkfn (rpcreq, NULL, 0, frame); + mem_put (rpc->reqpool, rpcreq); } return ret; } |