diff options
Diffstat (limited to 'rpc/rpc-lib/src')
| -rw-r--r-- | rpc/rpc-lib/src/Makefile.am | 8 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/auth-glusterfs.c | 32 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/auth-unix.c | 1 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/protocol-common.h | 36 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 135 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.h | 7 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-drc.c | 872 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-drc.h | 104 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 86 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 18 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc-auth.c | 46 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc-common.h | 64 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 342 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 45 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/xdr-rpc.c | 2 |
15 files changed, 1631 insertions, 167 deletions
diff --git a/rpc/rpc-lib/src/Makefile.am b/rpc/rpc-lib/src/Makefile.am index ca62a27f9..f19c3c8a4 100644 --- a/rpc/rpc-lib/src/Makefile.am +++ b/rpc/rpc-lib/src/Makefile.am @@ -1,16 +1,18 @@ lib_LTLIBRARIES = libgfrpc.la libgfrpc_la_SOURCES = auth-unix.c rpcsvc-auth.c rpcsvc.c auth-null.c \ - rpc-transport.c xdr-rpc.c xdr-rpcclnt.c rpc-clnt.c auth-glusterfs.c + rpc-transport.c xdr-rpc.c xdr-rpcclnt.c rpc-clnt.c auth-glusterfs.c \ + rpc-drc.c libgfrpc_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la noinst_HEADERS = rpcsvc.h rpc-transport.h xdr-common.h xdr-rpc.h xdr-rpcclnt.h \ - rpc-clnt.h rpcsvc-common.h protocol-common.h + rpc-clnt.h rpcsvc-common.h protocol-common.h rpc-drc.h AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \ -I$(top_srcdir)/rpc/xdr/src \ - -DRPC_TRANSPORTDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport\" + -DRPC_TRANSPORTDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport\" \ + -I$(top_srcdir)/contrib/rbtree AM_CFLAGS = -Wall $(GF_CFLAGS) diff --git a/rpc/rpc-lib/src/auth-glusterfs.c b/rpc/rpc-lib/src/auth-glusterfs.c index 9c6f8385b..db488434c 100644 --- a/rpc/rpc-lib/src/auth-glusterfs.c +++ b/rpc/rpc-lib/src/auth-glusterfs.c @@ -96,6 +96,22 @@ int auth_glusterfs_authenticate (rpcsvc_request_t *req, void *priv) goto err; } + if (req->auxgidcount > SMALL_GROUP_COUNT) { + req->auxgidlarge = GF_CALLOC(req->auxgidcount, + sizeof(req->auxgids[0]), + gf_common_mt_auxgids); + req->auxgids = req->auxgidlarge; + } else { + req->auxgids = req->auxgidsmall; + } + + if (!req->auxgids) { + gf_log ("auth-glusterfs", GF_LOG_WARNING, + "cannot allocate gid list"); + ret = RPCSVC_AUTH_REJECT; + goto err; + } + for (gidcount = 0; gidcount < au.ngrps; ++gidcount) req->auxgids[gidcount] = au.groups[gidcount]; @@ -203,6 +219,22 @@ int auth_glusterfs_v2_authenticate (rpcsvc_request_t *req, void *priv) goto err; } + if (req->auxgidcount > SMALL_GROUP_COUNT) { + req->auxgidlarge = GF_CALLOC(req->auxgidcount, + sizeof(req->auxgids[0]), + gf_common_mt_auxgids); + req->auxgids = req->auxgidlarge; + } else { + req->auxgids = req->auxgidsmall; + } + + if (!req->auxgids) { + gf_log ("auth-glusterfs-v2", GF_LOG_WARNING, + "cannot allocate gid list"); + ret = RPCSVC_AUTH_REJECT; + goto err; + } + for (i = 0; i < req->auxgidcount; ++i) req->auxgids[i] = au.groups.groups_val[i]; diff --git a/rpc/rpc-lib/src/auth-unix.c b/rpc/rpc-lib/src/auth-unix.c index 6251d60a8..fa5f0576e 100644 --- a/rpc/rpc-lib/src/auth-unix.c +++ b/rpc/rpc-lib/src/auth-unix.c @@ -42,6 +42,7 @@ int auth_unix_authenticate (rpcsvc_request_t *req, void *priv) if (!req) return ret; + req->auxgids = req->auxgidsmall; ret = xdr_to_auth_unix_cred (req->cred.authdata, req->cred.datalen, &aup, machname, req->auxgids); if (ret == -1) { diff --git a/rpc/rpc-lib/src/protocol-common.h b/rpc/rpc-lib/src/protocol-common.h index 97017e5fe..8bef906cc 100644 --- a/rpc/rpc-lib/src/protocol-common.h +++ b/rpc/rpc-lib/src/protocol-common.h @@ -56,6 +56,9 @@ enum gf_fop_procnum { GFS3_OP_RELEASE, GFS3_OP_RELEASEDIR, GFS3_OP_FREMOVEXATTR, + GFS3_OP_FALLOCATE, + GFS3_OP_DISCARD, + GFS3_OP_ZEROFILL, GFS3_OP_MAXVALUE, } ; @@ -154,7 +157,10 @@ enum gluster_cli_procnum { GLUSTER_CLI_LIST_VOLUME, GLUSTER_CLI_CLRLOCKS_VOLUME, GLUSTER_CLI_UUID_RESET, - GLUSTER_CLI_BD_OP, + GLUSTER_CLI_UUID_GET, + GLUSTER_CLI_COPY_FILE, + GLUSTER_CLI_SYS_EXEC, + GLUSTER_CLI_SNAP, GLUSTER_CLI_MAXVALUE, }; @@ -186,7 +192,7 @@ enum glusterd_brick_procnum { GLUSTERD_BRICK_XLATOR_DEFRAG, GLUSTERD_NODE_PROFILE, GLUSTERD_NODE_STATUS, - GLUSTERD_BRICK_BD_OP, + GLUSTERD_VOLUME_BARRIER_OP, GLUSTERD_BRICK_MAXVALUE, }; @@ -204,16 +210,22 @@ typedef enum { GF_AFR_OP_INDEX_SUMMARY, GF_AFR_OP_HEALED_FILES, GF_AFR_OP_HEAL_FAILED_FILES, - GF_AFR_OP_SPLIT_BRAIN_FILES + GF_AFR_OP_SPLIT_BRAIN_FILES, + GF_AFR_OP_STATISTICS, + GF_AFR_OP_STATISTICS_HEAL_COUNT, + GF_AFR_OP_STATISTICS_HEAL_COUNT_PER_REPLICA, } gf_xl_afr_op_t ; -typedef enum { - GF_BD_OP_INVALID, - GF_BD_OP_NEW_BD, - GF_BD_OP_DELETE_BD, - GF_BD_OP_CLONE_BD, - GF_BD_OP_SNAPSHOT_BD, -} gf_xl_bd_op_t ; +enum glusterd_mgmt_v3_procnum { + GLUSTERD_MGMT_V3_NULL, /* 0 */ + GLUSTERD_MGMT_V3_LOCK, + GLUSTERD_MGMT_V3_PRE_VALIDATE, + GLUSTERD_MGMT_V3_BRICK_OP, + GLUSTERD_MGMT_V3_COMMIT, + GLUSTERD_MGMT_V3_POST_VALIDATE, + GLUSTERD_MGMT_V3_UNLOCK, + GLUSTERD_MGMT_V3_MAXVALUE, +}; #define GLUSTER_HNDSK_PROGRAM 14398633 /* Completely random */ #define GLUSTER_HNDSK_VERSION 2 /* 0.0.2 */ @@ -241,6 +253,10 @@ typedef enum { #define GD_BRICK_PROGRAM 4867634 /*Completely random*/ #define GD_BRICK_VERSION 2 +/* Third version */ +#define GD_MGMT_V3_PROGRAM 2210013 /* Completely random */ +#define GD_MGMT_V3_VERSION 3 + /* OP-VERSION handshake */ #define GD_MGMT_HNDSK_PROGRAM 1239873 /* Completely random */ #define GD_MGMT_HNDSK_VERSION 1 diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 55ac88a93..ac98a5c91 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -145,7 +145,7 @@ call_bail (void *data) struct saved_frame *trav = NULL; struct saved_frame *tmp = NULL; char frame_sent[256] = {0,}; - struct timeval timeout = {0,}; + struct timespec timeout = {0,}; struct iovec iov = {0,}; GF_VALIDATE_OR_GOTO ("client", data, out); @@ -163,7 +163,7 @@ call_bail (void *data) call-once timer */ if (conn->timer) { timeout.tv_sec = 10; - timeout.tv_usec = 0; + timeout.tv_nsec = 0; gf_timer_call_cancel (clnt->ctx, conn->timer); conn->timer = gf_timer_call_after (clnt->ctx, @@ -173,7 +173,8 @@ call_bail (void *data) if (conn->timer == NULL) { gf_log (conn->trans->name, GF_LOG_WARNING, - "Cannot create bailout timer"); + "Cannot create bailout timer for %s", + conn->trans->peerinfo.identifier); } } @@ -198,13 +199,13 @@ call_bail (void *data) gf_log (conn->trans->name, GF_LOG_ERROR, "bailing out frame type(%s) op(%s(%d)) xid = 0x%x " - "sent = %s. timeout = %d", + "sent = %s. timeout = %d for %s", trav->rpcreq->prog->progname, (trav->rpcreq->prog->procnames) ? trav->rpcreq->prog->procnames[trav->rpcreq->procnum] : "--", trav->rpcreq->procnum, trav->rpcreq->xid, frame_sent, - conn->frame_timeout); + conn->frame_timeout, conn->trans->peerinfo.identifier); clnt = rpc_clnt_ref (clnt); trav->rpcreq->rpc_status = -1; @@ -226,7 +227,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, struct rpc_req *rpcreq) { rpc_clnt_connection_t *conn = NULL; - struct timeval timeout = {0, }; + struct timespec timeout = {0, }; struct saved_frame *saved_frame = NULL; conn = &rpc_clnt->conn; @@ -240,7 +241,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, /* TODO: make timeout configurable */ if (conn->timer == NULL) { timeout.tv_sec = 10; - timeout.tv_usec = 0; + timeout.tv_nsec = 0; conn->timer = gf_timer_call_after (rpc_clnt->ctx, timeout, call_bail, @@ -397,7 +398,7 @@ rpc_clnt_reconnect (void *trans_ptr) { rpc_transport_t *trans = NULL; rpc_clnt_connection_t *conn = NULL; - struct timeval tv = {0, 0}; + struct timespec ts = {0, 0}; int32_t ret = 0; struct rpc_clnt *clnt = NULL; @@ -416,14 +417,15 @@ rpc_clnt_reconnect (void *trans_ptr) conn->reconnect = 0; if (conn->connected == 0) { - tv.tv_sec = 3; + ts.tv_sec = 3; + ts.tv_nsec = 0; gf_log (trans->name, GF_LOG_TRACE, "attempting reconnect"); ret = rpc_transport_connect (trans, conn->config.remote_port); conn->reconnect = - gf_timer_call_after (clnt->ctx, tv, + gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, trans); } else { @@ -819,6 +821,9 @@ out: return; } +static void +rpc_clnt_destroy (struct rpc_clnt *rpc); + int rpc_clnt_notify (rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, void *data, ...) @@ -828,7 +833,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, int ret = -1; rpc_request_info_t *req_info = NULL; rpc_transport_pollin_t *pollin = NULL; - struct timeval tv = {0, }; + struct timespec ts = {0, }; conn = mydata; if (conn == NULL) { @@ -847,10 +852,11 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, { if (!conn->rpc_clnt->disabled && (conn->reconnect == NULL)) { - tv.tv_sec = 10; + ts.tv_sec = 10; + ts.tv_nsec = 0; conn->reconnect = - gf_timer_call_after (clnt->ctx, tv, + gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, conn->trans); } @@ -864,9 +870,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, } case RPC_TRANSPORT_CLEANUP: - /* this event should not be received on a client for, a - * transport is only disconnected, but never destroyed. - */ + rpc_clnt_destroy (clnt); ret = 0; break; @@ -1541,18 +1545,21 @@ rpc_clnt_ref (struct rpc_clnt *rpc) static void -rpc_clnt_destroy (struct rpc_clnt *rpc) +rpc_clnt_trigger_destroy (struct rpc_clnt *rpc) { if (!rpc) return; - if (rpc->conn.trans) { - rpc_transport_unregister_notify (rpc->conn.trans); - rpc_transport_disconnect (rpc->conn.trans); - rpc_transport_unref (rpc->conn.trans); - } + rpc_clnt_disable (rpc); + rpc_transport_unref (rpc->conn.trans); +} + +static void +rpc_clnt_destroy (struct rpc_clnt *rpc) +{ + if (!rpc) + return; - rpc_clnt_reconnect_cleanup (&rpc->conn); saved_frames_destroy (rpc->conn.saved_frames); pthread_mutex_destroy (&rpc->lock); pthread_mutex_destroy (&rpc->conn.lock); @@ -1579,13 +1586,36 @@ rpc_clnt_unref (struct rpc_clnt *rpc) } pthread_mutex_unlock (&rpc->lock); if (!count) { - rpc_clnt_destroy (rpc); + rpc_clnt_trigger_destroy (rpc); return NULL; } return rpc; } +char +rpc_clnt_is_disabled (struct rpc_clnt *rpc) +{ + + rpc_clnt_connection_t *conn = NULL; + char disabled = 0; + + if (!rpc) { + goto out; + } + + conn = &rpc->conn; + + pthread_mutex_lock (&conn->lock); + { + disabled = rpc->disabled; + } + pthread_mutex_unlock (&conn->lock); + +out: + return disabled; +} + void rpc_clnt_disable (struct rpc_clnt *rpc) { @@ -1668,60 +1698,3 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config) rpc->conn.config.remote_host = gf_strdup (config->remote_host); } } - -int -rpc_clnt_transport_unix_options_build (dict_t **options, char *filepath, - int frame_timeout) -{ - dict_t *dict = NULL; - char *fpath = NULL; - int ret = -1; - - GF_ASSERT (filepath); - GF_ASSERT (options); - - dict = dict_new (); - if (!dict) - goto out; - - fpath = gf_strdup (filepath); - if (!fpath) { - ret = -1; - goto out; - } - - ret = dict_set_dynstr (dict, "transport.socket.connect-path", fpath); - if (ret) - goto out; - - ret = dict_set_str (dict, "transport.address-family", "unix"); - if (ret) - goto out; - - ret = dict_set_str (dict, "transport.socket.nodelay", "off"); - if (ret) - goto out; - - ret = dict_set_str (dict, "transport-type", "socket"); - if (ret) - goto out; - - ret = dict_set_str (dict, "transport.socket.keepalive", "off"); - if (ret) - goto out; - - if (frame_timeout > 0) { - ret = dict_set_int32 (dict, "frame-timeout", frame_timeout); - if (ret) - goto out; - } - - *options = dict; -out: - if (ret) { - GF_FREE (fpath); - if (dict) - dict_unref (dict); - } - return ret; -} diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h index 0da165559..584963ad0 100644 --- a/rpc/rpc-lib/src/rpc-clnt.h +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -237,11 +237,10 @@ void rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config); int rpcclnt_cbk_program_register (struct rpc_clnt *svc, rpcclnt_cb_program_t *program, void *mydata); -int -rpc_clnt_transport_unix_options_build (dict_t **options, char *filepath, - int frame_timeout); - void rpc_clnt_disable (struct rpc_clnt *rpc); +char +rpc_clnt_is_disabled (struct rpc_clnt *rpc); + #endif /* !_RPC_CLNT_H */ diff --git a/rpc/rpc-lib/src/rpc-drc.c b/rpc/rpc-lib/src/rpc-drc.c new file mode 100644 index 000000000..8181e6aee --- /dev/null +++ b/rpc/rpc-lib/src/rpc-drc.c @@ -0,0 +1,872 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "rpcsvc.h" +#ifndef RPC_DRC_H +#include "rpc-drc.h" +#endif +#include "locking.h" +#include "hashfn.h" +#include "common-utils.h" +#include "statedump.h" +#include "mem-pool.h" + +#include <netinet/in.h> +#include <unistd.h> + +/** + * rpcsvc_drc_op_destroy - Destroys the cached reply + * + * @param drc - the main drc structure + * @param reply - the cached reply to destroy + * @return NULL if reply is destroyed, reply otherwise + */ +static drc_cached_op_t * +rpcsvc_drc_op_destroy (rpcsvc_drc_globals_t *drc, drc_cached_op_t *reply) +{ + GF_ASSERT (drc); + GF_ASSERT (reply); + + if (reply->state == DRC_OP_IN_TRANSIT) + return reply; + + iobref_unref (reply->msg.iobref); + if (reply->msg.rpchdr) + GF_FREE (reply->msg.rpchdr); + if (reply->msg.proghdr) + GF_FREE (reply->msg.proghdr); + if (reply->msg.progpayload) + GF_FREE (reply->msg.progpayload); + + list_del (&reply->global_list); + reply->client->op_count--; + drc->op_count--; + mem_put (reply); + reply = NULL; + + return reply; +} + +/** + * rpcsvc_drc_op_rb_unref - This function is used in rb tree cleanup only + * + * @param reply - the cached reply to unref + * @param drc - the main drc structure + * @return void + */ +static void +rpcsvc_drc_rb_op_destroy (void *reply, void *drc) +{ + rpcsvc_drc_op_destroy (drc, (drc_cached_op_t *)reply); +} + +/** + * rpcsvc_remove_drc_client - Cleanup the drc client + * + * @param client - the drc client to be removed + * @return void + */ +static void +rpcsvc_remove_drc_client (drc_client_t *client) +{ + rb_destroy (client->rbtree, rpcsvc_drc_rb_op_destroy); + list_del (&client->client_list); + GF_FREE (client); +} + +/** + * rpcsvc_client_lookup - Given a sockaddr_storage, find the client if it exists + * + * @param drc - the main drc structure + * @param sockaddr - the network address of the client to be looked up + * @return drc client if it exists, NULL otherwise + */ +static drc_client_t * +rpcsvc_client_lookup (rpcsvc_drc_globals_t *drc, + struct sockaddr_storage *sockaddr) +{ + drc_client_t *client = NULL; + + GF_ASSERT (drc); + GF_ASSERT (sockaddr); + + if (list_empty (&drc->clients_head)) + return NULL; + + list_for_each_entry (client, &drc->clients_head, client_list) { + if (gf_sock_union_equal_addr (&client->sock_union, + (union gf_sock_union *)sockaddr)) + return client; + } + + return NULL; +} + +/** + * drc_compare_reqs - Used by rbtree to determine if incoming req matches with + * an existing node(cached reply) in rbtree + * + * @param item - pointer to the incoming req + * @param rb_node_data - pointer to an rbtree node (cached reply) + * @param param - drc pointer - unused here, but used in *op_destroy + * @return 0 if req matches reply, else (req->xid - reply->xid) + */ +int +drc_compare_reqs (const void *item, const void *rb_node_data, void *param) +{ + int ret = -1; + rpcsvc_request_t *req = NULL; + drc_cached_op_t *reply = NULL; + + GF_ASSERT (item); + GF_ASSERT (rb_node_data); + GF_ASSERT (param); + + req = (rpcsvc_request_t *)item; + reply = (drc_cached_op_t *)rb_node_data; + + ret = req->xid - reply->xid; + if (ret != 0) + return ret; + + if (req->prognum == reply->prognum && + req->procnum == reply->procnum && + req->progver == reply->progversion) + return 0; + + return 1; +} + +/** + * drc_rb_calloc - used by rbtree api to allocate memory for nodes + * + * @param allocator - the libavl_allocator structure used by rbtree + * @param size - not needed by this function + * @return pointer to new cached reply (node in rbtree) + */ +static void * +drc_rb_calloc (struct libavl_allocator *allocator, size_t size) +{ + rpcsvc_drc_globals_t *drc = NULL; + + /* get the drc pointer by simple typecast, since allocator + * is the first member of rpcsvc_drc_globals_t + */ + drc = (rpcsvc_drc_globals_t *)allocator; + + return mem_get (drc->mempool); +} + +/** + * drc_rb_free - used by rbtree api to free a node + * + * @param a - the libavl_allocator structure used by rbtree api + * @param block - node that needs to be freed + * @return void + */ +static void +drc_rb_free (struct libavl_allocator *a, void *block) +{ + mem_put (block); +} + +/** + * drc_init_client_cache - initialize a drc client and its rb tree + * + * @param drc - the main drc structure + * @param client - the drc client to be initialized + * @return 0 on success, -1 on failure + */ +static int +drc_init_client_cache (rpcsvc_drc_globals_t *drc, drc_client_t *client) +{ + GF_ASSERT (drc); + GF_ASSERT (client); + + drc->allocator.libavl_malloc = drc_rb_calloc; + drc->allocator.libavl_free = drc_rb_free; + + client->rbtree = rb_create (drc_compare_reqs, drc, + (struct libavl_allocator *)drc); + if (!client->rbtree) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "rb tree creation failed"); + return -1; + } + + return 0; +} + +/** + * rpcsvc_get_drc_client - find the drc client with given sockaddr, else + * allocate and initialize a new drc client + * + * @param drc - the main drc structure + * @param sockaddr - network address of client + * @return drc client on success, NULL on failure + */ +static drc_client_t * +rpcsvc_get_drc_client (rpcsvc_drc_globals_t *drc, + struct sockaddr_storage *sockaddr) +{ + drc_client_t *client = NULL; + + GF_ASSERT (drc); + GF_ASSERT (sockaddr); + + client = rpcsvc_client_lookup (drc, sockaddr); + if (client) + goto out; + + /* if lookup fails, allocate cache for the new client */ + client = GF_CALLOC (1, sizeof (drc_client_t), + gf_common_mt_drc_client_t); + if (!client) + goto out; + + client->ref = 0; + client->sock_union = (union gf_sock_union)*sockaddr; + client->op_count = 0; + + if (drc_init_client_cache (drc, client)) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "initialization of drc client failed"); + GF_FREE (client); + client = NULL; + goto out; + } + drc->client_count++; + + list_add (&client->client_list, &drc->clients_head); + + out: + return client; +} + +/** + * rpcsvc_need_drc - Determine if a request needs DRC service + * + * @param req - incoming request + * @return 1 if DRC is needed for req, 0 otherwise + */ +int +rpcsvc_need_drc (rpcsvc_request_t *req) +{ + rpcsvc_actor_t *actor = NULL; + rpcsvc_drc_globals_t *drc = NULL; + + GF_ASSERT (req); + GF_ASSERT (req->svc); + + drc = req->svc->drc; + + if (!drc || drc->status == DRC_UNINITIATED) + return 0; + + actor = rpcsvc_program_actor (req); + if (!actor) + return 0; + + return (actor->op_type == DRC_NON_IDEMPOTENT + && drc->type != DRC_TYPE_NONE); +} + +/** + * rpcsvc_drc_client_ref - ref the drc client + * + * @param client - the drc client to ref + * @return client + */ +static drc_client_t * +rpcsvc_drc_client_ref (drc_client_t *client) +{ + GF_ASSERT (client); + client->ref++; + return client; +} + +/** + * rpcsvc_drc_client_unref - unref the drc client, and destroy + * the client on last unref + * + * @param drc - the main drc structure + * @param client - the drc client to unref + * @return NULL if it is the last unref, client otherwise + */ +static drc_client_t * +rpcsvc_drc_client_unref (rpcsvc_drc_globals_t *drc, drc_client_t *client) +{ + GF_ASSERT (drc); + GF_ASSERT (client->ref); + + client->ref--; + if (!client->ref) { + drc->client_count--; + rpcsvc_remove_drc_client (client); + client = NULL; + } + + return client; +} + +/** + * rpcsvc_drc_lookup - lookup a request to see if it is already cached + * + * @param req - incoming request + * @return cached reply of req if found, NULL otherwise + */ +drc_cached_op_t * +rpcsvc_drc_lookup (rpcsvc_request_t *req) +{ + drc_client_t *client = NULL; + drc_cached_op_t *reply = NULL; + + GF_ASSERT (req); + + if (!req->trans->drc_client) { + client = rpcsvc_get_drc_client (req->svc->drc, + &req->trans->peerinfo.sockaddr); + if (!client) + goto out; + req->trans->drc_client = client; + } + + client = rpcsvc_drc_client_ref (req->trans->drc_client); + + if (client->op_count == 0) + goto out; + + reply = rb_find (client->rbtree, req); + + out: + if (client) + rpcsvc_drc_client_unref (req->svc->drc, client); + + return reply; +} + +/** + * rpcsvc_send_cached_reply - send the cached reply for the incoming request + * + * @param req - incoming request (which is a duplicate in this case) + * @param reply - the cached reply for req + * @return 0 on successful reply submission, -1 or other non-zero value otherwise + */ +int +rpcsvc_send_cached_reply (rpcsvc_request_t *req, drc_cached_op_t *reply) +{ + int ret = 0; + + GF_ASSERT (req); + GF_ASSERT (reply); + + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "sending cached reply: xid: %d, " + "client: %s", req->xid, req->trans->peerinfo.identifier); + + rpcsvc_drc_client_ref (reply->client); + ret = rpcsvc_transport_submit (req->trans, + reply->msg.rpchdr, reply->msg.rpchdrcount, + reply->msg.proghdr, reply->msg.proghdrcount, + reply->msg.progpayload, reply->msg.progpayloadcount, + reply->msg.iobref, req->trans_private); + rpcsvc_drc_client_unref (req->svc->drc, reply->client); + + return ret; +} + +/** + * rpcsvc_cache_reply - cache the reply for the processed request 'req' + * + * @param req - processed request + * @param iobref - iobref structure of the reply + * @param rpchdr - rpc header of the reply + * @param rpchdrcount - size of rpchdr + * @param proghdr - program header of the reply + * @param proghdrcount - size of proghdr + * @param payload - payload of the reply if any + * @param payloadcount - size of payload + * @return 0 on success, -1 on failure + */ +int +rpcsvc_cache_reply (rpcsvc_request_t *req, struct iobref *iobref, + struct iovec *rpchdr, int rpchdrcount, + struct iovec *proghdr, int proghdrcount, + struct iovec *payload, int payloadcount) +{ + int ret = -1; + drc_cached_op_t *reply = NULL; + + GF_ASSERT (req); + GF_ASSERT (req->reply); + + reply = req->reply; + + reply->state = DRC_OP_CACHED; + + reply->msg.iobref = iobref_ref (iobref); + + reply->msg.rpchdrcount = rpchdrcount; + reply->msg.rpchdr = iov_dup (rpchdr, rpchdrcount); + + reply->msg.proghdrcount = proghdrcount; + reply->msg.proghdr = iov_dup (proghdr, proghdrcount); + + reply->msg.progpayloadcount = payloadcount; + if (payloadcount) + reply->msg.progpayload = iov_dup (payload, payloadcount); + + // rpcsvc_drc_client_unref (req->svc->drc, req->trans->drc_client); + // rpcsvc_drc_op_unref (req->svc->drc, reply); + ret = 0; + + return ret; +} + +/** + * rpcsvc_vacate_drc_entries - free up some percentage of drc cache + * based on the lru factor + * + * @param drc - the main drc structure + * @return void + */ +static void +rpcsvc_vacate_drc_entries (rpcsvc_drc_globals_t *drc) +{ + uint32_t i = 0; + uint32_t n = 0; + drc_cached_op_t *reply = NULL; + drc_cached_op_t *tmp = NULL; + drc_client_t *client = NULL; + + GF_ASSERT (drc); + + n = drc->global_cache_size / drc->lru_factor; + + list_for_each_entry_safe_reverse (reply, tmp, &drc->cache_head, global_list) { + /* Don't delete ops that are in transit */ + if (reply->state == DRC_OP_IN_TRANSIT) + continue; + + client = reply->client; + + (void *)rb_delete (client->rbtree, reply); + + rpcsvc_drc_op_destroy (drc, reply); + rpcsvc_drc_client_unref (drc, client); + i++; + if (i >= n) + break; + } +} + +/** + * rpcsvc_add_op_to_cache - insert the cached op into the client rbtree and drc list + * + * @param drc - the main drc structure + * @param reply - the op to be inserted + * @return 0 on success, -1 on failure + */ +static int +rpcsvc_add_op_to_cache (rpcsvc_drc_globals_t *drc, drc_cached_op_t *reply) +{ + drc_client_t *client = NULL; + drc_cached_op_t **tmp_reply = NULL; + + GF_ASSERT (drc); + GF_ASSERT (reply); + + client = reply->client; + + /* cache is full, free up some space */ + if (drc->op_count >= drc->global_cache_size) + rpcsvc_vacate_drc_entries (drc); + + tmp_reply = (drc_cached_op_t **)rb_probe (client->rbtree, reply); + if (*tmp_reply != reply) { + /* should never happen */ + gf_log (GF_RPCSVC, GF_LOG_ERROR, + "DRC failed to detect duplicates"); + return -1; + } else if (*tmp_reply == NULL) { + /* mem alloc failed */ + return -1; + } + + client->op_count++; + list_add (&reply->global_list, &drc->cache_head); + drc->op_count++; + + return 0; +} + +/** + * rpcsvc_cache_request - cache the in-transition incoming request + * + * @param req - incoming request + * @return 0 on success, -1 on failure + */ +int +rpcsvc_cache_request (rpcsvc_request_t *req) +{ + int ret = -1; + drc_client_t *client = NULL; + drc_cached_op_t *reply = NULL; + rpcsvc_drc_globals_t *drc = NULL; + + GF_ASSERT (req); + + drc = req->svc->drc; + + client = req->trans->drc_client; + if (!client) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc client is NULL"); + goto out; + } + + reply = mem_get (drc->mempool); + if (!reply) + goto out; + + reply->client = rpcsvc_drc_client_ref (client); + reply->xid = req->xid; + reply->prognum = req->prognum; + reply->progversion = req->progver; + reply->procnum = req->procnum; + reply->state = DRC_OP_IN_TRANSIT; + req->reply = reply; + + ret = rpcsvc_add_op_to_cache (drc, reply); + if (ret) { + req->reply = NULL; + rpcsvc_drc_op_destroy (drc, reply); + rpcsvc_drc_client_unref (drc, client); + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Failed to add op to drc cache"); + } + + out: + return ret; +} + +/** + * + * rpcsvc_drc_priv - function which dumps the drc state + * + * @param drc - the main drc structure + * @return 0 on success, -1 on failure + */ +int32_t +rpcsvc_drc_priv (rpcsvc_drc_globals_t *drc) +{ + int i = 0; + char key[GF_DUMP_MAX_BUF_LEN] = {0}; + drc_client_t *client = NULL; + char ip[INET6_ADDRSTRLEN] = {0}; + + if (!drc || drc->status == DRC_UNINITIATED) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "DRC is " + "uninitialized, not dumping its state"); + return 0; + } + + gf_proc_dump_add_section("rpc.drc"); + + if (TRY_LOCK (&drc->lock)) + return -1; + + gf_proc_dump_build_key (key, "drc", "type"); + gf_proc_dump_write (key, "%d", drc->type); + + gf_proc_dump_build_key (key, "drc", "client_count"); + gf_proc_dump_write (key, "%d", drc->client_count); + + gf_proc_dump_build_key (key, "drc", "current_cache_size"); + gf_proc_dump_write (key, "%d", drc->op_count); + + gf_proc_dump_build_key (key, "drc", "max_cache_size"); + gf_proc_dump_write (key, "%d", drc->global_cache_size); + + gf_proc_dump_build_key (key, "drc", "lru_factor"); + gf_proc_dump_write (key, "%d", drc->lru_factor); + + gf_proc_dump_build_key (key, "drc", "duplicate_request_count"); + gf_proc_dump_write (key, "%d", drc->cache_hits); + + gf_proc_dump_build_key (key, "drc", "in_transit_duplicate_requests"); + gf_proc_dump_write (key, "%d", drc->intransit_hits); + + list_for_each_entry (client, &drc->clients_head, client_list) { + gf_proc_dump_build_key (key, "client", "%d.ip-address", i); + memset (ip, 0, INET6_ADDRSTRLEN); + switch (client->sock_union.storage.ss_family) { + case AF_INET: + gf_proc_dump_write (key, "%s", inet_ntop (AF_INET, + &client->sock_union.sin.sin_addr.s_addr, + ip, INET_ADDRSTRLEN)); + break; + case AF_INET6: + gf_proc_dump_write (key, "%s", inet_ntop (AF_INET6, + &client->sock_union.sin6.sin6_addr, + ip, INET6_ADDRSTRLEN)); + break; + default: + gf_proc_dump_write (key, "%s", "N/A"); + } + + gf_proc_dump_build_key (key, "client", "%d.ref_count", i); + gf_proc_dump_write (key, "%d", client->ref); + gf_proc_dump_build_key (key, "client", "%d.op_count", i); + gf_proc_dump_write (key, "%d", client->op_count); + i++; + } + + UNLOCK (&drc->lock); + return 0; +} + +/** + * rpcsvc_drc_notify - function which is notified of RPC transport events + * + * @param svc - pointer to rpcsvc_t structure of the rpc + * @param xl - pointer to the xlator + * @param event - the event which triggered this notify + * @param data - the transport structure + * @return 0 on success, -1 on failure + */ +int +rpcsvc_drc_notify (rpcsvc_t *svc, void *xl, + rpcsvc_event_t event, void *data) +{ + int ret = -1; + rpc_transport_t *trans = NULL; + drc_client_t *client = NULL; + rpcsvc_drc_globals_t *drc = NULL; + + GF_ASSERT (svc); + GF_ASSERT (svc->drc); + GF_ASSERT (data); + + drc = svc->drc; + + if (drc->status == DRC_UNINITIATED || + drc->type == DRC_TYPE_NONE) + return 0; + + LOCK (&drc->lock); + + trans = (rpc_transport_t *)data; + client = rpcsvc_get_drc_client (drc, &trans->peerinfo.sockaddr); + if (!client) + goto out; + + switch (event) { + case RPCSVC_EVENT_ACCEPT: + trans->drc_client = rpcsvc_drc_client_ref (client); + ret = 0; + break; + + case RPCSVC_EVENT_DISCONNECT: + ret = 0; + if (list_empty (&drc->clients_head)) + break; + /* should be the last unref */ + rpcsvc_drc_client_unref (drc, client); + trans->drc_client = NULL; + break; + + default: + break; + } + + out: + UNLOCK (&drc->lock); + return ret; +} + +/** + * rpcsvc_drc_init - Initialize the duplicate request cache service + * + * @param svc - pointer to rpcsvc_t structure of the rpc + * @param options - the options dictionary which configures drc + * @return 0 on success, non-zero integer on failure + */ +int +rpcsvc_drc_init (rpcsvc_t *svc, dict_t *options) +{ + int ret = 0; + uint32_t drc_type = 0; + uint32_t drc_size = 0; + uint32_t drc_factor = 0; + rpcsvc_drc_globals_t *drc = NULL; + static gf_boolean_t drc_inited = _gf_false; + + GF_ASSERT (svc); + GF_ASSERT (options); + + /* Already inited */ + if (drc_inited) + return 0; + + if (!svc->drc) { + drc = GF_CALLOC (1, sizeof (rpcsvc_drc_globals_t), + gf_common_mt_drc_globals_t); + if (!drc) + return -1; + + svc->drc = drc; + LOCK_INIT (&drc->lock); + } else { + drc = svc->drc; + } + + LOCK (&drc->lock); + if (drc->type != DRC_TYPE_NONE) { + ret = 0; + goto out; + } + + /* Toggle DRC on/off, when more drc types(persistent/cluster) + are added, we shouldn't treat this as boolean */ + ret = dict_get_str_boolean (options, "nfs.drc", _gf_true); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_INFO, "drc user options need second look"); + ret = _gf_true; + } + drc->enable_drc = ret; + + if (ret == _gf_false) { + /* drc off */ + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "DRC is off"); + ret = 0; + goto out; + } + + /* Specify type of DRC to be used */ + ret = dict_get_uint32 (options, "nfs.drc-type", &drc_type); + if (ret) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc type not set." + " Continuing with default"); + drc_type = DRC_DEFAULT_TYPE; + } + + drc->type = drc_type; + + /* Set the global cache size (no. of ops to cache) */ + ret = dict_get_uint32 (options, "nfs.drc-size", &drc_size); + if (ret) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc size not set." + " Continuing with default size"); + drc_size = DRC_DEFAULT_CACHE_SIZE; + } + + drc->global_cache_size = drc_size; + + /* Mempool for cached ops */ + drc->mempool = mem_pool_new (drc_cached_op_t, drc->global_cache_size); + if (!drc->mempool) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get mempool for" + " DRC, drc-size: %d", drc->global_cache_size); + ret = -1; + goto out; + } + + /* What percent of cache to be evicted whenever it fills up */ + ret = dict_get_uint32 (options, "nfs.drc-lru-factor", &drc_factor); + if (ret) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc lru factor not set." + " Continuing with policy default"); + drc_factor = DRC_DEFAULT_LRU_FACTOR; + } + + drc->lru_factor = (drc_lru_factor_t) drc_factor; + + INIT_LIST_HEAD (&drc->clients_head); + INIT_LIST_HEAD (&drc->cache_head); + + ret = rpcsvc_register_notify (svc, rpcsvc_drc_notify, THIS); + if (ret) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, + "registration of drc_notify function failed"); + goto out; + } + + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc init successful"); + drc->status = DRC_INITIATED; + drc_inited = _gf_true; + + out: + UNLOCK (&drc->lock); + if (ret == -1) { + if (drc->mempool) { + mem_pool_destroy (drc->mempool); + drc->mempool = NULL; + } + GF_FREE (drc); + svc->drc = NULL; + } + return ret; +} + +int +rpcsvc_drc_reconfigure (rpcsvc_t *svc, dict_t *options) +{ + int ret = -1; + gf_boolean_t enable_drc = _gf_false; + rpcsvc_drc_globals_t *drc = NULL; + uint32_t drc_size = 0; + + if ((!svc) || (!options)) + return (-1); + + drc = svc->drc; + /* reconfig for drc-size */ + if (dict_get_uint32 (options, "nfs.drc-size", &drc_size)) + drc_size = DRC_DEFAULT_CACHE_SIZE; + + if (drc->global_cache_size != drc_size) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "nfs.drc-size size can not " + "be reconfigured without NFS server restart."); + return (-1); + } + + /* reconfig for nfs.drc */ + ret = dict_get_str_boolean (options, "nfs.drc", _gf_true); + if (ret < 0) { + ret = _gf_true; + } + enable_drc = ret; + + if (drc->enable_drc == enable_drc) + return 0; + + drc->enable_drc = enable_drc; + if (enable_drc) { + if (drc == NULL) + return rpcsvc_drc_init(svc, options); + } else { + if (drc == NULL) + return (0); + + LOCK (&drc->lock); + (void) rpcsvc_unregister_notify (svc, rpcsvc_drc_notify, THIS); + if (drc->mempool) { + mem_pool_destroy (drc->mempool); + drc->mempool = NULL; + } + UNLOCK (&drc->lock); + GF_FREE (drc); + svc->drc = NULL; + } + + return (0); +} diff --git a/rpc/rpc-lib/src/rpc-drc.h b/rpc/rpc-lib/src/rpc-drc.h new file mode 100644 index 000000000..7dfaef978 --- /dev/null +++ b/rpc/rpc-lib/src/rpc-drc.h @@ -0,0 +1,104 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef RPC_DRC_H +#define RPC_DRC_H + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "rpcsvc-common.h" +#include "rpcsvc.h" +#include "locking.h" +#include "dict.h" +#include "rb.h" + +/* per-client cache structure */ +struct drc_client { + uint32_t ref; + union gf_sock_union sock_union; + /* pointers to the cache */ + struct rb_table *rbtree; + /* no. of ops currently cached */ + uint32_t op_count; + struct list_head client_list; +}; + +struct drc_cached_op { + drc_op_state_t state; + uint32_t xid; + int prognum; + int progversion; + int procnum; + rpc_transport_msg_t msg; + drc_client_t *client; + struct list_head client_list; + struct list_head global_list; + int32_t ref; +}; + +/* global drc definitions */ +enum drc_status { + DRC_UNINITIATED, + DRC_INITIATED +}; +typedef enum drc_status drc_status_t; + +struct drc_globals { + /* allocator must be the first member since + * it is used so in gf_libavl_allocator + */ + struct libavl_allocator allocator; + drc_type_t type; + /* configurable size parameter */ + uint32_t global_cache_size; + drc_lru_factor_t lru_factor; + gf_lock_t lock; + drc_status_t status; + uint32_t op_count; + uint64_t cache_hits; + uint64_t intransit_hits; + struct mem_pool *mempool; + struct list_head cache_head; + uint32_t client_count; + struct list_head clients_head; + gf_boolean_t enable_drc; +}; + +int +rpcsvc_need_drc (rpcsvc_request_t *req); + +drc_cached_op_t * +rpcsvc_drc_lookup (rpcsvc_request_t *req); + +int +rpcsvc_send_cached_reply (rpcsvc_request_t *req, drc_cached_op_t *reply); + +int +rpcsvc_cache_reply (rpcsvc_request_t *req, struct iobref *iobref, + struct iovec *rpchdr, int rpchdrcount, + struct iovec *proghdr, int proghdrcount, + struct iovec *payload, int payloadcount); + +int +rpcsvc_cache_request (rpcsvc_request_t *req); + +int32_t +rpcsvc_drc_priv (rpcsvc_drc_globals_t *drc); + +int +rpcsvc_drc_init (rpcsvc_t *svc, dict_t *options); + +int +rpcsvc_drc_reconfigure (rpcsvc_t *svc, dict_t *options); + +#endif /* RPC_DRC_H */ diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index 2d64a1a1c..c24d41084 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -69,6 +69,19 @@ out: return ret; } +int +rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff) +{ + int ret = 0; + + if (!this->ops->throttle) + return -ENOSYS; + + ret = this->ops->throttle (this, onoff); + + return ret; +} + int32_t rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, size_t salen) @@ -282,7 +295,7 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name) } *VOID(&(trans->reconfigure)) = dlsym (handle, "reconfigure"); - if (trans->fini == NULL) { + if (trans->reconfigure == NULL) { gf_log ("rpc-transport", GF_LOG_DEBUG, "dlsym (gf_rpc_transport_reconfigure) on %s", dlerror()); } @@ -477,6 +490,8 @@ rpc_transport_unref (rpc_transport_t *this) if (this->mydata) this->notify (this, this->mydata, RPC_TRANSPORT_CLEANUP, NULL); + this->mydata = NULL; + this->notify = NULL; rpc_transport_destroy (this); } @@ -520,18 +535,6 @@ out: } -inline int -rpc_transport_unregister_notify (rpc_transport_t *trans) -{ - GF_VALIDATE_OR_GOTO ("rpc-transport", trans, out); - - trans->notify = NULL; - trans->mydata = NULL; - -out: - return 0; -} - //give negative values to skip setting that value //this function asserts if both the values are negative. @@ -559,6 +562,63 @@ out: } int +rpc_transport_unix_options_build (dict_t **options, char *filepath, + int frame_timeout) +{ + dict_t *dict = NULL; + char *fpath = NULL; + int ret = -1; + + GF_ASSERT (filepath); + GF_ASSERT (options); + + dict = dict_new (); + if (!dict) + goto out; + + fpath = gf_strdup (filepath); + if (!fpath) { + ret = -1; + goto out; + } + + ret = dict_set_dynstr (dict, "transport.socket.connect-path", fpath); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport.address-family", "unix"); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport.socket.nodelay", "off"); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport-type", "socket"); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport.socket.keepalive", "off"); + if (ret) + goto out; + + if (frame_timeout > 0) { + ret = dict_set_int32 (dict, "frame-timeout", frame_timeout); + if (ret) + goto out; + } + + *options = dict; +out: + if (ret) { + GF_FREE (fpath); + if (dict) + dict_unref (dict); + } + return ret; +} + +int rpc_transport_inet_options_build (dict_t **options, const char *hostname, int port) { diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index 272de9d7d..2db9072ae 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -186,16 +186,19 @@ struct rpc_transport { */ void *private; - void *xl_private; + struct _client_t *xl_private; void *xl; /* Used for THIS */ void *mydata; pthread_mutex_t lock; int32_t refcount; + int32_t outstanding_rpc_count; + glusterfs_ctx_t *ctx; dict_t *options; char *name; void *dnscache; + void *drc_client; data_t *buf; int32_t (*init) (rpc_transport_t *this); void (*fini) (rpc_transport_t *this); @@ -210,7 +213,7 @@ struct rpc_transport { struct list_head list; int bind_insecure; - void *dl_handle; /* handle of dlopen() */ + void *dl_handle; /* handle of dlopen() */ }; struct rpc_transport_ops { @@ -234,6 +237,7 @@ struct rpc_transport_ops { int32_t (*get_myaddr) (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, socklen_t sasize); + int32_t (*throttle) (rpc_transport_t *this, gf_boolean_t onoff); }; @@ -273,9 +277,6 @@ int rpc_transport_register_notify (rpc_transport_t *trans, rpc_transport_notify_t, void *mydata); -int -rpc_transport_unregister_notify (rpc_transport_t *trans); - int32_t rpc_transport_get_peername (rpc_transport_t *this, char *hostname, int hostlen); @@ -290,6 +291,9 @@ int32_t rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, size_t salen); +int +rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff); + rpc_transport_pollin_t * rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector, int count, struct iobuf *hdr_iobuf, @@ -302,5 +306,9 @@ rpc_transport_keepalive_options_set (dict_t *options, int32_t interval, int32_t time); int +rpc_transport_unix_options_build (dict_t **options, char *filepath, + int frame_timeout); + +int rpc_transport_inet_options_build (dict_t **options, const char *hostname, int port); #endif /* __RPC_TRANSPORT_H__ */ diff --git a/rpc/rpc-lib/src/rpcsvc-auth.c b/rpc/rpc-lib/src/rpcsvc-auth.c index 108279456..4cb86a758 100644 --- a/rpc/rpc-lib/src/rpcsvc-auth.c +++ b/rpc/rpc-lib/src/rpcsvc-auth.c @@ -178,6 +178,29 @@ err: } int +rpcsvc_set_addr_namelookup (rpcsvc_t *svc, dict_t *options) +{ + int ret; + static char *addrlookup_key = "rpc-auth.addr.namelookup"; + + if (!svc || !options) + return (-1); + + /* By default it's disabled */ + ret = dict_get_str_boolean (options, addrlookup_key, _gf_false); + if (ret < 0) { + svc->addr_namelookup = _gf_false; + } else { + svc->addr_namelookup = ret; + } + + if (svc->addr_namelookup) + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Addr-Name lookup enabled"); + + return (0); +} + +int rpcsvc_set_allow_insecure (rpcsvc_t *svc, dict_t *options) { int ret = -1; @@ -233,6 +256,7 @@ rpcsvc_auth_init (rpcsvc_t *svc, dict_t *options) (void) rpcsvc_set_allow_insecure (svc, options); (void) rpcsvc_set_root_squash (svc, options); + (void) rpcsvc_set_addr_namelookup (svc, options); ret = rpcsvc_auth_add_initers (svc); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to add initers"); @@ -249,6 +273,25 @@ out: return ret; } +int +rpcsvc_auth_reconf (rpcsvc_t *svc, dict_t *options) +{ + int ret = 0; + + if ((!svc) || (!options)) + return (-1); + + ret = rpcsvc_set_allow_insecure (svc, options); + if (ret) + return (-1); + + ret = rpcsvc_set_root_squash (svc, options); + if (ret) + return (-1); + + return rpcsvc_set_addr_namelookup (svc, options); +} + rpcsvc_auth_t * __rpcsvc_auth_get_handler (rpcsvc_request_t *req) @@ -327,6 +370,9 @@ rpcsvc_auth_request_init (rpcsvc_request_t *req) if (!auth->authops->request_init) ret = auth->authops->request_init (req, auth->authprivate); + req->auxgids = req->auxgidsmall; /* reset to auxgidlarge during + unsersialize if necessary */ + req->auxgidlarge = NULL; err: return ret; } diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h index 2c6f07488..aed55e039 100644 --- a/rpc/rpc-lib/src/rpcsvc-common.h +++ b/rpc/rpc-lib/src/rpcsvc-common.h @@ -30,6 +30,8 @@ struct rpcsvc_state; typedef int (*rpcsvc_notify_t) (struct rpcsvc_state *, void *mydata, rpcsvc_event_t, void *data); +struct drc_globals; +typedef struct drc_globals rpcsvc_drc_globals_t; /* Contains global state required for all the RPC services. */ @@ -50,25 +52,75 @@ typedef struct rpcsvc_state { dict_t *options; /* Allow insecure ports. */ - int allow_insecure; + gf_boolean_t allow_insecure; gf_boolean_t register_portmap; gf_boolean_t root_squash; glusterfs_ctx_t *ctx; /* list of connections which will listen for incoming connections */ - struct list_head listeners; + struct list_head listeners; /* list of programs registered with rpcsvc */ - struct list_head programs; + struct list_head programs; /* list of notification callbacks */ - struct list_head notify; - int notify_count; + struct list_head notify; + int notify_count; void *mydata; /* This is xlator */ - rpcsvc_notify_t notifyfn; + rpcsvc_notify_t notifyfn; struct mem_pool *rxpool; + rpcsvc_drc_globals_t *drc; + + /* per-client limit of outstanding rpc requests */ + int outstanding_rpc_limit; + gf_boolean_t addr_namelookup; } rpcsvc_t; +/* DRC START */ +enum drc_op_type { + DRC_NA = 0, + DRC_IDEMPOTENT = 1, + DRC_NON_IDEMPOTENT = 2 +}; +typedef enum drc_op_type drc_op_type_t; + +enum drc_type { + DRC_TYPE_NONE = 0, + DRC_TYPE_IN_MEMORY = 1 +}; +typedef enum drc_type drc_type_t; + +enum drc_lru_factor { + DRC_LRU_5_PC = 20, + DRC_LRU_10_PC = 10, + DRC_LRU_25_PC = 4, + DRC_LRU_50_PC = 2 +}; +typedef enum drc_lru_factor drc_lru_factor_t; + +enum drc_xid_state { + DRC_XID_MONOTONOUS = 0, + DRC_XID_WRAPPED = 1 +}; +typedef enum drc_xid_state drc_xid_state_t; + +enum drc_op_state { + DRC_OP_IN_TRANSIT = 0, + DRC_OP_CACHED = 1 +}; +typedef enum drc_op_state drc_op_state_t; + +enum drc_policy { + DRC_LRU = 0 +}; +typedef enum drc_policy drc_policy_t; + +/* Default policies for DRC */ +#define DRC_DEFAULT_TYPE DRC_TYPE_IN_MEMORY +#define DRC_DEFAULT_CACHE_SIZE 0x20000 +#define DRC_DEFAULT_LRU_FACTOR DRC_LRU_25_PC + +/* DRC END */ #endif /* #ifndef _RPCSVC_COMMON_H */ diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index db8ce2f3f..037c157f2 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -28,6 +28,7 @@ #include "xdr-generic.h" #include "rpc-common-xdr.h" #include "syncop.h" +#include "rpc-drc.h" #include <errno.h> #include <pthread.h> @@ -41,8 +42,7 @@ #include <stdio.h> #include "xdr-rpcclnt.h" - -#define ACL_PROGRAM 100227 +#include "glusterfs-acl.h" struct rpcsvc_program gluster_dump_prog; @@ -129,6 +129,37 @@ rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum, return NULL; } +int +rpcsvc_request_outstanding (rpcsvc_t *svc, rpc_transport_t *trans, int delta) +{ + int ret = 0; + int old_count = 0; + int new_count = 0; + int limit = 0; + + pthread_mutex_lock (&trans->lock); + { + limit = svc->outstanding_rpc_limit; + if (!limit) + goto unlock; + + old_count = trans->outstanding_rpc_count; + trans->outstanding_rpc_count += delta; + new_count = trans->outstanding_rpc_count; + + if (old_count <= limit && new_count > limit) + ret = rpc_transport_throttle (trans, _gf_true); + + if (old_count > limit && new_count <= limit) + ret = rpc_transport_throttle (trans, _gf_false); + } +unlock: + pthread_mutex_unlock (&trans->lock); + + return ret; +} + + /* This needs to change to returning errors, since * we need to return RPC specific error messages when some * of the pointers below are NULL. @@ -279,8 +310,17 @@ rpcsvc_request_destroy (rpcsvc_request_t *req) if (req->hdr_iobuf) iobuf_unref (req->hdr_iobuf); + /* This marks the "end" of an RPC request. Reply is + completely written to the socket and is on the way + to the client. It is time to decrement the + outstanding request counter by 1. + */ + rpcsvc_request_outstanding (req->svc, req->trans, -1); + rpc_transport_unref (req->trans); + GF_FREE (req->auxgidlarge); + mem_put (req); out: @@ -363,6 +403,12 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, goto err; } + /* We just received a new request from the wire. Account for + it in the outsanding request counter to make sure we don't + ingest too many concurrent requests from the same client. + */ + ret = rpcsvc_request_outstanding (svc, trans, +1); + msgbuf = msg->vector[0].iov_base; msglen = msg->vector[0].iov_len; @@ -422,6 +468,7 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, * since we are not handling authentication failures for now. */ req->rpc_status = MSG_ACCEPTED; + req->reply = NULL; ret = 0; err: if (ret == -1) { @@ -461,13 +508,15 @@ int rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, rpc_transport_pollin_t *msg) { - rpcsvc_actor_t *actor = NULL; - rpcsvc_actor actor_fn = NULL; - rpcsvc_request_t *req = NULL; - int ret = -1; - uint16_t port = 0; - gf_boolean_t is_unix = _gf_false; - gf_boolean_t unprivileged = _gf_false; + rpcsvc_actor_t *actor = NULL; + rpcsvc_actor actor_fn = NULL; + rpcsvc_request_t *req = NULL; + int ret = -1; + uint16_t port = 0; + gf_boolean_t is_unix = _gf_false; + gf_boolean_t unprivileged = _gf_false; + drc_cached_op_t *reply = NULL; + rpcsvc_drc_globals_t *drc = NULL; if (!trans || !svc) return -1; @@ -503,7 +552,7 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, req = rpcsvc_request_create (svc, trans, msg); if (!req) - goto err; + goto out; if (!rpcsvc_request_accepted (req)) goto err_reply; @@ -521,6 +570,39 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, return -1; } + /* DRC */ + if (rpcsvc_need_drc (req)) { + drc = req->svc->drc; + + LOCK (&drc->lock); + reply = rpcsvc_drc_lookup (req); + + /* retransmission of completed request, send cached reply */ + if (reply && reply->state == DRC_OP_CACHED) { + gf_log (GF_RPCSVC, GF_LOG_INFO, "duplicate request:" + " XID: 0x%x", req->xid); + ret = rpcsvc_send_cached_reply (req, reply); + drc->cache_hits++; + UNLOCK (&drc->lock); + goto out; + + } /* retransmitted request, original op in transit, drop it */ + else if (reply && reply->state == DRC_OP_IN_TRANSIT) { + gf_log (GF_RPCSVC, GF_LOG_INFO, "op in transit," + " discarding. XID: 0x%x", req->xid); + ret = 0; + drc->intransit_hits++; + rpcsvc_request_destroy (req); + UNLOCK (&drc->lock); + goto out; + + } /* fresh request, cache it as in-transit and proceed */ + else { + ret = rpcsvc_cache_request (req); + } + UNLOCK (&drc->lock); + } + if (req->rpc_err == SUCCESS) { /* Before going to xlator code, set the THIS properly */ THIS = svc->mydata; @@ -547,7 +629,6 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, req); } else { ret = actor_fn (req); - req->hdr_iobuf = NULL; } } @@ -558,7 +639,7 @@ err_reply: * has now been queued. */ ret = 0; -err: +out: return ret; } @@ -905,21 +986,22 @@ out: return ret; } -static inline int -rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec, - int hdrcount, struct iovec *proghdr, int proghdrcount, - struct iovec *progpayload, int progpayloadcount, - struct iobref *iobref, void *priv) +int +rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr, + int rpchdrcount, struct iovec *proghdr, + int proghdrcount, struct iovec *progpayload, + int progpayloadcount, struct iobref *iobref, + void *priv) { int ret = -1; rpc_transport_reply_t reply = {{0, }}; - if ((!trans) || (!hdrvec) || (!hdrvec->iov_base)) { + if ((!trans) || (!rpchdr) || (!rpchdr->iov_base)) { goto out; } - reply.msg.rpchdr = hdrvec; - reply.msg.rpchdrcount = hdrcount; + reply.msg.rpchdr = rpchdr; + reply.msg.rpchdrcount = rpchdrcount; reply.msg.proghdr = proghdr; reply.msg.proghdrcount = proghdrcount; reply.msg.progpayload = progpayload; @@ -1065,6 +1147,7 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, size_t msglen = 0; size_t hdrlen = 0; char new_iobref = 0; + rpcsvc_drc_globals_t *drc = NULL; if ((!req) || (!req->trans)) return -1; @@ -1099,6 +1182,17 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, iobref_add (iobref, replyiob); + /* cache the request in the duplicate request cache for appropriate ops */ + if (req->reply) { + drc = req->svc->drc; + + LOCK (&drc->lock); + ret = rpcsvc_cache_reply (req, iobref, &recordhdr, 1, + proghdr, hdrcount, + payload, payloadcount); + UNLOCK (&drc->lock); + } + ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount, payload, payloadcount, iobref, req->trans_private); @@ -1155,12 +1249,13 @@ rpcsvc_error_reply (rpcsvc_request_t *req) inline int rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port) { - int ret = 0; + int ret = -1; /* FAIL */ if (!newprog) { goto out; } + /* pmap_set() returns 0 for FAIL and 1 for SUCCESS */ if (!(pmap_set (newprog->prognum, newprog->progver, IPPROTO_TCP, port))) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register with" @@ -1168,16 +1263,16 @@ rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port) goto out; } - ret = 0; + ret = 0; /* SUCCESS */ out: return ret; } -static inline int +inline int rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog) { - int ret = 0; + int ret = -1; if (!prog) goto out; @@ -1795,12 +1890,92 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options) gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Portmap registration " "disabled"); - ret = 0; + ret = rpcsvc_set_outstanding_rpc_limit (svc, options); out: return ret; } int +rpcsvc_reconfigure_options (rpcsvc_t *svc, dict_t *options) +{ + xlator_t *xlator = NULL; + xlator_list_t *volentry = NULL; + char *srchkey = NULL; + char *keyval = NULL; + int ret = -1; + + if ((!svc) || (!svc->options) || (!options)) + return (-1); + + /* Fetch the xlator from svc */ + xlator = (xlator_t *) svc->mydata; + if (!xlator) + return (-1); + + /* Reconfigure the volume specific rpc-auth.addr allow part */ + volentry = xlator->children; + while (volentry) { + ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.allow", + volentry->xlator->name); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); + return (-1); + } + + /* If found the srchkey, delete old key/val pair + * and set the key with new value. + */ + if (!dict_get_str (options, srchkey, &keyval)) { + dict_del (svc->options, srchkey); + ret = dict_set_str (svc->options, srchkey, keyval); + if (ret < 0) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, + "dict_set_str error"); + GF_FREE (srchkey); + return (-1); + } + } + + GF_FREE (srchkey); + volentry = volentry->next; + } + + /* Reconfigure the volume specific rpc-auth.addr reject part */ + volentry = xlator->children; + while (volentry) { + ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.reject", + volentry->xlator->name); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); + return (-1); + } + + /* If found the srchkey, delete old key/val pair + * and set the key with new value. + */ + if (!dict_get_str (options, srchkey, &keyval)) { + dict_del (svc->options, srchkey); + ret = dict_set_str (svc->options, srchkey, keyval); + if (ret < 0) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, + "dict_set_str error"); + GF_FREE (srchkey); + return (-1); + } + } + + GF_FREE (srchkey); + volentry = volentry->next; + } + + ret = rpcsvc_init_options (svc, options); + if (ret) + return (-1); + + return rpcsvc_auth_reconf (svc, options); +} + +int rpcsvc_transport_unix_options_build (dict_t **options, char *filepath) { dict_t *dict = NULL; @@ -1846,6 +2021,48 @@ out: return ret; } +/* + * Reconfigure() the rpc.outstanding-rpc-limit param. + */ +int +rpcsvc_set_outstanding_rpc_limit (rpcsvc_t *svc, dict_t *options) +{ + int ret = -1; /* FAILURE */ + int rpclim = 0; + static char *rpclimkey = "rpc.outstanding-rpc-limit"; + + if ((!svc) || (!options)) + return (-1); + + /* Reconfigure() the rpc.outstanding-rpc-limit param */ + ret = dict_get_int32 (options, rpclimkey, &rpclim); + if (ret < 0) { + /* Fall back to default for FAILURE */ + rpclim = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT; + } else { + /* SUCCESS: round off to multiple of 8. + * If the input value fails Boundary check, fall back to + * default i.e. RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT. + * NB: value 0 is special, means its unset i.e. unlimited. + */ + rpclim = ((rpclim + 8 - 1) >> 3) * 8; + if (rpclim < RPCSVC_MIN_OUTSTANDING_RPC_LIMIT) { + rpclim = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT; + } else if (rpclim > RPCSVC_MAX_OUTSTANDING_RPC_LIMIT) { + rpclim = RPCSVC_MAX_OUTSTANDING_RPC_LIMIT; + } + } + + if (svc->outstanding_rpc_limit != rpclim) { + svc->outstanding_rpc_limit = rpclim; + gf_log (GF_RPCSVC, GF_LOG_INFO, + "Configured %s with value %d", + rpclimkey, rpclim); + } + + return (0); +} + /* The global RPC service initializer. */ rpcsvc_t * @@ -1906,6 +2123,7 @@ rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options, "failed to register DUMP program"); goto free_svc; } + ret = 0; free_svc: if (ret == -1) { @@ -1976,7 +2194,7 @@ err: } -int +static int rpcsvc_transport_peer_check_allow (dict_t *options, char *volname, char *ip, char *hostname) { @@ -2005,7 +2223,7 @@ out: return ret; } -int +static int rpcsvc_transport_peer_check_reject (dict_t *options, char *volname, char *ip, char *hostname) { @@ -2057,7 +2275,7 @@ rpcsvc_combine_allow_reject_volume_check (int allow, int reject) } int -rpcsvc_auth_check (dict_t *options, char *volname, +rpcsvc_auth_check (rpcsvc_t *svc, char *volname, rpc_transport_t *trans) { int ret = RPCSVC_AUTH_REJECT; @@ -2066,8 +2284,17 @@ rpcsvc_auth_check (dict_t *options, char *volname, char *hostname = NULL; char *ip = NULL; char client_ip[RPCSVC_PEER_STRLEN] = {0}; + char *allow_str = NULL; + char *reject_str = NULL; + char *srchstr = NULL; + dict_t *options = NULL; - if (!options || !volname || !trans) + if (!svc || !volname || !trans) + return ret; + + /* Fetch the options from svc struct and validate */ + options = svc->options; + if (!options) return ret; ret = rpcsvc_transport_peername (trans, client_ip, RPCSVC_PEER_STRLEN); @@ -2077,12 +2304,51 @@ rpcsvc_auth_check (dict_t *options, char *volname, return RPCSVC_AUTH_REJECT; } - get_host_name (client_ip, &ip); + /* Accept if its the default case: Allow all, Reject none + * The default volfile always contains a 'allow *' rule + * for each volume. If allow rule is missing (which implies + * there is some bad volfile generating code doing this), we + * assume no one is allowed mounts, and thus, we reject mounts. + */ + ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); + return RPCSVC_AUTH_REJECT; + } - /* addr-namelookup disabled by default */ - ret = dict_get_str_boolean (options, "rpc-auth.addr.namelookup", 0); - if (ret == _gf_true) - gf_get_hostname_from_ip (ip, &hostname); + ret = dict_get_str (options, srchstr, &allow_str); + GF_FREE (srchstr); + if (ret < 0) + return RPCSVC_AUTH_REJECT; + + ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject", volname); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); + return RPCSVC_AUTH_REJECT; + } + + ret = dict_get_str (options, srchstr, &reject_str); + GF_FREE (srchstr); + if (reject_str == NULL && !strcmp ("*", allow_str)) + return RPCSVC_AUTH_ACCEPT; + + /* Non-default rule, authenticate */ + if (!get_host_name (client_ip, &ip)) + ip = client_ip; + + /* addr-namelookup check */ + if (svc->addr_namelookup == _gf_true) { + ret = gf_get_hostname_from_ip (ip, &hostname); + if (ret) { + if (hostname) + GF_FREE (hostname); + /* failed to get hostname, but hostname auth + * is enabled, so authentication will not be + * 100% correct. reject mounts + */ + return RPCSVC_AUTH_REJECT; + } + } accept = rpcsvc_transport_peer_check_allow (options, volname, ip, hostname); @@ -2090,6 +2356,8 @@ rpcsvc_auth_check (dict_t *options, char *volname, reject = rpcsvc_transport_peer_check_reject (options, volname, ip, hostname); + if (hostname) + GF_FREE (hostname); return rpcsvc_combine_allow_reject_volume_check (accept, reject); } @@ -2197,9 +2465,9 @@ out: rpcsvc_actor_t gluster_dump_actors[] = { - [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0}, - [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0}, - [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0}, + [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA}, + [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA}, + [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0, DRC_NA}, }; diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index afa7c9926..cbc1f4226 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -38,6 +38,10 @@ #define MAX_IOVEC 16 #endif +#define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT 64 +#define RPCSVC_MAX_OUTSTANDING_RPC_LIMIT 65536 +#define RPCSVC_MIN_OUTSTANDING_RPC_LIMIT 0 /* No limit i.e. Unlimited */ + #define GF_RPCSVC "rpc-service" #define RPCSVC_THREAD_STACK_SIZE ((size_t)(1024 * GF_UNIT_KB)) @@ -140,6 +144,9 @@ typedef struct rpcsvc_auth_data { #define rpcsvc_auth_flavour(au) ((au).flavour) +typedef struct drc_client drc_client_t; +typedef struct drc_cached_op drc_cached_op_t; + /* The container for the RPC call handed up to an actor. * Dynamically allocated. Lives till the call reply is completely * transmitted. @@ -178,7 +185,9 @@ struct rpcsvc_request { /* Might want to move this to AUTH_UNIX specific state since this array * is not available for every authentication scheme. */ - gid_t auxgids[GF_MAX_AUX_GROUPS]; + gid_t *auxgids; + gid_t auxgidsmall[SMALL_GROUP_COUNT]; + gid_t *auxgidlarge; int auxgidcount; @@ -241,6 +250,9 @@ struct rpcsvc_request { /* we need to ref the 'iobuf' in case of 'synctasking' it */ struct iobuf *hdr_iobuf; + + /* pointer to cached reply for use in DRC */ + drc_cached_op_t *reply; }; #define rpcsvc_request_program(req) ((rpcsvc_program_t *)((req)->prog)) @@ -314,7 +326,6 @@ typedef void *(*rpcsvc_encode_reply) (void *msg); */ typedef void (*rpcsvc_deallocate_reply) (void *msg); - #define RPCSVC_NAME_MAX 32 /* The descriptor for each procedure/actor that runs * over the RPC service. @@ -336,6 +347,7 @@ typedef struct rpcsvc_actor_desc { /* Can actor be ran on behalf an unprivileged requestor? */ gf_boolean_t unprivileged; + drc_op_type_t op_type; } rpcsvc_actor_t; /* Describes a program and its version along with the function pointers @@ -429,6 +441,9 @@ extern int rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port); extern int +rpcsvc_program_unregister_portmap (rpcsvc_program_t *newprog); + +extern int rpcsvc_register_portmap_enabled (rpcsvc_t *svc); /* Inits the global RPC service data structures. @@ -438,6 +453,9 @@ extern rpcsvc_t * rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options, uint32_t poolcount); +extern int +rpcsvc_reconfigure_options (rpcsvc_t *svc, dict_t *options); + int rpcsvc_register_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata); @@ -448,6 +466,13 @@ int rpcsvc_unregister_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata); int +rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr, + int rpchdrcount, struct iovec *proghdr, + int proghdrcount, struct iovec *progpayload, + int progpayloadcount, struct iobref *iobref, + void *priv); + +int rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr, int hdrcount, struct iovec *payload, int payloadcount, struct iobref *iobref); @@ -473,8 +498,7 @@ rpcsvc_transport_peeraddr (rpc_transport_t *trans, char *addrstr, int addrlen, struct sockaddr_storage *returnsa, socklen_t sasize); extern int -rpcsvc_auth_check (dict_t *options, char *volname, - rpc_transport_t *trans); +rpcsvc_auth_check (rpcsvc_t *svc, char *volname, rpc_transport_t *trans); extern int rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname, @@ -535,6 +559,9 @@ extern int rpcsvc_auth_init (rpcsvc_t *svc, dict_t *options); extern int +rpcsvc_auth_reconf (rpcsvc_t *svc, dict_t *options); + +extern int rpcsvc_auth_transport_init (rpc_transport_t *xprt); extern int @@ -558,18 +585,22 @@ int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, rpcsvc_cbk_program_t *prog, int procnum, struct iovec *proghdr, int proghdrcount); +rpcsvc_actor_t * +rpcsvc_program_actor (rpcsvc_request_t *req); + int rpcsvc_transport_unix_options_build (dict_t **options, char *filepath); int rpcsvc_set_allow_insecure (rpcsvc_t *svc, dict_t *options); int +rpcsvc_set_addr_namelookup (rpcsvc_t *svc, dict_t *options); +int rpcsvc_set_root_squash (rpcsvc_t *svc, dict_t *options); int +rpcsvc_set_outstanding_rpc_limit (rpcsvc_t *svc, dict_t *options); +int rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen); -char * -rpcsvc_volume_allowed (dict_t *options, char *volname); rpcsvc_vector_sizer rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum, uint32_t progver, uint32_t procnum); - #endif diff --git a/rpc/rpc-lib/src/xdr-rpc.c b/rpc/rpc-lib/src/xdr-rpc.c index ef52764c3..adb48a531 100644 --- a/rpc/rpc-lib/src/xdr-rpc.c +++ b/rpc/rpc-lib/src/xdr-rpc.c @@ -34,7 +34,7 @@ xdr_to_rpc_call (char *msgbuf, size_t len, struct rpc_msg *call, struct iovec *payload, char *credbytes, char *verfbytes) { XDR xdr; - char opaquebytes[MAX_AUTH_BYTES]; + char opaquebytes[GF_MAX_AUTH_BYTES]; struct opaque_auth *oa = NULL; int ret = -1; |
