summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src')
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c219
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.h36
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c96
-rw-r--r--rpc/rpc-lib/src/rpc-transport.h44
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c33
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h20
-rw-r--r--rpc/rpc-lib/src/xdr-common.h8
7 files changed, 256 insertions, 200 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 9b0bfe33d5b..fce3e8200fe 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -23,10 +23,16 @@
#include "config.h"
#endif
+#define RPC_CLNT_DEFAULT_REQUEST_COUNT 4096
+
#include "rpc-clnt.h"
#include "xdr-rpcclnt.h"
#include "rpc-transport.h"
#include "protocol-common.h"
+#include "mem-pool.h"
+
+void
+rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool);
uint64_t
rpc_clnt_new_callid (struct rpc_clnt *clnt)
@@ -63,28 +69,24 @@ __saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout,
struct saved_frame *
-__saved_frames_put (struct saved_frames *frames, void *frame, int32_t procnum,
- rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, int64_t callid)
+__saved_frames_put (struct saved_frames *frames, void *frame,
+ struct rpc_req *rpcreq)
{
struct saved_frame *saved_frame = NULL;
- saved_frame = GF_CALLOC (1, sizeof (*saved_frame),
- gf_common_mt_rpcclnt_savedframe_t);
+ saved_frame = mem_get (rpcreq->conn->rpc_clnt->saved_frames_pool);
if (!saved_frame) {
gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
goto out;
}
/* THIS should be saved and set back */
+ memset (saved_frame, 0, sizeof (*saved_frame));
INIT_LIST_HEAD (&saved_frame->list);
saved_frame->capital_this = THIS;
saved_frame->frame = frame;
- saved_frame->procnum = procnum;
- saved_frame->callid = callid;
- saved_frame->prog = prog;
- saved_frame->cbkfn = cbk;
-
+ saved_frame->rpcreq = rpcreq;
gettimeofday (&saved_frame->saved_at, NULL);
list_add_tail (&saved_frame->list, &frames->sf.list);
@@ -110,7 +112,12 @@ saved_frames_delete (struct saved_frame *saved_frame,
}
pthread_mutex_unlock (&conn->lock);
- GF_FREE (saved_frame);
+ if (saved_frame->rpcreq != NULL) {
+ rpc_clnt_reply_deinit (saved_frame->rpcreq,
+ conn->rpc_clnt->reqpool);
+ }
+
+ mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame);
out:
return;
}
@@ -129,7 +136,6 @@ call_bail (void *data)
struct tm frame_sent_tm;
char frame_sent[32] = {0,};
struct timeval timeout = {0,};
- struct rpc_req req;
struct iovec iov = {0,};
GF_VALIDATE_OR_GOTO ("client", data, out);
@@ -180,15 +186,19 @@ call_bail (void *data)
gf_log (conn->trans->name, GF_LOG_ERROR,
"bailing out frame type(%s) op(%s(%d)) sent = %s. "
"timeout = %d",
- trav->prog->progname, (trav->prog->procnames) ?
- trav->prog->procnames[trav->procnum] : "--",
- trav->procnum, frame_sent,
+ trav->rpcreq->prog->progname,
+ (trav->rpcreq->prog->procnames) ?
+ trav->rpcreq->prog->procnames[trav->rpcreq->procnum] :
+ "--",
+ trav->rpcreq->procnum, frame_sent,
conn->frame_timeout);
- trav->cbkfn (&req, &iov, 1, trav->frame);
+ trav->rpcreq->rpc_status = -1;
+ trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame);
+ rpc_clnt_reply_deinit (trav->rpcreq, clnt->reqpool);
list_del_init (&trav->list);
- GF_FREE (trav);
+ mem_put (conn->rpc_clnt->saved_frames_pool, trav);
}
out:
return;
@@ -197,8 +207,8 @@ out:
/* to be called with conn->lock held */
struct saved_frame *
-__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum,
- rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, uint64_t callid)
+__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,
+ struct rpc_req *rpcreq)
{
rpc_clnt_connection_t *conn = NULL;
struct timeval timeout = {0, };
@@ -206,8 +216,8 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum,
conn = &rpc_clnt->conn;
- saved_frame = __saved_frames_put (conn->saved_frames, frame,
- procnum, prog, cbk, callid);
+ saved_frame = __saved_frames_put (conn->saved_frames, frame, rpcreq);
+
if (saved_frame == NULL) {
goto out;
}
@@ -258,7 +268,7 @@ __saved_frame_copy (struct saved_frames *frames, int64_t callid,
}
list_for_each_entry (tmp, &frames->sf.list, list) {
- if (tmp->callid == callid) {
+ if (tmp->rpcreq->xid == callid) {
*saved_frame = *tmp;
ret = 0;
break;
@@ -277,7 +287,7 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid)
struct saved_frame *tmp = NULL;
list_for_each_entry (tmp, &frames->sf.list, list) {
- if (tmp->callid == callid) {
+ if (tmp->rpcreq->xid == callid) {
list_del_init (&tmp->list);
frames->count--;
saved_frame = tmp;
@@ -300,32 +310,34 @@ saved_frames_unwind (struct saved_frames *saved_frames)
struct tm *frame_sent_tm = NULL;
char timestr[256] = {0,};
- struct rpc_req req;
struct iovec iov = {0,};
- memset (&req, 0, sizeof (req));
-
- req.rpc_status = -1;
-
list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) {
frame_sent_tm = localtime (&trav->saved_at.tv_sec);
strftime (timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S",
frame_sent_tm);
- snprintf (timestr + strlen (timestr), sizeof(timestr) - strlen (timestr),
+ snprintf (timestr + strlen (timestr),
+ sizeof(timestr) - strlen (timestr),
".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec);
gf_log ("rpc-clnt", GF_LOG_ERROR,
- "forced unwinding frame type(%s) op(%s(%d)) called at %s",
- trav->prog->progname, (trav->prog->procnames) ?
- trav->prog->procnames[trav->procnum] : "--",
- trav->procnum, timestr);
+ "forced unwinding frame type(%s) op(%s(%d)) "
+ "called at %s",
+ trav->rpcreq->prog->progname,
+ (trav->rpcreq->prog->procnames) ?
+ trav->rpcreq->prog->procnames[trav->rpcreq->procnum]
+ : "--",
+ trav->rpcreq->procnum, timestr);
saved_frames->count--;
- trav->cbkfn (&req, &iov, 1, trav->frame);
+ trav->rpcreq->rpc_status = -1;
+ trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame);
+ rpc_clnt_reply_deinit (trav->rpcreq,
+ trav->rpcreq->conn->rpc_clnt->reqpool);
list_del_init (&trav->list);
- GF_FREE (trav);
+ mem_put (trav->rpcreq->conn->rpc_clnt->saved_frames_pool, trav);
}
}
@@ -407,9 +419,9 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info)
goto out;
}
- info->prognum = saved_frame.prog->prognum;
- info->procnum = saved_frame.procnum;
- info->progver = saved_frame.prog->progver;
+ info->prognum = saved_frame.rpcreq->prog->prognum;
+ info->procnum = saved_frame.rpcreq->procnum;
+ info->progver = saved_frame.rpcreq->prog->progver;
info->rsp = saved_frame.rsp;
ret = 0;
@@ -490,9 +502,10 @@ int
rpc_clnt_reply_fill (rpc_transport_pollin_t *msg,
rpc_clnt_connection_t *conn,
struct rpc_msg *replymsg, struct iovec progmsg,
- struct rpc_req *req, struct saved_frame *saved_frame)
+ struct rpc_req *req,
+ struct saved_frame *saved_frame)
{
- int ret = -1;
+ int ret = -1;
if ((!conn) || (!replymsg)|| (!req) || (!saved_frame) || (!msg)) {
goto out;
@@ -504,25 +517,14 @@ rpc_clnt_reply_fill (rpc_transport_pollin_t *msg,
req->rpc_status = -1;
}
- req->xid = rpc_reply_xid (replymsg);
- req->prog = saved_frame->prog;
- req->procnum = saved_frame->procnum;
- req->conn = conn;
-
req->rsp[0] = progmsg;
+ req->rsp_iobref = iobref_ref (msg->iobref);
if (msg->vectored) {
- req->rsp[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2);
- req->rsp[1].iov_len = msg->data.vector.size2;
-
+ req->rsp[1] = msg->vector[1];
req->rspcnt = 2;
-
- req->rsp_prochdr = iobuf_ref (msg->data.vector.iobuf1);
- req->rsp_procpayload = iobuf_ref (msg->data.vector.iobuf2);
} else {
req->rspcnt = 1;
-
- req->rsp_prochdr = iobuf_ref (msg->data.simple.iobuf);
}
/* By this time, the data bytes for the auth scheme would have already
@@ -544,20 +546,17 @@ out:
void
-rpc_clnt_reply_deinit (struct rpc_req *req)
+rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool)
{
if (!req) {
goto out;
}
- if (req->rsp_prochdr) {
- iobuf_unref (req->rsp_prochdr);
- }
-
- if (req->rsp_procpayload) {
- iobuf_unref (req->rsp_procpayload);
+ if (req->rsp_iobref) {
+ iobref_unref (req->rsp_iobref);
}
+ mem_put (pool, req);
out:
return;
}
@@ -574,13 +573,8 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
size_t msglen = 0;
int ret = -1;
- if (msg->vectored) {
- msgbuf = iobuf_ptr (msg->data.vector.iobuf1);
- msglen = msg->data.vector.size1;
- } else {
- msgbuf = iobuf_ptr (msg->data.simple.iobuf);
- msglen = msg->data.simple.size;
- }
+ msgbuf = msg->vector[0].iov_base;
+ msglen = msg->vector[0].iov_len;
ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg,
req->verf.authdata);
@@ -595,10 +589,11 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
goto out;
}
- gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %"PRIx64", Program: %s,"
- " ProgVers: %d, Proc: %d", saved_frame->callid,
- saved_frame->prog->progname, saved_frame->prog->progver,
- saved_frame->procnum);
+ gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %d Program: %s,"
+ " ProgVers: %d, Proc: %d", saved_frame->rpcreq->xid,
+ saved_frame->rpcreq->prog->progname,
+ saved_frame->rpcreq->prog->progver,
+ saved_frame->rpcreq->procnum);
/* TODO: */
/* TODO: AUTH */
/* The verifier that is sent in a reply is a string that can be used as
@@ -639,8 +634,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
struct saved_frame *saved_frame = NULL;
rpc_request_info_t *request_info = NULL;
int ret = -1;
- struct rpc_req req = {0, };
- int cbk_ret = -1;
+ struct rpc_req *req = NULL;
conn = &clnt->conn;
@@ -657,26 +651,36 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
goto out;
}
- ret = rpc_clnt_reply_init (conn, pollin, &req, saved_frame);
+ req = saved_frame->rpcreq;
+ if (req == NULL) {
+ gf_log ("rpc-clnt", GF_LOG_CRITICAL,
+ "saved_frame for reply with xid (%d), "
+ "prog-version (%d), prog-num (%d), procnum (%d)"
+ "does not contain rpc-req", request_info->xid,
+ request_info->progver, request_info->prognum,
+ request_info->procnum);
+ goto out;
+ }
+
+ ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame);
if (ret != 0) {
- req.rpc_status = -1;
+ req->rpc_status = -1;
gf_log ("rpc-clnt", GF_LOG_DEBUG, "initialising rpc reply "
"failed");
}
- cbk_ret = saved_frame->cbkfn (&req, req.rsp, req.rspcnt,
- saved_frame->frame);
-
- if (ret == 0) {
- rpc_clnt_reply_deinit (&req);
+ req->cbkfn (req, req->rsp, req->rspcnt, saved_frame->frame);
+
+ if (req) {
+ rpc_clnt_reply_deinit (req, conn->rpc_clnt->reqpool);
}
out:
if (saved_frame) {
- GF_FREE (saved_frame);
+ mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame);
}
- return cbk_ret;
+ return ret;
}
@@ -894,6 +898,25 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,
pthread_mutex_init (&rpc->lock, NULL);
rpc->ctx = ctx;
+ rpc->reqpool = mem_pool_new (struct rpc_req,
+ RPC_CLNT_DEFAULT_REQUEST_COUNT);
+ if (rpc->reqpool == NULL) {
+ pthread_mutex_destroy (&rpc->lock);
+ GF_FREE (rpc);
+ rpc = NULL;
+ goto out;
+ }
+
+ rpc->saved_frames_pool = mem_pool_new (struct saved_frame,
+ RPC_CLNT_DEFAULT_REQUEST_COUNT);
+ if (rpc->saved_frames_pool == NULL) {
+ pthread_mutex_destroy (&rpc->lock);
+ mem_pool_destroy (rpc->reqpool);
+ GF_FREE (rpc);
+ rpc = NULL;
+ goto out;
+ }
+
ret = rpc_clnt_connection_init (rpc, ctx, options, name);
if (ret == -1) {
pthread_mutex_destroy (&rpc->lock);
@@ -901,6 +924,7 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,
rpc = NULL;
goto out;
}
+
out:
return rpc;
}
@@ -1167,12 +1191,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
int procnum, fop_cbk_fn_t cbkfn,
struct iovec *proghdr, int proghdrcount,
struct iovec *progpayload, int progpayloadcount,
- struct iobref *iobref, void *frame)
+ struct iobref *iobref, void *frame, struct iovec *rsphdr,
+ int rsphdr_count, struct iovec *rsp_payload,
+ int rsp_payload_count, struct iobref *rsp_iobref)
{
rpc_clnt_connection_t *conn = NULL;
struct iobuf *request_iob = NULL;
struct iovec rpchdr = {0,};
- struct rpc_req rpcreq = {0,};
+ struct rpc_req *rpcreq = NULL;
rpc_transport_req_t req;
int ret = -1;
int proglen = 0;
@@ -1183,6 +1209,13 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
goto out;
}
+ rpcreq = mem_get (rpc->reqpool);
+ if (rpcreq == NULL) {
+ gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
+ goto out;
+ }
+
+ memset (rpcreq, 0, sizeof (*rpcreq));
memset (&req, 0, sizeof (req));
if (!iobref) {
@@ -1199,6 +1232,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
conn = &rpc->conn;
+ rpcreq->prog = prog;
+ rpcreq->procnum = procnum;
+ rpcreq->conn = conn;
+ rpcreq->xid = callid;
+ rpcreq->cbkfn = cbkfn;
+
pthread_mutex_lock (&conn->lock);
{
if (conn->connected == 0) {
@@ -1235,6 +1274,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
req.msg.progpayloadcount = progpayloadcount;
req.msg.iobref = iobref;
+ req.rsp.rsphdr = rsphdr;
+ req.rsp.rsphdr_count = rsphdr_count;
+ req.rsp.rsp_payload = rsp_payload;
+ req.rsp.rsp_payload_count = rsp_payload_count;
+ req.rsp.rsp_iobref = rsp_iobref;
+
ret = rpc_transport_submit_request (rpc->conn.trans,
&req);
if (ret == -1) {
@@ -1245,9 +1290,8 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
if ((ret >= 0) && frame) {
gettimeofday (&conn->last_sent, NULL);
/* Save the frame in queue */
- __save_frame (rpc, frame, procnum, prog, cbkfn, callid);
+ __save_frame (rpc, frame, rpcreq);
}
-
}
unlock:
pthread_mutex_unlock (&conn->lock);
@@ -1266,8 +1310,9 @@ out:
}
if (frame && (ret == -1)) {
- rpcreq.rpc_status = -1;
- cbkfn (&rpcreq, NULL, 0, frame);
+ rpcreq->rpc_status = -1;
+ cbkfn (rpcreq, NULL, 0, frame);
+ mem_put (rpc->reqpool, rpcreq);
}
return ret;
}
diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h
index efc256cd261..b9d39b3320a 100644
--- a/rpc/rpc-lib/src/rpc-clnt.h
+++ b/rpc/rpc-lib/src/rpc-clnt.h
@@ -58,10 +58,7 @@ struct saved_frame {
void *capital_this;
void *frame;
struct timeval saved_at;
- int32_t procnum;
- struct rpc_clnt_program *prog;
- fop_cbk_fn_t cbkfn;
- uint64_t callid;
+ struct rpc_req *rpcreq;
rpc_transport_rsp_t rsp;
};
@@ -116,14 +113,16 @@ struct rpc_req {
uint32_t xid;
struct iovec req[2];
int reqcnt;
+ struct iobref *req_iobref;
struct iovec rsp[2];
int rspcnt;
- struct iobuf *rsp_prochdr;
- struct iobuf *rsp_procpayload;
+ struct iobref *rsp_iobref;
int rpc_status;
rpc_auth_data_t verf;
rpc_clnt_prog_t *prog;
int procnum;
+ fop_cbk_fn_t cbkfn;
+ void *conn_private;
};
struct rpc_clnt {
@@ -132,6 +131,12 @@ struct rpc_clnt {
rpc_clnt_connection_t conn;
void *mydata;
uint64_t xid;
+
+ /* Memory pool for rpc_req_t */
+ struct mem_pool *reqpool;
+
+ struct mem_pool *saved_frames_pool;
+
glusterfs_ctx_t *ctx;
};
@@ -149,11 +154,28 @@ struct rpc_clnt * rpc_clnt_init (struct rpc_clnt_config *config,
int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn,
void *mydata);
+/* Some preconditions related to vectors holding responses.
+ * @rsphdr: should contain pointer to buffer which can hold response header
+ * and length of the program header. In case of procedures whose
+ * respnose size is not bounded (eg., glusterfs lookup), the length
+ * should be equal to size of buffer.
+ * @rsp_payload: should contain pointer and length of the bu
+ *
+ * 1. Both @rsp_hdr and @rsp_payload are optional.
+ * 2. The user of rpc_clnt_submit, if wants response hdr and payload in its own
+ * buffers, then it has to populate @rsphdr and @rsp_payload.
+ * 3. when @rsp_payload is not NULL, @rsphdr should
+ * also be filled with pointer to buffer to hold header and length
+ * of the header.
+ */
+
int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
int procnum, fop_cbk_fn_t cbkfn,
struct iovec *proghdr, int proghdrcount,
struct iovec *progpayload, int progpayloadcount,
- struct iobref *iobref, void *frame);
+ struct iobref *iobref, void *frame, struct iovec *rsphdr,
+ int rsphdr_count, struct iovec *rsp_payload,
+ int rsp_payload_count, struct iobref *rsp_iobref);
void rpc_clnt_destroy (struct rpc_clnt *rpc);
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index b77ea2aa553..50379c14950 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -628,20 +628,10 @@ rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin)
goto out;
}
- if (pollin->vectored) {
- if (pollin->data.vector.iobuf1) {
- iobuf_unref (pollin->data.vector.iobuf1);
- }
-
- if (pollin->data.vector.iobuf2) {
- iobuf_unref (pollin->data.vector.iobuf2);
- }
- } else {
- if (pollin->data.simple.iobuf) {
- iobuf_unref (pollin->data.simple.iobuf);
- }
+ if (pollin->iobref) {
+ iobref_unref (pollin->iobref);
}
-
+
if (pollin->private) {
/* */
GF_FREE (pollin->private);
@@ -654,9 +644,8 @@ out:
rpc_transport_pollin_t *
-rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf,
- size_t size, struct iobuf *vectored_buf,
- size_t vectored_size, void *private)
+rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector,
+ int count, struct iobref *iobref, void *private)
{
rpc_transport_pollin_t *msg = NULL;
msg = GF_CALLOC (1, sizeof (*msg), gf_common_mt_rpc_trans_pollin_t);
@@ -665,19 +654,15 @@ rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf,
goto out;
}
- if (vectored_buf) {
+ if (count == 2) {
msg->vectored = 1;
- msg->data.vector.iobuf1 = iobuf_ref (iobuf);
- msg->data.vector.size1 = size;
-
- msg->data.vector.iobuf2 = iobuf_ref (vectored_buf);
- msg->data.vector.size2 = vectored_size;
- } else {
- msg->data.simple.iobuf = iobuf_ref (iobuf);
- msg->data.simple.size = size;
}
+ memcpy (msg->vector, vector, count * sizeof (*vector));
+ msg->count = count;
+ msg->iobref = iobref_ref (iobref);
msg->private = private;
+
out:
return msg;
}
@@ -698,6 +683,7 @@ rpc_transport_same_process_pollin_alloc (rpc_transport_t *this,
int progpayloadlen = 0;
char vectored = 0;
char *hdr = NULL, *progpayloadbuf = NULL;
+ struct iobuf *iobuf = NULL;
if (!rpchdr || !proghdr) {
goto err;
@@ -729,47 +715,72 @@ rpc_transport_same_process_pollin_alloc (rpc_transport_t *this,
}
if (vectored) {
- msg->data.vector.iobuf1 = iobuf_get (this->ctx->iobuf_pool);
- if (!msg->data.vector.iobuf1) {
+ msg->iobref = iobref_new ();
+ if (!msg->iobref) {
+ gf_log ("rpc-transport", GF_LOG_ERROR,
+ "out of memory");
+ goto err;
+ }
+
+ iobuf = iobuf_get (this->ctx->iobuf_pool);
+ if (!iobuf) {
gf_log ("rpc_transport", GF_LOG_ERROR,
"out of memory");
goto err;
}
- msg->data.vector.size1 = rpchdrlen + proghdrlen;
- hdr = iobuf_ptr (msg->data.vector.iobuf1);
+ iobref_add (msg->iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ msg->vector[0].iov_len = rpchdrlen + proghdrlen;
+ msg->vector[0].iov_base = hdr = iobuf_ptr (iobuf);
if (!is_request && rsp) {
- msg->data.vector.iobuf2 = rsp->rspbuf;
- progpayloadbuf = rsp->rspvec->iov_base;
+ msg->vector[1] = rsp->rsp_payload[0];
+ progpayloadbuf = rsp->rsp_payload[0].iov_base;
} else {
- msg->data.vector.iobuf2 = iobuf_get (this->ctx->iobuf_pool);
- if (!msg->data.vector.iobuf2) {
+ iobuf = iobuf_get (this->ctx->iobuf_pool);
+ if (!iobuf) {
gf_log ("rpc_transport", GF_LOG_ERROR,
"out of memory");
goto err;
}
- progpayloadbuf = iobuf_ptr (msg->data.vector.iobuf2);
+ iobref_add (msg->iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ msg->vector[1].iov_base
+ = progpayloadbuf = iobuf_ptr (iobuf);
}
- msg->data.vector.size2 = progpayloadlen;
+ msg->vector[1].iov_len = progpayloadlen;
} else {
if (!is_request && rsp) {
/* FIXME: Assuming rspvec contains only one vector */
- hdr = rsp->rspvec->iov_base;
- msg->data.simple.iobuf = rsp->rspbuf;
+ hdr = rsp->rsphdr[0].iov_base;
+ msg->vector[0] = rsp->rsphdr[0];
} else {
- msg->data.simple.iobuf = iobuf_get (this->ctx->iobuf_pool);
- if (!msg->data.simple.iobuf) {
+ msg->iobref = iobref_new ();
+ if (!msg->iobref) {
+ gf_log ("rpc-transport", GF_LOG_ERROR,
+ "out of memory");
+ goto err;
+ }
+
+ iobuf = iobuf_get (this->ctx->iobuf_pool);
+ if (!iobuf) {
gf_log ("rpc_transport", GF_LOG_ERROR,
"out of memory");
goto err;
}
- hdr = iobuf_ptr (msg->data.simple.iobuf);
+ iobref_add (msg->iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ hdr = iobuf_ptr (iobuf);
+ msg->vector[0].iov_base = hdr;
}
- msg->data.simple.size = rpchdrlen + proghdrlen;
+ msg->vector[0].iov_len = rpchdrlen + proghdrlen;
}
iov_unload (hdr, rpchdr, rpchdrcount);
@@ -1253,7 +1264,8 @@ rpc_transport_peerproc (void *trans_data)
}
pthread_mutex_unlock (&trans->handover.mutex);
- rpc_transport_notify (trans, RPC_TRANSPORT_MSG_RECEIVED, msg->pollin);
+ rpc_transport_notify (trans, RPC_TRANSPORT_MSG_RECEIVED,
+ msg->pollin);
rpc_transport_handover_destroy (msg);
}
}
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index b3c7985e095..5698b287cce 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -30,6 +30,10 @@
#include <rpc/auth.h>
#include <rpc/rpc_msg.h>
+#ifndef MAX_IOVEC
+#define MAX_IOVEC 16
+#endif
+
/* Given the 4-byte fragment header, returns non-zero if this fragment
* is the last fragment for the RPC record being assemebled.
* RPC Record marking standard defines a 32 bit value as the fragment
@@ -108,12 +112,11 @@ struct rpc_transport_msg {
typedef struct rpc_transport_msg rpc_transport_msg_t;
struct rpc_transport_rsp {
- /* as of now, the entire rsp payload is read into rspbuf and hence
- * rspcount is always set to one.
- */
- struct iovec *rspvec;
- int rspcount;
- struct iobuf *rspbuf;
+ struct iovec *rsphdr;
+ int rsphdr_count;
+ struct iovec *rsp_payload;
+ int rsp_payload_count;
+ struct iobref *rsp_iobref;
};
typedef struct rpc_transport_rsp rpc_transport_rsp_t;
@@ -129,6 +132,15 @@ struct rpc_transport_reply {
};
typedef struct rpc_transport_reply rpc_transport_reply_t;
+struct rpc_transport_data {
+ char is_request;
+ union {
+ rpc_transport_req_t req;
+ rpc_transport_reply_t reply;
+ } data;
+};
+typedef struct rpc_transport_data rpc_transport_data_t;
+
struct rpc_request_info {
uint32_t xid;
int prognum;
@@ -140,20 +152,11 @@ typedef struct rpc_request_info rpc_request_info_t;
struct rpc_transport_pollin {
- union {
- struct vectored {
- struct iobuf *iobuf1;
- size_t size1;
- struct iobuf *iobuf2;
- size_t size2;
- } vector;
- struct simple {
- struct iobuf *iobuf;
- size_t size;
- } simple;
- } data;
+ struct iovec vector[2];
+ int count;
char vectored;
void *private;
+ struct iobref *iobref;
};
typedef struct rpc_transport_pollin rpc_transport_pollin_t;
@@ -279,9 +282,8 @@ rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen,
struct sockaddr *sa, size_t salen);
rpc_transport_pollin_t *
-rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf,
- size_t iobuf_size, struct iobuf *vectoriob,
- size_t vectoriob_size, void *private);
+rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector,
+ int count, struct iobref *iobref, void *private);
void
rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin);
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 9bb7e0e4c9e..5bb908cec0a 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -1021,12 +1021,8 @@ rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req)
goto out;
}
- if (req->recordiob) {
- iobuf_unref (req->recordiob);
- }
-
- if (req->vectorediob) {
- iobuf_unref (req->vectorediob);
+ if (req->iobref) {
+ iobref_unref (req->iobref);
}
mem_put (conn->rxpool, req);
@@ -1050,15 +1046,11 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg,
req->progver = rpc_call_progver (callmsg);
req->procnum = rpc_call_progproc (callmsg);
req->conn = conn;
+ req->count = msg->count;
req->msg[0] = progmsg;
+ req->iobref = iobref_ref (msg->iobref);
if (msg->vectored) {
- req->msg[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2);
- req->msg[1].iov_len = msg->data.vector.size2;
-
- req->recordiob = iobuf_ref (msg->data.vector.iobuf1);
- req->vectorediob = iobuf_ref (msg->data.vector.iobuf2);
- } else {
- req->recordiob = iobuf_ref (msg->data.simple.iobuf);
+ req->msg[1] = msg->vector[1];
}
req->trans_private = msg->private;
@@ -1106,13 +1098,8 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
goto err;
}
- if (msg->vectored) {
- msgbuf = iobuf_ptr (msg->data.vector.iobuf1);
- msglen = msg->data.vector.size1;
- } else {
- msgbuf = iobuf_ptr (msg->data.simple.iobuf);
- msglen = msg->data.simple.size;
- }
+ msgbuf = msg->vector[0].iov_base;
+ msglen = msg->vector[0].iov_len;
ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg,
req->cred.authdata,req->verf.authdata);
@@ -1190,11 +1177,11 @@ rpcsvc_handle_rpc_call (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
goto err_reply;
if (actor && (req->rpc_err == SUCCESS)) {
- if (req->vectorediob) {
+ if (req->count == 2) {
if (actor->vector_actor) {
rpcsvc_conn_ref (conn);
- ret = actor->vector_actor (req,
- req->vectorediob);
+ ret = actor->vector_actor (req, &req->msg[1], 1,
+ req->iobref);
} else {
rpcsvc_request_seterr (req, PROC_UNAVAIL);
gf_log (GF_RPCSVC, GF_LOG_ERROR,
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index 10dc32698ad..5a3f3cd3449 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -237,21 +237,9 @@ struct rpcsvc_request {
* be de-xdred by the actor.
*/
struct iovec msg[2];
+ int count;
- /* The full message buffer allocated to store the RPC headers.
- * This buffer is ref'd when allocated why RPC svc and unref'd after
- * the buffer is handed to the actor. That means if the actor or any
- * higher layer wants to keep this buffer around, they too must ref it
- * right after entering the program actor.
- */
- struct iobuf *recordiob;
-
- /* iobuf to hold payload of calls like write. By storing large payloads
- * starting from page-aligned addresses, performance increases while
- * accessing the payload
- */
- struct iobuf *vectorediob;
-
+ struct iobref *iobref;
/* Status of the RPC call, whether it was accepted or denied. */
int rpc_status;
@@ -317,7 +305,6 @@ struct rpcsvc_request {
#define rpcsvc_request_private(req) ((req)->private)
#define rpcsvc_request_xid(req) ((req)->xid)
#define rpcsvc_request_set_private(req,prv) (req)->private = (void *)(prv)
-#define rpcsvc_request_record_iob(rq) ((rq)->recordiob)
#define rpcsvc_request_record_ref(req) (iobuf_ref ((req)->recordiob))
#define rpcsvc_request_record_unref(req) (iobuf_unref ((req)->recordiob))
@@ -338,7 +325,8 @@ struct rpcsvc_request {
*
*/
typedef int (*rpcsvc_actor) (rpcsvc_request_t *req);
-typedef int (*rpcsvc_vector_actor) (rpcsvc_request_t *req, struct iobuf *iob);
+typedef int (*rpcsvc_vector_actor) (rpcsvc_request_t *req, struct iovec *vec,
+ int count, struct iobref *iobref);
typedef int (*rpcsvc_vector_sizer) (rpcsvc_request_t *req, ssize_t *readsize,
int *newiob);
diff --git a/rpc/rpc-lib/src/xdr-common.h b/rpc/rpc-lib/src/xdr-common.h
index 7ba1372529c..b3ce29e5dbe 100644
--- a/rpc/rpc-lib/src/xdr-common.h
+++ b/rpc/rpc-lib/src/xdr-common.h
@@ -50,12 +50,12 @@ struct auth_glusterfs_parms {
u_int gid;
u_int ngrps;
u_int groups[16];
-};
+} __attribute__((packed));
typedef struct auth_glusterfs_parms auth_glusterfs_parms;
struct gf_dump_req {
u_quad_t gfs_id;
-};
+} __attribute__((packed));
typedef struct gf_dump_req gf_dump_req;
struct gf_prog_detail {
@@ -63,7 +63,7 @@ struct gf_prog_detail {
u_quad_t prognum;
u_quad_t progver;
struct gf_prog_detail *next;
-};
+} __attribute__((packed));
typedef struct gf_prog_detail gf_prog_detail;
struct gf_dump_rsp {
@@ -71,7 +71,7 @@ struct gf_dump_rsp {
int op_ret;
int op_errno;
struct gf_prog_detail *prog;
-};
+}__attribute__((packed));
typedef struct gf_dump_rsp gf_dump_rsp;
extern bool_t