From 3f72e52c936edf7d1477a69fa3a01f89e0576881 Mon Sep 17 00:00:00 2001 From: Rajesh Amaravathi Date: Fri, 21 Jun 2013 11:31:11 +0530 Subject: rpc: duplicate request cache for nfs Duplicate request cache provides a mechanism for detecting duplicate rpc requests from clients. DRC caches replies and on duplicate requests, sends the cached reply instead of re-processing the request. Change-Id: I3d62a6c4aa86c92bf61f1038ca62a1a46bf1c303 BUG: 847624 Signed-off-by: Rajesh Amaravathi Reviewed-on: http://review.gluster.org/4049 Reviewed-by: Rajesh Joseph Tested-by: Gluster Build System Reviewed-by: Anand Avati --- rpc/rpc-lib/src/Makefile.am | 8 +- rpc/rpc-lib/src/rpc-drc.c | 811 ++++++++++++++++++++++++++++++++++++++++ rpc/rpc-lib/src/rpc-drc.h | 100 +++++ rpc/rpc-lib/src/rpc-transport.h | 1 + rpc/rpc-lib/src/rpcsvc-common.h | 48 +++ rpc/rpc-lib/src/rpcsvc.c | 91 ++++- rpc/rpc-lib/src/rpcsvc.h | 19 +- 7 files changed, 1053 insertions(+), 25 deletions(-) create mode 100644 rpc/rpc-lib/src/rpc-drc.c create mode 100644 rpc/rpc-lib/src/rpc-drc.h (limited to 'rpc/rpc-lib') diff --git a/rpc/rpc-lib/src/Makefile.am b/rpc/rpc-lib/src/Makefile.am index ca62a27f9..f19c3c8a4 100644 --- a/rpc/rpc-lib/src/Makefile.am +++ b/rpc/rpc-lib/src/Makefile.am @@ -1,16 +1,18 @@ lib_LTLIBRARIES = libgfrpc.la libgfrpc_la_SOURCES = auth-unix.c rpcsvc-auth.c rpcsvc.c auth-null.c \ - rpc-transport.c xdr-rpc.c xdr-rpcclnt.c rpc-clnt.c auth-glusterfs.c + rpc-transport.c xdr-rpc.c xdr-rpcclnt.c rpc-clnt.c auth-glusterfs.c \ + rpc-drc.c libgfrpc_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la noinst_HEADERS = rpcsvc.h rpc-transport.h xdr-common.h xdr-rpc.h xdr-rpcclnt.h \ - rpc-clnt.h rpcsvc-common.h protocol-common.h + rpc-clnt.h rpcsvc-common.h protocol-common.h rpc-drc.h AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \ -I$(top_srcdir)/rpc/xdr/src \ - -DRPC_TRANSPORTDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport\" + -DRPC_TRANSPORTDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport\" \ + -I$(top_srcdir)/contrib/rbtree AM_CFLAGS = -Wall $(GF_CFLAGS) diff --git a/rpc/rpc-lib/src/rpc-drc.c b/rpc/rpc-lib/src/rpc-drc.c new file mode 100644 index 000000000..66d07cfe6 --- /dev/null +++ b/rpc/rpc-lib/src/rpc-drc.c @@ -0,0 +1,811 @@ +/* + Copyright (c) 2013 Red Hat, Inc. + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "rpcsvc.h" +#ifndef RPC_DRC_H +#include "rpc-drc.h" +#endif +#include "locking.h" +#include "hashfn.h" +#include "common-utils.h" +#include "statedump.h" +#include "mem-pool.h" + +#include +#include + +/** + * rpcsvc_drc_op_destroy - Destroys the cached reply + * + * @param drc - the main drc structure + * @param reply - the cached reply to destroy + * @return NULL if reply is destroyed, reply otherwise + */ +static drc_cached_op_t * +rpcsvc_drc_op_destroy (rpcsvc_drc_globals_t *drc, drc_cached_op_t *reply) +{ + GF_ASSERT (drc); + GF_ASSERT (reply); + + if (reply->state == DRC_OP_IN_TRANSIT) + return reply; + + iobref_unref (reply->msg.iobref); + if (reply->msg.rpchdr) + GF_FREE (reply->msg.rpchdr); + if (reply->msg.proghdr) + GF_FREE (reply->msg.proghdr); + if (reply->msg.progpayload) + GF_FREE (reply->msg.progpayload); + + list_del (&reply->global_list); + reply->client->op_count--; + drc->op_count--; + mem_put (reply); + reply = NULL; + + return reply; +} + +/** + * rpcsvc_drc_op_rb_unref - This function is used in rb tree cleanup only + * + * @param reply - the cached reply to unref + * @param drc - the main drc structure + * @return void + */ +static void +rpcsvc_drc_rb_op_destroy (void *reply, void *drc) +{ + rpcsvc_drc_op_destroy (drc, (drc_cached_op_t *)reply); +} + +/** + * rpcsvc_remove_drc_client - Cleanup the drc client + * + * @param client - the drc client to be removed + * @return void + */ +static void +rpcsvc_remove_drc_client (drc_client_t *client) +{ + rb_destroy (client->rbtree, rpcsvc_drc_rb_op_destroy); + list_del (&client->client_list); + GF_FREE (client); +} + +/** + * rpcsvc_client_lookup - Given a sockaddr_storage, find the client if it exists + * + * @param drc - the main drc structure + * @param sockaddr - the network address of the client to be looked up + * @return drc client if it exists, NULL otherwise + */ +static drc_client_t * +rpcsvc_client_lookup (rpcsvc_drc_globals_t *drc, + struct sockaddr_storage *sockaddr) +{ + drc_client_t *client = NULL; + + GF_ASSERT (drc); + GF_ASSERT (sockaddr); + + if (list_empty (&drc->clients_head)) + return NULL; + + list_for_each_entry (client, &drc->clients_head, client_list) { + if (gf_sock_union_equal_addr (&client->sock_union, + (union gf_sock_union *)sockaddr)) + return client; + } + + return NULL; +} + +/** + * drc_compare_reqs - Used by rbtree to determine if incoming req matches with + * an existing node(cached reply) in rbtree + * + * @param item - pointer to the incoming req + * @param rb_node_data - pointer to an rbtree node (cached reply) + * @param param - drc pointer - unused here, but used in *op_destroy + * @return 0 if req matches reply, else (req->xid - reply->xid) + */ +int +drc_compare_reqs (const void *item, const void *rb_node_data, void *param) +{ + int ret = -1; + rpcsvc_request_t *req = NULL; + drc_cached_op_t *reply = NULL; + + GF_ASSERT (item); + GF_ASSERT (rb_node_data); + GF_ASSERT (param); + + req = (rpcsvc_request_t *)item; + reply = (drc_cached_op_t *)rb_node_data; + + ret = req->xid - reply->xid; + if (ret != 0) + return ret; + + if (req->prognum == reply->prognum && + req->procnum == reply->procnum && + req->progver == reply->progversion) + return 0; + + return 1; +} + +/** + * drc_rb_calloc - used by rbtree api to allocate memory for nodes + * + * @param allocator - the libavl_allocator structure used by rbtree + * @param size - not needed by this function + * @return pointer to new cached reply (node in rbtree) + */ +static void * +drc_rb_calloc (struct libavl_allocator *allocator, size_t size) +{ + rpcsvc_drc_globals_t *drc = NULL; + + /* get the drc pointer by simple typecast, since allocator + * is the first member of rpcsvc_drc_globals_t + */ + drc = (rpcsvc_drc_globals_t *)allocator; + + return mem_get (drc->mempool); +} + +/** + * drc_rb_free - used by rbtree api to free a node + * + * @param a - the libavl_allocator structure used by rbtree api + * @param block - node that needs to be freed + * @return void + */ +static void +drc_rb_free (struct libavl_allocator *a, void *block) +{ + mem_put (block); +} + +/** + * drc_init_client_cache - initialize a drc client and its rb tree + * + * @param drc - the main drc structure + * @param client - the drc client to be initialized + * @return 0 on success, -1 on failure + */ +static int +drc_init_client_cache (rpcsvc_drc_globals_t *drc, drc_client_t *client) +{ + GF_ASSERT (drc); + GF_ASSERT (client); + + drc->allocator.libavl_malloc = drc_rb_calloc; + drc->allocator.libavl_free = drc_rb_free; + + client->rbtree = rb_create (drc_compare_reqs, drc, + (struct libavl_allocator *)drc); + if (!client->rbtree) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "rb tree creation failed"); + return -1; + } + + return 0; +} + +/** + * rpcsvc_get_drc_client - find the drc client with given sockaddr, else + * allocate and initialize a new drc client + * + * @param drc - the main drc structure + * @param sockaddr - network address of client + * @return drc client on success, NULL on failure + */ +static drc_client_t * +rpcsvc_get_drc_client (rpcsvc_drc_globals_t *drc, + struct sockaddr_storage *sockaddr) +{ + drc_client_t *client = NULL; + + GF_ASSERT (drc); + GF_ASSERT (sockaddr); + + client = rpcsvc_client_lookup (drc, sockaddr); + if (client) + goto out; + + /* if lookup fails, allocate cache for the new client */ + client = GF_CALLOC (1, sizeof (drc_client_t), + gf_common_mt_drc_client_t); + if (!client) + goto out; + + client->ref = 0; + client->sock_union = (union gf_sock_union)*sockaddr; + client->op_count = 0; + + if (drc_init_client_cache (drc, client)) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "initialization of drc client failed"); + GF_FREE (client); + client = NULL; + goto out; + } + drc->client_count++; + + list_add (&client->client_list, &drc->clients_head); + + out: + return client; +} + +/** + * rpcsvc_need_drc - Determine if a request needs DRC service + * + * @param req - incoming request + * @return 1 if DRC is needed for req, 0 otherwise + */ +int +rpcsvc_need_drc (rpcsvc_request_t *req) +{ + rpcsvc_actor_t *actor = NULL; + rpcsvc_drc_globals_t *drc = NULL; + + GF_ASSERT (req); + GF_ASSERT (req->svc); + + drc = req->svc->drc; + + if (!drc || drc->status == DRC_UNINITIATED) + return 0; + + actor = rpcsvc_program_actor (req); + if (!actor) + return 0; + + return (actor->op_type == DRC_NON_IDEMPOTENT + && drc->type != DRC_TYPE_NONE); +} + +/** + * rpcsvc_drc_client_ref - ref the drc client + * + * @param client - the drc client to ref + * @return client + */ +static drc_client_t * +rpcsvc_drc_client_ref (drc_client_t *client) +{ + GF_ASSERT (client); + client->ref++; + return client; +} + +/** + * rpcsvc_drc_client_unref - unref the drc client, and destroy + * the client on last unref + * + * @param drc - the main drc structure + * @param client - the drc client to unref + * @return NULL if it is the last unref, client otherwise + */ +static drc_client_t * +rpcsvc_drc_client_unref (rpcsvc_drc_globals_t *drc, drc_client_t *client) +{ + GF_ASSERT (drc); + GF_ASSERT (client->ref); + + client->ref--; + if (!client->ref) { + drc->client_count--; + rpcsvc_remove_drc_client (client); + client = NULL; + } + + return client; +} + +/** + * rpcsvc_drc_lookup - lookup a request to see if it is already cached + * + * @param req - incoming request + * @return cached reply of req if found, NULL otherwise + */ +drc_cached_op_t * +rpcsvc_drc_lookup (rpcsvc_request_t *req) +{ + drc_client_t *client = NULL; + drc_cached_op_t *reply = NULL; + + GF_ASSERT (req); + + if (!req->trans->drc_client) { + client = rpcsvc_get_drc_client (req->svc->drc, + &req->trans->peerinfo.sockaddr); + if (!client) + goto out; + req->trans->drc_client = client; + } + + client = rpcsvc_drc_client_ref (req->trans->drc_client); + + if (client->op_count == 0) + goto out; + + reply = rb_find (client->rbtree, req); + + out: + if (client) + rpcsvc_drc_client_unref (req->svc->drc, client); + + return reply; +} + +/** + * rpcsvc_send_cached_reply - send the cached reply for the incoming request + * + * @param req - incoming request (which is a duplicate in this case) + * @param reply - the cached reply for req + * @return 0 on successful reply submission, -1 or other non-zero value otherwise + */ +int +rpcsvc_send_cached_reply (rpcsvc_request_t *req, drc_cached_op_t *reply) +{ + int ret = 0; + + GF_ASSERT (req); + GF_ASSERT (reply); + + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "sending cached reply: xid: %d, " + "client: %s", req->xid, req->trans->peerinfo.identifier); + + rpcsvc_drc_client_ref (reply->client); + ret = rpcsvc_transport_submit (req->trans, + reply->msg.rpchdr, reply->msg.rpchdrcount, + reply->msg.proghdr, reply->msg.proghdrcount, + reply->msg.progpayload, reply->msg.progpayloadcount, + reply->msg.iobref, req->trans_private); + rpcsvc_drc_client_unref (req->svc->drc, reply->client); + + return ret; +} + +/** + * rpcsvc_cache_reply - cache the reply for the processed request 'req' + * + * @param req - processed request + * @param iobref - iobref structure of the reply + * @param rpchdr - rpc header of the reply + * @param rpchdrcount - size of rpchdr + * @param proghdr - program header of the reply + * @param proghdrcount - size of proghdr + * @param payload - payload of the reply if any + * @param payloadcount - size of payload + * @return 0 on success, -1 on failure + */ +int +rpcsvc_cache_reply (rpcsvc_request_t *req, struct iobref *iobref, + struct iovec *rpchdr, int rpchdrcount, + struct iovec *proghdr, int proghdrcount, + struct iovec *payload, int payloadcount) +{ + int ret = -1; + drc_cached_op_t *reply = NULL; + + GF_ASSERT (req); + GF_ASSERT (req->reply); + + reply = req->reply; + + reply->state = DRC_OP_CACHED; + + reply->msg.iobref = iobref_ref (iobref); + + reply->msg.rpchdrcount = rpchdrcount; + reply->msg.rpchdr = iov_dup (rpchdr, rpchdrcount); + + reply->msg.proghdrcount = proghdrcount; + reply->msg.proghdr = iov_dup (proghdr, proghdrcount); + + reply->msg.progpayloadcount = payloadcount; + if (payloadcount) + reply->msg.progpayload = iov_dup (payload, payloadcount); + + // rpcsvc_drc_client_unref (req->svc->drc, req->trans->drc_client); + // rpcsvc_drc_op_unref (req->svc->drc, reply); + ret = 0; + + return ret; +} + +/** + * rpcsvc_vacate_drc_entries - free up some percentage of drc cache + * based on the lru factor + * + * @param drc - the main drc structure + * @return void + */ +static void +rpcsvc_vacate_drc_entries (rpcsvc_drc_globals_t *drc) +{ + uint32_t i = 0; + uint32_t n = 0; + drc_cached_op_t *reply = NULL; + drc_cached_op_t *tmp = NULL; + drc_client_t *client = NULL; + + GF_ASSERT (drc); + + n = drc->global_cache_size / drc->lru_factor; + + list_for_each_entry_safe_reverse (reply, tmp, &drc->cache_head, global_list) { + /* Don't delete ops that are in transit */ + if (reply->state == DRC_OP_IN_TRANSIT) + continue; + + client = reply->client; + + (void *)rb_delete (client->rbtree, reply); + + rpcsvc_drc_op_destroy (drc, reply); + rpcsvc_drc_client_unref (drc, client); + i++; + if (i >= n) + break; + } +} + +/** + * rpcsvc_add_op_to_cache - insert the cached op into the client rbtree and drc list + * + * @param drc - the main drc structure + * @param reply - the op to be inserted + * @return 0 on success, -1 on failure + */ +static int +rpcsvc_add_op_to_cache (rpcsvc_drc_globals_t *drc, drc_cached_op_t *reply) +{ + drc_client_t *client = NULL; + drc_cached_op_t **tmp_reply = NULL; + + GF_ASSERT (drc); + GF_ASSERT (reply); + + client = reply->client; + + /* cache is full, free up some space */ + if (drc->op_count >= drc->global_cache_size) + rpcsvc_vacate_drc_entries (drc); + + tmp_reply = (drc_cached_op_t **)rb_probe (client->rbtree, reply); + if (*tmp_reply != reply) { + /* should never happen */ + gf_log (GF_RPCSVC, GF_LOG_ERROR, + "DRC failed to detect duplicates"); + return -1; + } else if (*tmp_reply == NULL) { + /* mem alloc failed */ + return -1; + } + + client->op_count++; + list_add (&reply->global_list, &drc->cache_head); + drc->op_count++; + + return 0; +} + +/** + * rpcsvc_cache_request - cache the in-transition incoming request + * + * @param req - incoming request + * @return 0 on success, -1 on failure + */ +int +rpcsvc_cache_request (rpcsvc_request_t *req) +{ + int ret = -1; + drc_client_t *client = NULL; + drc_cached_op_t *reply = NULL; + rpcsvc_drc_globals_t *drc = NULL; + + GF_ASSERT (req); + + drc = req->svc->drc; + + client = req->trans->drc_client; + if (!client) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc client is NULL"); + goto out; + } + + reply = mem_get (drc->mempool); + if (!reply) + goto out; + + reply->client = rpcsvc_drc_client_ref (client); + reply->xid = req->xid; + reply->prognum = req->prognum; + reply->progversion = req->progver; + reply->procnum = req->procnum; + reply->state = DRC_OP_IN_TRANSIT; + req->reply = reply; + + ret = rpcsvc_add_op_to_cache (drc, reply); + if (ret) { + req->reply = NULL; + rpcsvc_drc_op_destroy (drc, reply); + rpcsvc_drc_client_unref (drc, client); + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Failed to add op to drc cache"); + } + + out: + return ret; +} + +/** + * + * rpcsvc_drc_priv - function which dumps the drc state + * + * @param drc - the main drc structure + * @return 0 on success, -1 on failure + */ +int32_t +rpcsvc_drc_priv (rpcsvc_drc_globals_t *drc) +{ + int i = 0; + char key[GF_DUMP_MAX_BUF_LEN] = {0}; + drc_client_t *client = NULL; + char ip[INET6_ADDRSTRLEN] = {0}; + + if (!drc || drc->status == DRC_UNINITIATED) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "DRC is " + "uninitialized, not dumping its state"); + return 0; + } + + gf_proc_dump_add_section("rpc.drc"); + + if (TRY_LOCK (&drc->lock)) + return -1; + + gf_proc_dump_build_key (key, "drc", "type"); + gf_proc_dump_write (key, "%d", drc->type); + + gf_proc_dump_build_key (key, "drc", "client_count"); + gf_proc_dump_write (key, "%d", drc->client_count); + + gf_proc_dump_build_key (key, "drc", "current_cache_size"); + gf_proc_dump_write (key, "%d", drc->op_count); + + gf_proc_dump_build_key (key, "drc", "max_cache_size"); + gf_proc_dump_write (key, "%d", drc->global_cache_size); + + gf_proc_dump_build_key (key, "drc", "lru_factor"); + gf_proc_dump_write (key, "%d", drc->lru_factor); + + gf_proc_dump_build_key (key, "drc", "duplicate_request_count"); + gf_proc_dump_write (key, "%d", drc->cache_hits); + + gf_proc_dump_build_key (key, "drc", "in_transit_duplicate_requests"); + gf_proc_dump_write (key, "%d", drc->intransit_hits); + + list_for_each_entry (client, &drc->clients_head, client_list) { + gf_proc_dump_build_key (key, "client", "%d.ip-address", i); + memset (ip, 0, INET6_ADDRSTRLEN); + switch (client->sock_union.storage.ss_family) { + case AF_INET: + gf_proc_dump_write (key, "%s", inet_ntop (AF_INET, + &client->sock_union.sin.sin_addr.s_addr, + ip, INET_ADDRSTRLEN)); + break; + case AF_INET6: + gf_proc_dump_write (key, "%s", inet_ntop (AF_INET6, + &client->sock_union.sin6.sin6_addr, + ip, INET6_ADDRSTRLEN)); + break; + default: + gf_proc_dump_write (key, "%s", "N/A"); + } + + gf_proc_dump_build_key (key, "client", "%d.ref_count", i); + gf_proc_dump_write (key, "%d", client->ref); + gf_proc_dump_build_key (key, "client", "%d.op_count", i); + gf_proc_dump_write (key, "%d", client->op_count); + i++; + } + + UNLOCK (&drc->lock); + return 0; +} + +/** + * rpcsvc_drc_notify - function which is notified of RPC transport events + * + * @param svc - pointer to rpcsvc_t structure of the rpc + * @param xl - pointer to the xlator + * @param event - the event which triggered this notify + * @param data - the transport structure + * @return 0 on success, -1 on failure + */ +int +rpcsvc_drc_notify (rpcsvc_t *svc, void *xl, + rpcsvc_event_t event, void *data) +{ + int ret = -1; + rpc_transport_t *trans = NULL; + drc_client_t *client = NULL; + rpcsvc_drc_globals_t *drc = NULL; + + GF_ASSERT (svc); + GF_ASSERT (svc->drc); + GF_ASSERT (data); + + drc = svc->drc; + + if (drc->status == DRC_UNINITIATED || + drc->type == DRC_TYPE_NONE) + return 0; + + LOCK (&drc->lock); + + trans = (rpc_transport_t *)data; + client = rpcsvc_get_drc_client (drc, &trans->peerinfo.sockaddr); + if (!client) + goto out; + + switch (event) { + case RPCSVC_EVENT_ACCEPT: + trans->drc_client = rpcsvc_drc_client_ref (client); + ret = 0; + break; + + case RPCSVC_EVENT_DISCONNECT: + ret = 0; + if (list_empty (&drc->clients_head)) + break; + /* should be the last unref */ + rpcsvc_drc_client_unref (drc, client); + trans->drc_client = NULL; + break; + + default: + break; + } + + out: + UNLOCK (&drc->lock); + return ret; +} + +/** + * rpcsvc_drc_init - Initialize the duplicate request cache service + * + * @param svc - pointer to rpcsvc_t structure of the rpc + * @param options - the options dictionary which configures drc + * @return 0 on success, non-zero integer on failure + */ +int +rpcsvc_drc_init (rpcsvc_t *svc, dict_t *options) +{ + int ret = 0; + uint32_t drc_type = 0; + uint32_t drc_size = 0; + uint32_t drc_factor = 0; + rpcsvc_drc_globals_t *drc = NULL; + + GF_ASSERT (svc); + GF_ASSERT (options); + + if (!svc->drc) { + drc = GF_CALLOC (1, sizeof (rpcsvc_drc_globals_t), + gf_common_mt_drc_globals_t); + if (!drc) + return -1; + + svc->drc = drc; + LOCK_INIT (&drc->lock); + } else { + drc = svc->drc; + } + + LOCK (&drc->lock); + if (drc->type != DRC_TYPE_NONE) { + ret = 0; + goto out; + } + + /* Toggle DRC on/off, when more drc types(persistent/cluster) + are added, we shouldn't treat this as boolean */ + ret = dict_get_str_boolean (options, "nfs.drc", _gf_false); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_INFO, "drc user options need second look"); + ret = _gf_true; + } + + if (ret == _gf_false) { + /* drc off */ + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "DRC is off"); + ret = 0; + goto out; + } + + /* Specify type of DRC to be used */ + ret = dict_get_uint32 (options, "nfs.drc-type", &drc_type); + if (ret) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc type not set." + " Continuing with default"); + drc_type = DRC_DEFAULT_TYPE; + } + + drc->type = drc_type; + + /* Set the global cache size (no. of ops to cache) */ + ret = dict_get_uint32 (options, "nfs.drc-size", &drc_size); + if (ret) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc size not set." + " Continuing with default size"); + drc_size = DRC_DEFAULT_CACHE_SIZE; + } + + drc->global_cache_size = drc_size; + + /* Mempool for cached ops */ + drc->mempool = mem_pool_new (drc_cached_op_t, drc->global_cache_size); + if (!drc->mempool) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get mempool for" + " DRC, drc-size: %d", drc->global_cache_size); + ret = -1; + goto out; + } + + /* What percent of cache to be evicted whenever it fills up */ + ret = dict_get_uint32 (options, "nfs.drc-lru-factor", &drc_factor); + if (ret) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc lru factor not set." + " Continuing with policy default"); + drc_factor = DRC_DEFAULT_LRU_FACTOR; + } + + drc->lru_factor = (drc_lru_factor_t) drc_factor; + + INIT_LIST_HEAD (&drc->clients_head); + INIT_LIST_HEAD (&drc->cache_head); + + ret = rpcsvc_register_notify (svc, rpcsvc_drc_notify, THIS); + if (ret) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, + "registration of drc_notify function failed"); + goto out; + } + + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc init successful"); + drc->status = DRC_INITIATED; + + out: + UNLOCK (&drc->lock); + if (ret == -1) { + if (drc->mempool) { + mem_pool_destroy (drc->mempool); + drc->mempool = NULL; + } + GF_FREE (drc); + svc->drc = NULL; + } + return ret; +} diff --git a/rpc/rpc-lib/src/rpc-drc.h b/rpc/rpc-lib/src/rpc-drc.h new file mode 100644 index 000000000..0a1688992 --- /dev/null +++ b/rpc/rpc-lib/src/rpc-drc.h @@ -0,0 +1,100 @@ +/* + Copyright (c) 2013 Red Hat, Inc. + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef RPC_DRC_H +#define RPC_DRC_H + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "rpcsvc-common.h" +#include "rpcsvc.h" +#include "locking.h" +#include "dict.h" +#include "rb.h" + +/* per-client cache structure */ +struct drc_client { + uint32_t ref; + union gf_sock_union sock_union; + /* pointers to the cache */ + struct rb_table *rbtree; + /* no. of ops currently cached */ + uint32_t op_count; + struct list_head client_list; +}; + +struct drc_cached_op { + drc_op_state_t state; + uint32_t xid; + int prognum; + int progversion; + int procnum; + rpc_transport_msg_t msg; + drc_client_t *client; + struct list_head client_list; + struct list_head global_list; + int32_t ref; +}; + +/* global drc definitions */ +enum drc_status { + DRC_UNINITIATED, + DRC_INITIATED +}; +typedef enum drc_status drc_status_t; + +struct drc_globals { + /* allocator must be the first member since + * it is used so in gf_libavl_allocator + */ + struct libavl_allocator allocator; + drc_type_t type; + /* configurable size parameter */ + uint32_t global_cache_size; + drc_lru_factor_t lru_factor; + gf_lock_t lock; + drc_status_t status; + uint32_t op_count; + uint64_t cache_hits; + uint64_t intransit_hits; + struct mem_pool *mempool; + struct list_head cache_head; + uint32_t client_count; + struct list_head clients_head; +}; + +int +rpcsvc_need_drc (rpcsvc_request_t *req); + +drc_cached_op_t * +rpcsvc_drc_lookup (rpcsvc_request_t *req); + +int +rpcsvc_send_cached_reply (rpcsvc_request_t *req, drc_cached_op_t *reply); + +int +rpcsvc_cache_reply (rpcsvc_request_t *req, struct iobref *iobref, + struct iovec *rpchdr, int rpchdrcount, + struct iovec *proghdr, int proghdrcount, + struct iovec *payload, int payloadcount); + +int +rpcsvc_cache_request (rpcsvc_request_t *req); + +int32_t +rpcsvc_drc_priv (rpcsvc_drc_globals_t *drc); + +int +rpcsvc_drc_init (rpcsvc_t *svc, dict_t *options); + +#endif /* RPC_DRC_H */ diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index a8744d618..4384f2abd 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -196,6 +196,7 @@ struct rpc_transport { dict_t *options; char *name; void *dnscache; + void *drc_client; data_t *buf; int32_t (*init) (rpc_transport_t *this); void (*fini) (rpc_transport_t *this); diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h index 2c6f07488..054e187c9 100644 --- a/rpc/rpc-lib/src/rpcsvc-common.h +++ b/rpc/rpc-lib/src/rpcsvc-common.h @@ -30,6 +30,8 @@ struct rpcsvc_state; typedef int (*rpcsvc_notify_t) (struct rpcsvc_state *, void *mydata, rpcsvc_event_t, void *data); +struct drc_globals; +typedef struct drc_globals rpcsvc_drc_globals_t; /* Contains global state required for all the RPC services. */ @@ -68,7 +70,53 @@ typedef struct rpcsvc_state { void *mydata; /* This is xlator */ rpcsvc_notify_t notifyfn; struct mem_pool *rxpool; + rpcsvc_drc_globals_t *drc; } rpcsvc_t; +/* DRC START */ +enum drc_op_type { + DRC_NA = 0, + DRC_IDEMPOTENT = 1, + DRC_NON_IDEMPOTENT = 2 +}; +typedef enum drc_op_type drc_op_type_t; + +enum drc_type { + DRC_TYPE_NONE = 0, + DRC_TYPE_IN_MEMORY = 1 +}; +typedef enum drc_type drc_type_t; + +enum drc_lru_factor { + DRC_LRU_5_PC = 20, + DRC_LRU_10_PC = 10, + DRC_LRU_25_PC = 4, + DRC_LRU_50_PC = 2 +}; +typedef enum drc_lru_factor drc_lru_factor_t; + +enum drc_xid_state { + DRC_XID_MONOTONOUS = 0, + DRC_XID_WRAPPED = 1 +}; +typedef enum drc_xid_state drc_xid_state_t; + +enum drc_op_state { + DRC_OP_IN_TRANSIT = 0, + DRC_OP_CACHED = 1 +}; +typedef enum drc_op_state drc_op_state_t; + +enum drc_policy { + DRC_LRU = 0 +}; +typedef enum drc_policy drc_policy_t; + +/* Default policies for DRC */ +#define DRC_DEFAULT_TYPE DRC_TYPE_IN_MEMORY +#define DRC_DEFAULT_CACHE_SIZE 0x20000 +#define DRC_DEFAULT_LRU_FACTOR DRC_LRU_25_PC + +/* DRC END */ #endif /* #ifndef _RPCSVC_COMMON_H */ diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index d69756cc0..7efb2e1fb 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -28,6 +28,7 @@ #include "xdr-generic.h" #include "rpc-common-xdr.h" #include "syncop.h" +#include "rpc-drc.h" #include #include @@ -422,6 +423,7 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, * since we are not handling authentication failures for now. */ req->rpc_status = MSG_ACCEPTED; + req->reply = NULL; ret = 0; err: if (ret == -1) { @@ -461,13 +463,15 @@ int rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, rpc_transport_pollin_t *msg) { - rpcsvc_actor_t *actor = NULL; - rpcsvc_actor actor_fn = NULL; - rpcsvc_request_t *req = NULL; - int ret = -1; - uint16_t port = 0; - gf_boolean_t is_unix = _gf_false; - gf_boolean_t unprivileged = _gf_false; + rpcsvc_actor_t *actor = NULL; + rpcsvc_actor actor_fn = NULL; + rpcsvc_request_t *req = NULL; + int ret = -1; + uint16_t port = 0; + gf_boolean_t is_unix = _gf_false; + gf_boolean_t unprivileged = _gf_false; + drc_cached_op_t *reply = NULL; + rpcsvc_drc_globals_t *drc = NULL; if (!trans || !svc) return -1; @@ -503,7 +507,7 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, req = rpcsvc_request_create (svc, trans, msg); if (!req) - goto err; + goto out; if (!rpcsvc_request_accepted (req)) goto err_reply; @@ -521,6 +525,39 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, return -1; } + /* DRC */ + if (rpcsvc_need_drc (req)) { + drc = req->svc->drc; + + LOCK (&drc->lock); + reply = rpcsvc_drc_lookup (req); + + /* retransmission of completed request, send cached reply */ + if (reply && reply->state == DRC_OP_CACHED) { + gf_log (GF_RPCSVC, GF_LOG_INFO, "duplicate request:" + " XID: 0x%x", req->xid); + ret = rpcsvc_send_cached_reply (req, reply); + drc->cache_hits++; + UNLOCK (&drc->lock); + goto out; + + } /* retransmitted request, original op in transit, drop it */ + else if (reply && reply->state == DRC_OP_IN_TRANSIT) { + gf_log (GF_RPCSVC, GF_LOG_INFO, "op in transit," + " discarding. XID: 0x%x", req->xid); + ret = 0; + drc->intransit_hits++; + rpcsvc_request_destroy (req); + UNLOCK (&drc->lock); + goto out; + + } /* fresh request, cache it as in-transit and proceed */ + else { + ret = rpcsvc_cache_request (req); + } + UNLOCK (&drc->lock); + } + if (req->rpc_err == SUCCESS) { /* Before going to xlator code, set the THIS properly */ THIS = svc->mydata; @@ -557,7 +594,7 @@ err_reply: * has now been queued. */ ret = 0; -err: +out: return ret; } @@ -904,21 +941,22 @@ out: return ret; } -static inline int -rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec, - int hdrcount, struct iovec *proghdr, int proghdrcount, - struct iovec *progpayload, int progpayloadcount, - struct iobref *iobref, void *priv) +int +rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr, + int rpchdrcount, struct iovec *proghdr, + int proghdrcount, struct iovec *progpayload, + int progpayloadcount, struct iobref *iobref, + void *priv) { int ret = -1; rpc_transport_reply_t reply = {{0, }}; - if ((!trans) || (!hdrvec) || (!hdrvec->iov_base)) { + if ((!trans) || (!rpchdr) || (!rpchdr->iov_base)) { goto out; } - reply.msg.rpchdr = hdrvec; - reply.msg.rpchdrcount = hdrcount; + reply.msg.rpchdr = rpchdr; + reply.msg.rpchdrcount = rpchdrcount; reply.msg.proghdr = proghdr; reply.msg.proghdrcount = proghdrcount; reply.msg.progpayload = progpayload; @@ -1064,6 +1102,7 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, size_t msglen = 0; size_t hdrlen = 0; char new_iobref = 0; + rpcsvc_drc_globals_t *drc = NULL; if ((!req) || (!req->trans)) return -1; @@ -1098,6 +1137,17 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, iobref_add (iobref, replyiob); + /* cache the request in the duplicate request cache for appropriate ops */ + if (req->reply) { + drc = req->svc->drc; + + LOCK (&drc->lock); + ret = rpcsvc_cache_reply (req, iobref, &recordhdr, 1, + proghdr, hdrcount, + payload, payloadcount); + UNLOCK (&drc->lock); + } + ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount, payload, payloadcount, iobref, req->trans_private); @@ -1905,6 +1955,7 @@ rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options, "failed to register DUMP program"); goto free_svc; } + ret = 0; free_svc: if (ret == -1) { @@ -2196,9 +2247,9 @@ out: rpcsvc_actor_t gluster_dump_actors[] = { - [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0}, - [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0}, - [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0}, + [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA}, + [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA}, + [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0, DRC_NA}, }; diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index afa7c9926..67ff74be6 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -140,6 +140,9 @@ typedef struct rpcsvc_auth_data { #define rpcsvc_auth_flavour(au) ((au).flavour) +typedef struct drc_client drc_client_t; +typedef struct drc_cached_op drc_cached_op_t; + /* The container for the RPC call handed up to an actor. * Dynamically allocated. Lives till the call reply is completely * transmitted. @@ -241,6 +244,9 @@ struct rpcsvc_request { /* we need to ref the 'iobuf' in case of 'synctasking' it */ struct iobuf *hdr_iobuf; + + /* pointer to cached reply for use in DRC */ + drc_cached_op_t *reply; }; #define rpcsvc_request_program(req) ((rpcsvc_program_t *)((req)->prog)) @@ -314,7 +320,6 @@ typedef void *(*rpcsvc_encode_reply) (void *msg); */ typedef void (*rpcsvc_deallocate_reply) (void *msg); - #define RPCSVC_NAME_MAX 32 /* The descriptor for each procedure/actor that runs * over the RPC service. @@ -336,6 +341,7 @@ typedef struct rpcsvc_actor_desc { /* Can actor be ran on behalf an unprivileged requestor? */ gf_boolean_t unprivileged; + drc_op_type_t op_type; } rpcsvc_actor_t; /* Describes a program and its version along with the function pointers @@ -447,6 +453,13 @@ rpcsvc_register_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata); int rpcsvc_unregister_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata); +int +rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr, + int rpchdrcount, struct iovec *proghdr, + int proghdrcount, struct iovec *progpayload, + int progpayloadcount, struct iobref *iobref, + void *priv); + int rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr, int hdrcount, struct iovec *payload, int payloadcount, @@ -558,6 +571,9 @@ int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, rpcsvc_cbk_program_t *prog, int procnum, struct iovec *proghdr, int proghdrcount); +rpcsvc_actor_t * +rpcsvc_program_actor (rpcsvc_request_t *req); + int rpcsvc_transport_unix_options_build (dict_t **options, char *filepath); int @@ -571,5 +587,4 @@ rpcsvc_volume_allowed (dict_t *options, char *volname); rpcsvc_vector_sizer rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum, uint32_t progver, uint32_t procnum); - #endif -- cgit