diff options
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 129 | 
1 files changed, 101 insertions, 28 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 4396454a33a..baeee735bb8 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -370,12 +370,13 @@ __socket_reset (rpc_transport_t *this)          /* TODO: use mem-pool on incoming data */ -        if (priv->incoming.iobuf) { -                iobuf_unref (priv->incoming.iobuf); +        if (priv->incoming.iobref) { +                iobref_unref (priv->incoming.iobref); +                priv->incoming.iobref = NULL;          } -        if (priv->incoming.vectoriob) { -                iobuf_unref (priv->incoming.vectoriob); +        if (priv->incoming.iobuf) { +                iobuf_unref (priv->incoming.iobuf);          }          memset (&priv->incoming, 0, sizeof (priv->incoming)); @@ -699,7 +700,7 @@ __socket_read_vectored_request (rpc_transport_t *this)                  /* fall through */          case SP_STATE_READ_VERFBYTES: -                if (priv->incoming.vectoriob == NULL) { +                if (priv->incoming.payload_vector.iov_base == NULL) {                          iobuf = iobuf_get (this->ctx->iobuf_pool);                          if (!iobuf) {                                  gf_log (this->name, GF_LOG_ERROR, @@ -710,7 +711,23 @@ __socket_read_vectored_request (rpc_transport_t *this)                                  break;                          } -                        priv->incoming.vectoriob = iobuf; +                        if (priv->incoming.iobref == NULL) { +                                priv->incoming.iobref = iobref_new (); +                                if (priv->incoming.iobref == NULL) { +                                        gf_log (this->name, GF_LOG_ERROR, +                                                "out of memory"); +                                        ret = -1; +                                        iobuf_unref (iobuf); +                                        break; +                                } +                        } + +                        iobref_add (priv->incoming.iobref, iobuf); +                        iobuf_unref (iobuf); + +                        priv->incoming.payload_vector.iov_base +                                = iobuf_ptr (iobuf); +                          priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf);                  } @@ -735,9 +752,10 @@ __socket_read_vectored_request (rpc_transport_t *this)                          && RPC_LASTFRAG (priv->incoming.fraghdr))) {                          priv->incoming.frag.call_body.request.vector_state                                  = SP_STATE_VECTORED_REQUEST_INIT; -                        priv->incoming.vectoriob_size +                        priv->incoming.payload_vector.iov_len                                  = (unsigned long)priv->incoming.frag.fragcurrent -                                - (unsigned long)iobuf_ptr (priv->incoming.vectoriob); +                                - (unsigned long) +                                priv->incoming.payload_vector.iov_base;                  }                  break;          } @@ -846,18 +864,31 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)                  /* fall through */          case SP_STATE_READ_PROC_HEADER: -                if (priv->incoming.vectoriob == NULL) { +                if (priv->incoming.payload_vector.iov_base == NULL) {                          iobuf = iobuf_get (this->ctx->iobuf_pool);                          if (iobuf == NULL) {                                  ret = -1;                                  goto out;                          } -                        priv->incoming.vectoriob = iobuf; +                        if (priv->incoming.iobref == NULL) { +                                priv->incoming.iobref = iobref_new (); +                                if (priv->incoming.iobref == NULL) { +                                        ret = -1; +                                        iobuf_unref (iobuf); +                                        goto out; +                                } +                        } + +                        iobref_add (priv->incoming.iobref, iobuf); +                        iobuf_unref (iobuf); + +                        priv->incoming.payload_vector.iov_base +                                = iobuf_ptr (iobuf);                  }                  priv->incoming.frag.fragcurrent -                        = iobuf_ptr (priv->incoming.vectoriob); +                        = priv->incoming.payload_vector.iov_base;                  /* now read the entire remaining msg into new iobuf */                  ret = __socket_read_simple_msg (this); @@ -1080,9 +1111,11 @@ __socket_read_reply (rpc_transport_t *this)          if ((request_info->prognum == GLUSTER3_1_FOP_PROGRAM)              && (request_info->procnum == GF_FOP_READ)) { -                if (request_info->rsp.rspbuf != NULL) { -                        priv->incoming.vectoriob -                                = iobuf_ref (request_info->rsp.rspbuf); +                if (request_info->rsp.rsp_payload_count != 0) { +                        priv->incoming.iobref +                                = iobref_ref (request_info->rsp.rsp_iobref); +                        priv->incoming.payload_vector +                                = *request_info->rsp.rsp_payload;                  }                  ret = __socket_read_vectored_reply (this); @@ -1154,15 +1187,19 @@ __socket_read_frag (rpc_transport_t *this)  inline  void __socket_reset_priv (socket_private_t *priv)  { +        if (priv->incoming.iobref) { +                iobref_unref (priv->incoming.iobref); +                priv->incoming.iobref = NULL; +        } +          if (priv->incoming.iobuf) {                  iobuf_unref (priv->incoming.iobuf); -                priv->incoming.iobuf = NULL;          } -        if (priv->incoming.vectoriob) { -                iobuf_unref (priv->incoming.vectoriob); -                priv->incoming.vectoriob = NULL; -        } +        memset (&priv->incoming.payload_vector, 0, +                sizeof (priv->incoming.payload_vector)); + +        priv->incoming.iobuf = NULL;  } @@ -1170,9 +1207,11 @@ int  __socket_proto_state_machine (rpc_transport_t *this,                                rpc_transport_pollin_t **pollin)  { -        int               ret   = -1; -        socket_private_t *priv  = NULL; -        struct iobuf     *iobuf = NULL; +        int               ret    = -1; +        socket_private_t *priv   = NULL; +        struct iobuf     *iobuf  = NULL; +        struct iobref    *iobref = NULL; +        struct iovec      vector[2];          priv = this->private;          while (priv->incoming.record_state != SP_STATE_COMPLETE) { @@ -1191,7 +1230,7 @@ __socket_proto_state_machine (rpc_transport_t *this,                          priv->incoming.iobuf = iobuf;                          priv->incoming.iobuf_size = 0; -                        priv->incoming.vectoriob_size = 0; +                        priv->incoming.payload_vector.iov_len = 0;                          priv->incoming.pending_vector = priv->incoming.vector;                          priv->incoming.pending_vector->iov_base = @@ -1260,15 +1299,49 @@ __socket_proto_state_machine (rpc_transport_t *this,                           * upper layers.                           */                          if (pollin != NULL) { +                                int count = 0;                                  priv->incoming.iobuf_size                                          = priv->incoming.total_bytes_read -                                        - priv->incoming.vectoriob_size; +                                        - priv->incoming.payload_vector.iov_len; + +                                memset (vector, 0, sizeof (vector)); + +                                if (priv->incoming.iobref == NULL) { +                                        priv->incoming.iobref = iobref_new (); +                                        if (priv->incoming.iobref == NULL) { +                                                gf_log (this->name, +                                                        GF_LOG_ERROR, +                                                        "out of memory"); +                                                ret = -1; +                                                goto out; +                                        } +                                } + +                                vector[count].iov_base +                                        = iobuf_ptr (priv->incoming.iobuf); +                                vector[count].iov_len +                                        = priv->incoming.iobuf_size; + +                                iobref = priv->incoming.iobref; + +                                iobref_add (iobref, +                                            priv->incoming.iobuf); +                                iobuf_unref (priv->incoming.iobuf); +                                priv->incoming.iobuf = NULL;  + +                                count++; + +                                if (priv->incoming.payload_vector.iov_base +                                    != NULL) { +                                        vector[count] +                                                = priv->incoming.payload_vector; +                                        count++; +                                }                                  *pollin = rpc_transport_pollin_alloc (this, -                                                                      priv->incoming.iobuf, -                                                                      priv->incoming.iobuf_size, -                                                                      priv->incoming.vectoriob, -                                                                      priv->incoming.vectoriob_size, +                                                                      vector, +                                                                      count, +                                                                      iobref,                                                                        priv->incoming.request_info);                                  if (*pollin == NULL) {                                          ret = -1;  | 
