summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-transport')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c129
-rw-r--r--rpc/rpc-transport/socket/src/socket.h4
2 files changed, 103 insertions, 30 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 4396454a33a..baeee735bb8 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -370,12 +370,13 @@ __socket_reset (rpc_transport_t *this)
/* TODO: use mem-pool on incoming data */
- if (priv->incoming.iobuf) {
- iobuf_unref (priv->incoming.iobuf);
+ if (priv->incoming.iobref) {
+ iobref_unref (priv->incoming.iobref);
+ priv->incoming.iobref = NULL;
}
- if (priv->incoming.vectoriob) {
- iobuf_unref (priv->incoming.vectoriob);
+ if (priv->incoming.iobuf) {
+ iobuf_unref (priv->incoming.iobuf);
}
memset (&priv->incoming, 0, sizeof (priv->incoming));
@@ -699,7 +700,7 @@ __socket_read_vectored_request (rpc_transport_t *this)
/* fall through */
case SP_STATE_READ_VERFBYTES:
- if (priv->incoming.vectoriob == NULL) {
+ if (priv->incoming.payload_vector.iov_base == NULL) {
iobuf = iobuf_get (this->ctx->iobuf_pool);
if (!iobuf) {
gf_log (this->name, GF_LOG_ERROR,
@@ -710,7 +711,23 @@ __socket_read_vectored_request (rpc_transport_t *this)
break;
}
- priv->incoming.vectoriob = iobuf;
+ if (priv->incoming.iobref == NULL) {
+ priv->incoming.iobref = iobref_new ();
+ if (priv->incoming.iobref == NULL) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "out of memory");
+ ret = -1;
+ iobuf_unref (iobuf);
+ break;
+ }
+ }
+
+ iobref_add (priv->incoming.iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ priv->incoming.payload_vector.iov_base
+ = iobuf_ptr (iobuf);
+
priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf);
}
@@ -735,9 +752,10 @@ __socket_read_vectored_request (rpc_transport_t *this)
&& RPC_LASTFRAG (priv->incoming.fraghdr))) {
priv->incoming.frag.call_body.request.vector_state
= SP_STATE_VECTORED_REQUEST_INIT;
- priv->incoming.vectoriob_size
+ priv->incoming.payload_vector.iov_len
= (unsigned long)priv->incoming.frag.fragcurrent
- - (unsigned long)iobuf_ptr (priv->incoming.vectoriob);
+ - (unsigned long)
+ priv->incoming.payload_vector.iov_base;
}
break;
}
@@ -846,18 +864,31 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
/* fall through */
case SP_STATE_READ_PROC_HEADER:
- if (priv->incoming.vectoriob == NULL) {
+ if (priv->incoming.payload_vector.iov_base == NULL) {
iobuf = iobuf_get (this->ctx->iobuf_pool);
if (iobuf == NULL) {
ret = -1;
goto out;
}
- priv->incoming.vectoriob = iobuf;
+ if (priv->incoming.iobref == NULL) {
+ priv->incoming.iobref = iobref_new ();
+ if (priv->incoming.iobref == NULL) {
+ ret = -1;
+ iobuf_unref (iobuf);
+ goto out;
+ }
+ }
+
+ iobref_add (priv->incoming.iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ priv->incoming.payload_vector.iov_base
+ = iobuf_ptr (iobuf);
}
priv->incoming.frag.fragcurrent
- = iobuf_ptr (priv->incoming.vectoriob);
+ = priv->incoming.payload_vector.iov_base;
/* now read the entire remaining msg into new iobuf */
ret = __socket_read_simple_msg (this);
@@ -1080,9 +1111,11 @@ __socket_read_reply (rpc_transport_t *this)
if ((request_info->prognum == GLUSTER3_1_FOP_PROGRAM)
&& (request_info->procnum == GF_FOP_READ)) {
- if (request_info->rsp.rspbuf != NULL) {
- priv->incoming.vectoriob
- = iobuf_ref (request_info->rsp.rspbuf);
+ if (request_info->rsp.rsp_payload_count != 0) {
+ priv->incoming.iobref
+ = iobref_ref (request_info->rsp.rsp_iobref);
+ priv->incoming.payload_vector
+ = *request_info->rsp.rsp_payload;
}
ret = __socket_read_vectored_reply (this);
@@ -1154,15 +1187,19 @@ __socket_read_frag (rpc_transport_t *this)
inline
void __socket_reset_priv (socket_private_t *priv)
{
+ if (priv->incoming.iobref) {
+ iobref_unref (priv->incoming.iobref);
+ priv->incoming.iobref = NULL;
+ }
+
if (priv->incoming.iobuf) {
iobuf_unref (priv->incoming.iobuf);
- priv->incoming.iobuf = NULL;
}
- if (priv->incoming.vectoriob) {
- iobuf_unref (priv->incoming.vectoriob);
- priv->incoming.vectoriob = NULL;
- }
+ memset (&priv->incoming.payload_vector, 0,
+ sizeof (priv->incoming.payload_vector));
+
+ priv->incoming.iobuf = NULL;
}
@@ -1170,9 +1207,11 @@ int
__socket_proto_state_machine (rpc_transport_t *this,
rpc_transport_pollin_t **pollin)
{
- int ret = -1;
- socket_private_t *priv = NULL;
- struct iobuf *iobuf = NULL;
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ struct iobuf *iobuf = NULL;
+ struct iobref *iobref = NULL;
+ struct iovec vector[2];
priv = this->private;
while (priv->incoming.record_state != SP_STATE_COMPLETE) {
@@ -1191,7 +1230,7 @@ __socket_proto_state_machine (rpc_transport_t *this,
priv->incoming.iobuf = iobuf;
priv->incoming.iobuf_size = 0;
- priv->incoming.vectoriob_size = 0;
+ priv->incoming.payload_vector.iov_len = 0;
priv->incoming.pending_vector = priv->incoming.vector;
priv->incoming.pending_vector->iov_base =
@@ -1260,15 +1299,49 @@ __socket_proto_state_machine (rpc_transport_t *this,
* upper layers.
*/
if (pollin != NULL) {
+ int count = 0;
priv->incoming.iobuf_size
= priv->incoming.total_bytes_read
- - priv->incoming.vectoriob_size;
+ - priv->incoming.payload_vector.iov_len;
+
+ memset (vector, 0, sizeof (vector));
+
+ if (priv->incoming.iobref == NULL) {
+ priv->incoming.iobref = iobref_new ();
+ if (priv->incoming.iobref == NULL) {
+ gf_log (this->name,
+ GF_LOG_ERROR,
+ "out of memory");
+ ret = -1;
+ goto out;
+ }
+ }
+
+ vector[count].iov_base
+ = iobuf_ptr (priv->incoming.iobuf);
+ vector[count].iov_len
+ = priv->incoming.iobuf_size;
+
+ iobref = priv->incoming.iobref;
+
+ iobref_add (iobref,
+ priv->incoming.iobuf);
+ iobuf_unref (priv->incoming.iobuf);
+ priv->incoming.iobuf = NULL;
+
+ count++;
+
+ if (priv->incoming.payload_vector.iov_base
+ != NULL) {
+ vector[count]
+ = priv->incoming.payload_vector;
+ count++;
+ }
*pollin = rpc_transport_pollin_alloc (this,
- priv->incoming.iobuf,
- priv->incoming.iobuf_size,
- priv->incoming.vectoriob,
- priv->incoming.vectoriob_size,
+ vector,
+ count,
+ iobref,
priv->incoming.request_info);
if (*pollin == NULL) {
ret = -1;
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index aa31ee2a7ef..5078b161e29 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -170,8 +170,8 @@ typedef struct {
size_t iobuf_size;
struct iovec vector[2];
int count;
- struct iobuf *vectoriob;
- size_t vectoriob_size;
+ struct iovec payload_vector;
+ struct iobref *iobref;
rpc_request_info_t *request_info;
struct iovec *pending_vector;
int pending_count;