diff options
Diffstat (limited to 'rpc/rpc-lib/src')
| -rw-r--r-- | rpc/rpc-lib/src/Makefile.am | 8 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/auth-glusterfs.c | 32 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/auth-unix.c | 1 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/protocol-common.h | 36 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 145 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.h | 7 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-drc.c | 872 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-drc.h | 104 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 104 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 18 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc-auth.c | 125 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc-common.h | 64 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 822 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 49 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/xdr-rpc.c | 2 |
15 files changed, 1770 insertions, 619 deletions
diff --git a/rpc/rpc-lib/src/Makefile.am b/rpc/rpc-lib/src/Makefile.am index ca62a27f9..f19c3c8a4 100644 --- a/rpc/rpc-lib/src/Makefile.am +++ b/rpc/rpc-lib/src/Makefile.am @@ -1,16 +1,18 @@ lib_LTLIBRARIES = libgfrpc.la libgfrpc_la_SOURCES = auth-unix.c rpcsvc-auth.c rpcsvc.c auth-null.c \ - rpc-transport.c xdr-rpc.c xdr-rpcclnt.c rpc-clnt.c auth-glusterfs.c + rpc-transport.c xdr-rpc.c xdr-rpcclnt.c rpc-clnt.c auth-glusterfs.c \ + rpc-drc.c libgfrpc_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la noinst_HEADERS = rpcsvc.h rpc-transport.h xdr-common.h xdr-rpc.h xdr-rpcclnt.h \ - rpc-clnt.h rpcsvc-common.h protocol-common.h + rpc-clnt.h rpcsvc-common.h protocol-common.h rpc-drc.h AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \ -I$(top_srcdir)/rpc/xdr/src \ - -DRPC_TRANSPORTDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport\" + -DRPC_TRANSPORTDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport\" \ + -I$(top_srcdir)/contrib/rbtree AM_CFLAGS = -Wall $(GF_CFLAGS) diff --git a/rpc/rpc-lib/src/auth-glusterfs.c b/rpc/rpc-lib/src/auth-glusterfs.c index 9c6f8385b..db488434c 100644 --- a/rpc/rpc-lib/src/auth-glusterfs.c +++ b/rpc/rpc-lib/src/auth-glusterfs.c @@ -96,6 +96,22 @@ int auth_glusterfs_authenticate (rpcsvc_request_t *req, void *priv) goto err; } + if (req->auxgidcount > SMALL_GROUP_COUNT) { + req->auxgidlarge = GF_CALLOC(req->auxgidcount, + sizeof(req->auxgids[0]), + gf_common_mt_auxgids); + req->auxgids = req->auxgidlarge; + } else { + req->auxgids = req->auxgidsmall; + } + + if (!req->auxgids) { + gf_log ("auth-glusterfs", GF_LOG_WARNING, + "cannot allocate gid list"); + ret = RPCSVC_AUTH_REJECT; + goto err; + } + for (gidcount = 0; gidcount < au.ngrps; ++gidcount) req->auxgids[gidcount] = au.groups[gidcount]; @@ -203,6 +219,22 @@ int auth_glusterfs_v2_authenticate (rpcsvc_request_t *req, void *priv) goto err; } + if (req->auxgidcount > SMALL_GROUP_COUNT) { + req->auxgidlarge = GF_CALLOC(req->auxgidcount, + sizeof(req->auxgids[0]), + gf_common_mt_auxgids); + req->auxgids = req->auxgidlarge; + } else { + req->auxgids = req->auxgidsmall; + } + + if (!req->auxgids) { + gf_log ("auth-glusterfs-v2", GF_LOG_WARNING, + "cannot allocate gid list"); + ret = RPCSVC_AUTH_REJECT; + goto err; + } + for (i = 0; i < req->auxgidcount; ++i) req->auxgids[i] = au.groups.groups_val[i]; diff --git a/rpc/rpc-lib/src/auth-unix.c b/rpc/rpc-lib/src/auth-unix.c index 6251d60a8..fa5f0576e 100644 --- a/rpc/rpc-lib/src/auth-unix.c +++ b/rpc/rpc-lib/src/auth-unix.c @@ -42,6 +42,7 @@ int auth_unix_authenticate (rpcsvc_request_t *req, void *priv) if (!req) return ret; + req->auxgids = req->auxgidsmall; ret = xdr_to_auth_unix_cred (req->cred.authdata, req->cred.datalen, &aup, machname, req->auxgids); if (ret == -1) { diff --git a/rpc/rpc-lib/src/protocol-common.h b/rpc/rpc-lib/src/protocol-common.h index 97017e5fe..8bef906cc 100644 --- a/rpc/rpc-lib/src/protocol-common.h +++ b/rpc/rpc-lib/src/protocol-common.h @@ -56,6 +56,9 @@ enum gf_fop_procnum { GFS3_OP_RELEASE, GFS3_OP_RELEASEDIR, GFS3_OP_FREMOVEXATTR, + GFS3_OP_FALLOCATE, + GFS3_OP_DISCARD, + GFS3_OP_ZEROFILL, GFS3_OP_MAXVALUE, } ; @@ -154,7 +157,10 @@ enum gluster_cli_procnum { GLUSTER_CLI_LIST_VOLUME, GLUSTER_CLI_CLRLOCKS_VOLUME, GLUSTER_CLI_UUID_RESET, - GLUSTER_CLI_BD_OP, + GLUSTER_CLI_UUID_GET, + GLUSTER_CLI_COPY_FILE, + GLUSTER_CLI_SYS_EXEC, + GLUSTER_CLI_SNAP, GLUSTER_CLI_MAXVALUE, }; @@ -186,7 +192,7 @@ enum glusterd_brick_procnum { GLUSTERD_BRICK_XLATOR_DEFRAG, GLUSTERD_NODE_PROFILE, GLUSTERD_NODE_STATUS, - GLUSTERD_BRICK_BD_OP, + GLUSTERD_VOLUME_BARRIER_OP, GLUSTERD_BRICK_MAXVALUE, }; @@ -204,16 +210,22 @@ typedef enum { GF_AFR_OP_INDEX_SUMMARY, GF_AFR_OP_HEALED_FILES, GF_AFR_OP_HEAL_FAILED_FILES, - GF_AFR_OP_SPLIT_BRAIN_FILES + GF_AFR_OP_SPLIT_BRAIN_FILES, + GF_AFR_OP_STATISTICS, + GF_AFR_OP_STATISTICS_HEAL_COUNT, + GF_AFR_OP_STATISTICS_HEAL_COUNT_PER_REPLICA, } gf_xl_afr_op_t ; -typedef enum { - GF_BD_OP_INVALID, - GF_BD_OP_NEW_BD, - GF_BD_OP_DELETE_BD, - GF_BD_OP_CLONE_BD, - GF_BD_OP_SNAPSHOT_BD, -} gf_xl_bd_op_t ; +enum glusterd_mgmt_v3_procnum { + GLUSTERD_MGMT_V3_NULL, /* 0 */ + GLUSTERD_MGMT_V3_LOCK, + GLUSTERD_MGMT_V3_PRE_VALIDATE, + GLUSTERD_MGMT_V3_BRICK_OP, + GLUSTERD_MGMT_V3_COMMIT, + GLUSTERD_MGMT_V3_POST_VALIDATE, + GLUSTERD_MGMT_V3_UNLOCK, + GLUSTERD_MGMT_V3_MAXVALUE, +}; #define GLUSTER_HNDSK_PROGRAM 14398633 /* Completely random */ #define GLUSTER_HNDSK_VERSION 2 /* 0.0.2 */ @@ -241,6 +253,10 @@ typedef enum { #define GD_BRICK_PROGRAM 4867634 /*Completely random*/ #define GD_BRICK_VERSION 2 +/* Third version */ +#define GD_MGMT_V3_PROGRAM 2210013 /* Completely random */ +#define GD_MGMT_V3_VERSION 3 + /* OP-VERSION handshake */ #define GD_MGMT_HNDSK_PROGRAM 1239873 /* Completely random */ #define GD_MGMT_HNDSK_VERSION 1 diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index e6c681df8..ac98a5c91 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -145,7 +145,7 @@ call_bail (void *data) struct saved_frame *trav = NULL; struct saved_frame *tmp = NULL; char frame_sent[256] = {0,}; - struct timeval timeout = {0,}; + struct timespec timeout = {0,}; struct iovec iov = {0,}; GF_VALIDATE_OR_GOTO ("client", data, out); @@ -163,7 +163,7 @@ call_bail (void *data) call-once timer */ if (conn->timer) { timeout.tv_sec = 10; - timeout.tv_usec = 0; + timeout.tv_nsec = 0; gf_timer_call_cancel (clnt->ctx, conn->timer); conn->timer = gf_timer_call_after (clnt->ctx, @@ -173,7 +173,8 @@ call_bail (void *data) if (conn->timer == NULL) { gf_log (conn->trans->name, GF_LOG_WARNING, - "Cannot create bailout timer"); + "Cannot create bailout timer for %s", + conn->trans->peerinfo.identifier); } } @@ -197,14 +198,14 @@ call_bail (void *data) ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec); gf_log (conn->trans->name, GF_LOG_ERROR, - "bailing out frame type(%s) op(%s(%d)) xid = 0x%ux " - "sent = %s. timeout = %d", + "bailing out frame type(%s) op(%s(%d)) xid = 0x%x " + "sent = %s. timeout = %d for %s", trav->rpcreq->prog->progname, (trav->rpcreq->prog->procnames) ? trav->rpcreq->prog->procnames[trav->rpcreq->procnum] : "--", trav->rpcreq->procnum, trav->rpcreq->xid, frame_sent, - conn->frame_timeout); + conn->frame_timeout, conn->trans->peerinfo.identifier); clnt = rpc_clnt_ref (clnt); trav->rpcreq->rpc_status = -1; @@ -226,7 +227,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, struct rpc_req *rpcreq) { rpc_clnt_connection_t *conn = NULL; - struct timeval timeout = {0, }; + struct timespec timeout = {0, }; struct saved_frame *saved_frame = NULL; conn = &rpc_clnt->conn; @@ -240,7 +241,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, /* TODO: make timeout configurable */ if (conn->timer == NULL) { timeout.tv_sec = 10; - timeout.tv_usec = 0; + timeout.tv_nsec = 0; conn->timer = gf_timer_call_after (rpc_clnt->ctx, timeout, call_bail, @@ -359,7 +360,7 @@ saved_frames_unwind (struct saved_frames *saved_frames) gf_log_callingfn (trav->rpcreq->conn->trans->name, GF_LOG_ERROR, "forced unwinding frame type(%s) op(%s(%d)) " - "called at %s (xid=0x%ux)", + "called at %s (xid=0x%x)", trav->rpcreq->prog->progname, ((trav->rpcreq->prog->procnames) ? trav->rpcreq->prog->procnames[trav->rpcreq->procnum] @@ -397,7 +398,7 @@ rpc_clnt_reconnect (void *trans_ptr) { rpc_transport_t *trans = NULL; rpc_clnt_connection_t *conn = NULL; - struct timeval tv = {0, 0}; + struct timespec ts = {0, 0}; int32_t ret = 0; struct rpc_clnt *clnt = NULL; @@ -416,14 +417,15 @@ rpc_clnt_reconnect (void *trans_ptr) conn->reconnect = 0; if (conn->connected == 0) { - tv.tv_sec = 3; + ts.tv_sec = 3; + ts.tv_nsec = 0; gf_log (trans->name, GF_LOG_TRACE, "attempting reconnect"); ret = rpc_transport_connect (trans, conn->config.remote_port); conn->reconnect = - gf_timer_call_after (clnt->ctx, tv, + gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, trans); } else { @@ -661,7 +663,7 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, } gf_log (conn->trans->name, GF_LOG_TRACE, - "received rpc message (RPC XID: 0x%ux" + "received rpc message (RPC XID: 0x%x" " Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)", saved_frame->rpcreq->xid, saved_frame->rpcreq->prog->progname, @@ -819,6 +821,9 @@ out: return; } +static void +rpc_clnt_destroy (struct rpc_clnt *rpc); + int rpc_clnt_notify (rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, void *data, ...) @@ -828,7 +833,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, int ret = -1; rpc_request_info_t *req_info = NULL; rpc_transport_pollin_t *pollin = NULL; - struct timeval tv = {0, }; + struct timespec ts = {0, }; conn = mydata; if (conn == NULL) { @@ -847,10 +852,11 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, { if (!conn->rpc_clnt->disabled && (conn->reconnect == NULL)) { - tv.tv_sec = 10; + ts.tv_sec = 10; + ts.tv_nsec = 0; conn->reconnect = - gf_timer_call_after (clnt->ctx, tv, + gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, conn->trans); } @@ -864,9 +870,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, } case RPC_TRANSPORT_CLEANUP: - /* this event should not be received on a client for, a - * transport is only disconnected, but never destroyed. - */ + rpc_clnt_destroy (clnt); ret = 0; break; @@ -1481,7 +1485,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, if (ret == -1) { gf_log (conn->trans->name, GF_LOG_WARNING, "failed to submit rpc-request " - "(XID: 0x%ux Program: %s, ProgVers: %d, " + "(XID: 0x%x Program: %s, ProgVers: %d, " "Proc: %d) to rpc-transport (%s)", rpcreq->xid, rpcreq->prog->progname, rpcreq->prog->progver, rpcreq->procnum, rpc->conn.trans->name); @@ -1492,7 +1496,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, __save_frame (rpc, frame, rpcreq); gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request " - "(XID: 0x%ux Program: %s, ProgVers: %d, " + "(XID: 0x%x Program: %s, ProgVers: %d, " "Proc: %d) to rpc-transport (%s)", rpcreq->xid, rpcreq->prog->progname, rpcreq->prog->progver, rpcreq->procnum, rpc->conn.trans->name); @@ -1541,18 +1545,21 @@ rpc_clnt_ref (struct rpc_clnt *rpc) static void -rpc_clnt_destroy (struct rpc_clnt *rpc) +rpc_clnt_trigger_destroy (struct rpc_clnt *rpc) { if (!rpc) return; - if (rpc->conn.trans) { - rpc_transport_unregister_notify (rpc->conn.trans); - rpc_transport_disconnect (rpc->conn.trans); - rpc_transport_unref (rpc->conn.trans); - } + rpc_clnt_disable (rpc); + rpc_transport_unref (rpc->conn.trans); +} + +static void +rpc_clnt_destroy (struct rpc_clnt *rpc) +{ + if (!rpc) + return; - rpc_clnt_reconnect_cleanup (&rpc->conn); saved_frames_destroy (rpc->conn.saved_frames); pthread_mutex_destroy (&rpc->lock); pthread_mutex_destroy (&rpc->conn.lock); @@ -1579,13 +1586,36 @@ rpc_clnt_unref (struct rpc_clnt *rpc) } pthread_mutex_unlock (&rpc->lock); if (!count) { - rpc_clnt_destroy (rpc); + rpc_clnt_trigger_destroy (rpc); return NULL; } return rpc; } +char +rpc_clnt_is_disabled (struct rpc_clnt *rpc) +{ + + rpc_clnt_connection_t *conn = NULL; + char disabled = 0; + + if (!rpc) { + goto out; + } + + conn = &rpc->conn; + + pthread_mutex_lock (&conn->lock); + { + disabled = rpc->disabled; + } + pthread_mutex_unlock (&conn->lock); + +out: + return disabled; +} + void rpc_clnt_disable (struct rpc_clnt *rpc) { @@ -1668,60 +1698,3 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config) rpc->conn.config.remote_host = gf_strdup (config->remote_host); } } - -int -rpc_clnt_transport_unix_options_build (dict_t **options, char *filepath, - int frame_timeout) -{ - dict_t *dict = NULL; - char *fpath = NULL; - int ret = -1; - - GF_ASSERT (filepath); - GF_ASSERT (options); - - dict = dict_new (); - if (!dict) - goto out; - - fpath = gf_strdup (filepath); - if (!fpath) { - ret = -1; - goto out; - } - - ret = dict_set_dynstr (dict, "transport.socket.connect-path", fpath); - if (ret) - goto out; - - ret = dict_set_str (dict, "transport.address-family", "unix"); - if (ret) - goto out; - - ret = dict_set_str (dict, "transport.socket.nodelay", "off"); - if (ret) - goto out; - - ret = dict_set_str (dict, "transport-type", "socket"); - if (ret) - goto out; - - ret = dict_set_str (dict, "transport.socket.keepalive", "off"); - if (ret) - goto out; - - if (frame_timeout > 0) { - ret = dict_set_int32 (dict, "frame-timeout", frame_timeout); - if (ret) - goto out; - } - - *options = dict; -out: - if (ret) { - GF_FREE (fpath); - if (dict) - dict_unref (dict); - } - return ret; -} diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h index 0da165559..584963ad0 100644 --- a/rpc/rpc-lib/src/rpc-clnt.h +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -237,11 +237,10 @@ void rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config); int rpcclnt_cbk_program_register (struct rpc_clnt *svc, rpcclnt_cb_program_t *program, void *mydata); -int -rpc_clnt_transport_unix_options_build (dict_t **options, char *filepath, - int frame_timeout); - void rpc_clnt_disable (struct rpc_clnt *rpc); +char +rpc_clnt_is_disabled (struct rpc_clnt *rpc); + #endif /* !_RPC_CLNT_H */ diff --git a/rpc/rpc-lib/src/rpc-drc.c b/rpc/rpc-lib/src/rpc-drc.c new file mode 100644 index 000000000..8181e6aee --- /dev/null +++ b/rpc/rpc-lib/src/rpc-drc.c @@ -0,0 +1,872 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "rpcsvc.h" +#ifndef RPC_DRC_H +#include "rpc-drc.h" +#endif +#include "locking.h" +#include "hashfn.h" +#include "common-utils.h" +#include "statedump.h" +#include "mem-pool.h" + +#include <netinet/in.h> +#include <unistd.h> + +/** + * rpcsvc_drc_op_destroy - Destroys the cached reply + * + * @param drc - the main drc structure + * @param reply - the cached reply to destroy + * @return NULL if reply is destroyed, reply otherwise + */ +static drc_cached_op_t * +rpcsvc_drc_op_destroy (rpcsvc_drc_globals_t *drc, drc_cached_op_t *reply) +{ + GF_ASSERT (drc); + GF_ASSERT (reply); + + if (reply->state == DRC_OP_IN_TRANSIT) + return reply; + + iobref_unref (reply->msg.iobref); + if (reply->msg.rpchdr) + GF_FREE (reply->msg.rpchdr); + if (reply->msg.proghdr) + GF_FREE (reply->msg.proghdr); + if (reply->msg.progpayload) + GF_FREE (reply->msg.progpayload); + + list_del (&reply->global_list); + reply->client->op_count--; + drc->op_count--; + mem_put (reply); + reply = NULL; + + return reply; +} + +/** + * rpcsvc_drc_op_rb_unref - This function is used in rb tree cleanup only + * + * @param reply - the cached reply to unref + * @param drc - the main drc structure + * @return void + */ +static void +rpcsvc_drc_rb_op_destroy (void *reply, void *drc) +{ + rpcsvc_drc_op_destroy (drc, (drc_cached_op_t *)reply); +} + +/** + * rpcsvc_remove_drc_client - Cleanup the drc client + * + * @param client - the drc client to be removed + * @return void + */ +static void +rpcsvc_remove_drc_client (drc_client_t *client) +{ + rb_destroy (client->rbtree, rpcsvc_drc_rb_op_destroy); + list_del (&client->client_list); + GF_FREE (client); +} + +/** + * rpcsvc_client_lookup - Given a sockaddr_storage, find the client if it exists + * + * @param drc - the main drc structure + * @param sockaddr - the network address of the client to be looked up + * @return drc client if it exists, NULL otherwise + */ +static drc_client_t * +rpcsvc_client_lookup (rpcsvc_drc_globals_t *drc, + struct sockaddr_storage *sockaddr) +{ + drc_client_t *client = NULL; + + GF_ASSERT (drc); + GF_ASSERT (sockaddr); + + if (list_empty (&drc->clients_head)) + return NULL; + + list_for_each_entry (client, &drc->clients_head, client_list) { + if (gf_sock_union_equal_addr (&client->sock_union, + (union gf_sock_union *)sockaddr)) + return client; + } + + return NULL; +} + +/** + * drc_compare_reqs - Used by rbtree to determine if incoming req matches with + * an existing node(cached reply) in rbtree + * + * @param item - pointer to the incoming req + * @param rb_node_data - pointer to an rbtree node (cached reply) + * @param param - drc pointer - unused here, but used in *op_destroy + * @return 0 if req matches reply, else (req->xid - reply->xid) + */ +int +drc_compare_reqs (const void *item, const void *rb_node_data, void *param) +{ + int ret = -1; + rpcsvc_request_t *req = NULL; + drc_cached_op_t *reply = NULL; + + GF_ASSERT (item); + GF_ASSERT (rb_node_data); + GF_ASSERT (param); + + req = (rpcsvc_request_t *)item; + reply = (drc_cached_op_t *)rb_node_data; + + ret = req->xid - reply->xid; + if (ret != 0) + return ret; + + if (req->prognum == reply->prognum && + req->procnum == reply->procnum && + req->progver == reply->progversion) + return 0; + + return 1; +} + +/** + * drc_rb_calloc - used by rbtree api to allocate memory for nodes + * + * @param allocator - the libavl_allocator structure used by rbtree + * @param size - not needed by this function + * @return pointer to new cached reply (node in rbtree) + */ +static void * +drc_rb_calloc (struct libavl_allocator *allocator, size_t size) +{ + rpcsvc_drc_globals_t *drc = NULL; + + /* get the drc pointer by simple typecast, since allocator + * is the first member of rpcsvc_drc_globals_t + */ + drc = (rpcsvc_drc_globals_t *)allocator; + + return mem_get (drc->mempool); +} + +/** + * drc_rb_free - used by rbtree api to free a node + * + * @param a - the libavl_allocator structure used by rbtree api + * @param block - node that needs to be freed + * @return void + */ +static void +drc_rb_free (struct libavl_allocator *a, void *block) +{ + mem_put (block); +} + +/** + * drc_init_client_cache - initialize a drc client and its rb tree + * + * @param drc - the main drc structure + * @param client - the drc client to be initialized + * @return 0 on success, -1 on failure + */ +static int +drc_init_client_cache (rpcsvc_drc_globals_t *drc, drc_client_t *client) +{ + GF_ASSERT (drc); + GF_ASSERT (client); + + drc->allocator.libavl_malloc = drc_rb_calloc; + drc->allocator.libavl_free = drc_rb_free; + + client->rbtree = rb_create (drc_compare_reqs, drc, + (struct libavl_allocator *)drc); + if (!client->rbtree) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "rb tree creation failed"); + return -1; + } + + return 0; +} + +/** + * rpcsvc_get_drc_client - find the drc client with given sockaddr, else + * allocate and initialize a new drc client + * + * @param drc - the main drc structure + * @param sockaddr - network address of client + * @return drc client on success, NULL on failure + */ +static drc_client_t * +rpcsvc_get_drc_client (rpcsvc_drc_globals_t *drc, + struct sockaddr_storage *sockaddr) +{ + drc_client_t *client = NULL; + + GF_ASSERT (drc); + GF_ASSERT (sockaddr); + + client = rpcsvc_client_lookup (drc, sockaddr); + if (client) + goto out; + + /* if lookup fails, allocate cache for the new client */ + client = GF_CALLOC (1, sizeof (drc_client_t), + gf_common_mt_drc_client_t); + if (!client) + goto out; + + client->ref = 0; + client->sock_union = (union gf_sock_union)*sockaddr; + client->op_count = 0; + + if (drc_init_client_cache (drc, client)) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "initialization of drc client failed"); + GF_FREE (client); + client = NULL; + goto out; + } + drc->client_count++; + + list_add (&client->client_list, &drc->clients_head); + + out: + return client; +} + +/** + * rpcsvc_need_drc - Determine if a request needs DRC service + * + * @param req - incoming request + * @return 1 if DRC is needed for req, 0 otherwise + */ +int +rpcsvc_need_drc (rpcsvc_request_t *req) +{ + rpcsvc_actor_t *actor = NULL; + rpcsvc_drc_globals_t *drc = NULL; + + GF_ASSERT (req); + GF_ASSERT (req->svc); + + drc = req->svc->drc; + + if (!drc || drc->status == DRC_UNINITIATED) + return 0; + + actor = rpcsvc_program_actor (req); + if (!actor) + return 0; + + return (actor->op_type == DRC_NON_IDEMPOTENT + && drc->type != DRC_TYPE_NONE); +} + +/** + * rpcsvc_drc_client_ref - ref the drc client + * + * @param client - the drc client to ref + * @return client + */ +static drc_client_t * +rpcsvc_drc_client_ref (drc_client_t *client) +{ + GF_ASSERT (client); + client->ref++; + return client; +} + +/** + * rpcsvc_drc_client_unref - unref the drc client, and destroy + * the client on last unref + * + * @param drc - the main drc structure + * @param client - the drc client to unref + * @return NULL if it is the last unref, client otherwise + */ +static drc_client_t * +rpcsvc_drc_client_unref (rpcsvc_drc_globals_t *drc, drc_client_t *client) +{ + GF_ASSERT (drc); + GF_ASSERT (client->ref); + + client->ref--; + if (!client->ref) { + drc->client_count--; + rpcsvc_remove_drc_client (client); + client = NULL; + } + + return client; +} + +/** + * rpcsvc_drc_lookup - lookup a request to see if it is already cached + * + * @param req - incoming request + * @return cached reply of req if found, NULL otherwise + */ +drc_cached_op_t * +rpcsvc_drc_lookup (rpcsvc_request_t *req) +{ + drc_client_t *client = NULL; + drc_cached_op_t *reply = NULL; + + GF_ASSERT (req); + + if (!req->trans->drc_client) { + client = rpcsvc_get_drc_client (req->svc->drc, + &req->trans->peerinfo.sockaddr); + if (!client) + goto out; + req->trans->drc_client = client; + } + + client = rpcsvc_drc_client_ref (req->trans->drc_client); + + if (client->op_count == 0) + goto out; + + reply = rb_find (client->rbtree, req); + + out: + if (client) + rpcsvc_drc_client_unref (req->svc->drc, client); + + return reply; +} + +/** + * rpcsvc_send_cached_reply - send the cached reply for the incoming request + * + * @param req - incoming request (which is a duplicate in this case) + * @param reply - the cached reply for req + * @return 0 on successful reply submission, -1 or other non-zero value otherwise + */ +int +rpcsvc_send_cached_reply (rpcsvc_request_t *req, drc_cached_op_t *reply) +{ + int ret = 0; + + GF_ASSERT (req); + GF_ASSERT (reply); + + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "sending cached reply: xid: %d, " + "client: %s", req->xid, req->trans->peerinfo.identifier); + + rpcsvc_drc_client_ref (reply->client); + ret = rpcsvc_transport_submit (req->trans, + reply->msg.rpchdr, reply->msg.rpchdrcount, + reply->msg.proghdr, reply->msg.proghdrcount, + reply->msg.progpayload, reply->msg.progpayloadcount, + reply->msg.iobref, req->trans_private); + rpcsvc_drc_client_unref (req->svc->drc, reply->client); + + return ret; +} + +/** + * rpcsvc_cache_reply - cache the reply for the processed request 'req' + * + * @param req - processed request + * @param iobref - iobref structure of the reply + * @param rpchdr - rpc header of the reply + * @param rpchdrcount - size of rpchdr + * @param proghdr - program header of the reply + * @param proghdrcount - size of proghdr + * @param payload - payload of the reply if any + * @param payloadcount - size of payload + * @return 0 on success, -1 on failure + */ +int +rpcsvc_cache_reply (rpcsvc_request_t *req, struct iobref *iobref, + struct iovec *rpchdr, int rpchdrcount, + struct iovec *proghdr, int proghdrcount, + struct iovec *payload, int payloadcount) +{ + int ret = -1; + drc_cached_op_t *reply = NULL; + + GF_ASSERT (req); + GF_ASSERT (req->reply); + + reply = req->reply; + + reply->state = DRC_OP_CACHED; + + reply->msg.iobref = iobref_ref (iobref); + + reply->msg.rpchdrcount = rpchdrcount; + reply->msg.rpchdr = iov_dup (rpchdr, rpchdrcount); + + reply->msg.proghdrcount = proghdrcount; + reply->msg.proghdr = iov_dup (proghdr, proghdrcount); + + reply->msg.progpayloadcount = payloadcount; + if (payloadcount) + reply->msg.progpayload = iov_dup (payload, payloadcount); + + // rpcsvc_drc_client_unref (req->svc->drc, req->trans->drc_client); + // rpcsvc_drc_op_unref (req->svc->drc, reply); + ret = 0; + + return ret; +} + +/** + * rpcsvc_vacate_drc_entries - free up some percentage of drc cache + * based on the lru factor + * + * @param drc - the main drc structure + * @return void + */ +static void +rpcsvc_vacate_drc_entries (rpcsvc_drc_globals_t *drc) +{ + uint32_t i = 0; + uint32_t n = 0; + drc_cached_op_t *reply = NULL; + drc_cached_op_t *tmp = NULL; + drc_client_t *client = NULL; + + GF_ASSERT (drc); + + n = drc->global_cache_size / drc->lru_factor; + + list_for_each_entry_safe_reverse (reply, tmp, &drc->cache_head, global_list) { + /* Don't delete ops that are in transit */ + if (reply->state == DRC_OP_IN_TRANSIT) + continue; + + client = reply->client; + + (void *)rb_delete (client->rbtree, reply); + + rpcsvc_drc_op_destroy (drc, reply); + rpcsvc_drc_client_unref (drc, client); + i++; + if (i >= n) + break; + } +} + +/** + * rpcsvc_add_op_to_cache - insert the cached op into the client rbtree and drc list + * + * @param drc - the main drc structure + * @param reply - the op to be inserted + * @return 0 on success, -1 on failure + */ +static int +rpcsvc_add_op_to_cache (rpcsvc_drc_globals_t *drc, drc_cached_op_t *reply) +{ + drc_client_t *client = NULL; + drc_cached_op_t **tmp_reply = NULL; + + GF_ASSERT (drc); + GF_ASSERT (reply); + + client = reply->client; + + /* cache is full, free up some space */ + if (drc->op_count >= drc->global_cache_size) + rpcsvc_vacate_drc_entries (drc); + + tmp_reply = (drc_cached_op_t **)rb_probe (client->rbtree, reply); + if (*tmp_reply != reply) { + /* should never happen */ + gf_log (GF_RPCSVC, GF_LOG_ERROR, + "DRC failed to detect duplicates"); + return -1; + } else if (*tmp_reply == NULL) { + /* mem alloc failed */ + return -1; + } + + client->op_count++; + list_add (&reply->global_list, &drc->cache_head); + drc->op_count++; + + return 0; +} + +/** + * rpcsvc_cache_request - cache the in-transition incoming request + * + * @param req - incoming request + * @return 0 on success, -1 on failure + */ +int +rpcsvc_cache_request (rpcsvc_request_t *req) +{ + int ret = -1; + drc_client_t *client = NULL; + drc_cached_op_t *reply = NULL; + rpcsvc_drc_globals_t *drc = NULL; + + GF_ASSERT (req); + + drc = req->svc->drc; + + client = req->trans->drc_client; + if (!client) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc client is NULL"); + goto out; + } + + reply = mem_get (drc->mempool); + if (!reply) + goto out; + + reply->client = rpcsvc_drc_client_ref (client); + reply->xid = req->xid; + reply->prognum = req->prognum; + reply->progversion = req->progver; + reply->procnum = req->procnum; + reply->state = DRC_OP_IN_TRANSIT; + req->reply = reply; + + ret = rpcsvc_add_op_to_cache (drc, reply); + if (ret) { + req->reply = NULL; + rpcsvc_drc_op_destroy (drc, reply); + rpcsvc_drc_client_unref (drc, client); + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Failed to add op to drc cache"); + } + + out: + return ret; +} + +/** + * + * rpcsvc_drc_priv - function which dumps the drc state + * + * @param drc - the main drc structure + * @return 0 on success, -1 on failure + */ +int32_t +rpcsvc_drc_priv (rpcsvc_drc_globals_t *drc) +{ + int i = 0; + char key[GF_DUMP_MAX_BUF_LEN] = {0}; + drc_client_t *client = NULL; + char ip[INET6_ADDRSTRLEN] = {0}; + + if (!drc || drc->status == DRC_UNINITIATED) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "DRC is " + "uninitialized, not dumping its state"); + return 0; + } + + gf_proc_dump_add_section("rpc.drc"); + + if (TRY_LOCK (&drc->lock)) + return -1; + + gf_proc_dump_build_key (key, "drc", "type"); + gf_proc_dump_write (key, "%d", drc->type); + + gf_proc_dump_build_key (key, "drc", "client_count"); + gf_proc_dump_write (key, "%d", drc->client_count); + + gf_proc_dump_build_key (key, "drc", "current_cache_size"); + gf_proc_dump_write (key, "%d", drc->op_count); + + gf_proc_dump_build_key (key, "drc", "max_cache_size"); + gf_proc_dump_write (key, "%d", drc->global_cache_size); + + gf_proc_dump_build_key (key, "drc", "lru_factor"); + gf_proc_dump_write (key, "%d", drc->lru_factor); + + gf_proc_dump_build_key (key, "drc", "duplicate_request_count"); + gf_proc_dump_write (key, "%d", drc->cache_hits); + + gf_proc_dump_build_key (key, "drc", "in_transit_duplicate_requests"); + gf_proc_dump_write (key, "%d", drc->intransit_hits); + + list_for_each_entry (client, &drc->clients_head, client_list) { + gf_proc_dump_build_key (key, "client", "%d.ip-address", i); + memset (ip, 0, INET6_ADDRSTRLEN); + switch (client->sock_union.storage.ss_family) { + case AF_INET: + gf_proc_dump_write (key, "%s", inet_ntop (AF_INET, + &client->sock_union.sin.sin_addr.s_addr, + ip, INET_ADDRSTRLEN)); + break; + case AF_INET6: + gf_proc_dump_write (key, "%s", inet_ntop (AF_INET6, + &client->sock_union.sin6.sin6_addr, + ip, INET6_ADDRSTRLEN)); + break; + default: + gf_proc_dump_write (key, "%s", "N/A"); + } + + gf_proc_dump_build_key (key, "client", "%d.ref_count", i); + gf_proc_dump_write (key, "%d", client->ref); + gf_proc_dump_build_key (key, "client", "%d.op_count", i); + gf_proc_dump_write (key, "%d", client->op_count); + i++; + } + + UNLOCK (&drc->lock); + return 0; +} + +/** + * rpcsvc_drc_notify - function which is notified of RPC transport events + * + * @param svc - pointer to rpcsvc_t structure of the rpc + * @param xl - pointer to the xlator + * @param event - the event which triggered this notify + * @param data - the transport structure + * @return 0 on success, -1 on failure + */ +int +rpcsvc_drc_notify (rpcsvc_t *svc, void *xl, + rpcsvc_event_t event, void *data) +{ + int ret = -1; + rpc_transport_t *trans = NULL; + drc_client_t *client = NULL; + rpcsvc_drc_globals_t *drc = NULL; + + GF_ASSERT (svc); + GF_ASSERT (svc->drc); + GF_ASSERT (data); + + drc = svc->drc; + + if (drc->status == DRC_UNINITIATED || + drc->type == DRC_TYPE_NONE) + return 0; + + LOCK (&drc->lock); + + trans = (rpc_transport_t *)data; + client = rpcsvc_get_drc_client (drc, &trans->peerinfo.sockaddr); + if (!client) + goto out; + + switch (event) { + case RPCSVC_EVENT_ACCEPT: + trans->drc_client = rpcsvc_drc_client_ref (client); + ret = 0; + break; + + case RPCSVC_EVENT_DISCONNECT: + ret = 0; + if (list_empty (&drc->clients_head)) + break; + /* should be the last unref */ + rpcsvc_drc_client_unref (drc, client); + trans->drc_client = NULL; + break; + + default: + break; + } + + out: + UNLOCK (&drc->lock); + return ret; +} + +/** + * rpcsvc_drc_init - Initialize the duplicate request cache service + * + * @param svc - pointer to rpcsvc_t structure of the rpc + * @param options - the options dictionary which configures drc + * @return 0 on success, non-zero integer on failure + */ +int +rpcsvc_drc_init (rpcsvc_t *svc, dict_t *options) +{ + int ret = 0; + uint32_t drc_type = 0; + uint32_t drc_size = 0; + uint32_t drc_factor = 0; + rpcsvc_drc_globals_t *drc = NULL; + static gf_boolean_t drc_inited = _gf_false; + + GF_ASSERT (svc); + GF_ASSERT (options); + + /* Already inited */ + if (drc_inited) + return 0; + + if (!svc->drc) { + drc = GF_CALLOC (1, sizeof (rpcsvc_drc_globals_t), + gf_common_mt_drc_globals_t); + if (!drc) + return -1; + + svc->drc = drc; + LOCK_INIT (&drc->lock); + } else { + drc = svc->drc; + } + + LOCK (&drc->lock); + if (drc->type != DRC_TYPE_NONE) { + ret = 0; + goto out; + } + + /* Toggle DRC on/off, when more drc types(persistent/cluster) + are added, we shouldn't treat this as boolean */ + ret = dict_get_str_boolean (options, "nfs.drc", _gf_true); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_INFO, "drc user options need second look"); + ret = _gf_true; + } + drc->enable_drc = ret; + + if (ret == _gf_false) { + /* drc off */ + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "DRC is off"); + ret = 0; + goto out; + } + + /* Specify type of DRC to be used */ + ret = dict_get_uint32 (options, "nfs.drc-type", &drc_type); + if (ret) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc type not set." + " Continuing with default"); + drc_type = DRC_DEFAULT_TYPE; + } + + drc->type = drc_type; + + /* Set the global cache size (no. of ops to cache) */ + ret = dict_get_uint32 (options, "nfs.drc-size", &drc_size); + if (ret) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc size not set." + " Continuing with default size"); + drc_size = DRC_DEFAULT_CACHE_SIZE; + } + + drc->global_cache_size = drc_size; + + /* Mempool for cached ops */ + drc->mempool = mem_pool_new (drc_cached_op_t, drc->global_cache_size); + if (!drc->mempool) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get mempool for" + " DRC, drc-size: %d", drc->global_cache_size); + ret = -1; + goto out; + } + + /* What percent of cache to be evicted whenever it fills up */ + ret = dict_get_uint32 (options, "nfs.drc-lru-factor", &drc_factor); + if (ret) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc lru factor not set." + " Continuing with policy default"); + drc_factor = DRC_DEFAULT_LRU_FACTOR; + } + + drc->lru_factor = (drc_lru_factor_t) drc_factor; + + INIT_LIST_HEAD (&drc->clients_head); + INIT_LIST_HEAD (&drc->cache_head); + + ret = rpcsvc_register_notify (svc, rpcsvc_drc_notify, THIS); + if (ret) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, + "registration of drc_notify function failed"); + goto out; + } + + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc init successful"); + drc->status = DRC_INITIATED; + drc_inited = _gf_true; + + out: + UNLOCK (&drc->lock); + if (ret == -1) { + if (drc->mempool) { + mem_pool_destroy (drc->mempool); + drc->mempool = NULL; + } + GF_FREE (drc); + svc->drc = NULL; + } + return ret; +} + +int +rpcsvc_drc_reconfigure (rpcsvc_t *svc, dict_t *options) +{ + int ret = -1; + gf_boolean_t enable_drc = _gf_false; + rpcsvc_drc_globals_t *drc = NULL; + uint32_t drc_size = 0; + + if ((!svc) || (!options)) + return (-1); + + drc = svc->drc; + /* reconfig for drc-size */ + if (dict_get_uint32 (options, "nfs.drc-size", &drc_size)) + drc_size = DRC_DEFAULT_CACHE_SIZE; + + if (drc->global_cache_size != drc_size) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "nfs.drc-size size can not " + "be reconfigured without NFS server restart."); + return (-1); + } + + /* reconfig for nfs.drc */ + ret = dict_get_str_boolean (options, "nfs.drc", _gf_true); + if (ret < 0) { + ret = _gf_true; + } + enable_drc = ret; + + if (drc->enable_drc == enable_drc) + return 0; + + drc->enable_drc = enable_drc; + if (enable_drc) { + if (drc == NULL) + return rpcsvc_drc_init(svc, options); + } else { + if (drc == NULL) + return (0); + + LOCK (&drc->lock); + (void) rpcsvc_unregister_notify (svc, rpcsvc_drc_notify, THIS); + if (drc->mempool) { + mem_pool_destroy (drc->mempool); + drc->mempool = NULL; + } + UNLOCK (&drc->lock); + GF_FREE (drc); + svc->drc = NULL; + } + + return (0); +} diff --git a/rpc/rpc-lib/src/rpc-drc.h b/rpc/rpc-lib/src/rpc-drc.h new file mode 100644 index 000000000..7dfaef978 --- /dev/null +++ b/rpc/rpc-lib/src/rpc-drc.h @@ -0,0 +1,104 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef RPC_DRC_H +#define RPC_DRC_H + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "rpcsvc-common.h" +#include "rpcsvc.h" +#include "locking.h" +#include "dict.h" +#include "rb.h" + +/* per-client cache structure */ +struct drc_client { + uint32_t ref; + union gf_sock_union sock_union; + /* pointers to the cache */ + struct rb_table *rbtree; + /* no. of ops currently cached */ + uint32_t op_count; + struct list_head client_list; +}; + +struct drc_cached_op { + drc_op_state_t state; + uint32_t xid; + int prognum; + int progversion; + int procnum; + rpc_transport_msg_t msg; + drc_client_t *client; + struct list_head client_list; + struct list_head global_list; + int32_t ref; +}; + +/* global drc definitions */ +enum drc_status { + DRC_UNINITIATED, + DRC_INITIATED +}; +typedef enum drc_status drc_status_t; + +struct drc_globals { + /* allocator must be the first member since + * it is used so in gf_libavl_allocator + */ + struct libavl_allocator allocator; + drc_type_t type; + /* configurable size parameter */ + uint32_t global_cache_size; + drc_lru_factor_t lru_factor; + gf_lock_t lock; + drc_status_t status; + uint32_t op_count; + uint64_t cache_hits; + uint64_t intransit_hits; + struct mem_pool *mempool; + struct list_head cache_head; + uint32_t client_count; + struct list_head clients_head; + gf_boolean_t enable_drc; +}; + +int +rpcsvc_need_drc (rpcsvc_request_t *req); + +drc_cached_op_t * +rpcsvc_drc_lookup (rpcsvc_request_t *req); + +int +rpcsvc_send_cached_reply (rpcsvc_request_t *req, drc_cached_op_t *reply); + +int +rpcsvc_cache_reply (rpcsvc_request_t *req, struct iobref *iobref, + struct iovec *rpchdr, int rpchdrcount, + struct iovec *proghdr, int proghdrcount, + struct iovec *payload, int payloadcount); + +int +rpcsvc_cache_request (rpcsvc_request_t *req); + +int32_t +rpcsvc_drc_priv (rpcsvc_drc_globals_t *drc); + +int +rpcsvc_drc_init (rpcsvc_t *svc, dict_t *options); + +int +rpcsvc_drc_reconfigure (rpcsvc_t *svc, dict_t *options); + +#endif /* RPC_DRC_H */ diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index 695261a71..c24d41084 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -69,6 +69,19 @@ out: return ret; } +int +rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff) +{ + int ret = 0; + + if (!this->ops->throttle) + return -ENOSYS; + + ret = this->ops->throttle (this, onoff); + + return ret; +} + int32_t rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, size_t salen) @@ -145,6 +158,7 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name) int8_t is_tcp = 0, is_unix = 0, is_ibsdp = 0; volume_opt_list_t *vol_opt = NULL; gf_boolean_t bind_insecure = _gf_false; + xlator_t *this = NULL; GF_VALIDATE_OR_GOTO("rpc-transport", options, fail); GF_VALIDATE_OR_GOTO("rpc-transport", ctx, fail); @@ -169,7 +183,7 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name) gf_log ("dict", GF_LOG_DEBUG, "setting transport-type failed"); else - gf_log ("rpc-transport", GF_LOG_WARNING, + gf_log ("rpc-transport", GF_LOG_DEBUG, "missing 'option transport-type'. defaulting to " "\"socket\""); } else { @@ -281,7 +295,7 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name) } *VOID(&(trans->reconfigure)) = dlsym (handle, "reconfigure"); - if (trans->fini == NULL) { + if (trans->reconfigure == NULL) { gf_log ("rpc-transport", GF_LOG_DEBUG, "dlsym (gf_rpc_transport_reconfigure) on %s", dlerror()); } @@ -292,26 +306,26 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name) goto fail; } + this = THIS; vol_opt->given_opt = dlsym (handle, "options"); if (vol_opt->given_opt == NULL) { gf_log ("rpc-transport", GF_LOG_DEBUG, "volume option validation not specified"); } else { INIT_LIST_HEAD (&vol_opt->list); - list_add_tail (&vol_opt->list, &(THIS->volume_options)); - if (xlator_options_validate_list (THIS, options, vol_opt, + list_add_tail (&vol_opt->list, &(this->volume_options)); + if (xlator_options_validate_list (this, options, vol_opt, NULL)) { gf_log ("rpc-transport", GF_LOG_ERROR, "volume option validation failed"); goto fail; } - vol_opt = NULL; } trans->options = options; pthread_mutex_init (&trans->lock, NULL); - trans->xl = THIS; + trans->xl = this; ret = trans->init (trans); if (ret != 0) { @@ -324,8 +338,6 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name) GF_FREE (name); - GF_FREE (vol_opt); - return return_trans; fail: @@ -340,7 +352,10 @@ fail: GF_FREE (name); - GF_FREE (vol_opt); + if (vol_opt && !list_empty (&vol_opt->list)) { + list_del_init (&vol_opt->list); + GF_FREE (vol_opt); + } return NULL; } @@ -475,6 +490,8 @@ rpc_transport_unref (rpc_transport_t *this) if (this->mydata) this->notify (this, this->mydata, RPC_TRANSPORT_CLEANUP, NULL); + this->mydata = NULL; + this->notify = NULL; rpc_transport_destroy (this); } @@ -518,18 +535,6 @@ out: } -inline int -rpc_transport_unregister_notify (rpc_transport_t *trans) -{ - GF_VALIDATE_OR_GOTO ("rpc-transport", trans, out); - - trans->notify = NULL; - trans->mydata = NULL; - -out: - return 0; -} - //give negative values to skip setting that value //this function asserts if both the values are negative. @@ -557,6 +562,63 @@ out: } int +rpc_transport_unix_options_build (dict_t **options, char *filepath, + int frame_timeout) +{ + dict_t *dict = NULL; + char *fpath = NULL; + int ret = -1; + + GF_ASSERT (filepath); + GF_ASSERT (options); + + dict = dict_new (); + if (!dict) + goto out; + + fpath = gf_strdup (filepath); + if (!fpath) { + ret = -1; + goto out; + } + + ret = dict_set_dynstr (dict, "transport.socket.connect-path", fpath); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport.address-family", "unix"); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport.socket.nodelay", "off"); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport-type", "socket"); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport.socket.keepalive", "off"); + if (ret) + goto out; + + if (frame_timeout > 0) { + ret = dict_set_int32 (dict, "frame-timeout", frame_timeout); + if (ret) + goto out; + } + + *options = dict; +out: + if (ret) { + GF_FREE (fpath); + if (dict) + dict_unref (dict); + } + return ret; +} + +int rpc_transport_inet_options_build (dict_t **options, const char *hostname, int port) { diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index 272de9d7d..2db9072ae 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -186,16 +186,19 @@ struct rpc_transport { */ void *private; - void *xl_private; + struct _client_t *xl_private; void *xl; /* Used for THIS */ void *mydata; pthread_mutex_t lock; int32_t refcount; + int32_t outstanding_rpc_count; + glusterfs_ctx_t *ctx; dict_t *options; char *name; void *dnscache; + void *drc_client; data_t *buf; int32_t (*init) (rpc_transport_t *this); void (*fini) (rpc_transport_t *this); @@ -210,7 +213,7 @@ struct rpc_transport { struct list_head list; int bind_insecure; - void *dl_handle; /* handle of dlopen() */ + void *dl_handle; /* handle of dlopen() */ }; struct rpc_transport_ops { @@ -234,6 +237,7 @@ struct rpc_transport_ops { int32_t (*get_myaddr) (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, socklen_t sasize); + int32_t (*throttle) (rpc_transport_t *this, gf_boolean_t onoff); }; @@ -273,9 +277,6 @@ int rpc_transport_register_notify (rpc_transport_t *trans, rpc_transport_notify_t, void *mydata); -int -rpc_transport_unregister_notify (rpc_transport_t *trans); - int32_t rpc_transport_get_peername (rpc_transport_t *this, char *hostname, int hostlen); @@ -290,6 +291,9 @@ int32_t rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, size_t salen); +int +rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff); + rpc_transport_pollin_t * rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector, int count, struct iobuf *hdr_iobuf, @@ -302,5 +306,9 @@ rpc_transport_keepalive_options_set (dict_t *options, int32_t interval, int32_t time); int +rpc_transport_unix_options_build (dict_t **options, char *filepath, + int frame_timeout); + +int rpc_transport_inet_options_build (dict_t **options, const char *hostname, int port); #endif /* __RPC_TRANSPORT_H__ */ diff --git a/rpc/rpc-lib/src/rpcsvc-auth.c b/rpc/rpc-lib/src/rpcsvc-auth.c index 907ae1ec9..4cb86a758 100644 --- a/rpc/rpc-lib/src/rpcsvc-auth.c +++ b/rpc/rpc-lib/src/rpcsvc-auth.c @@ -178,6 +178,29 @@ err: } int +rpcsvc_set_addr_namelookup (rpcsvc_t *svc, dict_t *options) +{ + int ret; + static char *addrlookup_key = "rpc-auth.addr.namelookup"; + + if (!svc || !options) + return (-1); + + /* By default it's disabled */ + ret = dict_get_str_boolean (options, addrlookup_key, _gf_false); + if (ret < 0) { + svc->addr_namelookup = _gf_false; + } else { + svc->addr_namelookup = ret; + } + + if (svc->addr_namelookup) + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Addr-Name lookup enabled"); + + return (0); +} + +int rpcsvc_set_allow_insecure (rpcsvc_t *svc, dict_t *options) { int ret = -1; @@ -206,11 +229,16 @@ rpcsvc_set_allow_insecure (rpcsvc_t *svc, dict_t *options) int rpcsvc_set_root_squash (rpcsvc_t *svc, dict_t *options) { + int ret = -1; + GF_ASSERT (svc); GF_ASSERT (options); - if (dict_get_str_boolean (options, "root-squash", 0)) - svc->root_squash = _gf_true; + ret = dict_get_str_boolean (options, "root-squash", 0); + if (ret != -1) + svc->root_squash = ret; + else + svc->root_squash = _gf_false; if (svc->root_squash) gf_log (GF_RPCSVC, GF_LOG_DEBUG, "root squashing enabled "); @@ -228,6 +256,7 @@ rpcsvc_auth_init (rpcsvc_t *svc, dict_t *options) (void) rpcsvc_set_allow_insecure (svc, options); (void) rpcsvc_set_root_squash (svc, options); + (void) rpcsvc_set_addr_namelookup (svc, options); ret = rpcsvc_auth_add_initers (svc); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to add initers"); @@ -244,6 +273,25 @@ out: return ret; } +int +rpcsvc_auth_reconf (rpcsvc_t *svc, dict_t *options) +{ + int ret = 0; + + if ((!svc) || (!options)) + return (-1); + + ret = rpcsvc_set_allow_insecure (svc, options); + if (ret) + return (-1); + + ret = rpcsvc_set_root_squash (svc, options); + if (ret) + return (-1); + + return rpcsvc_set_addr_namelookup (svc, options); +} + rpcsvc_auth_t * __rpcsvc_auth_get_handler (rpcsvc_request_t *req) @@ -322,6 +370,9 @@ rpcsvc_auth_request_init (rpcsvc_request_t *req) if (!auth->authops->request_init) ret = auth->authops->request_init (req, auth->authprivate); + req->auxgids = req->auxgidsmall; /* reset to auxgidlarge during + unsersialize if necessary */ + req->auxgidlarge = NULL; err: return ret; } @@ -361,14 +412,10 @@ err: int rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen) { - int count = 0; - int gen = RPCSVC_AUTH_REJECT; - int spec = RPCSVC_AUTH_REJECT; - int final = RPCSVC_AUTH_REJECT; - char *srchstr = NULL; - char *valstr = NULL; - gf_boolean_t boolval = _gf_false; - int ret = 0; + int count = 0; + int result = RPCSVC_AUTH_REJECT; + char *srchstr = NULL; + int ret = 0; struct rpcsvc_auth_list *auth = NULL; struct rpcsvc_auth_list *tmp = NULL; @@ -386,59 +433,27 @@ rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen) if (count >= arrlen) break; - gen = gf_asprintf (&srchstr, "rpc-auth.%s", auth->name); - if (gen == -1) { + result = gf_asprintf (&srchstr, "rpc-auth.%s.%s", + auth->name, volname); + if (result == -1) { count = -1; goto err; } - gen = RPCSVC_AUTH_REJECT; - if (dict_get (svc->options, srchstr)) { - ret = dict_get_str (svc->options, srchstr, &valstr); - if (ret == 0) { - ret = gf_string2boolean (valstr, &boolval); - if (ret == 0) { - if (boolval == _gf_true) - gen = RPCSVC_AUTH_ACCEPT; - } else - gf_log (GF_RPCSVC, GF_LOG_ERROR, "Faile" - "d to read auth val"); - } else - gf_log (GF_RPCSVC, GF_LOG_ERROR, "Faile" - "d to read auth val"); - } - + ret = dict_get_str_boolean (svc->options, srchstr, 0xC00FFEE); GF_FREE (srchstr); - spec = gf_asprintf (&srchstr, "rpc-auth.%s.%s", auth->name, - volname); - if (spec == -1) { - count = -1; - goto err; - } - - spec = RPCSVC_AUTH_DONTCARE; - if (dict_get (svc->options, srchstr)) { - ret = dict_get_str (svc->options, srchstr, &valstr); - if (ret == 0) { - ret = gf_string2boolean (valstr, &boolval); - if (ret == 0) { - if (boolval == _gf_true) - spec = RPCSVC_AUTH_ACCEPT; - else - spec = RPCSVC_AUTH_REJECT; - } else - gf_log (GF_RPCSVC, GF_LOG_ERROR, "Faile" - "d to read auth val"); - } else - gf_log (GF_RPCSVC, GF_LOG_ERROR, "Faile" - "d to read auth val"); - } - GF_FREE (srchstr); - final = rpcsvc_combine_gen_spec_volume_checks (gen, spec); - if (final == RPCSVC_AUTH_ACCEPT) { + switch (ret) { + case _gf_true: + result = RPCSVC_AUTH_ACCEPT; autharr[count] = auth->auth->authnum; ++count; + break; + case _gf_false: + result = RPCSVC_AUTH_REJECT; + break; + default: + result = RPCSVC_AUTH_DONTCARE; } } diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h index 2c6f07488..aed55e039 100644 --- a/rpc/rpc-lib/src/rpcsvc-common.h +++ b/rpc/rpc-lib/src/rpcsvc-common.h @@ -30,6 +30,8 @@ struct rpcsvc_state; typedef int (*rpcsvc_notify_t) (struct rpcsvc_state *, void *mydata, rpcsvc_event_t, void *data); +struct drc_globals; +typedef struct drc_globals rpcsvc_drc_globals_t; /* Contains global state required for all the RPC services. */ @@ -50,25 +52,75 @@ typedef struct rpcsvc_state { dict_t *options; /* Allow insecure ports. */ - int allow_insecure; + gf_boolean_t allow_insecure; gf_boolean_t register_portmap; gf_boolean_t root_squash; glusterfs_ctx_t *ctx; /* list of connections which will listen for incoming connections */ - struct list_head listeners; + struct list_head listeners; /* list of programs registered with rpcsvc */ - struct list_head programs; + struct list_head programs; /* list of notification callbacks */ - struct list_head notify; - int notify_count; + struct list_head notify; + int notify_count; void *mydata; /* This is xlator */ - rpcsvc_notify_t notifyfn; + rpcsvc_notify_t notifyfn; struct mem_pool *rxpool; + rpcsvc_drc_globals_t *drc; + + /* per-client limit of outstanding rpc requests */ + int outstanding_rpc_limit; + gf_boolean_t addr_namelookup; } rpcsvc_t; +/* DRC START */ +enum drc_op_type { + DRC_NA = 0, + DRC_IDEMPOTENT = 1, + DRC_NON_IDEMPOTENT = 2 +}; +typedef enum drc_op_type drc_op_type_t; + +enum drc_type { + DRC_TYPE_NONE = 0, + DRC_TYPE_IN_MEMORY = 1 +}; +typedef enum drc_type drc_type_t; + +enum drc_lru_factor { + DRC_LRU_5_PC = 20, + DRC_LRU_10_PC = 10, + DRC_LRU_25_PC = 4, + DRC_LRU_50_PC = 2 +}; +typedef enum drc_lru_factor drc_lru_factor_t; + +enum drc_xid_state { + DRC_XID_MONOTONOUS = 0, + DRC_XID_WRAPPED = 1 +}; +typedef enum drc_xid_state drc_xid_state_t; + +enum drc_op_state { + DRC_OP_IN_TRANSIT = 0, + DRC_OP_CACHED = 1 +}; +typedef enum drc_op_state drc_op_state_t; + +enum drc_policy { + DRC_LRU = 0 +}; +typedef enum drc_policy drc_policy_t; + +/* Default policies for DRC */ +#define DRC_DEFAULT_TYPE DRC_TYPE_IN_MEMORY +#define DRC_DEFAULT_CACHE_SIZE 0x20000 +#define DRC_DEFAULT_LRU_FACTOR DRC_LRU_25_PC + +/* DRC END */ #endif /* #ifndef _RPCSVC_COMMON_H */ diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 078adc062..037c157f2 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -28,6 +28,7 @@ #include "xdr-generic.h" #include "rpc-common-xdr.h" #include "syncop.h" +#include "rpc-drc.h" #include <errno.h> #include <pthread.h> @@ -41,8 +42,7 @@ #include <stdio.h> #include "xdr-rpcclnt.h" - -#define ACL_PROGRAM 100227 +#include "glusterfs-acl.h" struct rpcsvc_program gluster_dump_prog; @@ -129,6 +129,37 @@ rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum, return NULL; } +int +rpcsvc_request_outstanding (rpcsvc_t *svc, rpc_transport_t *trans, int delta) +{ + int ret = 0; + int old_count = 0; + int new_count = 0; + int limit = 0; + + pthread_mutex_lock (&trans->lock); + { + limit = svc->outstanding_rpc_limit; + if (!limit) + goto unlock; + + old_count = trans->outstanding_rpc_count; + trans->outstanding_rpc_count += delta; + new_count = trans->outstanding_rpc_count; + + if (old_count <= limit && new_count > limit) + ret = rpc_transport_throttle (trans, _gf_true); + + if (old_count > limit && new_count <= limit) + ret = rpc_transport_throttle (trans, _gf_false); + } +unlock: + pthread_mutex_unlock (&trans->lock); + + return ret; +} + + /* This needs to change to returning errors, since * we need to return RPC specific error messages when some * of the pointers below are NULL. @@ -207,7 +238,7 @@ rpcsvc_program_actor (rpcsvc_request_t *req) goto err; } - req->synctask = program->synctask; + req->synctask = program->synctask; err = SUCCESS; gf_log (GF_RPCSVC, GF_LOG_TRACE, "Actor found: %s - %s", @@ -279,8 +310,17 @@ rpcsvc_request_destroy (rpcsvc_request_t *req) if (req->hdr_iobuf) iobuf_unref (req->hdr_iobuf); + /* This marks the "end" of an RPC request. Reply is + completely written to the socket and is on the way + to the client. It is time to decrement the + outstanding request counter by 1. + */ + rpcsvc_request_outstanding (req->svc, req->trans, -1); + rpc_transport_unref (req->trans); + GF_FREE (req->auxgidlarge); + mem_put (req); out: @@ -363,6 +403,12 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, goto err; } + /* We just received a new request from the wire. Account for + it in the outsanding request counter to make sure we don't + ingest too many concurrent requests from the same client. + */ + ret = rpcsvc_request_outstanding (svc, trans, +1); + msgbuf = msg->vector[0].iov_base; msglen = msg->vector[0].iov_len; @@ -422,6 +468,7 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, * since we are not handling authentication failures for now. */ req->rpc_status = MSG_ACCEPTED; + req->reply = NULL; ret = 0; err: if (ret == -1) { @@ -439,9 +486,9 @@ err: int rpcsvc_check_and_reply_error (int ret, call_frame_t *frame, void *opaque) { - rpcsvc_request_t *req = NULL; + rpcsvc_request_t *req = NULL; - req = opaque; + req = opaque; if (ret) gf_log ("rpcsvc", GF_LOG_ERROR, @@ -454,20 +501,22 @@ rpcsvc_check_and_reply_error (int ret, call_frame_t *frame, void *opaque) "failed to queue error reply"); } - return 0; + return 0; } int rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, rpc_transport_pollin_t *msg) { - rpcsvc_actor_t *actor = NULL; - rpcsvc_actor actor_fn = NULL; - rpcsvc_request_t *req = NULL; - int ret = -1; - uint16_t port = 0; - gf_boolean_t is_unix = _gf_false; - gf_boolean_t unprivileged = _gf_false; + rpcsvc_actor_t *actor = NULL; + rpcsvc_actor actor_fn = NULL; + rpcsvc_request_t *req = NULL; + int ret = -1; + uint16_t port = 0; + gf_boolean_t is_unix = _gf_false; + gf_boolean_t unprivileged = _gf_false; + drc_cached_op_t *reply = NULL; + rpcsvc_drc_globals_t *drc = NULL; if (!trans || !svc) return -1; @@ -503,7 +552,7 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, req = rpcsvc_request_create (svc, trans, msg); if (!req) - goto err; + goto out; if (!rpcsvc_request_accepted (req)) goto err_reply; @@ -521,33 +570,65 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, return -1; } + /* DRC */ + if (rpcsvc_need_drc (req)) { + drc = req->svc->drc; + + LOCK (&drc->lock); + reply = rpcsvc_drc_lookup (req); + + /* retransmission of completed request, send cached reply */ + if (reply && reply->state == DRC_OP_CACHED) { + gf_log (GF_RPCSVC, GF_LOG_INFO, "duplicate request:" + " XID: 0x%x", req->xid); + ret = rpcsvc_send_cached_reply (req, reply); + drc->cache_hits++; + UNLOCK (&drc->lock); + goto out; + + } /* retransmitted request, original op in transit, drop it */ + else if (reply && reply->state == DRC_OP_IN_TRANSIT) { + gf_log (GF_RPCSVC, GF_LOG_INFO, "op in transit," + " discarding. XID: 0x%x", req->xid); + ret = 0; + drc->intransit_hits++; + rpcsvc_request_destroy (req); + UNLOCK (&drc->lock); + goto out; + + } /* fresh request, cache it as in-transit and proceed */ + else { + ret = rpcsvc_cache_request (req); + } + UNLOCK (&drc->lock); + } + if (req->rpc_err == SUCCESS) { /* Before going to xlator code, set the THIS properly */ THIS = svc->mydata; - actor_fn = actor->actor; + actor_fn = actor->actor; - if (!actor_fn) { - rpcsvc_request_seterr (req, PROC_UNAVAIL); - /* LOG TODO: print more info about procnum, - prognum etc, also print transport info */ - gf_log (GF_RPCSVC, GF_LOG_ERROR, - "No vectored handler present"); - ret = RPCSVC_ACTOR_ERROR; - goto err_reply; - } + if (!actor_fn) { + rpcsvc_request_seterr (req, PROC_UNAVAIL); + /* LOG TODO: print more info about procnum, + prognum etc, also print transport info */ + gf_log (GF_RPCSVC, GF_LOG_ERROR, + "No vectored handler present"); + ret = RPCSVC_ACTOR_ERROR; + goto err_reply; + } - if (req->synctask) { + if (req->synctask) { if (msg->hdr_iobuf) req->hdr_iobuf = iobuf_ref (msg->hdr_iobuf); - ret = synctask_new (THIS->ctx->env, - (synctask_fn_t) actor_fn, - rpcsvc_check_and_reply_error, NULL, + ret = synctask_new (THIS->ctx->env, + (synctask_fn_t) actor_fn, + rpcsvc_check_and_reply_error, NULL, req); } else { - ret = actor_fn (req); - req->hdr_iobuf = NULL; + ret = actor_fn (req); } } @@ -558,7 +639,7 @@ err_reply: * has now been queued. */ ret = 0; -err: +out: return ret; } @@ -905,21 +986,22 @@ out: return ret; } -static inline int -rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec, - int hdrcount, struct iovec *proghdr, int proghdrcount, - struct iovec *progpayload, int progpayloadcount, - struct iobref *iobref, void *priv) +int +rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr, + int rpchdrcount, struct iovec *proghdr, + int proghdrcount, struct iovec *progpayload, + int progpayloadcount, struct iobref *iobref, + void *priv) { int ret = -1; rpc_transport_reply_t reply = {{0, }}; - if ((!trans) || (!hdrvec) || (!hdrvec->iov_base)) { + if ((!trans) || (!rpchdr) || (!rpchdr->iov_base)) { goto out; } - reply.msg.rpchdr = hdrvec; - reply.msg.rpchdrcount = hdrcount; + reply.msg.rpchdr = rpchdr; + reply.msg.rpchdrcount = rpchdrcount; reply.msg.proghdr = proghdr; reply.msg.proghdrcount = proghdrcount; reply.msg.progpayload = progpayload; @@ -1065,6 +1147,7 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, size_t msglen = 0; size_t hdrlen = 0; char new_iobref = 0; + rpcsvc_drc_globals_t *drc = NULL; if ((!req) || (!req->trans)) return -1; @@ -1099,20 +1182,31 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, iobref_add (iobref, replyiob); + /* cache the request in the duplicate request cache for appropriate ops */ + if (req->reply) { + drc = req->svc->drc; + + LOCK (&drc->lock); + ret = rpcsvc_cache_reply (req, iobref, &recordhdr, 1, + proghdr, hdrcount, + payload, payloadcount); + UNLOCK (&drc->lock); + } + ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount, payload, payloadcount, iobref, req->trans_private); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "failed to submit message " - "(XID: 0x%ux, Program: %s, ProgVers: %d, Proc: %d) to " + "(XID: 0x%x, Program: %s, ProgVers: %d, Proc: %d) to " "rpc-transport (%s)", req->xid, req->prog ? req->prog->progname : "(not matched)", req->prog ? req->prog->progver : 0, req->procnum, trans->name); } else { gf_log (GF_RPCSVC, GF_LOG_TRACE, - "submitted reply for rpc-message (XID: 0x%ux, " + "submitted reply for rpc-message (XID: 0x%x, " "Program: %s, ProgVers: %d, Proc: %d) to rpc-transport " "(%s)", req->xid, req->prog ? req->prog->progname: "-", req->prog ? req->prog->progver : 0, @@ -1155,12 +1249,13 @@ rpcsvc_error_reply (rpcsvc_request_t *req) inline int rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port) { - int ret = 0; + int ret = -1; /* FAIL */ if (!newprog) { goto out; } + /* pmap_set() returns 0 for FAIL and 1 for SUCCESS */ if (!(pmap_set (newprog->prognum, newprog->progver, IPPROTO_TCP, port))) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register with" @@ -1168,16 +1263,16 @@ rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port) goto out; } - ret = 0; + ret = 0; /* SUCCESS */ out: return ret; } -static inline int +inline int rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog) { - int ret = 0; + int ret = -1; if (!prog) goto out; @@ -1795,12 +1890,92 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options) gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Portmap registration " "disabled"); - ret = 0; + ret = rpcsvc_set_outstanding_rpc_limit (svc, options); out: return ret; } int +rpcsvc_reconfigure_options (rpcsvc_t *svc, dict_t *options) +{ + xlator_t *xlator = NULL; + xlator_list_t *volentry = NULL; + char *srchkey = NULL; + char *keyval = NULL; + int ret = -1; + + if ((!svc) || (!svc->options) || (!options)) + return (-1); + + /* Fetch the xlator from svc */ + xlator = (xlator_t *) svc->mydata; + if (!xlator) + return (-1); + + /* Reconfigure the volume specific rpc-auth.addr allow part */ + volentry = xlator->children; + while (volentry) { + ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.allow", + volentry->xlator->name); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); + return (-1); + } + + /* If found the srchkey, delete old key/val pair + * and set the key with new value. + */ + if (!dict_get_str (options, srchkey, &keyval)) { + dict_del (svc->options, srchkey); + ret = dict_set_str (svc->options, srchkey, keyval); + if (ret < 0) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, + "dict_set_str error"); + GF_FREE (srchkey); + return (-1); + } + } + + GF_FREE (srchkey); + volentry = volentry->next; + } + + /* Reconfigure the volume specific rpc-auth.addr reject part */ + volentry = xlator->children; + while (volentry) { + ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.reject", + volentry->xlator->name); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); + return (-1); + } + + /* If found the srchkey, delete old key/val pair + * and set the key with new value. + */ + if (!dict_get_str (options, srchkey, &keyval)) { + dict_del (svc->options, srchkey); + ret = dict_set_str (svc->options, srchkey, keyval); + if (ret < 0) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, + "dict_set_str error"); + GF_FREE (srchkey); + return (-1); + } + } + + GF_FREE (srchkey); + volentry = volentry->next; + } + + ret = rpcsvc_init_options (svc, options); + if (ret) + return (-1); + + return rpcsvc_auth_reconf (svc, options); +} + +int rpcsvc_transport_unix_options_build (dict_t **options, char *filepath) { dict_t *dict = NULL; @@ -1846,6 +2021,48 @@ out: return ret; } +/* + * Reconfigure() the rpc.outstanding-rpc-limit param. + */ +int +rpcsvc_set_outstanding_rpc_limit (rpcsvc_t *svc, dict_t *options) +{ + int ret = -1; /* FAILURE */ + int rpclim = 0; + static char *rpclimkey = "rpc.outstanding-rpc-limit"; + + if ((!svc) || (!options)) + return (-1); + + /* Reconfigure() the rpc.outstanding-rpc-limit param */ + ret = dict_get_int32 (options, rpclimkey, &rpclim); + if (ret < 0) { + /* Fall back to default for FAILURE */ + rpclim = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT; + } else { + /* SUCCESS: round off to multiple of 8. + * If the input value fails Boundary check, fall back to + * default i.e. RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT. + * NB: value 0 is special, means its unset i.e. unlimited. + */ + rpclim = ((rpclim + 8 - 1) >> 3) * 8; + if (rpclim < RPCSVC_MIN_OUTSTANDING_RPC_LIMIT) { + rpclim = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT; + } else if (rpclim > RPCSVC_MAX_OUTSTANDING_RPC_LIMIT) { + rpclim = RPCSVC_MAX_OUTSTANDING_RPC_LIMIT; + } + } + + if (svc->outstanding_rpc_limit != rpclim) { + svc->outstanding_rpc_limit = rpclim; + gf_log (GF_RPCSVC, GF_LOG_INFO, + "Configured %s with value %d", + rpclimkey, rpclim); + } + + return (0); +} + /* The global RPC service initializer. */ rpcsvc_t * @@ -1906,6 +2123,7 @@ rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options, "failed to register DUMP program"); goto free_svc; } + ret = 0; free_svc: if (ret == -1) { @@ -1918,21 +2136,16 @@ free_svc: int -rpcsvc_transport_peer_check_search (dict_t *options, char *pattern, char *clstr) +rpcsvc_transport_peer_check_search (dict_t *options, char *pattern, + char *ip, char *hostname) { - int ret = -1; - char *addrtok = NULL; - char *addrstr = NULL; - char *dup_addrstr = NULL; - char *svptr = NULL; - char *fqdn = NULL; + int ret = -1; + char *addrtok = NULL; + char *addrstr = NULL; + char *dup_addrstr = NULL; + char *svptr = NULL; - if ((!options) || (!clstr)) - return -1; - - ret = dict_get_str (options, "fqdn", &fqdn); - - if (!dict_get (options, pattern)) + if ((!options) || (!ip)) return -1; ret = dict_get_str (options, pattern, &addrstr); @@ -1952,19 +2165,19 @@ rpcsvc_transport_peer_check_search (dict_t *options, char *pattern, char *clstr) /* CASEFOLD not present on Solaris */ #ifdef FNM_CASEFOLD - ret = fnmatch (addrtok, clstr, FNM_CASEFOLD); + ret = fnmatch (addrtok, ip, FNM_CASEFOLD); #else - ret = fnmatch (addrtok, clstr, 0); + ret = fnmatch (addrtok, ip, 0); #endif if (ret == 0) goto err; /* compare hostnames if applicable */ - if (fqdn) { + if (hostname) { #ifdef FNM_CASEFOLD - ret = fnmatch (addrtok, fqdn, FNM_CASEFOLD); + ret = fnmatch (addrtok, hostname, FNM_CASEFOLD); #else - ret = fnmatch (addrtok, fqdn, 0); + ret = fnmatch (addrtok, hostname, 0); #endif if (ret == 0) goto err; @@ -1981,66 +2194,56 @@ err: } -int -rpcsvc_transport_peer_check_allow (dict_t *options, char *volname, char *clstr) +static int +rpcsvc_transport_peer_check_allow (dict_t *options, char *volname, + char *ip, char *hostname) { - int ret = RPCSVC_AUTH_DONTCARE; + int ret = RPCSVC_AUTH_DONTCARE; char *srchstr = NULL; - char globalrule[] = "rpc-auth.addr.allow"; - if ((!options) || (!clstr)) + if ((!options) || (!ip) || (!volname)) return ret; - /* If volname is NULL, then we're searching for the general rule to - * determine the current address in clstr is allowed or not for all - * subvolumes. - */ - if (volname) { - ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname); - if (ret == -1) { - gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); - ret = RPCSVC_AUTH_DONTCARE; - goto out; - } - } else - srchstr = globalrule; + ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); + ret = RPCSVC_AUTH_DONTCARE; + goto out; + } - ret = rpcsvc_transport_peer_check_search (options, srchstr, clstr); - if (volname) - GF_FREE (srchstr); + ret = rpcsvc_transport_peer_check_search (options, srchstr, + ip, hostname); + GF_FREE (srchstr); if (ret == 0) ret = RPCSVC_AUTH_ACCEPT; else - ret = RPCSVC_AUTH_DONTCARE; + ret = RPCSVC_AUTH_REJECT; out: return ret; } -int -rpcsvc_transport_peer_check_reject (dict_t *options, char *volname, char *clstr) +static int +rpcsvc_transport_peer_check_reject (dict_t *options, char *volname, + char *ip, char *hostname) { - int ret = RPCSVC_AUTH_DONTCARE; + int ret = RPCSVC_AUTH_DONTCARE; char *srchstr = NULL; - char generalrule[] = "rpc-auth.addr.reject"; - if ((!options) || (!clstr)) + if ((!options) || (!ip) || (!volname)) return ret; - if (volname) { - ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject", - volname); - if (ret == -1) { - gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); - ret = RPCSVC_AUTH_REJECT; - goto out; - } - } else - srchstr = generalrule; + ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject", + volname); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); + ret = RPCSVC_AUTH_REJECT; + goto out; + } - ret = rpcsvc_transport_peer_check_search (options, srchstr, clstr); - if (volname) - GF_FREE (srchstr); + ret = rpcsvc_transport_peer_check_search (options, srchstr, + ip, hostname); + GF_FREE (srchstr); if (ret == 0) ret = RPCSVC_AUTH_REJECT; @@ -2051,309 +2254,113 @@ out: } -/* This function tests the results of the allow rule and the reject rule to - * combine them into a single result that can be used to determine if the - * connection should be allowed to proceed. - * Heres the test matrix we need to follow in this function. - * - * A - Allow, the result of the allow test. Never returns R. - * R - Reject, result of the reject test. Never returns A. - * Both can return D or dont care if no rule was given. - * - * | @allow | @reject | Result | - * | A | R | R | - * | D | D | D | - * | A | D | A | - * | D | R | R | +/* Combines rpc auth's allow and reject options. + * Order of checks is important. + * First, REJECT if either rejects. + * If neither rejects, ACCEPT if either accepts. + * If neither accepts, DONTCARE */ int rpcsvc_combine_allow_reject_volume_check (int allow, int reject) { - int final = RPCSVC_AUTH_REJECT; - - /* If allowed rule allows but reject rule rejects, we stay cautious - * and reject. */ - if ((allow == RPCSVC_AUTH_ACCEPT) && (reject == RPCSVC_AUTH_REJECT)) - final = RPCSVC_AUTH_REJECT; - /* if both are dont care, that is user did not specify for either allow - * or reject, we leave it up to the general rule to apply, in the hope - * that there is one. - */ - else if ((allow == RPCSVC_AUTH_DONTCARE) && - (reject == RPCSVC_AUTH_DONTCARE)) - final = RPCSVC_AUTH_DONTCARE; - /* If one is dont care, the other one applies. */ - else if ((allow == RPCSVC_AUTH_ACCEPT) && - (reject == RPCSVC_AUTH_DONTCARE)) - final = RPCSVC_AUTH_ACCEPT; - else if ((allow == RPCSVC_AUTH_DONTCARE) && - (reject == RPCSVC_AUTH_REJECT)) - final = RPCSVC_AUTH_REJECT; - - return final; -} - - -/* Combines the result of the general rule test against, the specific rule - * to determine final permission for the client's address. - * - * | @gen | @spec | Result | - * | A | A | A | - * | A | R | R | - * | A | D | A | - * | D | A | A | - * | D | R | R | - * | D | D | D | - * | R | A | A | - * | R | D | R | - * | R | R | R | - */ -int -rpcsvc_combine_gen_spec_addr_checks (int gen, int spec) -{ - int final = RPCSVC_AUTH_REJECT; - - if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_ACCEPT)) - final = RPCSVC_AUTH_ACCEPT; - else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_REJECT)) - final = RPCSVC_AUTH_REJECT; - else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_DONTCARE)) - final = RPCSVC_AUTH_ACCEPT; - else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_ACCEPT)) - final = RPCSVC_AUTH_ACCEPT; - else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_REJECT)) - final = RPCSVC_AUTH_REJECT; - else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec== RPCSVC_AUTH_DONTCARE)) - final = RPCSVC_AUTH_DONTCARE; - else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_ACCEPT)) - final = RPCSVC_AUTH_ACCEPT; - else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_DONTCARE)) - final = RPCSVC_AUTH_REJECT; - else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_REJECT)) - final = RPCSVC_AUTH_REJECT; - - return final; -} - + if (allow == RPCSVC_AUTH_REJECT || + reject == RPCSVC_AUTH_REJECT) + return RPCSVC_AUTH_REJECT; + if (allow == RPCSVC_AUTH_ACCEPT || + reject == RPCSVC_AUTH_ACCEPT) + return RPCSVC_AUTH_ACCEPT; -/* Combines the result of the general rule test against, the specific rule - * to determine final test for the connection coming in for a given volume. - * - * | @gen | @spec | Result | - * | A | A | A | - * | A | R | R | - * | A | D | A | - * | D | A | A | - * | D | R | R | - * | D | D | R |, special case, we intentionally disallow this. - * | R | A | A | - * | R | D | R | - * | R | R | R | - */ -int -rpcsvc_combine_gen_spec_volume_checks (int gen, int spec) -{ - int final = RPCSVC_AUTH_REJECT; - - if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_ACCEPT)) - final = RPCSVC_AUTH_ACCEPT; - else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_REJECT)) - final = RPCSVC_AUTH_REJECT; - else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_DONTCARE)) - final = RPCSVC_AUTH_ACCEPT; - else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_ACCEPT)) - final = RPCSVC_AUTH_ACCEPT; - else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_REJECT)) - final = RPCSVC_AUTH_REJECT; - /* On no rule, we reject. */ - else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec== RPCSVC_AUTH_DONTCARE)) - final = RPCSVC_AUTH_REJECT; - else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_ACCEPT)) - final = RPCSVC_AUTH_ACCEPT; - else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_DONTCARE)) - final = RPCSVC_AUTH_REJECT; - else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_REJECT)) - final = RPCSVC_AUTH_REJECT; - - return final; + return RPCSVC_AUTH_DONTCARE; } - int -rpcsvc_transport_peer_check_name (dict_t *options, char *volname, - rpc_transport_t *trans) +rpcsvc_auth_check (rpcsvc_t *svc, char *volname, + rpc_transport_t *trans) { - int ret = RPCSVC_AUTH_REJECT; - int aret = RPCSVC_AUTH_REJECT; - int rjret = RPCSVC_AUTH_REJECT; - char clstr[RPCSVC_PEER_STRLEN]; - char *hostname = NULL; - - if (!trans) + int ret = RPCSVC_AUTH_REJECT; + int accept = RPCSVC_AUTH_REJECT; + int reject = RPCSVC_AUTH_REJECT; + char *hostname = NULL; + char *ip = NULL; + char client_ip[RPCSVC_PEER_STRLEN] = {0}; + char *allow_str = NULL; + char *reject_str = NULL; + char *srchstr = NULL; + dict_t *options = NULL; + + if (!svc || !volname || !trans) return ret; - ret = rpcsvc_transport_peername (trans, clstr, RPCSVC_PEER_STRLEN); - if (ret != 0) { - gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: " - "%s", gai_strerror (ret)); - ret = RPCSVC_AUTH_REJECT; - goto err; - } - - ret = gf_get_hostname_from_ip (clstr, &hostname); - if (!ret) - ret = dict_set_dynstr (options, "fqdn", - hostname); - - aret = rpcsvc_transport_peer_check_allow (options, volname, clstr); - rjret = rpcsvc_transport_peer_check_reject (options, volname, clstr); - - ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret); - -err: - return ret; -} - - -int -rpcsvc_transport_peer_check_addr (dict_t *options, char *volname, - rpc_transport_t *trans) -{ - int ret = RPCSVC_AUTH_REJECT; - int aret = RPCSVC_AUTH_DONTCARE; - int rjret = RPCSVC_AUTH_REJECT; - char clstr[RPCSVC_PEER_STRLEN]; - char *tmp = NULL; - union gf_sock_union sock_union; - - if (!trans) + /* Fetch the options from svc struct and validate */ + options = svc->options; + if (!options) return ret; - ret = rpcsvc_transport_peeraddr (trans, clstr, RPCSVC_PEER_STRLEN, - &sock_union.storage, - sizeof (sock_union.storage)); + ret = rpcsvc_transport_peername (trans, client_ip, RPCSVC_PEER_STRLEN); if (ret != 0) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: " "%s", gai_strerror (ret)); - ret = RPCSVC_AUTH_REJECT; - goto err; - } - - switch (sock_union.sa.sa_family) { - - case AF_INET: - case AF_INET6: - tmp = strrchr (clstr, ':'); - if (tmp) - *tmp = '\0'; - break; - } - - aret = rpcsvc_transport_peer_check_allow (options, volname, clstr); - rjret = rpcsvc_transport_peer_check_reject (options, volname, clstr); - - ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret); -err: - return ret; -} - - -int -rpcsvc_transport_check_volume_specific (dict_t *options, char *volname, - rpc_transport_t *trans) -{ - int namechk = RPCSVC_AUTH_REJECT; - int addrchk = RPCSVC_AUTH_REJECT; - gf_boolean_t namelookup = _gf_false; - char *namestr = NULL; - int ret = 0; - - if ((!options) || (!volname) || (!trans)) return RPCSVC_AUTH_REJECT; - - /* Disabled by default */ - if ((dict_get (options, "rpc-auth.addr.namelookup"))) { - ret = dict_get_str (options, "rpc-auth.addr.namelookup" - , &namestr); - if (ret == 0) - ret = gf_string2boolean (namestr, &namelookup); } - /* We need two separate checks because the rules with addresses in them - * can be network addresses which can be general and names can be - * specific which will over-ride the network address rules. + /* Accept if its the default case: Allow all, Reject none + * The default volfile always contains a 'allow *' rule + * for each volume. If allow rule is missing (which implies + * there is some bad volfile generating code doing this), we + * assume no one is allowed mounts, and thus, we reject mounts. */ - if (namelookup) - namechk = rpcsvc_transport_peer_check_name (options, volname, - trans); - addrchk = rpcsvc_transport_peer_check_addr (options, volname, trans); - - if (namelookup) - ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, - namechk); - else - ret = addrchk; - - return ret; -} - - -int -rpcsvc_transport_check_volume_general (dict_t *options, rpc_transport_t *trans) -{ - int addrchk = RPCSVC_AUTH_REJECT; - int namechk = RPCSVC_AUTH_REJECT; - gf_boolean_t namelookup = _gf_false; - char *namestr = NULL; - int ret = 0; - - if ((!options) || (!trans)) + ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); return RPCSVC_AUTH_REJECT; - - /* Disabled by default */ - if ((dict_get (options, "rpc-auth.addr.namelookup"))) { - ret = dict_get_str (options, "rpc-auth.addr.namelookup" - , &namestr); - if (ret == 0) - ret = gf_string2boolean (namestr, &namelookup); } - /* We need two separate checks because the rules with addresses in them - * can be network addresses which can be general and names can be - * specific which will over-ride the network address rules. - */ - if (namelookup) - namechk = rpcsvc_transport_peer_check_name (options, NULL, trans); - addrchk = rpcsvc_transport_peer_check_addr (options, NULL, trans); - - if (namelookup) - ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk); - else - ret = addrchk; + ret = dict_get_str (options, srchstr, &allow_str); + GF_FREE (srchstr); + if (ret < 0) + return RPCSVC_AUTH_REJECT; - return ret; -} + ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject", volname); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); + return RPCSVC_AUTH_REJECT; + } -int -rpcsvc_transport_peer_check (dict_t *options, char *volname, - rpc_transport_t *trans) -{ - int general_chk = RPCSVC_AUTH_REJECT; - int specific_chk = RPCSVC_AUTH_REJECT; + ret = dict_get_str (options, srchstr, &reject_str); + GF_FREE (srchstr); + if (reject_str == NULL && !strcmp ("*", allow_str)) + return RPCSVC_AUTH_ACCEPT; + + /* Non-default rule, authenticate */ + if (!get_host_name (client_ip, &ip)) + ip = client_ip; + + /* addr-namelookup check */ + if (svc->addr_namelookup == _gf_true) { + ret = gf_get_hostname_from_ip (ip, &hostname); + if (ret) { + if (hostname) + GF_FREE (hostname); + /* failed to get hostname, but hostname auth + * is enabled, so authentication will not be + * 100% correct. reject mounts + */ + return RPCSVC_AUTH_REJECT; + } + } - if ((!options) || (!volname) || (!trans)) - return RPCSVC_AUTH_REJECT; + accept = rpcsvc_transport_peer_check_allow (options, volname, + ip, hostname); - general_chk = rpcsvc_transport_check_volume_general (options, trans); - specific_chk = rpcsvc_transport_check_volume_specific (options, volname, - trans); + reject = rpcsvc_transport_peer_check_reject (options, volname, + ip, hostname); - return rpcsvc_combine_gen_spec_volume_checks (general_chk, - specific_chk); + if (hostname) + GF_FREE (hostname); + return rpcsvc_combine_allow_reject_volume_check (accept, reject); } - int rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname, rpc_transport_t *trans) @@ -2363,8 +2370,6 @@ rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname, socklen_t sinsize = sizeof (&sock_union.sin); char *srchstr = NULL; char *valstr = NULL; - int globalinsecure = RPCSVC_AUTH_REJECT; - int exportinsecure = RPCSVC_AUTH_DONTCARE; uint16_t port = 0; gf_boolean_t insecure = _gf_false; @@ -2393,23 +2398,6 @@ rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname, } /* Disabled by default */ - if ((dict_get (svc->options, "rpc-auth.ports.insecure"))) { - ret = dict_get_str (svc->options, "rpc-auth.ports.insecure" - , &srchstr); - if (ret == 0) { - ret = gf_string2boolean (srchstr, &insecure); - if (ret == 0) { - if (insecure == _gf_true) - globalinsecure = RPCSVC_AUTH_ACCEPT; - } else - gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to" - " read rpc-auth.ports.insecure value"); - } else - gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to" - " read rpc-auth.ports.insecure value"); - } - - /* Disabled by default */ ret = gf_asprintf (&srchstr, "rpc-auth.ports.%s.insecure", volname); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); @@ -2417,25 +2405,22 @@ rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname, goto err; } - if (dict_get (svc->options, srchstr)) { - ret = dict_get_str (svc->options, srchstr, &valstr); - if (ret == 0) { - ret = gf_string2boolean (valstr, &insecure); - if (ret == 0) { - if (insecure == _gf_true) - exportinsecure = RPCSVC_AUTH_ACCEPT; - else - exportinsecure = RPCSVC_AUTH_REJECT; - } else - gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to" - " read rpc-auth.ports.insecure value"); - } else - gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to" - " read rpc-auth.ports.insecure value"); - } - - ret = rpcsvc_combine_gen_spec_volume_checks (globalinsecure, - exportinsecure); + ret = dict_get_str (svc->options, srchstr, &valstr); + if (ret) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to" + " read rpc-auth.ports.insecure value"); + goto err; + } + + ret = gf_string2boolean (valstr, &insecure); + if (ret) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to" + " convert rpc-auth.ports.insecure value"); + goto err; + } + + ret = insecure ? RPCSVC_AUTH_ACCEPT : RPCSVC_AUTH_REJECT; + if (ret == RPCSVC_AUTH_ACCEPT) gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port allowed"); else @@ -2443,7 +2428,8 @@ rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname, " allowed"); err: - GF_FREE (srchstr); + if (srchstr) + GF_FREE (srchstr); return ret; } @@ -2479,9 +2465,9 @@ out: rpcsvc_actor_t gluster_dump_actors[] = { - [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0}, - [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0}, - [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0}, + [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA}, + [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA}, + [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0, DRC_NA}, }; diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 1323c8b7a..cbc1f4226 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -38,6 +38,10 @@ #define MAX_IOVEC 16 #endif +#define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT 64 +#define RPCSVC_MAX_OUTSTANDING_RPC_LIMIT 65536 +#define RPCSVC_MIN_OUTSTANDING_RPC_LIMIT 0 /* No limit i.e. Unlimited */ + #define GF_RPCSVC "rpc-service" #define RPCSVC_THREAD_STACK_SIZE ((size_t)(1024 * GF_UNIT_KB)) @@ -140,6 +144,9 @@ typedef struct rpcsvc_auth_data { #define rpcsvc_auth_flavour(au) ((au).flavour) +typedef struct drc_client drc_client_t; +typedef struct drc_cached_op drc_cached_op_t; + /* The container for the RPC call handed up to an actor. * Dynamically allocated. Lives till the call reply is completely * transmitted. @@ -178,7 +185,9 @@ struct rpcsvc_request { /* Might want to move this to AUTH_UNIX specific state since this array * is not available for every authentication scheme. */ - gid_t auxgids[GF_MAX_AUX_GROUPS]; + gid_t *auxgids; + gid_t auxgidsmall[SMALL_GROUP_COUNT]; + gid_t *auxgidlarge; int auxgidcount; @@ -241,6 +250,9 @@ struct rpcsvc_request { /* we need to ref the 'iobuf' in case of 'synctasking' it */ struct iobuf *hdr_iobuf; + + /* pointer to cached reply for use in DRC */ + drc_cached_op_t *reply; }; #define rpcsvc_request_program(req) ((rpcsvc_program_t *)((req)->prog)) @@ -314,7 +326,6 @@ typedef void *(*rpcsvc_encode_reply) (void *msg); */ typedef void (*rpcsvc_deallocate_reply) (void *msg); - #define RPCSVC_NAME_MAX 32 /* The descriptor for each procedure/actor that runs * over the RPC service. @@ -336,6 +347,7 @@ typedef struct rpcsvc_actor_desc { /* Can actor be ran on behalf an unprivileged requestor? */ gf_boolean_t unprivileged; + drc_op_type_t op_type; } rpcsvc_actor_t; /* Describes a program and its version along with the function pointers @@ -429,6 +441,9 @@ extern int rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port); extern int +rpcsvc_program_unregister_portmap (rpcsvc_program_t *newprog); + +extern int rpcsvc_register_portmap_enabled (rpcsvc_t *svc); /* Inits the global RPC service data structures. @@ -438,6 +453,9 @@ extern rpcsvc_t * rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options, uint32_t poolcount); +extern int +rpcsvc_reconfigure_options (rpcsvc_t *svc, dict_t *options); + int rpcsvc_register_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata); @@ -448,6 +466,13 @@ int rpcsvc_unregister_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata); int +rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr, + int rpchdrcount, struct iovec *proghdr, + int proghdrcount, struct iovec *progpayload, + int progpayloadcount, struct iobref *iobref, + void *priv); + +int rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr, int hdrcount, struct iovec *payload, int payloadcount, struct iobref *iobref); @@ -473,12 +498,12 @@ rpcsvc_transport_peeraddr (rpc_transport_t *trans, char *addrstr, int addrlen, struct sockaddr_storage *returnsa, socklen_t sasize); extern int -rpcsvc_transport_peer_check (dict_t *options, char *volname, - rpc_transport_t *trans); +rpcsvc_auth_check (rpcsvc_t *svc, char *volname, rpc_transport_t *trans); extern int rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname, rpc_transport_t *trans); + #define rpcsvc_request_seterr(req, err) (req)->rpc_err = err #define rpcsvc_request_set_autherr(req, err) (req)->auth_err = err @@ -534,6 +559,9 @@ extern int rpcsvc_auth_init (rpcsvc_t *svc, dict_t *options); extern int +rpcsvc_auth_reconf (rpcsvc_t *svc, dict_t *options); + +extern int rpcsvc_auth_transport_init (rpc_transport_t *xprt); extern int @@ -550,9 +578,6 @@ rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen); extern gid_t * rpcsvc_auth_unix_auxgids (rpcsvc_request_t *req, int *arrlen); -extern int -rpcsvc_combine_gen_spec_volume_checks (int gen, int spec); - extern char * rpcsvc_volume_allowed (dict_t *options, char *volname); @@ -560,18 +585,22 @@ int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, rpcsvc_cbk_program_t *prog, int procnum, struct iovec *proghdr, int proghdrcount); +rpcsvc_actor_t * +rpcsvc_program_actor (rpcsvc_request_t *req); + int rpcsvc_transport_unix_options_build (dict_t **options, char *filepath); int rpcsvc_set_allow_insecure (rpcsvc_t *svc, dict_t *options); int +rpcsvc_set_addr_namelookup (rpcsvc_t *svc, dict_t *options); +int rpcsvc_set_root_squash (rpcsvc_t *svc, dict_t *options); int +rpcsvc_set_outstanding_rpc_limit (rpcsvc_t *svc, dict_t *options); +int rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen); -char * -rpcsvc_volume_allowed (dict_t *options, char *volname); rpcsvc_vector_sizer rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum, uint32_t progver, uint32_t procnum); - #endif diff --git a/rpc/rpc-lib/src/xdr-rpc.c b/rpc/rpc-lib/src/xdr-rpc.c index ef52764c3..adb48a531 100644 --- a/rpc/rpc-lib/src/xdr-rpc.c +++ b/rpc/rpc-lib/src/xdr-rpc.c @@ -34,7 +34,7 @@ xdr_to_rpc_call (char *msgbuf, size_t len, struct rpc_msg *call, struct iovec *payload, char *credbytes, char *verfbytes) { XDR xdr; - char opaquebytes[MAX_AUTH_BYTES]; + char opaquebytes[GF_MAX_AUTH_BYTES]; struct opaque_auth *oa = NULL; int ret = -1; |
