diff options
Diffstat (limited to 'rpc/rpc-lib/src')
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 219 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.h | 36 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 96 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 44 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 33 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 20 | ||||
-rw-r--r-- | rpc/rpc-lib/src/xdr-common.h | 8 |
7 files changed, 256 insertions, 200 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 9b0bfe33d5b..fce3e8200fe 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -23,10 +23,16 @@ #include "config.h" #endif +#define RPC_CLNT_DEFAULT_REQUEST_COUNT 4096 + #include "rpc-clnt.h" #include "xdr-rpcclnt.h" #include "rpc-transport.h" #include "protocol-common.h" +#include "mem-pool.h" + +void +rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool); uint64_t rpc_clnt_new_callid (struct rpc_clnt *clnt) @@ -63,28 +69,24 @@ __saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout, struct saved_frame * -__saved_frames_put (struct saved_frames *frames, void *frame, int32_t procnum, - rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, int64_t callid) +__saved_frames_put (struct saved_frames *frames, void *frame, + struct rpc_req *rpcreq) { struct saved_frame *saved_frame = NULL; - saved_frame = GF_CALLOC (1, sizeof (*saved_frame), - gf_common_mt_rpcclnt_savedframe_t); + saved_frame = mem_get (rpcreq->conn->rpc_clnt->saved_frames_pool); if (!saved_frame) { gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); goto out; } /* THIS should be saved and set back */ + memset (saved_frame, 0, sizeof (*saved_frame)); INIT_LIST_HEAD (&saved_frame->list); saved_frame->capital_this = THIS; saved_frame->frame = frame; - saved_frame->procnum = procnum; - saved_frame->callid = callid; - saved_frame->prog = prog; - saved_frame->cbkfn = cbk; - + saved_frame->rpcreq = rpcreq; gettimeofday (&saved_frame->saved_at, NULL); list_add_tail (&saved_frame->list, &frames->sf.list); @@ -110,7 +112,12 @@ saved_frames_delete (struct saved_frame *saved_frame, } pthread_mutex_unlock (&conn->lock); - GF_FREE (saved_frame); + if (saved_frame->rpcreq != NULL) { + rpc_clnt_reply_deinit (saved_frame->rpcreq, + conn->rpc_clnt->reqpool); + } + + mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame); out: return; } @@ -129,7 +136,6 @@ call_bail (void *data) struct tm frame_sent_tm; char frame_sent[32] = {0,}; struct timeval timeout = {0,}; - struct rpc_req req; struct iovec iov = {0,}; GF_VALIDATE_OR_GOTO ("client", data, out); @@ -180,15 +186,19 @@ call_bail (void *data) gf_log (conn->trans->name, GF_LOG_ERROR, "bailing out frame type(%s) op(%s(%d)) sent = %s. " "timeout = %d", - trav->prog->progname, (trav->prog->procnames) ? - trav->prog->procnames[trav->procnum] : "--", - trav->procnum, frame_sent, + trav->rpcreq->prog->progname, + (trav->rpcreq->prog->procnames) ? + trav->rpcreq->prog->procnames[trav->rpcreq->procnum] : + "--", + trav->rpcreq->procnum, frame_sent, conn->frame_timeout); - trav->cbkfn (&req, &iov, 1, trav->frame); + trav->rpcreq->rpc_status = -1; + trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); + rpc_clnt_reply_deinit (trav->rpcreq, clnt->reqpool); list_del_init (&trav->list); - GF_FREE (trav); + mem_put (conn->rpc_clnt->saved_frames_pool, trav); } out: return; @@ -197,8 +207,8 @@ out: /* to be called with conn->lock held */ struct saved_frame * -__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum, - rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, uint64_t callid) +__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, + struct rpc_req *rpcreq) { rpc_clnt_connection_t *conn = NULL; struct timeval timeout = {0, }; @@ -206,8 +216,8 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum, conn = &rpc_clnt->conn; - saved_frame = __saved_frames_put (conn->saved_frames, frame, - procnum, prog, cbk, callid); + saved_frame = __saved_frames_put (conn->saved_frames, frame, rpcreq); + if (saved_frame == NULL) { goto out; } @@ -258,7 +268,7 @@ __saved_frame_copy (struct saved_frames *frames, int64_t callid, } list_for_each_entry (tmp, &frames->sf.list, list) { - if (tmp->callid == callid) { + if (tmp->rpcreq->xid == callid) { *saved_frame = *tmp; ret = 0; break; @@ -277,7 +287,7 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid) struct saved_frame *tmp = NULL; list_for_each_entry (tmp, &frames->sf.list, list) { - if (tmp->callid == callid) { + if (tmp->rpcreq->xid == callid) { list_del_init (&tmp->list); frames->count--; saved_frame = tmp; @@ -300,32 +310,34 @@ saved_frames_unwind (struct saved_frames *saved_frames) struct tm *frame_sent_tm = NULL; char timestr[256] = {0,}; - struct rpc_req req; struct iovec iov = {0,}; - memset (&req, 0, sizeof (req)); - - req.rpc_status = -1; - list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) { frame_sent_tm = localtime (&trav->saved_at.tv_sec); strftime (timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S", frame_sent_tm); - snprintf (timestr + strlen (timestr), sizeof(timestr) - strlen (timestr), + snprintf (timestr + strlen (timestr), + sizeof(timestr) - strlen (timestr), ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec); gf_log ("rpc-clnt", GF_LOG_ERROR, - "forced unwinding frame type(%s) op(%s(%d)) called at %s", - trav->prog->progname, (trav->prog->procnames) ? - trav->prog->procnames[trav->procnum] : "--", - trav->procnum, timestr); + "forced unwinding frame type(%s) op(%s(%d)) " + "called at %s", + trav->rpcreq->prog->progname, + (trav->rpcreq->prog->procnames) ? + trav->rpcreq->prog->procnames[trav->rpcreq->procnum] + : "--", + trav->rpcreq->procnum, timestr); saved_frames->count--; - trav->cbkfn (&req, &iov, 1, trav->frame); + trav->rpcreq->rpc_status = -1; + trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); + rpc_clnt_reply_deinit (trav->rpcreq, + trav->rpcreq->conn->rpc_clnt->reqpool); list_del_init (&trav->list); - GF_FREE (trav); + mem_put (trav->rpcreq->conn->rpc_clnt->saved_frames_pool, trav); } } @@ -407,9 +419,9 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info) goto out; } - info->prognum = saved_frame.prog->prognum; - info->procnum = saved_frame.procnum; - info->progver = saved_frame.prog->progver; + info->prognum = saved_frame.rpcreq->prog->prognum; + info->procnum = saved_frame.rpcreq->procnum; + info->progver = saved_frame.rpcreq->prog->progver; info->rsp = saved_frame.rsp; ret = 0; @@ -490,9 +502,10 @@ int rpc_clnt_reply_fill (rpc_transport_pollin_t *msg, rpc_clnt_connection_t *conn, struct rpc_msg *replymsg, struct iovec progmsg, - struct rpc_req *req, struct saved_frame *saved_frame) + struct rpc_req *req, + struct saved_frame *saved_frame) { - int ret = -1; + int ret = -1; if ((!conn) || (!replymsg)|| (!req) || (!saved_frame) || (!msg)) { goto out; @@ -504,25 +517,14 @@ rpc_clnt_reply_fill (rpc_transport_pollin_t *msg, req->rpc_status = -1; } - req->xid = rpc_reply_xid (replymsg); - req->prog = saved_frame->prog; - req->procnum = saved_frame->procnum; - req->conn = conn; - req->rsp[0] = progmsg; + req->rsp_iobref = iobref_ref (msg->iobref); if (msg->vectored) { - req->rsp[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2); - req->rsp[1].iov_len = msg->data.vector.size2; - + req->rsp[1] = msg->vector[1]; req->rspcnt = 2; - - req->rsp_prochdr = iobuf_ref (msg->data.vector.iobuf1); - req->rsp_procpayload = iobuf_ref (msg->data.vector.iobuf2); } else { req->rspcnt = 1; - - req->rsp_prochdr = iobuf_ref (msg->data.simple.iobuf); } /* By this time, the data bytes for the auth scheme would have already @@ -544,20 +546,17 @@ out: void -rpc_clnt_reply_deinit (struct rpc_req *req) +rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool) { if (!req) { goto out; } - if (req->rsp_prochdr) { - iobuf_unref (req->rsp_prochdr); - } - - if (req->rsp_procpayload) { - iobuf_unref (req->rsp_procpayload); + if (req->rsp_iobref) { + iobref_unref (req->rsp_iobref); } + mem_put (pool, req); out: return; } @@ -574,13 +573,8 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, size_t msglen = 0; int ret = -1; - if (msg->vectored) { - msgbuf = iobuf_ptr (msg->data.vector.iobuf1); - msglen = msg->data.vector.size1; - } else { - msgbuf = iobuf_ptr (msg->data.simple.iobuf); - msglen = msg->data.simple.size; - } + msgbuf = msg->vector[0].iov_base; + msglen = msg->vector[0].iov_len; ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg, req->verf.authdata); @@ -595,10 +589,11 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, goto out; } - gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %"PRIx64", Program: %s," - " ProgVers: %d, Proc: %d", saved_frame->callid, - saved_frame->prog->progname, saved_frame->prog->progver, - saved_frame->procnum); + gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %d Program: %s," + " ProgVers: %d, Proc: %d", saved_frame->rpcreq->xid, + saved_frame->rpcreq->prog->progname, + saved_frame->rpcreq->prog->progver, + saved_frame->rpcreq->procnum); /* TODO: */ /* TODO: AUTH */ /* The verifier that is sent in a reply is a string that can be used as @@ -639,8 +634,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) struct saved_frame *saved_frame = NULL; rpc_request_info_t *request_info = NULL; int ret = -1; - struct rpc_req req = {0, }; - int cbk_ret = -1; + struct rpc_req *req = NULL; conn = &clnt->conn; @@ -657,26 +651,36 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) goto out; } - ret = rpc_clnt_reply_init (conn, pollin, &req, saved_frame); + req = saved_frame->rpcreq; + if (req == NULL) { + gf_log ("rpc-clnt", GF_LOG_CRITICAL, + "saved_frame for reply with xid (%d), " + "prog-version (%d), prog-num (%d), procnum (%d)" + "does not contain rpc-req", request_info->xid, + request_info->progver, request_info->prognum, + request_info->procnum); + goto out; + } + + ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame); if (ret != 0) { - req.rpc_status = -1; + req->rpc_status = -1; gf_log ("rpc-clnt", GF_LOG_DEBUG, "initialising rpc reply " "failed"); } - cbk_ret = saved_frame->cbkfn (&req, req.rsp, req.rspcnt, - saved_frame->frame); - - if (ret == 0) { - rpc_clnt_reply_deinit (&req); + req->cbkfn (req, req->rsp, req->rspcnt, saved_frame->frame); + + if (req) { + rpc_clnt_reply_deinit (req, conn->rpc_clnt->reqpool); } out: if (saved_frame) { - GF_FREE (saved_frame); + mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame); } - return cbk_ret; + return ret; } @@ -894,6 +898,25 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, pthread_mutex_init (&rpc->lock, NULL); rpc->ctx = ctx; + rpc->reqpool = mem_pool_new (struct rpc_req, + RPC_CLNT_DEFAULT_REQUEST_COUNT); + if (rpc->reqpool == NULL) { + pthread_mutex_destroy (&rpc->lock); + GF_FREE (rpc); + rpc = NULL; + goto out; + } + + rpc->saved_frames_pool = mem_pool_new (struct saved_frame, + RPC_CLNT_DEFAULT_REQUEST_COUNT); + if (rpc->saved_frames_pool == NULL) { + pthread_mutex_destroy (&rpc->lock); + mem_pool_destroy (rpc->reqpool); + GF_FREE (rpc); + rpc = NULL; + goto out; + } + ret = rpc_clnt_connection_init (rpc, ctx, options, name); if (ret == -1) { pthread_mutex_destroy (&rpc->lock); @@ -901,6 +924,7 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, rpc = NULL; goto out; } + out: return rpc; } @@ -1167,12 +1191,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum, fop_cbk_fn_t cbkfn, struct iovec *proghdr, int proghdrcount, struct iovec *progpayload, int progpayloadcount, - struct iobref *iobref, void *frame) + struct iobref *iobref, void *frame, struct iovec *rsphdr, + int rsphdr_count, struct iovec *rsp_payload, + int rsp_payload_count, struct iobref *rsp_iobref) { rpc_clnt_connection_t *conn = NULL; struct iobuf *request_iob = NULL; struct iovec rpchdr = {0,}; - struct rpc_req rpcreq = {0,}; + struct rpc_req *rpcreq = NULL; rpc_transport_req_t req; int ret = -1; int proglen = 0; @@ -1183,6 +1209,13 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, goto out; } + rpcreq = mem_get (rpc->reqpool); + if (rpcreq == NULL) { + gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); + goto out; + } + + memset (rpcreq, 0, sizeof (*rpcreq)); memset (&req, 0, sizeof (req)); if (!iobref) { @@ -1199,6 +1232,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, conn = &rpc->conn; + rpcreq->prog = prog; + rpcreq->procnum = procnum; + rpcreq->conn = conn; + rpcreq->xid = callid; + rpcreq->cbkfn = cbkfn; + pthread_mutex_lock (&conn->lock); { if (conn->connected == 0) { @@ -1235,6 +1274,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, req.msg.progpayloadcount = progpayloadcount; req.msg.iobref = iobref; + req.rsp.rsphdr = rsphdr; + req.rsp.rsphdr_count = rsphdr_count; + req.rsp.rsp_payload = rsp_payload; + req.rsp.rsp_payload_count = rsp_payload_count; + req.rsp.rsp_iobref = rsp_iobref; + ret = rpc_transport_submit_request (rpc->conn.trans, &req); if (ret == -1) { @@ -1245,9 +1290,8 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, if ((ret >= 0) && frame) { gettimeofday (&conn->last_sent, NULL); /* Save the frame in queue */ - __save_frame (rpc, frame, procnum, prog, cbkfn, callid); + __save_frame (rpc, frame, rpcreq); } - } unlock: pthread_mutex_unlock (&conn->lock); @@ -1266,8 +1310,9 @@ out: } if (frame && (ret == -1)) { - rpcreq.rpc_status = -1; - cbkfn (&rpcreq, NULL, 0, frame); + rpcreq->rpc_status = -1; + cbkfn (rpcreq, NULL, 0, frame); + mem_put (rpc->reqpool, rpcreq); } return ret; } diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h index efc256cd261..b9d39b3320a 100644 --- a/rpc/rpc-lib/src/rpc-clnt.h +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -58,10 +58,7 @@ struct saved_frame { void *capital_this; void *frame; struct timeval saved_at; - int32_t procnum; - struct rpc_clnt_program *prog; - fop_cbk_fn_t cbkfn; - uint64_t callid; + struct rpc_req *rpcreq; rpc_transport_rsp_t rsp; }; @@ -116,14 +113,16 @@ struct rpc_req { uint32_t xid; struct iovec req[2]; int reqcnt; + struct iobref *req_iobref; struct iovec rsp[2]; int rspcnt; - struct iobuf *rsp_prochdr; - struct iobuf *rsp_procpayload; + struct iobref *rsp_iobref; int rpc_status; rpc_auth_data_t verf; rpc_clnt_prog_t *prog; int procnum; + fop_cbk_fn_t cbkfn; + void *conn_private; }; struct rpc_clnt { @@ -132,6 +131,12 @@ struct rpc_clnt { rpc_clnt_connection_t conn; void *mydata; uint64_t xid; + + /* Memory pool for rpc_req_t */ + struct mem_pool *reqpool; + + struct mem_pool *saved_frames_pool; + glusterfs_ctx_t *ctx; }; @@ -149,11 +154,28 @@ struct rpc_clnt * rpc_clnt_init (struct rpc_clnt_config *config, int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn, void *mydata); +/* Some preconditions related to vectors holding responses. + * @rsphdr: should contain pointer to buffer which can hold response header + * and length of the program header. In case of procedures whose + * respnose size is not bounded (eg., glusterfs lookup), the length + * should be equal to size of buffer. + * @rsp_payload: should contain pointer and length of the bu + * + * 1. Both @rsp_hdr and @rsp_payload are optional. + * 2. The user of rpc_clnt_submit, if wants response hdr and payload in its own + * buffers, then it has to populate @rsphdr and @rsp_payload. + * 3. when @rsp_payload is not NULL, @rsphdr should + * also be filled with pointer to buffer to hold header and length + * of the header. + */ + int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum, fop_cbk_fn_t cbkfn, struct iovec *proghdr, int proghdrcount, struct iovec *progpayload, int progpayloadcount, - struct iobref *iobref, void *frame); + struct iobref *iobref, void *frame, struct iovec *rsphdr, + int rsphdr_count, struct iovec *rsp_payload, + int rsp_payload_count, struct iobref *rsp_iobref); void rpc_clnt_destroy (struct rpc_clnt *rpc); diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index b77ea2aa553..50379c14950 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -628,20 +628,10 @@ rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin) goto out; } - if (pollin->vectored) { - if (pollin->data.vector.iobuf1) { - iobuf_unref (pollin->data.vector.iobuf1); - } - - if (pollin->data.vector.iobuf2) { - iobuf_unref (pollin->data.vector.iobuf2); - } - } else { - if (pollin->data.simple.iobuf) { - iobuf_unref (pollin->data.simple.iobuf); - } + if (pollin->iobref) { + iobref_unref (pollin->iobref); } - + if (pollin->private) { /* */ GF_FREE (pollin->private); @@ -654,9 +644,8 @@ out: rpc_transport_pollin_t * -rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf, - size_t size, struct iobuf *vectored_buf, - size_t vectored_size, void *private) +rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector, + int count, struct iobref *iobref, void *private) { rpc_transport_pollin_t *msg = NULL; msg = GF_CALLOC (1, sizeof (*msg), gf_common_mt_rpc_trans_pollin_t); @@ -665,19 +654,15 @@ rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf, goto out; } - if (vectored_buf) { + if (count == 2) { msg->vectored = 1; - msg->data.vector.iobuf1 = iobuf_ref (iobuf); - msg->data.vector.size1 = size; - - msg->data.vector.iobuf2 = iobuf_ref (vectored_buf); - msg->data.vector.size2 = vectored_size; - } else { - msg->data.simple.iobuf = iobuf_ref (iobuf); - msg->data.simple.size = size; } + memcpy (msg->vector, vector, count * sizeof (*vector)); + msg->count = count; + msg->iobref = iobref_ref (iobref); msg->private = private; + out: return msg; } @@ -698,6 +683,7 @@ rpc_transport_same_process_pollin_alloc (rpc_transport_t *this, int progpayloadlen = 0; char vectored = 0; char *hdr = NULL, *progpayloadbuf = NULL; + struct iobuf *iobuf = NULL; if (!rpchdr || !proghdr) { goto err; @@ -729,47 +715,72 @@ rpc_transport_same_process_pollin_alloc (rpc_transport_t *this, } if (vectored) { - msg->data.vector.iobuf1 = iobuf_get (this->ctx->iobuf_pool); - if (!msg->data.vector.iobuf1) { + msg->iobref = iobref_new (); + if (!msg->iobref) { + gf_log ("rpc-transport", GF_LOG_ERROR, + "out of memory"); + goto err; + } + + iobuf = iobuf_get (this->ctx->iobuf_pool); + if (!iobuf) { gf_log ("rpc_transport", GF_LOG_ERROR, "out of memory"); goto err; } - msg->data.vector.size1 = rpchdrlen + proghdrlen; - hdr = iobuf_ptr (msg->data.vector.iobuf1); + iobref_add (msg->iobref, iobuf); + iobuf_unref (iobuf); + + msg->vector[0].iov_len = rpchdrlen + proghdrlen; + msg->vector[0].iov_base = hdr = iobuf_ptr (iobuf); if (!is_request && rsp) { - msg->data.vector.iobuf2 = rsp->rspbuf; - progpayloadbuf = rsp->rspvec->iov_base; + msg->vector[1] = rsp->rsp_payload[0]; + progpayloadbuf = rsp->rsp_payload[0].iov_base; } else { - msg->data.vector.iobuf2 = iobuf_get (this->ctx->iobuf_pool); - if (!msg->data.vector.iobuf2) { + iobuf = iobuf_get (this->ctx->iobuf_pool); + if (!iobuf) { gf_log ("rpc_transport", GF_LOG_ERROR, "out of memory"); goto err; } - progpayloadbuf = iobuf_ptr (msg->data.vector.iobuf2); + iobref_add (msg->iobref, iobuf); + iobuf_unref (iobuf); + + msg->vector[1].iov_base + = progpayloadbuf = iobuf_ptr (iobuf); } - msg->data.vector.size2 = progpayloadlen; + msg->vector[1].iov_len = progpayloadlen; } else { if (!is_request && rsp) { /* FIXME: Assuming rspvec contains only one vector */ - hdr = rsp->rspvec->iov_base; - msg->data.simple.iobuf = rsp->rspbuf; + hdr = rsp->rsphdr[0].iov_base; + msg->vector[0] = rsp->rsphdr[0]; } else { - msg->data.simple.iobuf = iobuf_get (this->ctx->iobuf_pool); - if (!msg->data.simple.iobuf) { + msg->iobref = iobref_new (); + if (!msg->iobref) { + gf_log ("rpc-transport", GF_LOG_ERROR, + "out of memory"); + goto err; + } + + iobuf = iobuf_get (this->ctx->iobuf_pool); + if (!iobuf) { gf_log ("rpc_transport", GF_LOG_ERROR, "out of memory"); goto err; } - hdr = iobuf_ptr (msg->data.simple.iobuf); + iobref_add (msg->iobref, iobuf); + iobuf_unref (iobuf); + + hdr = iobuf_ptr (iobuf); + msg->vector[0].iov_base = hdr; } - msg->data.simple.size = rpchdrlen + proghdrlen; + msg->vector[0].iov_len = rpchdrlen + proghdrlen; } iov_unload (hdr, rpchdr, rpchdrcount); @@ -1253,7 +1264,8 @@ rpc_transport_peerproc (void *trans_data) } pthread_mutex_unlock (&trans->handover.mutex); - rpc_transport_notify (trans, RPC_TRANSPORT_MSG_RECEIVED, msg->pollin); + rpc_transport_notify (trans, RPC_TRANSPORT_MSG_RECEIVED, + msg->pollin); rpc_transport_handover_destroy (msg); } } diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index b3c7985e095..5698b287cce 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -30,6 +30,10 @@ #include <rpc/auth.h> #include <rpc/rpc_msg.h> +#ifndef MAX_IOVEC +#define MAX_IOVEC 16 +#endif + /* Given the 4-byte fragment header, returns non-zero if this fragment * is the last fragment for the RPC record being assemebled. * RPC Record marking standard defines a 32 bit value as the fragment @@ -108,12 +112,11 @@ struct rpc_transport_msg { typedef struct rpc_transport_msg rpc_transport_msg_t; struct rpc_transport_rsp { - /* as of now, the entire rsp payload is read into rspbuf and hence - * rspcount is always set to one. - */ - struct iovec *rspvec; - int rspcount; - struct iobuf *rspbuf; + struct iovec *rsphdr; + int rsphdr_count; + struct iovec *rsp_payload; + int rsp_payload_count; + struct iobref *rsp_iobref; }; typedef struct rpc_transport_rsp rpc_transport_rsp_t; @@ -129,6 +132,15 @@ struct rpc_transport_reply { }; typedef struct rpc_transport_reply rpc_transport_reply_t; +struct rpc_transport_data { + char is_request; + union { + rpc_transport_req_t req; + rpc_transport_reply_t reply; + } data; +}; +typedef struct rpc_transport_data rpc_transport_data_t; + struct rpc_request_info { uint32_t xid; int prognum; @@ -140,20 +152,11 @@ typedef struct rpc_request_info rpc_request_info_t; struct rpc_transport_pollin { - union { - struct vectored { - struct iobuf *iobuf1; - size_t size1; - struct iobuf *iobuf2; - size_t size2; - } vector; - struct simple { - struct iobuf *iobuf; - size_t size; - } simple; - } data; + struct iovec vector[2]; + int count; char vectored; void *private; + struct iobref *iobref; }; typedef struct rpc_transport_pollin rpc_transport_pollin_t; @@ -279,9 +282,8 @@ rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr *sa, size_t salen); rpc_transport_pollin_t * -rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf, - size_t iobuf_size, struct iobuf *vectoriob, - size_t vectoriob_size, void *private); +rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector, + int count, struct iobref *iobref, void *private); void rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin); diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 9bb7e0e4c9e..5bb908cec0a 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -1021,12 +1021,8 @@ rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req) goto out; } - if (req->recordiob) { - iobuf_unref (req->recordiob); - } - - if (req->vectorediob) { - iobuf_unref (req->vectorediob); + if (req->iobref) { + iobref_unref (req->iobref); } mem_put (conn->rxpool, req); @@ -1050,15 +1046,11 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg, req->progver = rpc_call_progver (callmsg); req->procnum = rpc_call_progproc (callmsg); req->conn = conn; + req->count = msg->count; req->msg[0] = progmsg; + req->iobref = iobref_ref (msg->iobref); if (msg->vectored) { - req->msg[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2); - req->msg[1].iov_len = msg->data.vector.size2; - - req->recordiob = iobuf_ref (msg->data.vector.iobuf1); - req->vectorediob = iobuf_ref (msg->data.vector.iobuf2); - } else { - req->recordiob = iobuf_ref (msg->data.simple.iobuf); + req->msg[1] = msg->vector[1]; } req->trans_private = msg->private; @@ -1106,13 +1098,8 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) goto err; } - if (msg->vectored) { - msgbuf = iobuf_ptr (msg->data.vector.iobuf1); - msglen = msg->data.vector.size1; - } else { - msgbuf = iobuf_ptr (msg->data.simple.iobuf); - msglen = msg->data.simple.size; - } + msgbuf = msg->vector[0].iov_base; + msglen = msg->vector[0].iov_len; ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, req->cred.authdata,req->verf.authdata); @@ -1190,11 +1177,11 @@ rpcsvc_handle_rpc_call (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) goto err_reply; if (actor && (req->rpc_err == SUCCESS)) { - if (req->vectorediob) { + if (req->count == 2) { if (actor->vector_actor) { rpcsvc_conn_ref (conn); - ret = actor->vector_actor (req, - req->vectorediob); + ret = actor->vector_actor (req, &req->msg[1], 1, + req->iobref); } else { rpcsvc_request_seterr (req, PROC_UNAVAIL); gf_log (GF_RPCSVC, GF_LOG_ERROR, diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 10dc32698ad..5a3f3cd3449 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -237,21 +237,9 @@ struct rpcsvc_request { * be de-xdred by the actor. */ struct iovec msg[2]; + int count; - /* The full message buffer allocated to store the RPC headers. - * This buffer is ref'd when allocated why RPC svc and unref'd after - * the buffer is handed to the actor. That means if the actor or any - * higher layer wants to keep this buffer around, they too must ref it - * right after entering the program actor. - */ - struct iobuf *recordiob; - - /* iobuf to hold payload of calls like write. By storing large payloads - * starting from page-aligned addresses, performance increases while - * accessing the payload - */ - struct iobuf *vectorediob; - + struct iobref *iobref; /* Status of the RPC call, whether it was accepted or denied. */ int rpc_status; @@ -317,7 +305,6 @@ struct rpcsvc_request { #define rpcsvc_request_private(req) ((req)->private) #define rpcsvc_request_xid(req) ((req)->xid) #define rpcsvc_request_set_private(req,prv) (req)->private = (void *)(prv) -#define rpcsvc_request_record_iob(rq) ((rq)->recordiob) #define rpcsvc_request_record_ref(req) (iobuf_ref ((req)->recordiob)) #define rpcsvc_request_record_unref(req) (iobuf_unref ((req)->recordiob)) @@ -338,7 +325,8 @@ struct rpcsvc_request { * */ typedef int (*rpcsvc_actor) (rpcsvc_request_t *req); -typedef int (*rpcsvc_vector_actor) (rpcsvc_request_t *req, struct iobuf *iob); +typedef int (*rpcsvc_vector_actor) (rpcsvc_request_t *req, struct iovec *vec, + int count, struct iobref *iobref); typedef int (*rpcsvc_vector_sizer) (rpcsvc_request_t *req, ssize_t *readsize, int *newiob); diff --git a/rpc/rpc-lib/src/xdr-common.h b/rpc/rpc-lib/src/xdr-common.h index 7ba1372529c..b3ce29e5dbe 100644 --- a/rpc/rpc-lib/src/xdr-common.h +++ b/rpc/rpc-lib/src/xdr-common.h @@ -50,12 +50,12 @@ struct auth_glusterfs_parms { u_int gid; u_int ngrps; u_int groups[16]; -}; +} __attribute__((packed)); typedef struct auth_glusterfs_parms auth_glusterfs_parms; struct gf_dump_req { u_quad_t gfs_id; -}; +} __attribute__((packed)); typedef struct gf_dump_req gf_dump_req; struct gf_prog_detail { @@ -63,7 +63,7 @@ struct gf_prog_detail { u_quad_t prognum; u_quad_t progver; struct gf_prog_detail *next; -}; +} __attribute__((packed)); typedef struct gf_prog_detail gf_prog_detail; struct gf_dump_rsp { @@ -71,7 +71,7 @@ struct gf_dump_rsp { int op_ret; int op_errno; struct gf_prog_detail *prog; -}; +}__attribute__((packed)); typedef struct gf_dump_rsp gf_dump_rsp; extern bool_t |