summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-transport')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c74
-rw-r--r--rpc/rpc-transport/socket/src/socket.h2
2 files changed, 55 insertions, 21 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 6c2d909e43a..5e65755d813 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -815,6 +815,7 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto
uint32_t remaining_size = 0;
ssize_t readsize = 0;
size_t size = 0;
+ char *proghdr_buf = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -871,10 +872,10 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto
case SP_STATE_READ_VERFBYTES:
sp_state_read_verfbytes:
+ proghdr_buf = priv->incoming.frag.fragcurrent;
priv->incoming.frag.call_body.request.vector_sizer_state =
vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state,
- &readsize,
- priv->incoming.frag.fragcurrent);
+ &readsize, proghdr_buf);
__socket_proto_init_pending (priv, readsize);
priv->incoming.frag.call_body.request.vector_state
= SP_STATE_READING_PROGHDR;
@@ -886,8 +887,7 @@ sp_state_read_verfbytes:
sp_state_reading_proghdr:
priv->incoming.frag.call_body.request.vector_sizer_state =
vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state,
- &readsize,
- priv->incoming.frag.fragcurrent);
+ &readsize, proghdr_buf);
if (readsize == 0) {
priv->incoming.frag.call_body.request.vector_state =
SP_STATE_READ_PROGHDR;
@@ -1038,12 +1038,14 @@ out:
inline int
__socket_read_accepted_successful_reply (rpc_transport_t *this)
{
- socket_private_t *priv = NULL;
- int ret = 0;
- struct iobuf *iobuf = NULL;
- uint32_t gluster_read_rsp_hdr_len = 0;
- gfs3_read_rsp read_rsp = {0, };
- size_t size = 0;
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ struct iobuf *iobuf = NULL;
+ gfs3_read_rsp read_rsp = {0, };
+ ssize_t size = 0;
+ ssize_t default_read_size = 0;
+ char *proghdr_buf = NULL;
+ XDR xdr;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -1053,16 +1055,12 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
switch (priv->incoming.frag.call_body.reply.accepted_success_state) {
case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT:
- gluster_read_rsp_hdr_len = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp,
- &read_rsp);
+ default_read_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp,
+ &read_rsp);
- if (gluster_read_rsp_hdr_len == 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "xdr_sizeof on gfs3_read_rsp failed");
- ret = -1;
- goto out;
- }
- __socket_proto_init_pending (priv, gluster_read_rsp_hdr_len);
+ proghdr_buf = priv->incoming.frag.fragcurrent;
+
+ __socket_proto_init_pending (priv, default_read_size);
priv->incoming.frag.call_body.reply.accepted_success_state
= SP_STATE_READING_PROC_HEADER;
@@ -1072,9 +1070,40 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
case SP_STATE_READING_PROC_HEADER:
__socket_proto_read (priv, ret);
+ /* there can be 'xdata' in read response, figure it out */
+ xdrmem_create (&xdr, proghdr_buf, default_read_size,
+ XDR_DECODE);
+
+ /* This will fail if there is xdata sent from server, if not,
+ well and good, we don't need to worry about */
+ xdr_gfs3_read_rsp (&xdr, &read_rsp);
+
+ if (read_rsp.xdata.xdata_val)
+ free (read_rsp.xdata.xdata_val);
+
+ /* need to round off to proper roof (%4), as XDR packing pads
+ the end of opaque object with '0' */
+ size = roof (read_rsp.xdata.xdata_len, 4);
+
+ if (!size) {
+ priv->incoming.frag.call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_OPAQUE;
+ goto read_proc_opaque;
+ }
+
+ __socket_proto_init_pending (priv, size);
+
priv->incoming.frag.call_body.reply.accepted_success_state
- = SP_STATE_READ_PROC_HEADER;
+ = SP_STATE_READING_PROC_OPAQUE;
+
+ case SP_STATE_READING_PROC_OPAQUE:
+ __socket_proto_read (priv, ret);
+
+ priv->incoming.frag.call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_OPAQUE;
+ case SP_STATE_READ_PROC_OPAQUE:
+ read_proc_opaque:
if (priv->incoming.payload_vector.iov_base == NULL) {
size = (RPC_FRAGSIZE (priv->incoming.fraghdr) -
@@ -1107,6 +1136,9 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
priv->incoming.frag.fragcurrent
= priv->incoming.payload_vector.iov_base;
+ priv->incoming.frag.call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_HEADER;
+
/* fall through */
case SP_STATE_READ_PROC_HEADER:
@@ -1529,7 +1561,6 @@ __socket_proto_state_machine (rpc_transport_t *this,
case SP_STATE_READ_FRAGHDR:
priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr);
- priv->incoming.record_state = SP_STATE_READING_FRAG;
priv->incoming.total_bytes_read
+= RPC_FRAGSIZE(priv->incoming.fraghdr);
iobuf = iobuf_get2 (this->ctx->iobuf_pool,
@@ -1543,6 +1574,7 @@ __socket_proto_state_machine (rpc_transport_t *this,
priv->incoming.iobuf = iobuf;
priv->incoming.iobuf_size = 0;
priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf);
+ priv->incoming.record_state = SP_STATE_READING_FRAG;
/* fall through */
case SP_STATE_READING_FRAG:
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index 0c897bd2ec0..6d6802a541f 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -115,6 +115,8 @@ typedef enum {
typedef enum {
SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT,
SP_STATE_READING_PROC_HEADER,
+ SP_STATE_READING_PROC_OPAQUE,
+ SP_STATE_READ_PROC_OPAQUE,
SP_STATE_READ_PROC_HEADER,
} sp_rpcfrag_vectored_reply_accepted_success_state_t;