diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpcsvc.c')
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 91 |
1 files changed, 71 insertions, 20 deletions
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index d69756cc004..7efb2e1fbb7 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -28,6 +28,7 @@ #include "xdr-generic.h" #include "rpc-common-xdr.h" #include "syncop.h" +#include "rpc-drc.h" #include <errno.h> #include <pthread.h> @@ -422,6 +423,7 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, * since we are not handling authentication failures for now. */ req->rpc_status = MSG_ACCEPTED; + req->reply = NULL; ret = 0; err: if (ret == -1) { @@ -461,13 +463,15 @@ int rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, rpc_transport_pollin_t *msg) { - rpcsvc_actor_t *actor = NULL; - rpcsvc_actor actor_fn = NULL; - rpcsvc_request_t *req = NULL; - int ret = -1; - uint16_t port = 0; - gf_boolean_t is_unix = _gf_false; - gf_boolean_t unprivileged = _gf_false; + rpcsvc_actor_t *actor = NULL; + rpcsvc_actor actor_fn = NULL; + rpcsvc_request_t *req = NULL; + int ret = -1; + uint16_t port = 0; + gf_boolean_t is_unix = _gf_false; + gf_boolean_t unprivileged = _gf_false; + drc_cached_op_t *reply = NULL; + rpcsvc_drc_globals_t *drc = NULL; if (!trans || !svc) return -1; @@ -503,7 +507,7 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, req = rpcsvc_request_create (svc, trans, msg); if (!req) - goto err; + goto out; if (!rpcsvc_request_accepted (req)) goto err_reply; @@ -521,6 +525,39 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, return -1; } + /* DRC */ + if (rpcsvc_need_drc (req)) { + drc = req->svc->drc; + + LOCK (&drc->lock); + reply = rpcsvc_drc_lookup (req); + + /* retransmission of completed request, send cached reply */ + if (reply && reply->state == DRC_OP_CACHED) { + gf_log (GF_RPCSVC, GF_LOG_INFO, "duplicate request:" + " XID: 0x%x", req->xid); + ret = rpcsvc_send_cached_reply (req, reply); + drc->cache_hits++; + UNLOCK (&drc->lock); + goto out; + + } /* retransmitted request, original op in transit, drop it */ + else if (reply && reply->state == DRC_OP_IN_TRANSIT) { + gf_log (GF_RPCSVC, GF_LOG_INFO, "op in transit," + " discarding. XID: 0x%x", req->xid); + ret = 0; + drc->intransit_hits++; + rpcsvc_request_destroy (req); + UNLOCK (&drc->lock); + goto out; + + } /* fresh request, cache it as in-transit and proceed */ + else { + ret = rpcsvc_cache_request (req); + } + UNLOCK (&drc->lock); + } + if (req->rpc_err == SUCCESS) { /* Before going to xlator code, set the THIS properly */ THIS = svc->mydata; @@ -557,7 +594,7 @@ err_reply: * has now been queued. */ ret = 0; -err: +out: return ret; } @@ -904,21 +941,22 @@ out: return ret; } -static inline int -rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec, - int hdrcount, struct iovec *proghdr, int proghdrcount, - struct iovec *progpayload, int progpayloadcount, - struct iobref *iobref, void *priv) +int +rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr, + int rpchdrcount, struct iovec *proghdr, + int proghdrcount, struct iovec *progpayload, + int progpayloadcount, struct iobref *iobref, + void *priv) { int ret = -1; rpc_transport_reply_t reply = {{0, }}; - if ((!trans) || (!hdrvec) || (!hdrvec->iov_base)) { + if ((!trans) || (!rpchdr) || (!rpchdr->iov_base)) { goto out; } - reply.msg.rpchdr = hdrvec; - reply.msg.rpchdrcount = hdrcount; + reply.msg.rpchdr = rpchdr; + reply.msg.rpchdrcount = rpchdrcount; reply.msg.proghdr = proghdr; reply.msg.proghdrcount = proghdrcount; reply.msg.progpayload = progpayload; @@ -1064,6 +1102,7 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, size_t msglen = 0; size_t hdrlen = 0; char new_iobref = 0; + rpcsvc_drc_globals_t *drc = NULL; if ((!req) || (!req->trans)) return -1; @@ -1098,6 +1137,17 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, iobref_add (iobref, replyiob); + /* cache the request in the duplicate request cache for appropriate ops */ + if (req->reply) { + drc = req->svc->drc; + + LOCK (&drc->lock); + ret = rpcsvc_cache_reply (req, iobref, &recordhdr, 1, + proghdr, hdrcount, + payload, payloadcount); + UNLOCK (&drc->lock); + } + ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount, payload, payloadcount, iobref, req->trans_private); @@ -1905,6 +1955,7 @@ rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options, "failed to register DUMP program"); goto free_svc; } + ret = 0; free_svc: if (ret == -1) { @@ -2196,9 +2247,9 @@ out: rpcsvc_actor_t gluster_dump_actors[] = { - [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0}, - [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0}, - [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0}, + [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA}, + [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA}, + [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0, DRC_NA}, }; |