diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpcsvc.c')
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 91 | 
1 files changed, 71 insertions, 20 deletions
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index d69756cc004..7efb2e1fbb7 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -28,6 +28,7 @@  #include "xdr-generic.h"  #include "rpc-common-xdr.h"  #include "syncop.h" +#include "rpc-drc.h"  #include <errno.h>  #include <pthread.h> @@ -422,6 +423,7 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans,           * since we are not handling authentication failures for now.           */          req->rpc_status = MSG_ACCEPTED; +        req->reply = NULL;          ret = 0;  err:          if (ret == -1) { @@ -461,13 +463,15 @@ int  rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,                          rpc_transport_pollin_t *msg)  { -        rpcsvc_actor_t          *actor = NULL; -        rpcsvc_actor            actor_fn = NULL; -        rpcsvc_request_t        *req = NULL; -        int                     ret = -1; -        uint16_t                port = 0; -        gf_boolean_t            is_unix = _gf_false; -        gf_boolean_t            unprivileged = _gf_false; +        rpcsvc_actor_t         *actor          = NULL; +        rpcsvc_actor            actor_fn       = NULL; +        rpcsvc_request_t       *req            = NULL; +        int                     ret            = -1; +        uint16_t                port           = 0; +        gf_boolean_t            is_unix        = _gf_false; +        gf_boolean_t            unprivileged   = _gf_false; +        drc_cached_op_t        *reply          = NULL; +        rpcsvc_drc_globals_t   *drc            = NULL;          if (!trans || !svc)                  return -1; @@ -503,7 +507,7 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,          req = rpcsvc_request_create (svc, trans, msg);          if (!req) -                goto err; +                goto out;          if (!rpcsvc_request_accepted (req))                  goto err_reply; @@ -521,6 +525,39 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,                          return -1;          } +        /* DRC */ +        if (rpcsvc_need_drc (req)) { +                drc = req->svc->drc; + +                LOCK (&drc->lock); +                reply = rpcsvc_drc_lookup (req); + +                /* retransmission of completed request, send cached reply */ +                if (reply && reply->state == DRC_OP_CACHED) { +                        gf_log (GF_RPCSVC, GF_LOG_INFO, "duplicate request:" +                                " XID: 0x%x", req->xid); +                        ret = rpcsvc_send_cached_reply (req, reply); +                        drc->cache_hits++; +                        UNLOCK (&drc->lock); +                        goto out; + +                } /* retransmitted request, original op in transit, drop it */ +                else if (reply && reply->state == DRC_OP_IN_TRANSIT) { +                        gf_log (GF_RPCSVC, GF_LOG_INFO, "op in transit," +                                " discarding. XID: 0x%x", req->xid); +                        ret = 0; +                        drc->intransit_hits++; +                        rpcsvc_request_destroy (req); +                        UNLOCK (&drc->lock); +                        goto out; + +                } /* fresh request, cache it as in-transit and proceed */ +                else { +                        ret = rpcsvc_cache_request (req); +                } +                UNLOCK (&drc->lock); +        } +          if (req->rpc_err == SUCCESS) {                  /* Before going to xlator code, set the THIS properly */                  THIS = svc->mydata; @@ -557,7 +594,7 @@ err_reply:           * has now been queued. */          ret = 0; -err: +out:          return ret;  } @@ -904,21 +941,22 @@ out:          return ret;  } -static inline int -rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec, -                         int hdrcount, struct iovec *proghdr, int proghdrcount, -                         struct iovec *progpayload, int progpayloadcount, -                         struct iobref *iobref, void *priv) +int +rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr, +                         int rpchdrcount, struct iovec *proghdr, +                         int proghdrcount, struct iovec *progpayload, +                         int progpayloadcount, struct iobref *iobref, +                         void *priv)  {          int                   ret   = -1;          rpc_transport_reply_t reply = {{0, }}; -        if ((!trans) || (!hdrvec) || (!hdrvec->iov_base)) { +        if ((!trans) || (!rpchdr) || (!rpchdr->iov_base)) {                  goto out;          } -        reply.msg.rpchdr = hdrvec; -        reply.msg.rpchdrcount = hdrcount; +        reply.msg.rpchdr = rpchdr; +        reply.msg.rpchdrcount = rpchdrcount;          reply.msg.proghdr = proghdr;          reply.msg.proghdrcount = proghdrcount;          reply.msg.progpayload = progpayload; @@ -1064,6 +1102,7 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,          size_t                  msglen     = 0;          size_t                  hdrlen     = 0;          char                    new_iobref = 0; +        rpcsvc_drc_globals_t   *drc        = NULL;          if ((!req) || (!req->trans))                  return -1; @@ -1098,6 +1137,17 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,          iobref_add (iobref, replyiob); +        /* cache the request in the duplicate request cache for appropriate ops */ +        if (req->reply) { +                drc = req->svc->drc; + +                LOCK (&drc->lock); +                ret = rpcsvc_cache_reply (req, iobref, &recordhdr, 1, +                                          proghdr, hdrcount, +                                          payload, payloadcount); +                UNLOCK (&drc->lock); +        } +          ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount,                                         payload, payloadcount, iobref,                                         req->trans_private); @@ -1905,6 +1955,7 @@ rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options,                          "failed to register DUMP program");                  goto free_svc;          } +          ret = 0;  free_svc:          if (ret == -1) { @@ -2196,9 +2247,9 @@ out:  rpcsvc_actor_t gluster_dump_actors[] = { -        [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0}, -        [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0}, -        [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0}, +        [GF_DUMP_NULL]      = {"NULL",     GF_DUMP_NULL,     NULL,        NULL, 0, DRC_NA}, +        [GF_DUMP_DUMP]      = {"DUMP",     GF_DUMP_DUMP,     rpcsvc_dump, NULL, 0, DRC_NA}, +        [GF_DUMP_MAXVALUE]  = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL,        NULL, 0, DRC_NA},  };  | 
