diff options
author | Raghavendra G <raghavendra@gluster.com> | 2010-08-04 04:45:25 +0000 |
---|---|---|
committer | Anand V. Avati <avati@dev.gluster.com> | 2010-08-06 03:37:20 -0700 |
commit | b04d963e91f8b3c72343e1043d6ed8c68699c4fe (patch) | |
tree | 42ae4cadaa016efd7f11ea0745d457ee79d3aaf3 | |
parent | 21e4580c24b3e4a1270ad482e1d905afffb00fba (diff) |
rpc: changes to glusterfs programs that can take an optional payload argument.
- The existing interface required the transport to separate the procedure
header and procedure payload into two different buffers. Making this
separation can prove cumbersome for transports like rdma wherein the header
and payload can be received in a single buffer (For eg., header and payload
of write fop sent as inline msg using rdma-send). This patch delegates the
responsiblity of separating out header and payload to programs.
Signed-off-by: Raghavendra G <raghavendra@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 5 | ||||
-rw-r--r-- | xlators/protocol/client/src/client3_1-fops.c | 19 | ||||
-rw-r--r-- | xlators/protocol/server/src/server.h | 2 | ||||
-rw-r--r-- | xlators/protocol/server/src/server3_1-fops.c | 52 |
4 files changed, 49 insertions, 29 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index baeee735bb8..b017462a507 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -1057,6 +1057,10 @@ __socket_read_vectored_reply (rpc_transport_t *this) && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { priv->incoming.frag.call_body.reply.status_state = SP_STATE_ACCEPTED_REPLY_INIT; + priv->incoming.payload_vector.iov_len + = (unsigned long)priv->incoming.frag.fragcurrent + - (unsigned long) + priv->incoming.payload_vector.iov_base; } break; } @@ -1230,6 +1234,7 @@ __socket_proto_state_machine (rpc_transport_t *this, priv->incoming.iobuf = iobuf; priv->incoming.iobuf_size = 0; + priv->incoming.total_bytes_read = 0; priv->incoming.payload_vector.iov_len = 0; priv->incoming.pending_vector = priv->incoming.vector; diff --git a/xlators/protocol/client/src/client3_1-fops.c b/xlators/protocol/client/src/client3_1-fops.c index 52d5c093a60..32ebb0b1e7f 100644 --- a/xlators/protocol/client/src/client3_1-fops.c +++ b/xlators/protocol/client/src/client3_1-fops.c @@ -1980,10 +1980,12 @@ client3_1_readv_cbk (struct rpc_req *req, struct iovec *iov, int count, { call_frame_t *frame = NULL; struct iobref *iobref = NULL; - struct iovec vector = {0,}; + struct iovec vector[MAX_IOVEC]; struct iatt stat = {0,}; gfs3_read_rsp rsp = {0,}; - int ret = 0; + int ret = 0, rspcount = 0, i = 0; + + memset (vector, 0, sizeof (vector)); frame = myframe; @@ -2004,15 +2006,20 @@ client3_1_readv_cbk (struct rpc_req *req, struct iovec *iov, int count, if (rsp.op_ret != -1) { iobref = req->rsp_iobref; gf_stat_to_iatt (&rsp.stat, &stat); - vector.iov_len = rsp.op_ret; - if (rsp.op_ret > 0) { - vector.iov_base = req->rsp[1].iov_base; + if (ret < req->rsp[0].iov_len) { + vector[0].iov_base = req->rsp[0].iov_base + ret; + vector[0].iov_len = req->rsp[0].iov_len - ret; + rspcount = 1; + } + + for (i = 1; i < req->rspcnt; i++) { + vector[rspcount++] = req->rsp[i]; } } out: STACK_UNWIND_STRICT (readv, frame, rsp.op_ret, - gf_error_to_errno (rsp.op_errno), &vector, 1, + gf_error_to_errno (rsp.op_errno), vector, rspcount, &stat, iobref); return 0; diff --git a/xlators/protocol/server/src/server.h b/xlators/protocol/server/src/server.h index 20853879f8c..a35eeb166dc 100644 --- a/xlators/protocol/server/src/server.h +++ b/xlators/protocol/server/src/server.h @@ -162,7 +162,7 @@ struct _server_state { fd_t *fd; int flags; int wbflags; - struct iovec payload_vector; + struct iovec payload_vector[MAX_IOVEC]; int payload_count; struct iobuf *iobuf; struct iobref *iobref; diff --git a/xlators/protocol/server/src/server3_1-fops.c b/xlators/protocol/server/src/server3_1-fops.c index fb69cf8da28..5175251601e 100644 --- a/xlators/protocol/server/src/server3_1-fops.c +++ b/xlators/protocol/server/src/server3_1-fops.c @@ -2438,20 +2438,16 @@ int server_writev_resume (call_frame_t *frame, xlator_t *bound_xl) { server_state_t *state = NULL; - struct iovec iov = {0, }; state = CALL_STATE (frame); if (state->resolve.op_ret != 0) goto err; - if (state->payload_count) { - iov = state->payload_vector; - } - STACK_WIND (frame, server_writev_cbk, bound_xl, bound_xl->fops->writev, - state->fd, &iov, 1, state->offset, state->iobref); + state->fd, state->payload_vector, state->payload_count, + state->offset, state->iobref); return 0; err: @@ -2978,24 +2974,17 @@ out: int server_writev (rpcsvc_request_t *req) { - /* TODO : */ - assert (0); - return 0; -} - - -int -server_writev_vec (rpcsvc_request_t *req, struct iovec *payload, - int payload_count, struct iobref *iobref) -{ server_state_t *state = NULL; call_frame_t *frame = NULL; gfs3_write_req args = {0,}; + ssize_t len = 0; + int i = 0; if (!req) return 0; - if (!xdr_to_writev_req (req->msg[0], &args)) { + len = xdr_to_writev_req (req->msg[0], &args); + if (len == 0) { //failed to decode msg; req->rpc_err = GARBAGE_ARGS; goto out; @@ -3019,12 +3008,23 @@ server_writev_vec (rpcsvc_request_t *req, struct iovec *payload, state->resolve.type = RESOLVE_MUST; state->resolve.fd_no = args.fd; state->offset = args.offset; + state->iobref = iobref_ref (req->iobref); - if (payload_count != 0) { - state->iobref = iobref_ref (iobref); - state->size = req->msg[1].iov_len; - state->payload_count = payload_count; - state->payload_vector = *payload; + if (len < req->msg[0].iov_len) { + state->payload_vector[0].iov_base + = (req->msg[0].iov_base + len); + state->payload_vector[0].iov_len + = req->msg[0].iov_len - len; + state->payload_count = 1; + } + + for (i = 1; i < req->count; i++) { + state->payload_vector[state->payload_count++] + = req->msg[i]; + } + + for (i = 0; i < state->payload_count; i++) { + state->size += state->payload_vector[i].iov_len; } resolve_and_resume (frame, server_writev_resume); @@ -3034,6 +3034,14 @@ out: int +server_writev_vec (rpcsvc_request_t *req, struct iovec *payload, + int payload_count, struct iobref *iobref) +{ + return server_writev (req); +} + + +int server_release (rpcsvc_request_t *req) { server_connection_t *conn = NULL; |