diff options
Diffstat (limited to 'rpc')
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 579 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 66 | 
2 files changed, 333 insertions, 312 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index f5c6bffa2..24079a9f8 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -63,80 +63,89 @@  typedef int SSL_unary_func (SSL *);  typedef int SSL_trinary_func (SSL *, void *, int); -#define __socket_proto_reset_pending(priv) do {                 \ -                memset (&priv->incoming.frag.vector, 0,         \ -                        sizeof (priv->incoming.frag.vector));   \ -                priv->incoming.frag.pending_vector =            \ -                        &priv->incoming.frag.vector;            \ -                priv->incoming.frag.pending_vector->iov_base =  \ -                        priv->incoming.frag.fragcurrent;        \ -                priv->incoming.pending_vector =                 \ -                        priv->incoming.frag.pending_vector;     \ -        } while (0); +#define __socket_proto_reset_pending(priv) do {                         \ +                struct gf_sock_incoming_frag *frag;                     \ +                frag = &priv->incoming.frag;                            \ +                                                                        \ +                memset (&frag->vector, 0, sizeof (frag->vector));       \ +                frag->pending_vector = &frag->vector;                   \ +                frag->pending_vector->iov_base = frag->fragcurrent;     \ +                priv->incoming.pending_vector =  frag->pending_vector;  \ +        } while (0)  #define __socket_proto_update_pending(priv)                             \          do {                                                            \ -                uint32_t remaining_fragsize = 0;                        \ -                if (priv->incoming.frag.pending_vector->iov_len == 0) { \ -                        remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \ -                                - priv->incoming.frag.bytes_read;       \ +                uint32_t remaining;                                     \ +                struct gf_sock_incoming_frag *frag;                     \ +                frag = &priv->incoming.frag;                            \ +                if (frag->pending_vector->iov_len == 0) {               \ +                        remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr) \ +                                     - frag->bytes_read);               \                                                                          \ -                        priv->incoming.frag.pending_vector->iov_len =   \ -                                remaining_fragsize > priv->incoming.frag.remaining_size \ -                                ? priv->incoming.frag.remaining_size : remaining_fragsize; \ +                        frag->pending_vector->iov_len =                 \ +                                (remaining > frag->remaining_size)      \ +                                ? frag->remaining_size : remaining;     \                                                                          \ -                        priv->incoming.frag.remaining_size -=           \ -                                priv->incoming.frag.pending_vector->iov_len; \ +                        frag->remaining_size -=                         \ +                                frag->pending_vector->iov_len;          \                  }                                                       \ -        } while (0); +        } while (0)  #define __socket_proto_update_priv_after_read(priv, ret, bytes_read)    \          {                                                               \ -                priv->incoming.frag.fragcurrent += bytes_read;          \ -                priv->incoming.frag.bytes_read += bytes_read;           \ +                struct gf_sock_incoming_frag *frag;                     \ +                frag = &priv->incoming.frag;                            \ +                                                                        \ +                frag->fragcurrent += bytes_read;                        \ +                frag->bytes_read += bytes_read;                         \                                                                          \ -                if ((ret > 0) || (priv->incoming.frag.remaining_size != 0)) { \ -                        if (priv->incoming.frag.remaining_size != 0 && ret == 0) {  \ +                if ((ret > 0) || (frag->remaining_size != 0)) {         \ +                        if (frag->remaining_size != 0 && ret == 0) {    \                                  __socket_proto_reset_pending (priv);    \                          }                                               \                                                                          \ -                        gf_log (this->name, GF_LOG_TRACE, "partial read on non-blocking socket"); \ +                        gf_log (this->name, GF_LOG_TRACE,               \ +                                "partial read on non-blocking socket"); \                                                                          \                          break;                                          \                  }                                                       \          } -#define __socket_proto_init_pending(priv, size)                         \ +#define __socket_proto_init_pending(priv,size)                          \          do {                                                            \ -                uint32_t remaining_fragsize = 0;                        \ -                remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \ -                        - priv->incoming.frag.bytes_read;               \ +            uint32_t remaining = 0;                                     \ +            struct gf_sock_incoming_frag *frag;                         \ +            frag = &priv->incoming.frag;                                \ +                                                                        \ +            remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr)          \ +                         - frag->bytes_read);                           \                                                                          \ -                __socket_proto_reset_pending (priv);                    \ +            __socket_proto_reset_pending (priv);                        \                                                                          \ -                priv->incoming.frag.pending_vector->iov_len =           \ -                        remaining_fragsize > size ? size : remaining_fragsize; \ +            frag->pending_vector->iov_len =                             \ +                    (remaining > size) ? size : remaining;              \                                                                          \ -                priv->incoming.frag.remaining_size =                    \ -                        size - priv->incoming.frag.pending_vector->iov_len; \ +            frag->remaining_size = (size - frag->pending_vector->iov_len); \                                                                          \ -        } while (0); +            } while(0)  /* This will be used in a switch case and breaks from the switch case if all   * the pending data is not read.   */  #define __socket_proto_read(priv, ret)                                  \ -        {                                                               \ +                {                                                       \                  size_t bytes_read = 0;                                  \ +                struct gf_sock_incoming *in;                            \ +                in = &priv->incoming;                                   \                                                                          \                  __socket_proto_update_pending (priv);                   \                                                                          \                  ret = __socket_readv (this,                             \ -                                      priv->incoming.pending_vector, 1, \ -                                      &priv->incoming.pending_vector,   \ -                                      &priv->incoming.pending_count,    \ +                                      in->pending_vector, 1,            \ +                                      &in->pending_vector,              \ +                                      &in->pending_count,               \                                        &bytes_read);                     \                  if (ret == -1) {                                        \                          gf_log (this->name, GF_LOG_WARNING,             \ @@ -959,40 +968,42 @@ out:  inline int  __socket_read_simple_msg (rpc_transport_t *this)  { -        socket_private_t *priv           = NULL; -        int               ret            = 0; -        uint32_t          remaining_size = 0; -        size_t            bytes_read     = 0; +        int                           ret            = 0; +        uint32_t                      remaining_size = 0; +        size_t                        bytes_read     = 0; +        socket_private_t             *priv           = NULL; +        struct gf_sock_incoming      *in             = NULL; +        struct gf_sock_incoming_frag *frag           = NULL;          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out);          priv = this->private; -        switch (priv->incoming.frag.simple_state) { +        in = &priv->incoming; +        frag = &in->frag; + +        switch (frag->simple_state) {          case SP_STATE_SIMPLE_MSG_INIT: -                remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) -                        - priv->incoming.frag.bytes_read; +                remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;                  __socket_proto_init_pending (priv, remaining_size); -                priv->incoming.frag.simple_state = -                        SP_STATE_READING_SIMPLE_MSG; +                frag->simple_state = SP_STATE_READING_SIMPLE_MSG;                  /* fall through */          case SP_STATE_READING_SIMPLE_MSG:                  ret = 0; -                remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) -                        - priv->incoming.frag.bytes_read; +                remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;                  if (remaining_size > 0) {                          ret = __socket_readv (this, -                                              priv->incoming.pending_vector, 1, -                                              &priv->incoming.pending_vector, -                                              &priv->incoming.pending_count, +                                              in->pending_vector, 1, +                                              &in->pending_vector, +                                              &in->pending_count,                                                &bytes_read);                  } @@ -1004,8 +1015,8 @@ __socket_read_simple_msg (rpc_transport_t *this)                          break;                  } -                priv->incoming.frag.bytes_read += bytes_read; -                priv->incoming.frag.fragcurrent += bytes_read; +                frag->bytes_read += bytes_read; +                frag->fragcurrent += bytes_read;                  if (ret > 0) {                          gf_log (this->name, GF_LOG_TRACE, @@ -1014,8 +1025,7 @@ __socket_read_simple_msg (rpc_transport_t *this)                  }                  if (ret == 0) { -                        priv->incoming.frag.simple_state -                                =  SP_STATE_SIMPLE_MSG_INIT; +                        frag->simple_state =  SP_STATE_SIMPLE_MSG_INIT;                  }          } @@ -1051,17 +1061,26 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto          struct iobuf     *iobuf                  = NULL;          uint32_t          remaining_size         = 0;          ssize_t           readsize               = 0; -        size_t            size = 0; +        size_t            size                   = 0; +        struct gf_sock_incoming      *in         = NULL; +        struct gf_sock_incoming_frag *frag       = NULL; +        sp_rpcfrag_request_state_t   *request    = NULL;          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out);          priv = this->private; -        switch (priv->incoming.frag.call_body.request.vector_state) { +        /* used to reduce the indirection */ +        in = &priv->incoming; +        frag = &in->frag; +        request = &frag->call_body.request; + +        switch (request->vector_state) {          case SP_STATE_VECTORED_REQUEST_INIT: -                priv->incoming.frag.call_body.request.vector_sizer_state = 0; -                addr = rpc_cred_addr (iobuf_ptr (priv->incoming.iobuf)); +                request->vector_sizer_state = 0; + +                addr = rpc_cred_addr (iobuf_ptr (in->iobuf));                  /* also read verf flavour and verflen */                  credlen = ntoh32 (*((uint32_t *)addr)) @@ -1069,40 +1088,35 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto                  __socket_proto_init_pending (priv, credlen); -                priv->incoming.frag.call_body.request.vector_state = -                        SP_STATE_READING_CREDBYTES; +                request->vector_state = SP_STATE_READING_CREDBYTES;                  /* fall through */          case SP_STATE_READING_CREDBYTES:                  __socket_proto_read (priv, ret); -                priv->incoming.frag.call_body.request.vector_state = -                        SP_STATE_READ_CREDBYTES; +                request->vector_state = SP_STATE_READ_CREDBYTES;                  /* fall through */          case SP_STATE_READ_CREDBYTES: -                addr = rpc_verf_addr (priv->incoming.frag.fragcurrent); +                addr = rpc_verf_addr (frag->fragcurrent);                  verflen = ntoh32 (*((uint32_t *)addr));                  if (verflen == 0) { -                        priv->incoming.frag.call_body.request.vector_state -                                = SP_STATE_READ_VERFBYTES; +                        request->vector_state = SP_STATE_READ_VERFBYTES;                          goto sp_state_read_verfbytes;                  }                  __socket_proto_init_pending (priv, verflen); -                priv->incoming.frag.call_body.request.vector_state -                        = SP_STATE_READING_VERFBYTES; +                request->vector_state = SP_STATE_READING_VERFBYTES;                  /* fall through */          case SP_STATE_READING_VERFBYTES:                  __socket_proto_read (priv, ret); -                priv->incoming.frag.call_body.request.vector_state = -                        SP_STATE_READ_VERFBYTES; +                request->vector_state = SP_STATE_READ_VERFBYTES;                  /* fall through */ @@ -1110,85 +1124,78 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto  sp_state_read_verfbytes:  		/* set the base_addr 'persistently' across multiple calls  		   into the state machine */ -                priv->incoming.proghdr_base_addr = priv->incoming.frag.fragcurrent; +                in->proghdr_base_addr = frag->fragcurrent; -                priv->incoming.frag.call_body.request.vector_sizer_state = -                        vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state, -                                      &readsize, priv->incoming.proghdr_base_addr, -                                      priv->incoming.frag.fragcurrent); +                request->vector_sizer_state = +                        vector_sizer (request->vector_sizer_state, +                                      &readsize, in->proghdr_base_addr, +                                      frag->fragcurrent);                  __socket_proto_init_pending (priv, readsize); -                priv->incoming.frag.call_body.request.vector_state -                        = SP_STATE_READING_PROGHDR; + +                request->vector_state = SP_STATE_READING_PROGHDR;                  /* fall through */          case SP_STATE_READING_PROGHDR:                  __socket_proto_read (priv, ret); -		priv->incoming.frag.call_body.request.vector_state = -			SP_STATE_READ_PROGHDR; + +		request->vector_state =	SP_STATE_READ_PROGHDR;  		/* fall through */  	case SP_STATE_READ_PROGHDR:  sp_state_read_proghdr: -                priv->incoming.frag.call_body.request.vector_sizer_state = -                        vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state, -                                      &readsize, -				      priv->incoming.proghdr_base_addr, -                                      priv->incoming.frag.fragcurrent); +                request->vector_sizer_state = +                        vector_sizer (request->vector_sizer_state, +                                      &readsize, in->proghdr_base_addr, +                                      frag->fragcurrent);                  if (readsize == 0) { -                        priv->incoming.frag.call_body.request.vector_state = -                                SP_STATE_READ_PROGHDR_XDATA; +                        request->vector_state = SP_STATE_READ_PROGHDR_XDATA;  			goto sp_state_read_proghdr_xdata;                  }  		__socket_proto_init_pending (priv, readsize); -                priv->incoming.frag.call_body.request.vector_state = -			SP_STATE_READING_PROGHDR_XDATA; +                request->vector_state =	SP_STATE_READING_PROGHDR_XDATA;  		/* fall through */  	case SP_STATE_READING_PROGHDR_XDATA:  		__socket_proto_read (priv, ret); -		priv->incoming.frag.call_body.request.vector_state = -			SP_STATE_READ_PROGHDR; +		request->vector_state =	SP_STATE_READ_PROGHDR;  		/* check if the vector_sizer() has more to say */  		goto sp_state_read_proghdr;          case SP_STATE_READ_PROGHDR_XDATA:  sp_state_read_proghdr_xdata: -                if (priv->incoming.payload_vector.iov_base == NULL) { +                if (in->payload_vector.iov_base == NULL) { -                        size = RPC_FRAGSIZE (priv->incoming.fraghdr) - -                                priv->incoming.frag.bytes_read; +                        size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;                          iobuf = iobuf_get2 (this->ctx->iobuf_pool, size);                          if (!iobuf) {                                  ret = -1;                                  break;                          } -                        if (priv->incoming.iobref == NULL) { -                                priv->incoming.iobref = iobref_new (); -                                if (priv->incoming.iobref == NULL) { +                        if (in->iobref == NULL) { +                                in->iobref = iobref_new (); +                                if (in->iobref == NULL) {                                          ret = -1;                                          iobuf_unref (iobuf);                                          break;                                  }                          } -                        iobref_add (priv->incoming.iobref, iobuf); +                        iobref_add (in->iobref, iobuf);                          iobuf_unref (iobuf); -                        priv->incoming.payload_vector.iov_base -                                = iobuf_ptr (iobuf); +                        in->payload_vector.iov_base = iobuf_ptr (iobuf); -                        priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf); +                        frag->fragcurrent = iobuf_ptr (iobuf);                  } -                priv->incoming.frag.call_body.request.vector_state = -                        SP_STATE_READING_PROG; +                request->vector_state = SP_STATE_READING_PROG;                  /* fall through */ @@ -1199,19 +1206,15 @@ sp_state_read_proghdr_xdata:                  ret = __socket_read_simple_msg (this); -                remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) -                        - priv->incoming.frag.bytes_read; +                remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; -                if ((ret == -1) -                    || ((ret == 0) -                        && (remaining_size == 0) -                        && RPC_LASTFRAG (priv->incoming.fraghdr))) { -                        priv->incoming.frag.call_body.request.vector_state -                                = SP_STATE_VECTORED_REQUEST_INIT; -                        priv->incoming.payload_vector.iov_len -                                = (unsigned long)priv->incoming.frag.fragcurrent -                                - (unsigned long) -                                priv->incoming.payload_vector.iov_base; +                if ((ret == -1) || +                    ((ret == 0) && (remaining_size == 0) +                     && RPC_LASTFRAG (in->fraghdr))) { +                        request->vector_state = SP_STATE_VECTORED_REQUEST_INIT; +                        in->payload_vector.iov_len +                                = ((unsigned long)frag->fragcurrent +                                   - (unsigned long)in->payload_vector.iov_base);                  }                  break;          } @@ -1229,46 +1232,53 @@ __socket_read_request (rpc_transport_t *this)          int               ret                = -1;          char             *buf                = NULL;          rpcsvc_vector_sizer     vector_sizer = NULL; +        struct gf_sock_incoming      *in         = NULL; +        struct gf_sock_incoming_frag *frag       = NULL; +        sp_rpcfrag_request_state_t   *request    = NULL;          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out);          priv = this->private; -        switch (priv->incoming.frag.call_body.request.header_state) { +        /* used to reduce the indirection */ +        in = &priv->incoming; +        frag = &in->frag; +        request = &frag->call_body.request; + +        switch (request->header_state) {          case SP_STATE_REQUEST_HEADER_INIT:                  __socket_proto_init_pending (priv, RPC_CALL_BODY_SIZE); -                priv->incoming.frag.call_body.request.header_state -                        = SP_STATE_READING_RPCHDR1; +                request->header_state = SP_STATE_READING_RPCHDR1;                  /* fall through */          case SP_STATE_READING_RPCHDR1:                  __socket_proto_read (priv, ret); -                priv->incoming.frag.call_body.request.header_state = -                        SP_STATE_READ_RPCHDR1; +                request->header_state = SP_STATE_READ_RPCHDR1;                  /* fall through */          case SP_STATE_READ_RPCHDR1: -                buf = rpc_prognum_addr (iobuf_ptr (priv->incoming.iobuf)); +                buf = rpc_prognum_addr (iobuf_ptr (in->iobuf));                  prognum = ntoh32 (*((uint32_t *)buf)); -                buf = rpc_progver_addr (iobuf_ptr (priv->incoming.iobuf)); +                buf = rpc_progver_addr (iobuf_ptr (in->iobuf));                  progver = ntoh32 (*((uint32_t *)buf)); -                buf = rpc_procnum_addr (iobuf_ptr (priv->incoming.iobuf)); +                buf = rpc_procnum_addr (iobuf_ptr (in->iobuf));                  procnum = ntoh32 (*((uint32_t *)buf));                  if (this->listener) { -                        /* this check is needed as rpcsvc and rpc-clnt actor structures are -                         * not same */ -                        vector_sizer = rpcsvc_get_program_vector_sizer ((rpcsvc_t *)this->mydata, -                                                                        prognum, progver, procnum); +                        /* this check is needed as rpcsvc and rpc-clnt +                         * actor structures are not same */ +                        vector_sizer = +                                rpcsvc_get_program_vector_sizer ((rpcsvc_t *)this->mydata, +                                                                 prognum, progver, procnum);                  }                  if (vector_sizer) { @@ -1277,15 +1287,13 @@ __socket_read_request (rpc_transport_t *this)                          ret = __socket_read_simple_request (this);                  } -                remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) -                        - priv->incoming.frag.bytes_read; +                remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;                  if ((ret == -1)                      || ((ret == 0)                          && (remaining_size == 0) -                        && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { -                        priv->incoming.frag.call_body.request.header_state = -                                SP_STATE_REQUEST_HEADER_INIT; +                        && (RPC_LASTFRAG (in->fraghdr)))) { +                        request->header_state = SP_STATE_REQUEST_HEADER_INIT;                  }                  break; @@ -1307,23 +1315,29 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)          ssize_t           default_read_size = 0;          char             *proghdr_buf       = NULL;          XDR               xdr; +        struct gf_sock_incoming      *in         = NULL; +        struct gf_sock_incoming_frag *frag       = NULL;          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out);          priv = this->private; -        switch (priv->incoming.frag.call_body.reply.accepted_success_state) { +        /* used to reduce the indirection */ +        in = &priv->incoming; +        frag = &in->frag; + +        switch (frag->call_body.reply.accepted_success_state) {          case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT:                  default_read_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp,                                                  &read_rsp); -                proghdr_buf = priv->incoming.frag.fragcurrent; +                proghdr_buf = frag->fragcurrent;                  __socket_proto_init_pending (priv, default_read_size); -                priv->incoming.frag.call_body.reply.accepted_success_state +                frag->call_body.reply.accepted_success_state                          = SP_STATE_READING_PROC_HEADER;                  /* fall through */ @@ -1346,28 +1360,27 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)                  size = roof (read_rsp.xdata.xdata_len, 4);                  if (!size) { -                        priv->incoming.frag.call_body.reply.accepted_success_state +                        frag->call_body.reply.accepted_success_state                                  = SP_STATE_READ_PROC_OPAQUE;                          goto read_proc_opaque;                  }                  __socket_proto_init_pending (priv, size); -                priv->incoming.frag.call_body.reply.accepted_success_state +                frag->call_body.reply.accepted_success_state                          = SP_STATE_READING_PROC_OPAQUE;          case SP_STATE_READING_PROC_OPAQUE:                  __socket_proto_read (priv, ret); -                priv->incoming.frag.call_body.reply.accepted_success_state +                frag->call_body.reply.accepted_success_state                          = SP_STATE_READ_PROC_OPAQUE;          case SP_STATE_READ_PROC_OPAQUE:          read_proc_opaque: -                if (priv->incoming.payload_vector.iov_base == NULL) { +                if (in->payload_vector.iov_base == NULL) { -                        size = (RPC_FRAGSIZE (priv->incoming.fraghdr) - -                                priv->incoming.frag.bytes_read); +                        size = (RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read);                          iobuf = iobuf_get2 (this->ctx->iobuf_pool, size);                          if (iobuf == NULL) { @@ -1375,28 +1388,26 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)                                  goto out;                          } -                        if (priv->incoming.iobref == NULL) { -                                priv->incoming.iobref = iobref_new (); -                                if (priv->incoming.iobref == NULL) { +                        if (in->iobref == NULL) { +                                in->iobref = iobref_new (); +                                if (in->iobref == NULL) {                                          ret = -1;                                          iobuf_unref (iobuf);                                          goto out;                                  }                          } -                        iobref_add (priv->incoming.iobref, iobuf); +                        iobref_add (in->iobref, iobuf);                          iobuf_unref (iobuf); -                        priv->incoming.payload_vector.iov_base -                                = iobuf_ptr (iobuf); +                        in->payload_vector.iov_base = iobuf_ptr (iobuf); -                        priv->incoming.payload_vector.iov_len = size; +                        in->payload_vector.iov_len = size;                  } -                priv->incoming.frag.fragcurrent -                        = priv->incoming.payload_vector.iov_base; +                frag->fragcurrent = in->payload_vector.iov_base; -                priv->incoming.frag.call_body.reply.accepted_success_state +                frag->call_body.reply.accepted_success_state                          = SP_STATE_READ_PROC_HEADER;                  /* fall through */ @@ -1405,9 +1416,8 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)                  /* now read the entire remaining msg into new iobuf */                  ret = __socket_read_simple_msg (this);                  if ((ret == -1) -                    || ((ret == 0) -                        && RPC_LASTFRAG (priv->incoming.fraghdr))) { -                        priv->incoming.frag.call_body.reply.accepted_success_state +                    || ((ret == 0) && RPC_LASTFRAG (in->fraghdr))) { +                        frag->call_body.reply.accepted_success_state                                  = SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT;                  } @@ -1429,19 +1439,24 @@ __socket_read_accepted_reply (rpc_transport_t *this)          char             *buf            = NULL;          uint32_t          verflen        = 0, len = 0;          uint32_t          remaining_size = 0; +        struct gf_sock_incoming      *in         = NULL; +        struct gf_sock_incoming_frag *frag       = NULL;          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out);          priv = this->private; +        /* used to reduce the indirection */ +        in = &priv->incoming; +        frag = &in->frag; -        switch (priv->incoming.frag.call_body.reply.accepted_state) { +        switch (frag->call_body.reply.accepted_state) {          case SP_STATE_ACCEPTED_REPLY_INIT:                  __socket_proto_init_pending (priv,                                               RPC_AUTH_FLAVOUR_N_LENGTH_SIZE); -                priv->incoming.frag.call_body.reply.accepted_state +                frag->call_body.reply.accepted_state                          = SP_STATE_READING_REPLY_VERFLEN;                  /* fall through */ @@ -1449,13 +1464,13 @@ __socket_read_accepted_reply (rpc_transport_t *this)          case SP_STATE_READING_REPLY_VERFLEN:                  __socket_proto_read (priv, ret); -                priv->incoming.frag.call_body.reply.accepted_state +                frag->call_body.reply.accepted_state                          = SP_STATE_READ_REPLY_VERFLEN;                  /* fall through */          case SP_STATE_READ_REPLY_VERFLEN: -                buf = rpc_reply_verflen_addr (priv->incoming.frag.fragcurrent); +                buf = rpc_reply_verflen_addr (frag->fragcurrent);                  verflen = ntoh32 (*((uint32_t *) buf)); @@ -1464,7 +1479,7 @@ __socket_read_accepted_reply (rpc_transport_t *this)                  __socket_proto_init_pending (priv, len); -                priv->incoming.frag.call_body.reply.accepted_state +                frag->call_body.reply.accepted_state                          = SP_STATE_READING_REPLY_VERFBYTES;                  /* fall through */ @@ -1472,19 +1487,19 @@ __socket_read_accepted_reply (rpc_transport_t *this)          case SP_STATE_READING_REPLY_VERFBYTES:                  __socket_proto_read (priv, ret); -                priv->incoming.frag.call_body.reply.accepted_state +                frag->call_body.reply.accepted_state                          = SP_STATE_READ_REPLY_VERFBYTES; -                buf = rpc_reply_accept_status_addr (priv->incoming.frag.fragcurrent); +                buf = rpc_reply_accept_status_addr (frag->fragcurrent); -                priv->incoming.frag.call_body.reply.accept_status +                frag->call_body.reply.accept_status                          = ntoh32 (*(uint32_t *) buf);                  /* fall through */          case SP_STATE_READ_REPLY_VERFBYTES: -                if (priv->incoming.frag.call_body.reply.accept_status +                if (frag->call_body.reply.accept_status                      == SUCCESS) {                          ret = __socket_read_accepted_successful_reply (this);                  } else { @@ -1494,14 +1509,13 @@ __socket_read_accepted_reply (rpc_transport_t *this)                          ret = __socket_read_simple_msg (this);                  } -                remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) -                        - priv->incoming.frag.bytes_read; +                remaining_size = RPC_FRAGSIZE (in->fraghdr) +                        - frag->bytes_read;                  if ((ret == -1) -                    || ((ret == 0) -                        && (remaining_size == 0) -                        && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { -                        priv->incoming.frag.call_body.reply.accepted_state +                    || ((ret == 0) && (remaining_size == 0) +                        && (RPC_LASTFRAG (in->fraghdr)))) { +                        frag->call_body.reply.accepted_state                                  = SP_STATE_ACCEPTED_REPLY_INIT;                  } @@ -1530,18 +1544,22 @@ __socket_read_vectored_reply (rpc_transport_t *this)          int               ret            = 0;          char             *buf            = NULL;          uint32_t          remaining_size = 0; +        struct gf_sock_incoming      *in         = NULL; +        struct gf_sock_incoming_frag *frag       = NULL;          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out);          priv = this->private; +        in = &priv->incoming; +        frag = &in->frag; -        switch (priv->incoming.frag.call_body.reply.status_state) { +        switch (frag->call_body.reply.status_state) {          case SP_STATE_ACCEPTED_REPLY_INIT:                  __socket_proto_init_pending (priv, RPC_REPLY_STATUS_SIZE); -                priv->incoming.frag.call_body.reply.status_state +                frag->call_body.reply.status_state                          = SP_STATE_READING_REPLY_STATUS;                  /* fall through */ @@ -1549,37 +1567,33 @@ __socket_read_vectored_reply (rpc_transport_t *this)          case SP_STATE_READING_REPLY_STATUS:                  __socket_proto_read (priv, ret); -                buf = rpc_reply_status_addr (priv->incoming.frag.fragcurrent); +                buf = rpc_reply_status_addr (frag->fragcurrent); -                priv->incoming.frag.call_body.reply.accept_status +                frag->call_body.reply.accept_status                          = ntoh32 (*((uint32_t *) buf)); -                priv->incoming.frag.call_body.reply.status_state +                frag->call_body.reply.status_state                          = SP_STATE_READ_REPLY_STATUS;                  /* fall through */          case SP_STATE_READ_REPLY_STATUS: -                if (priv->incoming.frag.call_body.reply.accept_status -                    == MSG_ACCEPTED) { +                if (frag->call_body.reply.accept_status == MSG_ACCEPTED) {                          ret = __socket_read_accepted_reply (this);                  } else {                          ret = __socket_read_denied_reply (this);                  } -                remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) -                        - priv->incoming.frag.bytes_read; +                remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;                  if ((ret == -1) -                    || ((ret == 0) -                        && (remaining_size == 0) -                        && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { -                        priv->incoming.frag.call_body.reply.status_state +                    || ((ret == 0) && (remaining_size == 0) +                        && (RPC_LASTFRAG (in->fraghdr)))) { +                        frag->call_body.reply.status_state                                  = SP_STATE_ACCEPTED_REPLY_INIT; -                        priv->incoming.payload_vector.iov_len -                                = (unsigned long)priv->incoming.frag.fragcurrent -                                - (unsigned long) -                                priv->incoming.payload_vector.iov_base; +                        in->payload_vector.iov_len +                                = (unsigned long)frag->fragcurrent +                                - (unsigned long)in->payload_vector.iov_base;                  }                  break;          } @@ -1605,26 +1619,29 @@ __socket_read_reply (rpc_transport_t *this)          int32_t             ret          = -1;          rpc_request_info_t *request_info = NULL;          char                map_xid      = 0; +        struct gf_sock_incoming      *in         = NULL; +        struct gf_sock_incoming_frag *frag       = NULL;          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out);          priv = this->private; +        in = &priv->incoming; +        frag = &in->frag; -        buf = rpc_xid_addr (iobuf_ptr (priv->incoming.iobuf)); +        buf = rpc_xid_addr (iobuf_ptr (in->iobuf)); -        if (priv->incoming.request_info == NULL) { -                priv->incoming.request_info = GF_CALLOC (1, -                                                         sizeof (*request_info), -                                                         gf_common_mt_rpc_trans_reqinfo_t); -                if (priv->incoming.request_info == NULL) { +        if (in->request_info == NULL) { +                in->request_info = GF_CALLOC (1, sizeof (*request_info), +                                              gf_common_mt_rpc_trans_reqinfo_t); +                if (in->request_info == NULL) {                          goto out;                  }                  map_xid = 1;          } -        request_info = priv->incoming.request_info; +        request_info = in->request_info;          if (map_xid) {                  request_info->xid = ntoh32 (*((uint32_t *) buf)); @@ -1636,7 +1653,7 @@ __socket_read_reply (rpc_transport_t *this)                  {                          ret = rpc_transport_notify (this,                                                      RPC_TRANSPORT_MAP_XID_REQUEST, -                                                    priv->incoming.request_info); +                                                    in->request_info);                  }                  pthread_mutex_lock (&priv->lock); @@ -1650,10 +1667,8 @@ __socket_read_reply (rpc_transport_t *this)          if ((request_info->prognum == GLUSTER_FOP_PROGRAM)              && (request_info->procnum == GF_FOP_READ)) {                  if (map_xid && request_info->rsp.rsp_payload_count != 0) { -                        priv->incoming.iobref -                                = iobref_ref (request_info->rsp.rsp_iobref); -                        priv->incoming.payload_vector -                                = *request_info->rsp.rsp_payload; +                        in->iobref = iobref_ref (request_info->rsp.rsp_iobref); +                        in->payload_vector = *request_info->rsp.rsp_payload;                  }                  ret = __socket_read_vectored_reply (this); @@ -1673,35 +1688,40 @@ __socket_read_frag (rpc_transport_t *this)          int32_t           ret            = 0;          char             *buf            = NULL;          uint32_t          remaining_size = 0; +        struct gf_sock_incoming      *in         = NULL; +        struct gf_sock_incoming_frag *frag       = NULL;          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out);          priv = this->private; +        /* used to reduce the indirection */ +        in = &priv->incoming; +        frag = &in->frag; -        switch (priv->incoming.frag.state) { +        switch (frag->state) {          case SP_STATE_NADA:                  __socket_proto_init_pending (priv, RPC_MSGTYPE_SIZE); -                priv->incoming.frag.state = SP_STATE_READING_MSGTYPE; +                frag->state = SP_STATE_READING_MSGTYPE;                  /* fall through */          case SP_STATE_READING_MSGTYPE:                  __socket_proto_read (priv, ret); -                priv->incoming.frag.state = SP_STATE_READ_MSGTYPE; +                frag->state = SP_STATE_READ_MSGTYPE;                  /* fall through */          case SP_STATE_READ_MSGTYPE: -                buf = rpc_msgtype_addr (iobuf_ptr (priv->incoming.iobuf)); -                priv->incoming.msg_type = ntoh32 (*((uint32_t *)buf)); +                buf = rpc_msgtype_addr (iobuf_ptr (in->iobuf)); +                in->msg_type = ntoh32 (*((uint32_t *)buf)); -                if (priv->incoming.msg_type == CALL) { +                if (in->msg_type == CALL) {                          ret = __socket_read_request (this); -                } else if (priv->incoming.msg_type == REPLY) { +                } else if (in->msg_type == REPLY) {                          ret = __socket_read_reply (this); -                } else if (priv->incoming.msg_type == GF_UNIVERSAL_ANSWER) { +                } else if (in->msg_type == GF_UNIVERSAL_ANSWER) {                          gf_log ("rpc", GF_LOG_ERROR,                                  "older version of protocol/process trying to "                                  "connect from %s. use newer version on that node", @@ -1709,19 +1729,17 @@ __socket_read_frag (rpc_transport_t *this)                  } else {                          gf_log ("rpc", GF_LOG_ERROR,                                  "wrong MSG-TYPE (%d) received from %s", -                                priv->incoming.msg_type, +                                in->msg_type,                                  this->peerinfo.identifier);                          ret = -1;                  } -                remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) -                        - priv->incoming.frag.bytes_read; +                remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;                  if ((ret == -1) -                    || ((ret == 0) -                        && (remaining_size == 0) -                        && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { -                        priv->incoming.frag.state = SP_STATE_NADA; +                    || ((ret == 0) && (remaining_size == 0) +                        && (RPC_LASTFRAG (in->fraghdr)))) { +                        frag->state = SP_STATE_NADA;                  }                  break; @@ -1735,24 +1753,29 @@ out:  inline  void __socket_reset_priv (socket_private_t *priv)  { -        if (priv->incoming.iobref) { -                iobref_unref (priv->incoming.iobref); -                priv->incoming.iobref = NULL; +        struct gf_sock_incoming      *in         = NULL; + +        /* used to reduce the indirection */ +        in = &priv->incoming; + +        if (in->iobref) { +                iobref_unref (in->iobref); +                in->iobref = NULL;          } -        if (priv->incoming.iobuf) { -                iobuf_unref (priv->incoming.iobuf); +        if (in->iobuf) { +                iobuf_unref (in->iobuf);          } -        if (priv->incoming.request_info != NULL) { -                GF_FREE (priv->incoming.request_info); -                priv->incoming.request_info = NULL; +        if (in->request_info != NULL) { +                GF_FREE (in->request_info); +                in->request_info = NULL;          } -        memset (&priv->incoming.payload_vector, 0, -                sizeof (priv->incoming.payload_vector)); +        memset (&in->payload_vector, 0, +                sizeof (in->payload_vector)); -        priv->incoming.iobuf = NULL; +        in->iobuf = NULL;  } @@ -1765,34 +1788,37 @@ __socket_proto_state_machine (rpc_transport_t *this,          struct iobuf     *iobuf  = NULL;          struct iobref    *iobref = NULL;          struct iovec      vector[2]; +        struct gf_sock_incoming      *in         = NULL; +        struct gf_sock_incoming_frag *frag       = NULL;          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out);          priv = this->private; -        while (priv->incoming.record_state != SP_STATE_COMPLETE) { -                switch (priv->incoming.record_state) { +        /* used to reduce the indirection */ +        in = &priv->incoming; +        frag = &in->frag; + +        while (in->record_state != SP_STATE_COMPLETE) { +                switch (in->record_state) {                  case SP_STATE_NADA: -                        priv->incoming.total_bytes_read = 0; -                        priv->incoming.payload_vector.iov_len = 0; +                        in->total_bytes_read = 0; +                        in->payload_vector.iov_len = 0; -                        priv->incoming.pending_vector = priv->incoming.vector; -                        priv->incoming.pending_vector->iov_base = -                                &priv->incoming.fraghdr; +                        in->pending_vector = in->vector; +                        in->pending_vector->iov_base =  &in->fraghdr; -                        priv->incoming.pending_vector->iov_len  = -                                sizeof (priv->incoming.fraghdr); +                        in->pending_vector->iov_len  = sizeof (in->fraghdr); -                        priv->incoming.record_state = SP_STATE_READING_FRAGHDR; +                        in->record_state = SP_STATE_READING_FRAGHDR;                          /* fall through */                  case SP_STATE_READING_FRAGHDR: -                        ret = __socket_readv (this, -                                              priv->incoming.pending_vector, 1, -                                              &priv->incoming.pending_vector, -                                              &priv->incoming.pending_count, +                        ret = __socket_readv (this, in->pending_vector, 1, +                                              &in->pending_vector, +                                              &in->pending_count,                                                NULL);                          if (ret == -1) {                                  if (priv->read_fail_log == 1) { @@ -1813,44 +1839,40 @@ __socket_proto_state_machine (rpc_transport_t *this,                          }                          if (ret == 0) { -                                priv->incoming.record_state = -                                        SP_STATE_READ_FRAGHDR; +                                in->record_state = SP_STATE_READ_FRAGHDR;                          }                          /* fall through */                  case SP_STATE_READ_FRAGHDR: -                        priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr); -                        priv->incoming.total_bytes_read -                                += RPC_FRAGSIZE(priv->incoming.fraghdr); +                        in->fraghdr = ntoh32 (in->fraghdr); +                        in->total_bytes_read += RPC_FRAGSIZE(in->fraghdr);                          iobuf = iobuf_get2 (this->ctx->iobuf_pool, -                                            priv->incoming.total_bytes_read + -                                            sizeof (priv->incoming.fraghdr)); +                                            (in->total_bytes_read + +                                             sizeof (in->fraghdr)));                          if (!iobuf) {                                  ret = -ENOMEM;                                  goto out;                          } -                        priv->incoming.iobuf = iobuf; -                        priv->incoming.iobuf_size = 0; -                        priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf); -                        priv->incoming.record_state = SP_STATE_READING_FRAG; +                        in->iobuf = iobuf; +                        in->iobuf_size = 0; +                        frag->fragcurrent = iobuf_ptr (iobuf); +                        in->record_state = SP_STATE_READING_FRAG;                          /* fall through */                  case SP_STATE_READING_FRAG:                          ret = __socket_read_frag (this); -                        if ((ret == -1) -                            || (priv->incoming.frag.bytes_read != -                                RPC_FRAGSIZE (priv->incoming.fraghdr))) { +                        if ((ret == -1) || +                            (frag->bytes_read != RPC_FRAGSIZE (in->fraghdr))) {                                  goto out;                          } -                        priv->incoming.frag.bytes_read = 0; +                        frag->bytes_read = 0; -                        if (!RPC_LASTFRAG (priv->incoming.fraghdr)) { -                                priv->incoming.record_state = -                                        SP_STATE_READING_FRAGHDR; +                        if (!RPC_LASTFRAG (in->fraghdr)) { +                                in->record_state = SP_STATE_READING_FRAGHDR;                                  break;                          } @@ -1859,44 +1881,39 @@ __socket_proto_state_machine (rpc_transport_t *this,                           */                          if (pollin != NULL) {                                  int count = 0; -                                priv->incoming.iobuf_size -                                        = priv->incoming.total_bytes_read -                                        - priv->incoming.payload_vector.iov_len; +                                in->iobuf_size = (in->total_bytes_read - +                                                  in->payload_vector.iov_len);                                  memset (vector, 0, sizeof (vector)); -                                if (priv->incoming.iobref == NULL) { -                                        priv->incoming.iobref = iobref_new (); -                                        if (priv->incoming.iobref == NULL) { +                                if (in->iobref == NULL) { +                                        in->iobref = iobref_new (); +                                        if (in->iobref == NULL) {                                                  ret = -1;                                                  goto out;                                          }                                  } -                                vector[count].iov_base -                                        = iobuf_ptr (priv->incoming.iobuf); -                                vector[count].iov_len -                                        = priv->incoming.iobuf_size; +                                vector[count].iov_base = iobuf_ptr (in->iobuf); +                                vector[count].iov_len = in->iobuf_size; -                                iobref = priv->incoming.iobref; +                                iobref = in->iobref;                                  count++; -                                if (priv->incoming.payload_vector.iov_base -                                    != NULL) { -                                        vector[count] -                                                = priv->incoming.payload_vector; +                                if (in->payload_vector.iov_base != NULL) { +                                        vector[count] = in->payload_vector;                                          count++;                                  }                                  *pollin = rpc_transport_pollin_alloc (this,                                                                        vector,                                                                        count, -                                                                      priv->incoming.iobuf, +                                                                      in->iobuf,                                                                        iobref, -                                                                      priv->incoming.request_info); -                                iobuf_unref (priv->incoming.iobuf); -                                priv->incoming.iobuf = NULL; +                                                                      in->request_info); +                                iobuf_unref (in->iobuf); +                                in->iobuf = NULL;                                  if (*pollin == NULL) {                                          gf_log (this->name, GF_LOG_WARNING, @@ -1904,12 +1921,12 @@ __socket_proto_state_machine (rpc_transport_t *this,                                          ret = -1;                                          goto out;                                  } -                                if (priv->incoming.msg_type == REPLY) +                                if (in->msg_type == REPLY)                                          (*pollin)->is_reply = 1; -                                priv->incoming.request_info = NULL; +                                in->request_info = NULL;                          } -                        priv->incoming.record_state = SP_STATE_COMPLETE; +                        in->record_state = SP_STATE_COMPLETE;                          break;                  case SP_STATE_COMPLETE: @@ -1921,8 +1938,8 @@ __socket_proto_state_machine (rpc_transport_t *this,                  }          } -        if (priv->incoming.record_state == SP_STATE_COMPLETE) { -                priv->incoming.record_state = SP_STATE_NADA; +        if (in->record_state == SP_STATE_COMPLETE) { +                in->record_state = SP_STATE_NADA;                  __socket_reset_priv (priv);          } diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index 0a407cc1a..2c4b44cf4 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -143,6 +143,40 @@ typedef struct {          sp_rpcfrag_vectored_reply_accepted_success_state_t accepted_success_state;  } sp_rpcfrag_vectored_reply_state_t; +struct gf_sock_incoming_frag { +        char         *fragcurrent; +        uint32_t      bytes_read; +        uint32_t      remaining_size; +        struct iovec  vector; +        struct iovec *pending_vector; +        union { +                sp_rpcfrag_request_state_t        request; +                sp_rpcfrag_vectored_reply_state_t reply; +        } call_body; + +        sp_rpcfrag_simple_msg_state_t     simple_state; +        sp_rpcfrag_state_t state; +}; + +struct gf_sock_incoming { +        sp_rpcrecord_state_t  record_state; +        struct gf_sock_incoming_frag frag; +        char                *proghdr_base_addr; +        struct iobuf        *iobuf; +        size_t               iobuf_size; +        struct iovec         vector[2]; +        int                  count; +        struct iovec         payload_vector; +        struct iobref       *iobref; +        rpc_request_info_t  *request_info; +        struct iovec        *pending_vector; +        int                  pending_count; +        uint32_t             fraghdr; +        char                 complete_record; +        msg_type_t           msg_type; +        size_t               total_bytes_read; +}; +  typedef struct {          int32_t                sock;          int32_t                idx; @@ -158,37 +192,7 @@ typedef struct {                          struct ioq        *ioq_prev;                  };          }; -        struct { -                sp_rpcrecord_state_t  record_state; -                struct { -                        char         *fragcurrent; -                        uint32_t      bytes_read; -                        uint32_t      remaining_size; -                        struct iovec  vector; -                        struct iovec *pending_vector; -                        union { -                                sp_rpcfrag_request_state_t        request; -                                sp_rpcfrag_vectored_reply_state_t reply; -                        } call_body; - -                        sp_rpcfrag_simple_msg_state_t     simple_state; -                        sp_rpcfrag_state_t state; -                } frag; -		char                *proghdr_base_addr; -                struct iobuf        *iobuf; -                size_t               iobuf_size; -                struct iovec         vector[2]; -                int                  count; -                struct iovec         payload_vector; -                struct iobref       *iobref; -                rpc_request_info_t  *request_info; -                struct iovec        *pending_vector; -                int                  pending_count; -                uint32_t             fraghdr; -                char                 complete_record; -                msg_type_t           msg_type; -                size_t               total_bytes_read; -        } incoming; +        struct gf_sock_incoming incoming;          pthread_mutex_t        lock;          int                    windowsize;          char                   lowlat;  | 
