summaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
authorAmar Tumballi <amarts@redhat.com>2012-07-30 13:05:52 +0530
committerAnand Avati <avati@redhat.com>2012-08-23 02:17:43 -0700
commit99a0fcb7a46c996518a93c3975805f53108a4eba (patch)
tree7b1439664a1c90dfbe306b0c21b8cc7e9ebabdb4 /rpc
parentc57197c8fc1f3f6419a728a5f8b60457c468accb (diff)
socket: code cleanup
* for more review friendly way * reduce the level of indirections at each line. Change-Id: I82ace7683fb281d97a64da724f054ece28215054 Signed-off-by: Amar Tumballi <amarts@redhat.com> BUG: 764890 Reviewed-on: http://review.gluster.org/3839 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Anand Avati <avati@redhat.com>
Diffstat (limited to 'rpc')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c579
-rw-r--r--rpc/rpc-transport/socket/src/socket.h66
2 files changed, 333 insertions, 312 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index f5c6bffa2..24079a9f8 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -63,80 +63,89 @@
typedef int SSL_unary_func (SSL *);
typedef int SSL_trinary_func (SSL *, void *, int);
-#define __socket_proto_reset_pending(priv) do { \
- memset (&priv->incoming.frag.vector, 0, \
- sizeof (priv->incoming.frag.vector)); \
- priv->incoming.frag.pending_vector = \
- &priv->incoming.frag.vector; \
- priv->incoming.frag.pending_vector->iov_base = \
- priv->incoming.frag.fragcurrent; \
- priv->incoming.pending_vector = \
- priv->incoming.frag.pending_vector; \
- } while (0);
+#define __socket_proto_reset_pending(priv) do { \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ \
+ memset (&frag->vector, 0, sizeof (frag->vector)); \
+ frag->pending_vector = &frag->vector; \
+ frag->pending_vector->iov_base = frag->fragcurrent; \
+ priv->incoming.pending_vector = frag->pending_vector; \
+ } while (0)
#define __socket_proto_update_pending(priv) \
do { \
- uint32_t remaining_fragsize = 0; \
- if (priv->incoming.frag.pending_vector->iov_len == 0) { \
- remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \
- - priv->incoming.frag.bytes_read; \
+ uint32_t remaining; \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ if (frag->pending_vector->iov_len == 0) { \
+ remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr) \
+ - frag->bytes_read); \
\
- priv->incoming.frag.pending_vector->iov_len = \
- remaining_fragsize > priv->incoming.frag.remaining_size \
- ? priv->incoming.frag.remaining_size : remaining_fragsize; \
+ frag->pending_vector->iov_len = \
+ (remaining > frag->remaining_size) \
+ ? frag->remaining_size : remaining; \
\
- priv->incoming.frag.remaining_size -= \
- priv->incoming.frag.pending_vector->iov_len; \
+ frag->remaining_size -= \
+ frag->pending_vector->iov_len; \
} \
- } while (0);
+ } while (0)
#define __socket_proto_update_priv_after_read(priv, ret, bytes_read) \
{ \
- priv->incoming.frag.fragcurrent += bytes_read; \
- priv->incoming.frag.bytes_read += bytes_read; \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ \
+ frag->fragcurrent += bytes_read; \
+ frag->bytes_read += bytes_read; \
\
- if ((ret > 0) || (priv->incoming.frag.remaining_size != 0)) { \
- if (priv->incoming.frag.remaining_size != 0 && ret == 0) { \
+ if ((ret > 0) || (frag->remaining_size != 0)) { \
+ if (frag->remaining_size != 0 && ret == 0) { \
__socket_proto_reset_pending (priv); \
} \
\
- gf_log (this->name, GF_LOG_TRACE, "partial read on non-blocking socket"); \
+ gf_log (this->name, GF_LOG_TRACE, \
+ "partial read on non-blocking socket"); \
\
break; \
} \
}
-#define __socket_proto_init_pending(priv, size) \
+#define __socket_proto_init_pending(priv,size) \
do { \
- uint32_t remaining_fragsize = 0; \
- remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \
- - priv->incoming.frag.bytes_read; \
+ uint32_t remaining = 0; \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ \
+ remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr) \
+ - frag->bytes_read); \
\
- __socket_proto_reset_pending (priv); \
+ __socket_proto_reset_pending (priv); \
\
- priv->incoming.frag.pending_vector->iov_len = \
- remaining_fragsize > size ? size : remaining_fragsize; \
+ frag->pending_vector->iov_len = \
+ (remaining > size) ? size : remaining; \
\
- priv->incoming.frag.remaining_size = \
- size - priv->incoming.frag.pending_vector->iov_len; \
+ frag->remaining_size = (size - frag->pending_vector->iov_len); \
\
- } while (0);
+ } while(0)
/* This will be used in a switch case and breaks from the switch case if all
* the pending data is not read.
*/
#define __socket_proto_read(priv, ret) \
- { \
+ { \
size_t bytes_read = 0; \
+ struct gf_sock_incoming *in; \
+ in = &priv->incoming; \
\
__socket_proto_update_pending (priv); \
\
ret = __socket_readv (this, \
- priv->incoming.pending_vector, 1, \
- &priv->incoming.pending_vector, \
- &priv->incoming.pending_count, \
+ in->pending_vector, 1, \
+ &in->pending_vector, \
+ &in->pending_count, \
&bytes_read); \
if (ret == -1) { \
gf_log (this->name, GF_LOG_WARNING, \
@@ -959,40 +968,42 @@ out:
inline int
__socket_read_simple_msg (rpc_transport_t *this)
{
- socket_private_t *priv = NULL;
- int ret = 0;
- uint32_t remaining_size = 0;
- size_t bytes_read = 0;
+ int ret = 0;
+ uint32_t remaining_size = 0;
+ size_t bytes_read = 0;
+ socket_private_t *priv = NULL;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- switch (priv->incoming.frag.simple_state) {
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ switch (frag->simple_state) {
case SP_STATE_SIMPLE_MSG_INIT:
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
__socket_proto_init_pending (priv, remaining_size);
- priv->incoming.frag.simple_state =
- SP_STATE_READING_SIMPLE_MSG;
+ frag->simple_state = SP_STATE_READING_SIMPLE_MSG;
/* fall through */
case SP_STATE_READING_SIMPLE_MSG:
ret = 0;
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
if (remaining_size > 0) {
ret = __socket_readv (this,
- priv->incoming.pending_vector, 1,
- &priv->incoming.pending_vector,
- &priv->incoming.pending_count,
+ in->pending_vector, 1,
+ &in->pending_vector,
+ &in->pending_count,
&bytes_read);
}
@@ -1004,8 +1015,8 @@ __socket_read_simple_msg (rpc_transport_t *this)
break;
}
- priv->incoming.frag.bytes_read += bytes_read;
- priv->incoming.frag.fragcurrent += bytes_read;
+ frag->bytes_read += bytes_read;
+ frag->fragcurrent += bytes_read;
if (ret > 0) {
gf_log (this->name, GF_LOG_TRACE,
@@ -1014,8 +1025,7 @@ __socket_read_simple_msg (rpc_transport_t *this)
}
if (ret == 0) {
- priv->incoming.frag.simple_state
- = SP_STATE_SIMPLE_MSG_INIT;
+ frag->simple_state = SP_STATE_SIMPLE_MSG_INIT;
}
}
@@ -1051,17 +1061,26 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto
struct iobuf *iobuf = NULL;
uint32_t remaining_size = 0;
ssize_t readsize = 0;
- size_t size = 0;
+ size_t size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+ sp_rpcfrag_request_state_t *request = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- switch (priv->incoming.frag.call_body.request.vector_state) {
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+ request = &frag->call_body.request;
+
+ switch (request->vector_state) {
case SP_STATE_VECTORED_REQUEST_INIT:
- priv->incoming.frag.call_body.request.vector_sizer_state = 0;
- addr = rpc_cred_addr (iobuf_ptr (priv->incoming.iobuf));
+ request->vector_sizer_state = 0;
+
+ addr = rpc_cred_addr (iobuf_ptr (in->iobuf));
/* also read verf flavour and verflen */
credlen = ntoh32 (*((uint32_t *)addr))
@@ -1069,40 +1088,35 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto
__socket_proto_init_pending (priv, credlen);
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READING_CREDBYTES;
+ request->vector_state = SP_STATE_READING_CREDBYTES;
/* fall through */
case SP_STATE_READING_CREDBYTES:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READ_CREDBYTES;
+ request->vector_state = SP_STATE_READ_CREDBYTES;
/* fall through */
case SP_STATE_READ_CREDBYTES:
- addr = rpc_verf_addr (priv->incoming.frag.fragcurrent);
+ addr = rpc_verf_addr (frag->fragcurrent);
verflen = ntoh32 (*((uint32_t *)addr));
if (verflen == 0) {
- priv->incoming.frag.call_body.request.vector_state
- = SP_STATE_READ_VERFBYTES;
+ request->vector_state = SP_STATE_READ_VERFBYTES;
goto sp_state_read_verfbytes;
}
__socket_proto_init_pending (priv, verflen);
- priv->incoming.frag.call_body.request.vector_state
- = SP_STATE_READING_VERFBYTES;
+ request->vector_state = SP_STATE_READING_VERFBYTES;
/* fall through */
case SP_STATE_READING_VERFBYTES:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READ_VERFBYTES;
+ request->vector_state = SP_STATE_READ_VERFBYTES;
/* fall through */
@@ -1110,85 +1124,78 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto
sp_state_read_verfbytes:
/* set the base_addr 'persistently' across multiple calls
into the state machine */
- priv->incoming.proghdr_base_addr = priv->incoming.frag.fragcurrent;
+ in->proghdr_base_addr = frag->fragcurrent;
- priv->incoming.frag.call_body.request.vector_sizer_state =
- vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state,
- &readsize, priv->incoming.proghdr_base_addr,
- priv->incoming.frag.fragcurrent);
+ request->vector_sizer_state =
+ vector_sizer (request->vector_sizer_state,
+ &readsize, in->proghdr_base_addr,
+ frag->fragcurrent);
__socket_proto_init_pending (priv, readsize);
- priv->incoming.frag.call_body.request.vector_state
- = SP_STATE_READING_PROGHDR;
+
+ request->vector_state = SP_STATE_READING_PROGHDR;
/* fall through */
case SP_STATE_READING_PROGHDR:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READ_PROGHDR;
+
+ request->vector_state = SP_STATE_READ_PROGHDR;
/* fall through */
case SP_STATE_READ_PROGHDR:
sp_state_read_proghdr:
- priv->incoming.frag.call_body.request.vector_sizer_state =
- vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state,
- &readsize,
- priv->incoming.proghdr_base_addr,
- priv->incoming.frag.fragcurrent);
+ request->vector_sizer_state =
+ vector_sizer (request->vector_sizer_state,
+ &readsize, in->proghdr_base_addr,
+ frag->fragcurrent);
if (readsize == 0) {
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READ_PROGHDR_XDATA;
+ request->vector_state = SP_STATE_READ_PROGHDR_XDATA;
goto sp_state_read_proghdr_xdata;
}
__socket_proto_init_pending (priv, readsize);
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READING_PROGHDR_XDATA;
+ request->vector_state = SP_STATE_READING_PROGHDR_XDATA;
/* fall through */
case SP_STATE_READING_PROGHDR_XDATA:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READ_PROGHDR;
+ request->vector_state = SP_STATE_READ_PROGHDR;
/* check if the vector_sizer() has more to say */
goto sp_state_read_proghdr;
case SP_STATE_READ_PROGHDR_XDATA:
sp_state_read_proghdr_xdata:
- if (priv->incoming.payload_vector.iov_base == NULL) {
+ if (in->payload_vector.iov_base == NULL) {
- size = RPC_FRAGSIZE (priv->incoming.fraghdr) -
- priv->incoming.frag.bytes_read;
+ size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
iobuf = iobuf_get2 (this->ctx->iobuf_pool, size);
if (!iobuf) {
ret = -1;
break;
}
- if (priv->incoming.iobref == NULL) {
- priv->incoming.iobref = iobref_new ();
- if (priv->incoming.iobref == NULL) {
+ if (in->iobref == NULL) {
+ in->iobref = iobref_new ();
+ if (in->iobref == NULL) {
ret = -1;
iobuf_unref (iobuf);
break;
}
}
- iobref_add (priv->incoming.iobref, iobuf);
+ iobref_add (in->iobref, iobuf);
iobuf_unref (iobuf);
- priv->incoming.payload_vector.iov_base
- = iobuf_ptr (iobuf);
+ in->payload_vector.iov_base = iobuf_ptr (iobuf);
- priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf);
+ frag->fragcurrent = iobuf_ptr (iobuf);
}
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READING_PROG;
+ request->vector_state = SP_STATE_READING_PROG;
/* fall through */
@@ -1199,19 +1206,15 @@ sp_state_read_proghdr_xdata:
ret = __socket_read_simple_msg (this);
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
- if ((ret == -1)
- || ((ret == 0)
- && (remaining_size == 0)
- && RPC_LASTFRAG (priv->incoming.fraghdr))) {
- priv->incoming.frag.call_body.request.vector_state
- = SP_STATE_VECTORED_REQUEST_INIT;
- priv->incoming.payload_vector.iov_len
- = (unsigned long)priv->incoming.frag.fragcurrent
- - (unsigned long)
- priv->incoming.payload_vector.iov_base;
+ if ((ret == -1) ||
+ ((ret == 0) && (remaining_size == 0)
+ && RPC_LASTFRAG (in->fraghdr))) {
+ request->vector_state = SP_STATE_VECTORED_REQUEST_INIT;
+ in->payload_vector.iov_len
+ = ((unsigned long)frag->fragcurrent
+ - (unsigned long)in->payload_vector.iov_base);
}
break;
}
@@ -1229,46 +1232,53 @@ __socket_read_request (rpc_transport_t *this)
int ret = -1;
char *buf = NULL;
rpcsvc_vector_sizer vector_sizer = NULL;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+ sp_rpcfrag_request_state_t *request = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- switch (priv->incoming.frag.call_body.request.header_state) {
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+ request = &frag->call_body.request;
+
+ switch (request->header_state) {
case SP_STATE_REQUEST_HEADER_INIT:
__socket_proto_init_pending (priv, RPC_CALL_BODY_SIZE);
- priv->incoming.frag.call_body.request.header_state
- = SP_STATE_READING_RPCHDR1;
+ request->header_state = SP_STATE_READING_RPCHDR1;
/* fall through */
case SP_STATE_READING_RPCHDR1:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.request.header_state =
- SP_STATE_READ_RPCHDR1;
+ request->header_state = SP_STATE_READ_RPCHDR1;
/* fall through */
case SP_STATE_READ_RPCHDR1:
- buf = rpc_prognum_addr (iobuf_ptr (priv->incoming.iobuf));
+ buf = rpc_prognum_addr (iobuf_ptr (in->iobuf));
prognum = ntoh32 (*((uint32_t *)buf));
- buf = rpc_progver_addr (iobuf_ptr (priv->incoming.iobuf));
+ buf = rpc_progver_addr (iobuf_ptr (in->iobuf));
progver = ntoh32 (*((uint32_t *)buf));
- buf = rpc_procnum_addr (iobuf_ptr (priv->incoming.iobuf));
+ buf = rpc_procnum_addr (iobuf_ptr (in->iobuf));
procnum = ntoh32 (*((uint32_t *)buf));
if (this->listener) {
- /* this check is needed as rpcsvc and rpc-clnt actor structures are
- * not same */
- vector_sizer = rpcsvc_get_program_vector_sizer ((rpcsvc_t *)this->mydata,
- prognum, progver, procnum);
+ /* this check is needed as rpcsvc and rpc-clnt
+ * actor structures are not same */
+ vector_sizer =
+ rpcsvc_get_program_vector_sizer ((rpcsvc_t *)this->mydata,
+ prognum, progver, procnum);
}
if (vector_sizer) {
@@ -1277,15 +1287,13 @@ __socket_read_request (rpc_transport_t *this)
ret = __socket_read_simple_request (this);
}
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
if ((ret == -1)
|| ((ret == 0)
&& (remaining_size == 0)
- && (RPC_LASTFRAG (priv->incoming.fraghdr)))) {
- priv->incoming.frag.call_body.request.header_state =
- SP_STATE_REQUEST_HEADER_INIT;
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ request->header_state = SP_STATE_REQUEST_HEADER_INIT;
}
break;
@@ -1307,23 +1315,29 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
ssize_t default_read_size = 0;
char *proghdr_buf = NULL;
XDR xdr;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- switch (priv->incoming.frag.call_body.reply.accepted_success_state) {
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ switch (frag->call_body.reply.accepted_success_state) {
case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT:
default_read_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp,
&read_rsp);
- proghdr_buf = priv->incoming.frag.fragcurrent;
+ proghdr_buf = frag->fragcurrent;
__socket_proto_init_pending (priv, default_read_size);
- priv->incoming.frag.call_body.reply.accepted_success_state
+ frag->call_body.reply.accepted_success_state
= SP_STATE_READING_PROC_HEADER;
/* fall through */
@@ -1346,28 +1360,27 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
size = roof (read_rsp.xdata.xdata_len, 4);
if (!size) {
- priv->incoming.frag.call_body.reply.accepted_success_state
+ frag->call_body.reply.accepted_success_state
= SP_STATE_READ_PROC_OPAQUE;
goto read_proc_opaque;
}
__socket_proto_init_pending (priv, size);
- priv->incoming.frag.call_body.reply.accepted_success_state
+ frag->call_body.reply.accepted_success_state
= SP_STATE_READING_PROC_OPAQUE;
case SP_STATE_READING_PROC_OPAQUE:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.reply.accepted_success_state
+ frag->call_body.reply.accepted_success_state
= SP_STATE_READ_PROC_OPAQUE;
case SP_STATE_READ_PROC_OPAQUE:
read_proc_opaque:
- if (priv->incoming.payload_vector.iov_base == NULL) {
+ if (in->payload_vector.iov_base == NULL) {
- size = (RPC_FRAGSIZE (priv->incoming.fraghdr) -
- priv->incoming.frag.bytes_read);
+ size = (RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read);
iobuf = iobuf_get2 (this->ctx->iobuf_pool, size);
if (iobuf == NULL) {
@@ -1375,28 +1388,26 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
goto out;
}
- if (priv->incoming.iobref == NULL) {
- priv->incoming.iobref = iobref_new ();
- if (priv->incoming.iobref == NULL) {
+ if (in->iobref == NULL) {
+ in->iobref = iobref_new ();
+ if (in->iobref == NULL) {
ret = -1;
iobuf_unref (iobuf);
goto out;
}
}
- iobref_add (priv->incoming.iobref, iobuf);
+ iobref_add (in->iobref, iobuf);
iobuf_unref (iobuf);
- priv->incoming.payload_vector.iov_base
- = iobuf_ptr (iobuf);
+ in->payload_vector.iov_base = iobuf_ptr (iobuf);
- priv->incoming.payload_vector.iov_len = size;
+ in->payload_vector.iov_len = size;
}
- priv->incoming.frag.fragcurrent
- = priv->incoming.payload_vector.iov_base;
+ frag->fragcurrent = in->payload_vector.iov_base;
- priv->incoming.frag.call_body.reply.accepted_success_state
+ frag->call_body.reply.accepted_success_state
= SP_STATE_READ_PROC_HEADER;
/* fall through */
@@ -1405,9 +1416,8 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
/* now read the entire remaining msg into new iobuf */
ret = __socket_read_simple_msg (this);
if ((ret == -1)
- || ((ret == 0)
- && RPC_LASTFRAG (priv->incoming.fraghdr))) {
- priv->incoming.frag.call_body.reply.accepted_success_state
+ || ((ret == 0) && RPC_LASTFRAG (in->fraghdr))) {
+ frag->call_body.reply.accepted_success_state
= SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT;
}
@@ -1429,19 +1439,24 @@ __socket_read_accepted_reply (rpc_transport_t *this)
char *buf = NULL;
uint32_t verflen = 0, len = 0;
uint32_t remaining_size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
- switch (priv->incoming.frag.call_body.reply.accepted_state) {
+ switch (frag->call_body.reply.accepted_state) {
case SP_STATE_ACCEPTED_REPLY_INIT:
__socket_proto_init_pending (priv,
RPC_AUTH_FLAVOUR_N_LENGTH_SIZE);
- priv->incoming.frag.call_body.reply.accepted_state
+ frag->call_body.reply.accepted_state
= SP_STATE_READING_REPLY_VERFLEN;
/* fall through */
@@ -1449,13 +1464,13 @@ __socket_read_accepted_reply (rpc_transport_t *this)
case SP_STATE_READING_REPLY_VERFLEN:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.reply.accepted_state
+ frag->call_body.reply.accepted_state
= SP_STATE_READ_REPLY_VERFLEN;
/* fall through */
case SP_STATE_READ_REPLY_VERFLEN:
- buf = rpc_reply_verflen_addr (priv->incoming.frag.fragcurrent);
+ buf = rpc_reply_verflen_addr (frag->fragcurrent);
verflen = ntoh32 (*((uint32_t *) buf));
@@ -1464,7 +1479,7 @@ __socket_read_accepted_reply (rpc_transport_t *this)
__socket_proto_init_pending (priv, len);
- priv->incoming.frag.call_body.reply.accepted_state
+ frag->call_body.reply.accepted_state
= SP_STATE_READING_REPLY_VERFBYTES;
/* fall through */
@@ -1472,19 +1487,19 @@ __socket_read_accepted_reply (rpc_transport_t *this)
case SP_STATE_READING_REPLY_VERFBYTES:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.reply.accepted_state
+ frag->call_body.reply.accepted_state
= SP_STATE_READ_REPLY_VERFBYTES;
- buf = rpc_reply_accept_status_addr (priv->incoming.frag.fragcurrent);
+ buf = rpc_reply_accept_status_addr (frag->fragcurrent);
- priv->incoming.frag.call_body.reply.accept_status
+ frag->call_body.reply.accept_status
= ntoh32 (*(uint32_t *) buf);
/* fall through */
case SP_STATE_READ_REPLY_VERFBYTES:
- if (priv->incoming.frag.call_body.reply.accept_status
+ if (frag->call_body.reply.accept_status
== SUCCESS) {
ret = __socket_read_accepted_successful_reply (this);
} else {
@@ -1494,14 +1509,13 @@ __socket_read_accepted_reply (rpc_transport_t *this)
ret = __socket_read_simple_msg (this);
}
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr)
+ - frag->bytes_read;
if ((ret == -1)
- || ((ret == 0)
- && (remaining_size == 0)
- && (RPC_LASTFRAG (priv->incoming.fraghdr)))) {
- priv->incoming.frag.call_body.reply.accepted_state
+ || ((ret == 0) && (remaining_size == 0)
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ frag->call_body.reply.accepted_state
= SP_STATE_ACCEPTED_REPLY_INIT;
}
@@ -1530,18 +1544,22 @@ __socket_read_vectored_reply (rpc_transport_t *this)
int ret = 0;
char *buf = NULL;
uint32_t remaining_size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ in = &priv->incoming;
+ frag = &in->frag;
- switch (priv->incoming.frag.call_body.reply.status_state) {
+ switch (frag->call_body.reply.status_state) {
case SP_STATE_ACCEPTED_REPLY_INIT:
__socket_proto_init_pending (priv, RPC_REPLY_STATUS_SIZE);
- priv->incoming.frag.call_body.reply.status_state
+ frag->call_body.reply.status_state
= SP_STATE_READING_REPLY_STATUS;
/* fall through */
@@ -1549,37 +1567,33 @@ __socket_read_vectored_reply (rpc_transport_t *this)
case SP_STATE_READING_REPLY_STATUS:
__socket_proto_read (priv, ret);
- buf = rpc_reply_status_addr (priv->incoming.frag.fragcurrent);
+ buf = rpc_reply_status_addr (frag->fragcurrent);
- priv->incoming.frag.call_body.reply.accept_status
+ frag->call_body.reply.accept_status
= ntoh32 (*((uint32_t *) buf));
- priv->incoming.frag.call_body.reply.status_state
+ frag->call_body.reply.status_state
= SP_STATE_READ_REPLY_STATUS;
/* fall through */
case SP_STATE_READ_REPLY_STATUS:
- if (priv->incoming.frag.call_body.reply.accept_status
- == MSG_ACCEPTED) {
+ if (frag->call_body.reply.accept_status == MSG_ACCEPTED) {
ret = __socket_read_accepted_reply (this);
} else {
ret = __socket_read_denied_reply (this);
}
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
if ((ret == -1)
- || ((ret == 0)
- && (remaining_size == 0)
- && (RPC_LASTFRAG (priv->incoming.fraghdr)))) {
- priv->incoming.frag.call_body.reply.status_state
+ || ((ret == 0) && (remaining_size == 0)
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ frag->call_body.reply.status_state
= SP_STATE_ACCEPTED_REPLY_INIT;
- priv->incoming.payload_vector.iov_len
- = (unsigned long)priv->incoming.frag.fragcurrent
- - (unsigned long)
- priv->incoming.payload_vector.iov_base;
+ in->payload_vector.iov_len
+ = (unsigned long)frag->fragcurrent
+ - (unsigned long)in->payload_vector.iov_base;
}
break;
}
@@ -1605,26 +1619,29 @@ __socket_read_reply (rpc_transport_t *this)
int32_t ret = -1;
rpc_request_info_t *request_info = NULL;
char map_xid = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ in = &priv->incoming;
+ frag = &in->frag;
- buf = rpc_xid_addr (iobuf_ptr (priv->incoming.iobuf));
+ buf = rpc_xid_addr (iobuf_ptr (in->iobuf));
- if (priv->incoming.request_info == NULL) {
- priv->incoming.request_info = GF_CALLOC (1,
- sizeof (*request_info),
- gf_common_mt_rpc_trans_reqinfo_t);
- if (priv->incoming.request_info == NULL) {
+ if (in->request_info == NULL) {
+ in->request_info = GF_CALLOC (1, sizeof (*request_info),
+ gf_common_mt_rpc_trans_reqinfo_t);
+ if (in->request_info == NULL) {
goto out;
}
map_xid = 1;
}
- request_info = priv->incoming.request_info;
+ request_info = in->request_info;
if (map_xid) {
request_info->xid = ntoh32 (*((uint32_t *) buf));
@@ -1636,7 +1653,7 @@ __socket_read_reply (rpc_transport_t *this)
{
ret = rpc_transport_notify (this,
RPC_TRANSPORT_MAP_XID_REQUEST,
- priv->incoming.request_info);
+ in->request_info);
}
pthread_mutex_lock (&priv->lock);
@@ -1650,10 +1667,8 @@ __socket_read_reply (rpc_transport_t *this)
if ((request_info->prognum == GLUSTER_FOP_PROGRAM)
&& (request_info->procnum == GF_FOP_READ)) {
if (map_xid && request_info->rsp.rsp_payload_count != 0) {
- priv->incoming.iobref
- = iobref_ref (request_info->rsp.rsp_iobref);
- priv->incoming.payload_vector
- = *request_info->rsp.rsp_payload;
+ in->iobref = iobref_ref (request_info->rsp.rsp_iobref);
+ in->payload_vector = *request_info->rsp.rsp_payload;
}
ret = __socket_read_vectored_reply (this);
@@ -1673,35 +1688,40 @@ __socket_read_frag (rpc_transport_t *this)
int32_t ret = 0;
char *buf = NULL;
uint32_t remaining_size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
- switch (priv->incoming.frag.state) {
+ switch (frag->state) {
case SP_STATE_NADA:
__socket_proto_init_pending (priv, RPC_MSGTYPE_SIZE);
- priv->incoming.frag.state = SP_STATE_READING_MSGTYPE;
+ frag->state = SP_STATE_READING_MSGTYPE;
/* fall through */
case SP_STATE_READING_MSGTYPE:
__socket_proto_read (priv, ret);
- priv->incoming.frag.state = SP_STATE_READ_MSGTYPE;
+ frag->state = SP_STATE_READ_MSGTYPE;
/* fall through */
case SP_STATE_READ_MSGTYPE:
- buf = rpc_msgtype_addr (iobuf_ptr (priv->incoming.iobuf));
- priv->incoming.msg_type = ntoh32 (*((uint32_t *)buf));
+ buf = rpc_msgtype_addr (iobuf_ptr (in->iobuf));
+ in->msg_type = ntoh32 (*((uint32_t *)buf));
- if (priv->incoming.msg_type == CALL) {
+ if (in->msg_type == CALL) {
ret = __socket_read_request (this);
- } else if (priv->incoming.msg_type == REPLY) {
+ } else if (in->msg_type == REPLY) {
ret = __socket_read_reply (this);
- } else if (priv->incoming.msg_type == GF_UNIVERSAL_ANSWER) {
+ } else if (in->msg_type == GF_UNIVERSAL_ANSWER) {
gf_log ("rpc", GF_LOG_ERROR,
"older version of protocol/process trying to "
"connect from %s. use newer version on that node",
@@ -1709,19 +1729,17 @@ __socket_read_frag (rpc_transport_t *this)
} else {
gf_log ("rpc", GF_LOG_ERROR,
"wrong MSG-TYPE (%d) received from %s",
- priv->incoming.msg_type,
+ in->msg_type,
this->peerinfo.identifier);
ret = -1;
}
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
if ((ret == -1)
- || ((ret == 0)
- && (remaining_size == 0)
- && (RPC_LASTFRAG (priv->incoming.fraghdr)))) {
- priv->incoming.frag.state = SP_STATE_NADA;
+ || ((ret == 0) && (remaining_size == 0)
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ frag->state = SP_STATE_NADA;
}
break;
@@ -1735,24 +1753,29 @@ out:
inline
void __socket_reset_priv (socket_private_t *priv)
{
- if (priv->incoming.iobref) {
- iobref_unref (priv->incoming.iobref);
- priv->incoming.iobref = NULL;
+ struct gf_sock_incoming *in = NULL;
+
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+
+ if (in->iobref) {
+ iobref_unref (in->iobref);
+ in->iobref = NULL;
}
- if (priv->incoming.iobuf) {
- iobuf_unref (priv->incoming.iobuf);
+ if (in->iobuf) {
+ iobuf_unref (in->iobuf);
}
- if (priv->incoming.request_info != NULL) {
- GF_FREE (priv->incoming.request_info);
- priv->incoming.request_info = NULL;
+ if (in->request_info != NULL) {
+ GF_FREE (in->request_info);
+ in->request_info = NULL;
}
- memset (&priv->incoming.payload_vector, 0,
- sizeof (priv->incoming.payload_vector));
+ memset (&in->payload_vector, 0,
+ sizeof (in->payload_vector));
- priv->incoming.iobuf = NULL;
+ in->iobuf = NULL;
}
@@ -1765,34 +1788,37 @@ __socket_proto_state_machine (rpc_transport_t *this,
struct iobuf *iobuf = NULL;
struct iobref *iobref = NULL;
struct iovec vector[2];
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- while (priv->incoming.record_state != SP_STATE_COMPLETE) {
- switch (priv->incoming.record_state) {
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ while (in->record_state != SP_STATE_COMPLETE) {
+ switch (in->record_state) {
case SP_STATE_NADA:
- priv->incoming.total_bytes_read = 0;
- priv->incoming.payload_vector.iov_len = 0;
+ in->total_bytes_read = 0;
+ in->payload_vector.iov_len = 0;
- priv->incoming.pending_vector = priv->incoming.vector;
- priv->incoming.pending_vector->iov_base =
- &priv->incoming.fraghdr;
+ in->pending_vector = in->vector;
+ in->pending_vector->iov_base = &in->fraghdr;
- priv->incoming.pending_vector->iov_len =
- sizeof (priv->incoming.fraghdr);
+ in->pending_vector->iov_len = sizeof (in->fraghdr);
- priv->incoming.record_state = SP_STATE_READING_FRAGHDR;
+ in->record_state = SP_STATE_READING_FRAGHDR;
/* fall through */
case SP_STATE_READING_FRAGHDR:
- ret = __socket_readv (this,
- priv->incoming.pending_vector, 1,
- &priv->incoming.pending_vector,
- &priv->incoming.pending_count,
+ ret = __socket_readv (this, in->pending_vector, 1,
+ &in->pending_vector,
+ &in->pending_count,
NULL);
if (ret == -1) {
if (priv->read_fail_log == 1) {
@@ -1813,44 +1839,40 @@ __socket_proto_state_machine (rpc_transport_t *this,
}
if (ret == 0) {
- priv->incoming.record_state =
- SP_STATE_READ_FRAGHDR;
+ in->record_state = SP_STATE_READ_FRAGHDR;
}
/* fall through */
case SP_STATE_READ_FRAGHDR:
- priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr);
- priv->incoming.total_bytes_read
- += RPC_FRAGSIZE(priv->incoming.fraghdr);
+ in->fraghdr = ntoh32 (in->fraghdr);
+ in->total_bytes_read += RPC_FRAGSIZE(in->fraghdr);
iobuf = iobuf_get2 (this->ctx->iobuf_pool,
- priv->incoming.total_bytes_read +
- sizeof (priv->incoming.fraghdr));
+ (in->total_bytes_read +
+ sizeof (in->fraghdr)));
if (!iobuf) {
ret = -ENOMEM;
goto out;
}
- priv->incoming.iobuf = iobuf;
- priv->incoming.iobuf_size = 0;
- priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf);
- priv->incoming.record_state = SP_STATE_READING_FRAG;
+ in->iobuf = iobuf;
+ in->iobuf_size = 0;
+ frag->fragcurrent = iobuf_ptr (iobuf);
+ in->record_state = SP_STATE_READING_FRAG;
/* fall through */
case SP_STATE_READING_FRAG:
ret = __socket_read_frag (this);
- if ((ret == -1)
- || (priv->incoming.frag.bytes_read !=
- RPC_FRAGSIZE (priv->incoming.fraghdr))) {
+ if ((ret == -1) ||
+ (frag->bytes_read != RPC_FRAGSIZE (in->fraghdr))) {
goto out;
}
- priv->incoming.frag.bytes_read = 0;
+ frag->bytes_read = 0;
- if (!RPC_LASTFRAG (priv->incoming.fraghdr)) {
- priv->incoming.record_state =
- SP_STATE_READING_FRAGHDR;
+ if (!RPC_LASTFRAG (in->fraghdr)) {
+ in->record_state = SP_STATE_READING_FRAGHDR;
break;
}
@@ -1859,44 +1881,39 @@ __socket_proto_state_machine (rpc_transport_t *this,
*/
if (pollin != NULL) {
int count = 0;
- priv->incoming.iobuf_size
- = priv->incoming.total_bytes_read
- - priv->incoming.payload_vector.iov_len;
+ in->iobuf_size = (in->total_bytes_read -
+ in->payload_vector.iov_len);
memset (vector, 0, sizeof (vector));
- if (priv->incoming.iobref == NULL) {
- priv->incoming.iobref = iobref_new ();
- if (priv->incoming.iobref == NULL) {
+ if (in->iobref == NULL) {
+ in->iobref = iobref_new ();
+ if (in->iobref == NULL) {
ret = -1;
goto out;
}
}
- vector[count].iov_base
- = iobuf_ptr (priv->incoming.iobuf);
- vector[count].iov_len
- = priv->incoming.iobuf_size;
+ vector[count].iov_base = iobuf_ptr (in->iobuf);
+ vector[count].iov_len = in->iobuf_size;
- iobref = priv->incoming.iobref;
+ iobref = in->iobref;
count++;
- if (priv->incoming.payload_vector.iov_base
- != NULL) {
- vector[count]
- = priv->incoming.payload_vector;
+ if (in->payload_vector.iov_base != NULL) {
+ vector[count] = in->payload_vector;
count++;
}
*pollin = rpc_transport_pollin_alloc (this,
vector,
count,
- priv->incoming.iobuf,
+ in->iobuf,
iobref,
- priv->incoming.request_info);
- iobuf_unref (priv->incoming.iobuf);
- priv->incoming.iobuf = NULL;
+ in->request_info);
+ iobuf_unref (in->iobuf);
+ in->iobuf = NULL;
if (*pollin == NULL) {
gf_log (this->name, GF_LOG_WARNING,
@@ -1904,12 +1921,12 @@ __socket_proto_state_machine (rpc_transport_t *this,
ret = -1;
goto out;
}
- if (priv->incoming.msg_type == REPLY)
+ if (in->msg_type == REPLY)
(*pollin)->is_reply = 1;
- priv->incoming.request_info = NULL;
+ in->request_info = NULL;
}
- priv->incoming.record_state = SP_STATE_COMPLETE;
+ in->record_state = SP_STATE_COMPLETE;
break;
case SP_STATE_COMPLETE:
@@ -1921,8 +1938,8 @@ __socket_proto_state_machine (rpc_transport_t *this,
}
}
- if (priv->incoming.record_state == SP_STATE_COMPLETE) {
- priv->incoming.record_state = SP_STATE_NADA;
+ if (in->record_state == SP_STATE_COMPLETE) {
+ in->record_state = SP_STATE_NADA;
__socket_reset_priv (priv);
}
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index 0a407cc1a..2c4b44cf4 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -143,6 +143,40 @@ typedef struct {
sp_rpcfrag_vectored_reply_accepted_success_state_t accepted_success_state;
} sp_rpcfrag_vectored_reply_state_t;
+struct gf_sock_incoming_frag {
+ char *fragcurrent;
+ uint32_t bytes_read;
+ uint32_t remaining_size;
+ struct iovec vector;
+ struct iovec *pending_vector;
+ union {
+ sp_rpcfrag_request_state_t request;
+ sp_rpcfrag_vectored_reply_state_t reply;
+ } call_body;
+
+ sp_rpcfrag_simple_msg_state_t simple_state;
+ sp_rpcfrag_state_t state;
+};
+
+struct gf_sock_incoming {
+ sp_rpcrecord_state_t record_state;
+ struct gf_sock_incoming_frag frag;
+ char *proghdr_base_addr;
+ struct iobuf *iobuf;
+ size_t iobuf_size;
+ struct iovec vector[2];
+ int count;
+ struct iovec payload_vector;
+ struct iobref *iobref;
+ rpc_request_info_t *request_info;
+ struct iovec *pending_vector;
+ int pending_count;
+ uint32_t fraghdr;
+ char complete_record;
+ msg_type_t msg_type;
+ size_t total_bytes_read;
+};
+
typedef struct {
int32_t sock;
int32_t idx;
@@ -158,37 +192,7 @@ typedef struct {
struct ioq *ioq_prev;
};
};
- struct {
- sp_rpcrecord_state_t record_state;
- struct {
- char *fragcurrent;
- uint32_t bytes_read;
- uint32_t remaining_size;
- struct iovec vector;
- struct iovec *pending_vector;
- union {
- sp_rpcfrag_request_state_t request;
- sp_rpcfrag_vectored_reply_state_t reply;
- } call_body;
-
- sp_rpcfrag_simple_msg_state_t simple_state;
- sp_rpcfrag_state_t state;
- } frag;
- char *proghdr_base_addr;
- struct iobuf *iobuf;
- size_t iobuf_size;
- struct iovec vector[2];
- int count;
- struct iovec payload_vector;
- struct iobref *iobref;
- rpc_request_info_t *request_info;
- struct iovec *pending_vector;
- int pending_count;
- uint32_t fraghdr;
- char complete_record;
- msg_type_t msg_type;
- size_t total_bytes_read;
- } incoming;
+ struct gf_sock_incoming incoming;
pthread_mutex_t lock;
int windowsize;
char lowlat;