diff options
Diffstat (limited to 'rpc/rpc-transport')
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 129 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 4 |
2 files changed, 103 insertions, 30 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 4396454a33a..baeee735bb8 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -370,12 +370,13 @@ __socket_reset (rpc_transport_t *this) /* TODO: use mem-pool on incoming data */ - if (priv->incoming.iobuf) { - iobuf_unref (priv->incoming.iobuf); + if (priv->incoming.iobref) { + iobref_unref (priv->incoming.iobref); + priv->incoming.iobref = NULL; } - if (priv->incoming.vectoriob) { - iobuf_unref (priv->incoming.vectoriob); + if (priv->incoming.iobuf) { + iobuf_unref (priv->incoming.iobuf); } memset (&priv->incoming, 0, sizeof (priv->incoming)); @@ -699,7 +700,7 @@ __socket_read_vectored_request (rpc_transport_t *this) /* fall through */ case SP_STATE_READ_VERFBYTES: - if (priv->incoming.vectoriob == NULL) { + if (priv->incoming.payload_vector.iov_base == NULL) { iobuf = iobuf_get (this->ctx->iobuf_pool); if (!iobuf) { gf_log (this->name, GF_LOG_ERROR, @@ -710,7 +711,23 @@ __socket_read_vectored_request (rpc_transport_t *this) break; } - priv->incoming.vectoriob = iobuf; + if (priv->incoming.iobref == NULL) { + priv->incoming.iobref = iobref_new (); + if (priv->incoming.iobref == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + ret = -1; + iobuf_unref (iobuf); + break; + } + } + + iobref_add (priv->incoming.iobref, iobuf); + iobuf_unref (iobuf); + + priv->incoming.payload_vector.iov_base + = iobuf_ptr (iobuf); + priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf); } @@ -735,9 +752,10 @@ __socket_read_vectored_request (rpc_transport_t *this) && RPC_LASTFRAG (priv->incoming.fraghdr))) { priv->incoming.frag.call_body.request.vector_state = SP_STATE_VECTORED_REQUEST_INIT; - priv->incoming.vectoriob_size + priv->incoming.payload_vector.iov_len = (unsigned long)priv->incoming.frag.fragcurrent - - (unsigned long)iobuf_ptr (priv->incoming.vectoriob); + - (unsigned long) + priv->incoming.payload_vector.iov_base; } break; } @@ -846,18 +864,31 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) /* fall through */ case SP_STATE_READ_PROC_HEADER: - if (priv->incoming.vectoriob == NULL) { + if (priv->incoming.payload_vector.iov_base == NULL) { iobuf = iobuf_get (this->ctx->iobuf_pool); if (iobuf == NULL) { ret = -1; goto out; } - priv->incoming.vectoriob = iobuf; + if (priv->incoming.iobref == NULL) { + priv->incoming.iobref = iobref_new (); + if (priv->incoming.iobref == NULL) { + ret = -1; + iobuf_unref (iobuf); + goto out; + } + } + + iobref_add (priv->incoming.iobref, iobuf); + iobuf_unref (iobuf); + + priv->incoming.payload_vector.iov_base + = iobuf_ptr (iobuf); } priv->incoming.frag.fragcurrent - = iobuf_ptr (priv->incoming.vectoriob); + = priv->incoming.payload_vector.iov_base; /* now read the entire remaining msg into new iobuf */ ret = __socket_read_simple_msg (this); @@ -1080,9 +1111,11 @@ __socket_read_reply (rpc_transport_t *this) if ((request_info->prognum == GLUSTER3_1_FOP_PROGRAM) && (request_info->procnum == GF_FOP_READ)) { - if (request_info->rsp.rspbuf != NULL) { - priv->incoming.vectoriob - = iobuf_ref (request_info->rsp.rspbuf); + if (request_info->rsp.rsp_payload_count != 0) { + priv->incoming.iobref + = iobref_ref (request_info->rsp.rsp_iobref); + priv->incoming.payload_vector + = *request_info->rsp.rsp_payload; } ret = __socket_read_vectored_reply (this); @@ -1154,15 +1187,19 @@ __socket_read_frag (rpc_transport_t *this) inline void __socket_reset_priv (socket_private_t *priv) { + if (priv->incoming.iobref) { + iobref_unref (priv->incoming.iobref); + priv->incoming.iobref = NULL; + } + if (priv->incoming.iobuf) { iobuf_unref (priv->incoming.iobuf); - priv->incoming.iobuf = NULL; } - if (priv->incoming.vectoriob) { - iobuf_unref (priv->incoming.vectoriob); - priv->incoming.vectoriob = NULL; - } + memset (&priv->incoming.payload_vector, 0, + sizeof (priv->incoming.payload_vector)); + + priv->incoming.iobuf = NULL; } @@ -1170,9 +1207,11 @@ int __socket_proto_state_machine (rpc_transport_t *this, rpc_transport_pollin_t **pollin) { - int ret = -1; - socket_private_t *priv = NULL; - struct iobuf *iobuf = NULL; + int ret = -1; + socket_private_t *priv = NULL; + struct iobuf *iobuf = NULL; + struct iobref *iobref = NULL; + struct iovec vector[2]; priv = this->private; while (priv->incoming.record_state != SP_STATE_COMPLETE) { @@ -1191,7 +1230,7 @@ __socket_proto_state_machine (rpc_transport_t *this, priv->incoming.iobuf = iobuf; priv->incoming.iobuf_size = 0; - priv->incoming.vectoriob_size = 0; + priv->incoming.payload_vector.iov_len = 0; priv->incoming.pending_vector = priv->incoming.vector; priv->incoming.pending_vector->iov_base = @@ -1260,15 +1299,49 @@ __socket_proto_state_machine (rpc_transport_t *this, * upper layers. */ if (pollin != NULL) { + int count = 0; priv->incoming.iobuf_size = priv->incoming.total_bytes_read - - priv->incoming.vectoriob_size; + - priv->incoming.payload_vector.iov_len; + + memset (vector, 0, sizeof (vector)); + + if (priv->incoming.iobref == NULL) { + priv->incoming.iobref = iobref_new (); + if (priv->incoming.iobref == NULL) { + gf_log (this->name, + GF_LOG_ERROR, + "out of memory"); + ret = -1; + goto out; + } + } + + vector[count].iov_base + = iobuf_ptr (priv->incoming.iobuf); + vector[count].iov_len + = priv->incoming.iobuf_size; + + iobref = priv->incoming.iobref; + + iobref_add (iobref, + priv->incoming.iobuf); + iobuf_unref (priv->incoming.iobuf); + priv->incoming.iobuf = NULL; + + count++; + + if (priv->incoming.payload_vector.iov_base + != NULL) { + vector[count] + = priv->incoming.payload_vector; + count++; + } *pollin = rpc_transport_pollin_alloc (this, - priv->incoming.iobuf, - priv->incoming.iobuf_size, - priv->incoming.vectoriob, - priv->incoming.vectoriob_size, + vector, + count, + iobref, priv->incoming.request_info); if (*pollin == NULL) { ret = -1; diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index aa31ee2a7ef..5078b161e29 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -170,8 +170,8 @@ typedef struct { size_t iobuf_size; struct iovec vector[2]; int count; - struct iobuf *vectoriob; - size_t vectoriob_size; + struct iovec payload_vector; + struct iobref *iobref; rpc_request_info_t *request_info; struct iovec *pending_vector; int pending_count; |