From 40d3ad15856c88d93d16264aa1f6bb55806aafde Mon Sep 17 00:00:00 2001 From: Raghavendra G Date: Wed, 28 Jul 2010 06:23:31 +0000 Subject: changes to rpc - use mem-pool for requests and saved_frames. - preserve the rpc_req structure till rpc invokes program's reply. This will enable us to store transport specific data that has to last till reply has come (eg., memory regions of chunk lists in case of rdma). - change signature of rpc_clnt_submit to accept rsphdr_vector and rsppayload_vector. The buffers pointed by these vectors will be from iobufs and these iobufs are added to an iobref which should also be passed as an arguement to rpc_clnt_submit. Signed-off-by: Raghavendra G Signed-off-by: Anand V. Avati BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875 --- rpc/rpc-lib/src/rpc-transport.c | 96 +++++++++++++++++++++++------------------ 1 file changed, 54 insertions(+), 42 deletions(-) (limited to 'rpc/rpc-lib/src/rpc-transport.c') diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index b77ea2aa..50379c14 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -628,20 +628,10 @@ rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin) goto out; } - if (pollin->vectored) { - if (pollin->data.vector.iobuf1) { - iobuf_unref (pollin->data.vector.iobuf1); - } - - if (pollin->data.vector.iobuf2) { - iobuf_unref (pollin->data.vector.iobuf2); - } - } else { - if (pollin->data.simple.iobuf) { - iobuf_unref (pollin->data.simple.iobuf); - } + if (pollin->iobref) { + iobref_unref (pollin->iobref); } - + if (pollin->private) { /* */ GF_FREE (pollin->private); @@ -654,9 +644,8 @@ out: rpc_transport_pollin_t * -rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf, - size_t size, struct iobuf *vectored_buf, - size_t vectored_size, void *private) +rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector, + int count, struct iobref *iobref, void *private) { rpc_transport_pollin_t *msg = NULL; msg = GF_CALLOC (1, sizeof (*msg), gf_common_mt_rpc_trans_pollin_t); @@ -665,19 +654,15 @@ rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf, goto out; } - if (vectored_buf) { + if (count == 2) { msg->vectored = 1; - msg->data.vector.iobuf1 = iobuf_ref (iobuf); - msg->data.vector.size1 = size; - - msg->data.vector.iobuf2 = iobuf_ref (vectored_buf); - msg->data.vector.size2 = vectored_size; - } else { - msg->data.simple.iobuf = iobuf_ref (iobuf); - msg->data.simple.size = size; } + memcpy (msg->vector, vector, count * sizeof (*vector)); + msg->count = count; + msg->iobref = iobref_ref (iobref); msg->private = private; + out: return msg; } @@ -698,6 +683,7 @@ rpc_transport_same_process_pollin_alloc (rpc_transport_t *this, int progpayloadlen = 0; char vectored = 0; char *hdr = NULL, *progpayloadbuf = NULL; + struct iobuf *iobuf = NULL; if (!rpchdr || !proghdr) { goto err; @@ -729,47 +715,72 @@ rpc_transport_same_process_pollin_alloc (rpc_transport_t *this, } if (vectored) { - msg->data.vector.iobuf1 = iobuf_get (this->ctx->iobuf_pool); - if (!msg->data.vector.iobuf1) { + msg->iobref = iobref_new (); + if (!msg->iobref) { + gf_log ("rpc-transport", GF_LOG_ERROR, + "out of memory"); + goto err; + } + + iobuf = iobuf_get (this->ctx->iobuf_pool); + if (!iobuf) { gf_log ("rpc_transport", GF_LOG_ERROR, "out of memory"); goto err; } - msg->data.vector.size1 = rpchdrlen + proghdrlen; - hdr = iobuf_ptr (msg->data.vector.iobuf1); + iobref_add (msg->iobref, iobuf); + iobuf_unref (iobuf); + + msg->vector[0].iov_len = rpchdrlen + proghdrlen; + msg->vector[0].iov_base = hdr = iobuf_ptr (iobuf); if (!is_request && rsp) { - msg->data.vector.iobuf2 = rsp->rspbuf; - progpayloadbuf = rsp->rspvec->iov_base; + msg->vector[1] = rsp->rsp_payload[0]; + progpayloadbuf = rsp->rsp_payload[0].iov_base; } else { - msg->data.vector.iobuf2 = iobuf_get (this->ctx->iobuf_pool); - if (!msg->data.vector.iobuf2) { + iobuf = iobuf_get (this->ctx->iobuf_pool); + if (!iobuf) { gf_log ("rpc_transport", GF_LOG_ERROR, "out of memory"); goto err; } - progpayloadbuf = iobuf_ptr (msg->data.vector.iobuf2); + iobref_add (msg->iobref, iobuf); + iobuf_unref (iobuf); + + msg->vector[1].iov_base + = progpayloadbuf = iobuf_ptr (iobuf); } - msg->data.vector.size2 = progpayloadlen; + msg->vector[1].iov_len = progpayloadlen; } else { if (!is_request && rsp) { /* FIXME: Assuming rspvec contains only one vector */ - hdr = rsp->rspvec->iov_base; - msg->data.simple.iobuf = rsp->rspbuf; + hdr = rsp->rsphdr[0].iov_base; + msg->vector[0] = rsp->rsphdr[0]; } else { - msg->data.simple.iobuf = iobuf_get (this->ctx->iobuf_pool); - if (!msg->data.simple.iobuf) { + msg->iobref = iobref_new (); + if (!msg->iobref) { + gf_log ("rpc-transport", GF_LOG_ERROR, + "out of memory"); + goto err; + } + + iobuf = iobuf_get (this->ctx->iobuf_pool); + if (!iobuf) { gf_log ("rpc_transport", GF_LOG_ERROR, "out of memory"); goto err; } - hdr = iobuf_ptr (msg->data.simple.iobuf); + iobref_add (msg->iobref, iobuf); + iobuf_unref (iobuf); + + hdr = iobuf_ptr (iobuf); + msg->vector[0].iov_base = hdr; } - msg->data.simple.size = rpchdrlen + proghdrlen; + msg->vector[0].iov_len = rpchdrlen + proghdrlen; } iov_unload (hdr, rpchdr, rpchdrcount); @@ -1253,7 +1264,8 @@ rpc_transport_peerproc (void *trans_data) } pthread_mutex_unlock (&trans->handover.mutex); - rpc_transport_notify (trans, RPC_TRANSPORT_MSG_RECEIVED, msg->pollin); + rpc_transport_notify (trans, RPC_TRANSPORT_MSG_RECEIVED, + msg->pollin); rpc_transport_handover_destroy (msg); } } -- cgit