diff options
| author | Amar Tumballi <amarts@redhat.com> | 2012-05-01 23:30:53 +0530 | 
|---|---|---|
| committer | Anand Avati <avati@redhat.com> | 2012-05-21 16:50:57 -0700 | 
| commit | f42dd77fb8cdf5ef439db2c0e8eb6468419998b7 (patch) | |
| tree | a74fb997b1d95bba812287281cf01c133e402431 | |
| parent | 0039e876e3bfd889a92e9b51332a7e3b2b93d4b7 (diff) | |
protocol: handle proper vector size for writev()/readv()
* fixes the offset handling issue when 'xdata' is sent in writev/readv fop
  at the transport layer itself.
* client_writev() was not sending xdata on wire, fixed
Change-Id: Ib5ced64c84d415f07032662017979c65d9a1a128
Signed-off-by: Amar Tumballi <amarts@redhat.com>
BUG: 808078
Reviewed-on: http://review.gluster.com/3182
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
Tested-by: Jeff Darcy <jdarcy@redhat.com>
Reviewed-by: Anand Avati <avati@redhat.com>
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 74 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 2 | ||||
| -rw-r--r-- | xlators/protocol/client/src/client3_1-fops.c | 15 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server3_1-fops.c | 60 | 
4 files changed, 123 insertions, 28 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 6c2d909e43a..5e65755d813 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -815,6 +815,7 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto          uint32_t          remaining_size         = 0;          ssize_t           readsize               = 0;          size_t            size = 0; +        char             *proghdr_buf = NULL;          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -871,10 +872,10 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto          case SP_STATE_READ_VERFBYTES:  sp_state_read_verfbytes: +                proghdr_buf = priv->incoming.frag.fragcurrent;                  priv->incoming.frag.call_body.request.vector_sizer_state =                          vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state, -                                      &readsize, -                                      priv->incoming.frag.fragcurrent); +                                      &readsize, proghdr_buf);                  __socket_proto_init_pending (priv, readsize);                  priv->incoming.frag.call_body.request.vector_state                          = SP_STATE_READING_PROGHDR; @@ -886,8 +887,7 @@ sp_state_read_verfbytes:  sp_state_reading_proghdr:                  priv->incoming.frag.call_body.request.vector_sizer_state =                          vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state, -                                      &readsize, -                                      priv->incoming.frag.fragcurrent); +                                      &readsize, proghdr_buf);                  if (readsize == 0) {                          priv->incoming.frag.call_body.request.vector_state =                                  SP_STATE_READ_PROGHDR; @@ -1038,12 +1038,14 @@ out:  inline int  __socket_read_accepted_successful_reply (rpc_transport_t *this)  { -        socket_private_t *priv                     = NULL; -        int               ret                      = 0; -        struct iobuf     *iobuf                    = NULL; -        uint32_t          gluster_read_rsp_hdr_len = 0; -        gfs3_read_rsp     read_rsp                 = {0, }; -        size_t            size                     = 0; +        socket_private_t *priv              = NULL; +        int               ret               = 0; +        struct iobuf     *iobuf             = NULL; +        gfs3_read_rsp     read_rsp          = {0, }; +        ssize_t           size              = 0; +        ssize_t           default_read_size = 0; +        char             *proghdr_buf       = NULL; +        XDR               xdr;          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -1053,16 +1055,12 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)          switch (priv->incoming.frag.call_body.reply.accepted_success_state) {          case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT: -                gluster_read_rsp_hdr_len = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp, -                                                       &read_rsp); +                default_read_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp, +                                                &read_rsp); -                if (gluster_read_rsp_hdr_len == 0) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "xdr_sizeof on gfs3_read_rsp failed"); -                        ret = -1; -                        goto out; -                } -                __socket_proto_init_pending (priv, gluster_read_rsp_hdr_len); +                proghdr_buf = priv->incoming.frag.fragcurrent; + +                __socket_proto_init_pending (priv, default_read_size);                  priv->incoming.frag.call_body.reply.accepted_success_state                          = SP_STATE_READING_PROC_HEADER; @@ -1072,9 +1070,40 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)          case SP_STATE_READING_PROC_HEADER:                  __socket_proto_read (priv, ret); +                /* there can be 'xdata' in read response, figure it out */ +                xdrmem_create (&xdr, proghdr_buf, default_read_size, +                               XDR_DECODE); + +                /* This will fail if there is xdata sent from server, if not, +                   well and good, we don't need to worry about  */ +                xdr_gfs3_read_rsp (&xdr, &read_rsp); + +                if (read_rsp.xdata.xdata_val) +                        free (read_rsp.xdata.xdata_val); + +                /* need to round off to proper roof (%4), as XDR packing pads +                   the end of opaque object with '0' */ +                size = roof (read_rsp.xdata.xdata_len, 4); + +                if (!size) { +                        priv->incoming.frag.call_body.reply.accepted_success_state +                                = SP_STATE_READ_PROC_OPAQUE; +                        goto read_proc_opaque; +                } + +                __socket_proto_init_pending (priv, size); +                  priv->incoming.frag.call_body.reply.accepted_success_state -                        = SP_STATE_READ_PROC_HEADER; +                        = SP_STATE_READING_PROC_OPAQUE; + +        case SP_STATE_READING_PROC_OPAQUE: +                __socket_proto_read (priv, ret); + +                priv->incoming.frag.call_body.reply.accepted_success_state +                        = SP_STATE_READ_PROC_OPAQUE; +        case SP_STATE_READ_PROC_OPAQUE: +        read_proc_opaque:                  if (priv->incoming.payload_vector.iov_base == NULL) {                          size = (RPC_FRAGSIZE (priv->incoming.fraghdr) - @@ -1107,6 +1136,9 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)                  priv->incoming.frag.fragcurrent                          = priv->incoming.payload_vector.iov_base; +                priv->incoming.frag.call_body.reply.accepted_success_state +                        = SP_STATE_READ_PROC_HEADER; +                  /* fall through */          case SP_STATE_READ_PROC_HEADER: @@ -1529,7 +1561,6 @@ __socket_proto_state_machine (rpc_transport_t *this,                  case SP_STATE_READ_FRAGHDR:                          priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr); -                        priv->incoming.record_state = SP_STATE_READING_FRAG;                          priv->incoming.total_bytes_read                                  += RPC_FRAGSIZE(priv->incoming.fraghdr);                          iobuf = iobuf_get2 (this->ctx->iobuf_pool, @@ -1543,6 +1574,7 @@ __socket_proto_state_machine (rpc_transport_t *this,                          priv->incoming.iobuf = iobuf;                          priv->incoming.iobuf_size = 0;                          priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf); +                        priv->incoming.record_state = SP_STATE_READING_FRAG;                          /* fall through */                  case SP_STATE_READING_FRAG: diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index 0c897bd2ec0..6d6802a541f 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -115,6 +115,8 @@ typedef enum {  typedef enum {          SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT,          SP_STATE_READING_PROC_HEADER, +        SP_STATE_READING_PROC_OPAQUE, +        SP_STATE_READ_PROC_OPAQUE,          SP_STATE_READ_PROC_HEADER,  } sp_rpcfrag_vectored_reply_accepted_success_state_t; diff --git a/xlators/protocol/client/src/client3_1-fops.c b/xlators/protocol/client/src/client3_1-fops.c index e17a650d3c4..07d55c8956b 100644 --- a/xlators/protocol/client/src/client3_1-fops.c +++ b/xlators/protocol/client/src/client3_1-fops.c @@ -2693,6 +2693,10 @@ client3_1_readv_cbk (struct rpc_req *req, struct iovec *iov, int count,                                        (rsp.xdata.xdata_len), ret,                                        rsp.op_errno, out); +#ifdef GF_TESTING_IO_XDATA +        dict_dump (xdata); +#endif +  out:          if (rsp.op_ret == -1) {                  gf_log (this->name, GF_LOG_WARNING, @@ -4010,6 +4014,17 @@ client3_1_writev (call_frame_t *frame, xlator_t *this, void *data)          memcpy (req.gfid, args->fd->inode->gfid, 16); +#ifdef GF_TESTING_IO_XDATA +        if (!args->xdata) +                args->xdata = dict_new (); + +        ret = dict_set_str (args->xdata, "testing-the-xdata-key", +                            "testing-the-xdata-value"); +#endif + +        GF_PROTOCOL_DICT_SERIALIZE (this, args->xdata, (&req.xdata.xdata_val), +                                    req.xdata.xdata_len, op_errno, unwind); +          ret = client_submit_vec_request (this, &req, frame, conf->fops,                                           GFS3_OP_WRITE, client3_1_writev_cbk,                                           args->vector, args->count, diff --git a/xlators/protocol/server/src/server3_1-fops.c b/xlators/protocol/server/src/server3_1-fops.c index 2e0bbb4c8cd..06283461d06 100644 --- a/xlators/protocol/server/src/server3_1-fops.c +++ b/xlators/protocol/server/src/server3_1-fops.c @@ -1455,6 +1455,16 @@ server_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                          op_ret, strerror (op_errno));          } +#ifdef GF_TESTING_IO_XDATA +        { +                int ret = 0; +                if (!xdata) +                        xdata = dict_new (); + +                ret = dict_set_str (xdata, "testing-the-xdata-key", +                                       "testing-xdata-value"); +        } +#endif          GF_PROTOCOL_DICT_SERIALIZE (this, xdata, (&rsp.xdata.xdata_val),                                      rsp.xdata.xdata_len, op_errno, out); @@ -3392,6 +3402,10 @@ server_writev (rpcsvc_request_t *req)                                        (args.xdata.xdata_len), ret,                                        op_errno, out); +#ifdef GF_TESTING_IO_XDATA +        dict_dump (state->xdata); +#endif +          ret = 0;          resolve_and_resume (frame, server_writev_resume);  out: @@ -3413,26 +3427,58 @@ server_writev_vec (rpcsvc_request_t *req, struct iovec *payload,  }  #define SERVER3_1_VECWRITE_START 0 -#define SERVER3_1_VECWRITE_READINGHDR 1 +#define SERVER3_1_VECWRITE_READING_HDR 1 +#define SERVER3_1_VECWRITE_READING_OPAQUE 2  int  server_writev_vecsizer (int state, ssize_t *readsize, char *addr)  { -        int nextstate = 0; -        gfs3_write_req    write_req              = {{0,},}; +        ssize_t         size = 0; +        int             nextstate = 0; +        gfs3_write_req  write_req = {{0,},}; +        XDR             xdr;          switch (state) {          case SERVER3_1_VECWRITE_START: -                *readsize = xdr_sizeof ((xdrproc_t) xdr_gfs3_write_req, &write_req); -                nextstate = SERVER3_1_VECWRITE_READINGHDR; +                size = xdr_sizeof ((xdrproc_t) xdr_gfs3_write_req, +                                   &write_req); +                *readsize = size; +                nextstate = SERVER3_1_VECWRITE_READING_HDR; +                break; +        case SERVER3_1_VECWRITE_READING_HDR: +                size = xdr_sizeof ((xdrproc_t) xdr_gfs3_write_req, +                                           &write_req); + +                xdrmem_create (&xdr, addr, size, XDR_DECODE); + +                /* This will fail if there is xdata sent from client, if not, +                   well and good */ +                xdr_gfs3_write_req (&xdr, &write_req); + +                /* need to round off to proper roof (%4), as XDR packing pads +                   the end of opaque object with '0' */ +                size = roof (write_req.xdata.xdata_len, 4); + +                *readsize = size; + +                if (!size) +                        nextstate = SERVER3_1_VECWRITE_START; +                else +                        nextstate = SERVER3_1_VECWRITE_READING_OPAQUE; + +                if (write_req.xdata.xdata_val) +                        free (write_req.xdata.xdata_val); +                  break; -        case SERVER3_1_VECWRITE_READINGHDR: + +        case SERVER3_1_VECWRITE_READING_OPAQUE:                  *readsize = 0;                  nextstate = SERVER3_1_VECWRITE_START;                  break;          default: -                gf_log ("server3_1", GF_LOG_ERROR, "wrong state: %d", state); +                gf_log ("server", GF_LOG_ERROR, "wrong state: %d", state);          } +          return nextstate;  }  | 
