diff options
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 74 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 2 | ||||
-rw-r--r-- | xlators/protocol/client/src/client3_1-fops.c | 15 | ||||
-rw-r--r-- | xlators/protocol/server/src/server3_1-fops.c | 60 |
4 files changed, 123 insertions, 28 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 6c2d909e43a..5e65755d813 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -815,6 +815,7 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto uint32_t remaining_size = 0; ssize_t readsize = 0; size_t size = 0; + char *proghdr_buf = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -871,10 +872,10 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto case SP_STATE_READ_VERFBYTES: sp_state_read_verfbytes: + proghdr_buf = priv->incoming.frag.fragcurrent; priv->incoming.frag.call_body.request.vector_sizer_state = vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state, - &readsize, - priv->incoming.frag.fragcurrent); + &readsize, proghdr_buf); __socket_proto_init_pending (priv, readsize); priv->incoming.frag.call_body.request.vector_state = SP_STATE_READING_PROGHDR; @@ -886,8 +887,7 @@ sp_state_read_verfbytes: sp_state_reading_proghdr: priv->incoming.frag.call_body.request.vector_sizer_state = vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state, - &readsize, - priv->incoming.frag.fragcurrent); + &readsize, proghdr_buf); if (readsize == 0) { priv->incoming.frag.call_body.request.vector_state = SP_STATE_READ_PROGHDR; @@ -1038,12 +1038,14 @@ out: inline int __socket_read_accepted_successful_reply (rpc_transport_t *this) { - socket_private_t *priv = NULL; - int ret = 0; - struct iobuf *iobuf = NULL; - uint32_t gluster_read_rsp_hdr_len = 0; - gfs3_read_rsp read_rsp = {0, }; - size_t size = 0; + socket_private_t *priv = NULL; + int ret = 0; + struct iobuf *iobuf = NULL; + gfs3_read_rsp read_rsp = {0, }; + ssize_t size = 0; + ssize_t default_read_size = 0; + char *proghdr_buf = NULL; + XDR xdr; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -1053,16 +1055,12 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) switch (priv->incoming.frag.call_body.reply.accepted_success_state) { case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT: - gluster_read_rsp_hdr_len = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp, - &read_rsp); + default_read_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp, + &read_rsp); - if (gluster_read_rsp_hdr_len == 0) { - gf_log (this->name, GF_LOG_ERROR, - "xdr_sizeof on gfs3_read_rsp failed"); - ret = -1; - goto out; - } - __socket_proto_init_pending (priv, gluster_read_rsp_hdr_len); + proghdr_buf = priv->incoming.frag.fragcurrent; + + __socket_proto_init_pending (priv, default_read_size); priv->incoming.frag.call_body.reply.accepted_success_state = SP_STATE_READING_PROC_HEADER; @@ -1072,9 +1070,40 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) case SP_STATE_READING_PROC_HEADER: __socket_proto_read (priv, ret); + /* there can be 'xdata' in read response, figure it out */ + xdrmem_create (&xdr, proghdr_buf, default_read_size, + XDR_DECODE); + + /* This will fail if there is xdata sent from server, if not, + well and good, we don't need to worry about */ + xdr_gfs3_read_rsp (&xdr, &read_rsp); + + if (read_rsp.xdata.xdata_val) + free (read_rsp.xdata.xdata_val); + + /* need to round off to proper roof (%4), as XDR packing pads + the end of opaque object with '0' */ + size = roof (read_rsp.xdata.xdata_len, 4); + + if (!size) { + priv->incoming.frag.call_body.reply.accepted_success_state + = SP_STATE_READ_PROC_OPAQUE; + goto read_proc_opaque; + } + + __socket_proto_init_pending (priv, size); + priv->incoming.frag.call_body.reply.accepted_success_state - = SP_STATE_READ_PROC_HEADER; + = SP_STATE_READING_PROC_OPAQUE; + + case SP_STATE_READING_PROC_OPAQUE: + __socket_proto_read (priv, ret); + + priv->incoming.frag.call_body.reply.accepted_success_state + = SP_STATE_READ_PROC_OPAQUE; + case SP_STATE_READ_PROC_OPAQUE: + read_proc_opaque: if (priv->incoming.payload_vector.iov_base == NULL) { size = (RPC_FRAGSIZE (priv->incoming.fraghdr) - @@ -1107,6 +1136,9 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) priv->incoming.frag.fragcurrent = priv->incoming.payload_vector.iov_base; + priv->incoming.frag.call_body.reply.accepted_success_state + = SP_STATE_READ_PROC_HEADER; + /* fall through */ case SP_STATE_READ_PROC_HEADER: @@ -1529,7 +1561,6 @@ __socket_proto_state_machine (rpc_transport_t *this, case SP_STATE_READ_FRAGHDR: priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr); - priv->incoming.record_state = SP_STATE_READING_FRAG; priv->incoming.total_bytes_read += RPC_FRAGSIZE(priv->incoming.fraghdr); iobuf = iobuf_get2 (this->ctx->iobuf_pool, @@ -1543,6 +1574,7 @@ __socket_proto_state_machine (rpc_transport_t *this, priv->incoming.iobuf = iobuf; priv->incoming.iobuf_size = 0; priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf); + priv->incoming.record_state = SP_STATE_READING_FRAG; /* fall through */ case SP_STATE_READING_FRAG: diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index 0c897bd2ec0..6d6802a541f 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -115,6 +115,8 @@ typedef enum { typedef enum { SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT, SP_STATE_READING_PROC_HEADER, + SP_STATE_READING_PROC_OPAQUE, + SP_STATE_READ_PROC_OPAQUE, SP_STATE_READ_PROC_HEADER, } sp_rpcfrag_vectored_reply_accepted_success_state_t; diff --git a/xlators/protocol/client/src/client3_1-fops.c b/xlators/protocol/client/src/client3_1-fops.c index e17a650d3c4..07d55c8956b 100644 --- a/xlators/protocol/client/src/client3_1-fops.c +++ b/xlators/protocol/client/src/client3_1-fops.c @@ -2693,6 +2693,10 @@ client3_1_readv_cbk (struct rpc_req *req, struct iovec *iov, int count, (rsp.xdata.xdata_len), ret, rsp.op_errno, out); +#ifdef GF_TESTING_IO_XDATA + dict_dump (xdata); +#endif + out: if (rsp.op_ret == -1) { gf_log (this->name, GF_LOG_WARNING, @@ -4010,6 +4014,17 @@ client3_1_writev (call_frame_t *frame, xlator_t *this, void *data) memcpy (req.gfid, args->fd->inode->gfid, 16); +#ifdef GF_TESTING_IO_XDATA + if (!args->xdata) + args->xdata = dict_new (); + + ret = dict_set_str (args->xdata, "testing-the-xdata-key", + "testing-the-xdata-value"); +#endif + + GF_PROTOCOL_DICT_SERIALIZE (this, args->xdata, (&req.xdata.xdata_val), + req.xdata.xdata_len, op_errno, unwind); + ret = client_submit_vec_request (this, &req, frame, conf->fops, GFS3_OP_WRITE, client3_1_writev_cbk, args->vector, args->count, diff --git a/xlators/protocol/server/src/server3_1-fops.c b/xlators/protocol/server/src/server3_1-fops.c index 2e0bbb4c8cd..06283461d06 100644 --- a/xlators/protocol/server/src/server3_1-fops.c +++ b/xlators/protocol/server/src/server3_1-fops.c @@ -1455,6 +1455,16 @@ server_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this, op_ret, strerror (op_errno)); } +#ifdef GF_TESTING_IO_XDATA + { + int ret = 0; + if (!xdata) + xdata = dict_new (); + + ret = dict_set_str (xdata, "testing-the-xdata-key", + "testing-xdata-value"); + } +#endif GF_PROTOCOL_DICT_SERIALIZE (this, xdata, (&rsp.xdata.xdata_val), rsp.xdata.xdata_len, op_errno, out); @@ -3392,6 +3402,10 @@ server_writev (rpcsvc_request_t *req) (args.xdata.xdata_len), ret, op_errno, out); +#ifdef GF_TESTING_IO_XDATA + dict_dump (state->xdata); +#endif + ret = 0; resolve_and_resume (frame, server_writev_resume); out: @@ -3413,26 +3427,58 @@ server_writev_vec (rpcsvc_request_t *req, struct iovec *payload, } #define SERVER3_1_VECWRITE_START 0 -#define SERVER3_1_VECWRITE_READINGHDR 1 +#define SERVER3_1_VECWRITE_READING_HDR 1 +#define SERVER3_1_VECWRITE_READING_OPAQUE 2 int server_writev_vecsizer (int state, ssize_t *readsize, char *addr) { - int nextstate = 0; - gfs3_write_req write_req = {{0,},}; + ssize_t size = 0; + int nextstate = 0; + gfs3_write_req write_req = {{0,},}; + XDR xdr; switch (state) { case SERVER3_1_VECWRITE_START: - *readsize = xdr_sizeof ((xdrproc_t) xdr_gfs3_write_req, &write_req); - nextstate = SERVER3_1_VECWRITE_READINGHDR; + size = xdr_sizeof ((xdrproc_t) xdr_gfs3_write_req, + &write_req); + *readsize = size; + nextstate = SERVER3_1_VECWRITE_READING_HDR; + break; + case SERVER3_1_VECWRITE_READING_HDR: + size = xdr_sizeof ((xdrproc_t) xdr_gfs3_write_req, + &write_req); + + xdrmem_create (&xdr, addr, size, XDR_DECODE); + + /* This will fail if there is xdata sent from client, if not, + well and good */ + xdr_gfs3_write_req (&xdr, &write_req); + + /* need to round off to proper roof (%4), as XDR packing pads + the end of opaque object with '0' */ + size = roof (write_req.xdata.xdata_len, 4); + + *readsize = size; + + if (!size) + nextstate = SERVER3_1_VECWRITE_START; + else + nextstate = SERVER3_1_VECWRITE_READING_OPAQUE; + + if (write_req.xdata.xdata_val) + free (write_req.xdata.xdata_val); + break; - case SERVER3_1_VECWRITE_READINGHDR: + + case SERVER3_1_VECWRITE_READING_OPAQUE: *readsize = 0; nextstate = SERVER3_1_VECWRITE_START; break; default: - gf_log ("server3_1", GF_LOG_ERROR, "wrong state: %d", state); + gf_log ("server", GF_LOG_ERROR, "wrong state: %d", state); } + return nextstate; } |