summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src')
-rw-r--r--rpc/rpc-lib/src/Makefile.am8
-rw-r--r--rpc/rpc-lib/src/auth-glusterfs.c32
-rw-r--r--rpc/rpc-lib/src/auth-unix.c1
-rw-r--r--rpc/rpc-lib/src/protocol-common.h36
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c145
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.h7
-rw-r--r--rpc/rpc-lib/src/rpc-drc.c872
-rw-r--r--rpc/rpc-lib/src/rpc-drc.h104
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c104
-rw-r--r--rpc/rpc-lib/src/rpc-transport.h18
-rw-r--r--rpc/rpc-lib/src/rpcsvc-auth.c125
-rw-r--r--rpc/rpc-lib/src/rpcsvc-common.h64
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c822
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h49
-rw-r--r--rpc/rpc-lib/src/xdr-rpc.c2
15 files changed, 1770 insertions, 619 deletions
diff --git a/rpc/rpc-lib/src/Makefile.am b/rpc/rpc-lib/src/Makefile.am
index ca62a27f9..f19c3c8a4 100644
--- a/rpc/rpc-lib/src/Makefile.am
+++ b/rpc/rpc-lib/src/Makefile.am
@@ -1,16 +1,18 @@
lib_LTLIBRARIES = libgfrpc.la
libgfrpc_la_SOURCES = auth-unix.c rpcsvc-auth.c rpcsvc.c auth-null.c \
- rpc-transport.c xdr-rpc.c xdr-rpcclnt.c rpc-clnt.c auth-glusterfs.c
+ rpc-transport.c xdr-rpc.c xdr-rpcclnt.c rpc-clnt.c auth-glusterfs.c \
+ rpc-drc.c
libgfrpc_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
noinst_HEADERS = rpcsvc.h rpc-transport.h xdr-common.h xdr-rpc.h xdr-rpcclnt.h \
- rpc-clnt.h rpcsvc-common.h protocol-common.h
+ rpc-clnt.h rpcsvc-common.h protocol-common.h rpc-drc.h
AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \
-I$(top_srcdir)/rpc/xdr/src \
- -DRPC_TRANSPORTDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport\"
+ -DRPC_TRANSPORTDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport\" \
+ -I$(top_srcdir)/contrib/rbtree
AM_CFLAGS = -Wall $(GF_CFLAGS)
diff --git a/rpc/rpc-lib/src/auth-glusterfs.c b/rpc/rpc-lib/src/auth-glusterfs.c
index 9c6f8385b..db488434c 100644
--- a/rpc/rpc-lib/src/auth-glusterfs.c
+++ b/rpc/rpc-lib/src/auth-glusterfs.c
@@ -96,6 +96,22 @@ int auth_glusterfs_authenticate (rpcsvc_request_t *req, void *priv)
goto err;
}
+ if (req->auxgidcount > SMALL_GROUP_COUNT) {
+ req->auxgidlarge = GF_CALLOC(req->auxgidcount,
+ sizeof(req->auxgids[0]),
+ gf_common_mt_auxgids);
+ req->auxgids = req->auxgidlarge;
+ } else {
+ req->auxgids = req->auxgidsmall;
+ }
+
+ if (!req->auxgids) {
+ gf_log ("auth-glusterfs", GF_LOG_WARNING,
+ "cannot allocate gid list");
+ ret = RPCSVC_AUTH_REJECT;
+ goto err;
+ }
+
for (gidcount = 0; gidcount < au.ngrps; ++gidcount)
req->auxgids[gidcount] = au.groups[gidcount];
@@ -203,6 +219,22 @@ int auth_glusterfs_v2_authenticate (rpcsvc_request_t *req, void *priv)
goto err;
}
+ if (req->auxgidcount > SMALL_GROUP_COUNT) {
+ req->auxgidlarge = GF_CALLOC(req->auxgidcount,
+ sizeof(req->auxgids[0]),
+ gf_common_mt_auxgids);
+ req->auxgids = req->auxgidlarge;
+ } else {
+ req->auxgids = req->auxgidsmall;
+ }
+
+ if (!req->auxgids) {
+ gf_log ("auth-glusterfs-v2", GF_LOG_WARNING,
+ "cannot allocate gid list");
+ ret = RPCSVC_AUTH_REJECT;
+ goto err;
+ }
+
for (i = 0; i < req->auxgidcount; ++i)
req->auxgids[i] = au.groups.groups_val[i];
diff --git a/rpc/rpc-lib/src/auth-unix.c b/rpc/rpc-lib/src/auth-unix.c
index 6251d60a8..fa5f0576e 100644
--- a/rpc/rpc-lib/src/auth-unix.c
+++ b/rpc/rpc-lib/src/auth-unix.c
@@ -42,6 +42,7 @@ int auth_unix_authenticate (rpcsvc_request_t *req, void *priv)
if (!req)
return ret;
+ req->auxgids = req->auxgidsmall;
ret = xdr_to_auth_unix_cred (req->cred.authdata, req->cred.datalen,
&aup, machname, req->auxgids);
if (ret == -1) {
diff --git a/rpc/rpc-lib/src/protocol-common.h b/rpc/rpc-lib/src/protocol-common.h
index 97017e5fe..8bef906cc 100644
--- a/rpc/rpc-lib/src/protocol-common.h
+++ b/rpc/rpc-lib/src/protocol-common.h
@@ -56,6 +56,9 @@ enum gf_fop_procnum {
GFS3_OP_RELEASE,
GFS3_OP_RELEASEDIR,
GFS3_OP_FREMOVEXATTR,
+ GFS3_OP_FALLOCATE,
+ GFS3_OP_DISCARD,
+ GFS3_OP_ZEROFILL,
GFS3_OP_MAXVALUE,
} ;
@@ -154,7 +157,10 @@ enum gluster_cli_procnum {
GLUSTER_CLI_LIST_VOLUME,
GLUSTER_CLI_CLRLOCKS_VOLUME,
GLUSTER_CLI_UUID_RESET,
- GLUSTER_CLI_BD_OP,
+ GLUSTER_CLI_UUID_GET,
+ GLUSTER_CLI_COPY_FILE,
+ GLUSTER_CLI_SYS_EXEC,
+ GLUSTER_CLI_SNAP,
GLUSTER_CLI_MAXVALUE,
};
@@ -186,7 +192,7 @@ enum glusterd_brick_procnum {
GLUSTERD_BRICK_XLATOR_DEFRAG,
GLUSTERD_NODE_PROFILE,
GLUSTERD_NODE_STATUS,
- GLUSTERD_BRICK_BD_OP,
+ GLUSTERD_VOLUME_BARRIER_OP,
GLUSTERD_BRICK_MAXVALUE,
};
@@ -204,16 +210,22 @@ typedef enum {
GF_AFR_OP_INDEX_SUMMARY,
GF_AFR_OP_HEALED_FILES,
GF_AFR_OP_HEAL_FAILED_FILES,
- GF_AFR_OP_SPLIT_BRAIN_FILES
+ GF_AFR_OP_SPLIT_BRAIN_FILES,
+ GF_AFR_OP_STATISTICS,
+ GF_AFR_OP_STATISTICS_HEAL_COUNT,
+ GF_AFR_OP_STATISTICS_HEAL_COUNT_PER_REPLICA,
} gf_xl_afr_op_t ;
-typedef enum {
- GF_BD_OP_INVALID,
- GF_BD_OP_NEW_BD,
- GF_BD_OP_DELETE_BD,
- GF_BD_OP_CLONE_BD,
- GF_BD_OP_SNAPSHOT_BD,
-} gf_xl_bd_op_t ;
+enum glusterd_mgmt_v3_procnum {
+ GLUSTERD_MGMT_V3_NULL, /* 0 */
+ GLUSTERD_MGMT_V3_LOCK,
+ GLUSTERD_MGMT_V3_PRE_VALIDATE,
+ GLUSTERD_MGMT_V3_BRICK_OP,
+ GLUSTERD_MGMT_V3_COMMIT,
+ GLUSTERD_MGMT_V3_POST_VALIDATE,
+ GLUSTERD_MGMT_V3_UNLOCK,
+ GLUSTERD_MGMT_V3_MAXVALUE,
+};
#define GLUSTER_HNDSK_PROGRAM 14398633 /* Completely random */
#define GLUSTER_HNDSK_VERSION 2 /* 0.0.2 */
@@ -241,6 +253,10 @@ typedef enum {
#define GD_BRICK_PROGRAM 4867634 /*Completely random*/
#define GD_BRICK_VERSION 2
+/* Third version */
+#define GD_MGMT_V3_PROGRAM 2210013 /* Completely random */
+#define GD_MGMT_V3_VERSION 3
+
/* OP-VERSION handshake */
#define GD_MGMT_HNDSK_PROGRAM 1239873 /* Completely random */
#define GD_MGMT_HNDSK_VERSION 1
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index e6c681df8..ac98a5c91 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -145,7 +145,7 @@ call_bail (void *data)
struct saved_frame *trav = NULL;
struct saved_frame *tmp = NULL;
char frame_sent[256] = {0,};
- struct timeval timeout = {0,};
+ struct timespec timeout = {0,};
struct iovec iov = {0,};
GF_VALIDATE_OR_GOTO ("client", data, out);
@@ -163,7 +163,7 @@ call_bail (void *data)
call-once timer */
if (conn->timer) {
timeout.tv_sec = 10;
- timeout.tv_usec = 0;
+ timeout.tv_nsec = 0;
gf_timer_call_cancel (clnt->ctx, conn->timer);
conn->timer = gf_timer_call_after (clnt->ctx,
@@ -173,7 +173,8 @@ call_bail (void *data)
if (conn->timer == NULL) {
gf_log (conn->trans->name, GF_LOG_WARNING,
- "Cannot create bailout timer");
+ "Cannot create bailout timer for %s",
+ conn->trans->peerinfo.identifier);
}
}
@@ -197,14 +198,14 @@ call_bail (void *data)
".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec);
gf_log (conn->trans->name, GF_LOG_ERROR,
- "bailing out frame type(%s) op(%s(%d)) xid = 0x%ux "
- "sent = %s. timeout = %d",
+ "bailing out frame type(%s) op(%s(%d)) xid = 0x%x "
+ "sent = %s. timeout = %d for %s",
trav->rpcreq->prog->progname,
(trav->rpcreq->prog->procnames) ?
trav->rpcreq->prog->procnames[trav->rpcreq->procnum] :
"--",
trav->rpcreq->procnum, trav->rpcreq->xid, frame_sent,
- conn->frame_timeout);
+ conn->frame_timeout, conn->trans->peerinfo.identifier);
clnt = rpc_clnt_ref (clnt);
trav->rpcreq->rpc_status = -1;
@@ -226,7 +227,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,
struct rpc_req *rpcreq)
{
rpc_clnt_connection_t *conn = NULL;
- struct timeval timeout = {0, };
+ struct timespec timeout = {0, };
struct saved_frame *saved_frame = NULL;
conn = &rpc_clnt->conn;
@@ -240,7 +241,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,
/* TODO: make timeout configurable */
if (conn->timer == NULL) {
timeout.tv_sec = 10;
- timeout.tv_usec = 0;
+ timeout.tv_nsec = 0;
conn->timer = gf_timer_call_after (rpc_clnt->ctx,
timeout,
call_bail,
@@ -359,7 +360,7 @@ saved_frames_unwind (struct saved_frames *saved_frames)
gf_log_callingfn (trav->rpcreq->conn->trans->name,
GF_LOG_ERROR,
"forced unwinding frame type(%s) op(%s(%d)) "
- "called at %s (xid=0x%ux)",
+ "called at %s (xid=0x%x)",
trav->rpcreq->prog->progname,
((trav->rpcreq->prog->procnames) ?
trav->rpcreq->prog->procnames[trav->rpcreq->procnum]
@@ -397,7 +398,7 @@ rpc_clnt_reconnect (void *trans_ptr)
{
rpc_transport_t *trans = NULL;
rpc_clnt_connection_t *conn = NULL;
- struct timeval tv = {0, 0};
+ struct timespec ts = {0, 0};
int32_t ret = 0;
struct rpc_clnt *clnt = NULL;
@@ -416,14 +417,15 @@ rpc_clnt_reconnect (void *trans_ptr)
conn->reconnect = 0;
if (conn->connected == 0) {
- tv.tv_sec = 3;
+ ts.tv_sec = 3;
+ ts.tv_nsec = 0;
gf_log (trans->name, GF_LOG_TRACE,
"attempting reconnect");
ret = rpc_transport_connect (trans,
conn->config.remote_port);
conn->reconnect =
- gf_timer_call_after (clnt->ctx, tv,
+ gf_timer_call_after (clnt->ctx, ts,
rpc_clnt_reconnect,
trans);
} else {
@@ -661,7 +663,7 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
}
gf_log (conn->trans->name, GF_LOG_TRACE,
- "received rpc message (RPC XID: 0x%ux"
+ "received rpc message (RPC XID: 0x%x"
" Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)",
saved_frame->rpcreq->xid,
saved_frame->rpcreq->prog->progname,
@@ -819,6 +821,9 @@ out:
return;
}
+static void
+rpc_clnt_destroy (struct rpc_clnt *rpc);
+
int
rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
rpc_transport_event_t event, void *data, ...)
@@ -828,7 +833,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
int ret = -1;
rpc_request_info_t *req_info = NULL;
rpc_transport_pollin_t *pollin = NULL;
- struct timeval tv = {0, };
+ struct timespec ts = {0, };
conn = mydata;
if (conn == NULL) {
@@ -847,10 +852,11 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
{
if (!conn->rpc_clnt->disabled
&& (conn->reconnect == NULL)) {
- tv.tv_sec = 10;
+ ts.tv_sec = 10;
+ ts.tv_nsec = 0;
conn->reconnect =
- gf_timer_call_after (clnt->ctx, tv,
+ gf_timer_call_after (clnt->ctx, ts,
rpc_clnt_reconnect,
conn->trans);
}
@@ -864,9 +870,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
}
case RPC_TRANSPORT_CLEANUP:
- /* this event should not be received on a client for, a
- * transport is only disconnected, but never destroyed.
- */
+ rpc_clnt_destroy (clnt);
ret = 0;
break;
@@ -1481,7 +1485,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
if (ret == -1) {
gf_log (conn->trans->name, GF_LOG_WARNING,
"failed to submit rpc-request "
- "(XID: 0x%ux Program: %s, ProgVers: %d, "
+ "(XID: 0x%x Program: %s, ProgVers: %d, "
"Proc: %d) to rpc-transport (%s)", rpcreq->xid,
rpcreq->prog->progname, rpcreq->prog->progver,
rpcreq->procnum, rpc->conn.trans->name);
@@ -1492,7 +1496,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
__save_frame (rpc, frame, rpcreq);
gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request "
- "(XID: 0x%ux Program: %s, ProgVers: %d, "
+ "(XID: 0x%x Program: %s, ProgVers: %d, "
"Proc: %d) to rpc-transport (%s)", rpcreq->xid,
rpcreq->prog->progname, rpcreq->prog->progver,
rpcreq->procnum, rpc->conn.trans->name);
@@ -1541,18 +1545,21 @@ rpc_clnt_ref (struct rpc_clnt *rpc)
static void
-rpc_clnt_destroy (struct rpc_clnt *rpc)
+rpc_clnt_trigger_destroy (struct rpc_clnt *rpc)
{
if (!rpc)
return;
- if (rpc->conn.trans) {
- rpc_transport_unregister_notify (rpc->conn.trans);
- rpc_transport_disconnect (rpc->conn.trans);
- rpc_transport_unref (rpc->conn.trans);
- }
+ rpc_clnt_disable (rpc);
+ rpc_transport_unref (rpc->conn.trans);
+}
+
+static void
+rpc_clnt_destroy (struct rpc_clnt *rpc)
+{
+ if (!rpc)
+ return;
- rpc_clnt_reconnect_cleanup (&rpc->conn);
saved_frames_destroy (rpc->conn.saved_frames);
pthread_mutex_destroy (&rpc->lock);
pthread_mutex_destroy (&rpc->conn.lock);
@@ -1579,13 +1586,36 @@ rpc_clnt_unref (struct rpc_clnt *rpc)
}
pthread_mutex_unlock (&rpc->lock);
if (!count) {
- rpc_clnt_destroy (rpc);
+ rpc_clnt_trigger_destroy (rpc);
return NULL;
}
return rpc;
}
+char
+rpc_clnt_is_disabled (struct rpc_clnt *rpc)
+{
+
+ rpc_clnt_connection_t *conn = NULL;
+ char disabled = 0;
+
+ if (!rpc) {
+ goto out;
+ }
+
+ conn = &rpc->conn;
+
+ pthread_mutex_lock (&conn->lock);
+ {
+ disabled = rpc->disabled;
+ }
+ pthread_mutex_unlock (&conn->lock);
+
+out:
+ return disabled;
+}
+
void
rpc_clnt_disable (struct rpc_clnt *rpc)
{
@@ -1668,60 +1698,3 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config)
rpc->conn.config.remote_host = gf_strdup (config->remote_host);
}
}
-
-int
-rpc_clnt_transport_unix_options_build (dict_t **options, char *filepath,
- int frame_timeout)
-{
- dict_t *dict = NULL;
- char *fpath = NULL;
- int ret = -1;
-
- GF_ASSERT (filepath);
- GF_ASSERT (options);
-
- dict = dict_new ();
- if (!dict)
- goto out;
-
- fpath = gf_strdup (filepath);
- if (!fpath) {
- ret = -1;
- goto out;
- }
-
- ret = dict_set_dynstr (dict, "transport.socket.connect-path", fpath);
- if (ret)
- goto out;
-
- ret = dict_set_str (dict, "transport.address-family", "unix");
- if (ret)
- goto out;
-
- ret = dict_set_str (dict, "transport.socket.nodelay", "off");
- if (ret)
- goto out;
-
- ret = dict_set_str (dict, "transport-type", "socket");
- if (ret)
- goto out;
-
- ret = dict_set_str (dict, "transport.socket.keepalive", "off");
- if (ret)
- goto out;
-
- if (frame_timeout > 0) {
- ret = dict_set_int32 (dict, "frame-timeout", frame_timeout);
- if (ret)
- goto out;
- }
-
- *options = dict;
-out:
- if (ret) {
- GF_FREE (fpath);
- if (dict)
- dict_unref (dict);
- }
- return ret;
-}
diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h
index 0da165559..584963ad0 100644
--- a/rpc/rpc-lib/src/rpc-clnt.h
+++ b/rpc/rpc-lib/src/rpc-clnt.h
@@ -237,11 +237,10 @@ void rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config);
int rpcclnt_cbk_program_register (struct rpc_clnt *svc,
rpcclnt_cb_program_t *program, void *mydata);
-int
-rpc_clnt_transport_unix_options_build (dict_t **options, char *filepath,
- int frame_timeout);
-
void
rpc_clnt_disable (struct rpc_clnt *rpc);
+char
+rpc_clnt_is_disabled (struct rpc_clnt *rpc);
+
#endif /* !_RPC_CLNT_H */
diff --git a/rpc/rpc-lib/src/rpc-drc.c b/rpc/rpc-lib/src/rpc-drc.c
new file mode 100644
index 000000000..8181e6aee
--- /dev/null
+++ b/rpc/rpc-lib/src/rpc-drc.c
@@ -0,0 +1,872 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "rpcsvc.h"
+#ifndef RPC_DRC_H
+#include "rpc-drc.h"
+#endif
+#include "locking.h"
+#include "hashfn.h"
+#include "common-utils.h"
+#include "statedump.h"
+#include "mem-pool.h"
+
+#include <netinet/in.h>
+#include <unistd.h>
+
+/**
+ * rpcsvc_drc_op_destroy - Destroys the cached reply
+ *
+ * @param drc - the main drc structure
+ * @param reply - the cached reply to destroy
+ * @return NULL if reply is destroyed, reply otherwise
+ */
+static drc_cached_op_t *
+rpcsvc_drc_op_destroy (rpcsvc_drc_globals_t *drc, drc_cached_op_t *reply)
+{
+ GF_ASSERT (drc);
+ GF_ASSERT (reply);
+
+ if (reply->state == DRC_OP_IN_TRANSIT)
+ return reply;
+
+ iobref_unref (reply->msg.iobref);
+ if (reply->msg.rpchdr)
+ GF_FREE (reply->msg.rpchdr);
+ if (reply->msg.proghdr)
+ GF_FREE (reply->msg.proghdr);
+ if (reply->msg.progpayload)
+ GF_FREE (reply->msg.progpayload);
+
+ list_del (&reply->global_list);
+ reply->client->op_count--;
+ drc->op_count--;
+ mem_put (reply);
+ reply = NULL;
+
+ return reply;
+}
+
+/**
+ * rpcsvc_drc_op_rb_unref - This function is used in rb tree cleanup only
+ *
+ * @param reply - the cached reply to unref
+ * @param drc - the main drc structure
+ * @return void
+ */
+static void
+rpcsvc_drc_rb_op_destroy (void *reply, void *drc)
+{
+ rpcsvc_drc_op_destroy (drc, (drc_cached_op_t *)reply);
+}
+
+/**
+ * rpcsvc_remove_drc_client - Cleanup the drc client
+ *
+ * @param client - the drc client to be removed
+ * @return void
+ */
+static void
+rpcsvc_remove_drc_client (drc_client_t *client)
+{
+ rb_destroy (client->rbtree, rpcsvc_drc_rb_op_destroy);
+ list_del (&client->client_list);
+ GF_FREE (client);
+}
+
+/**
+ * rpcsvc_client_lookup - Given a sockaddr_storage, find the client if it exists
+ *
+ * @param drc - the main drc structure
+ * @param sockaddr - the network address of the client to be looked up
+ * @return drc client if it exists, NULL otherwise
+ */
+static drc_client_t *
+rpcsvc_client_lookup (rpcsvc_drc_globals_t *drc,
+ struct sockaddr_storage *sockaddr)
+{
+ drc_client_t *client = NULL;
+
+ GF_ASSERT (drc);
+ GF_ASSERT (sockaddr);
+
+ if (list_empty (&drc->clients_head))
+ return NULL;
+
+ list_for_each_entry (client, &drc->clients_head, client_list) {
+ if (gf_sock_union_equal_addr (&client->sock_union,
+ (union gf_sock_union *)sockaddr))
+ return client;
+ }
+
+ return NULL;
+}
+
+/**
+ * drc_compare_reqs - Used by rbtree to determine if incoming req matches with
+ * an existing node(cached reply) in rbtree
+ *
+ * @param item - pointer to the incoming req
+ * @param rb_node_data - pointer to an rbtree node (cached reply)
+ * @param param - drc pointer - unused here, but used in *op_destroy
+ * @return 0 if req matches reply, else (req->xid - reply->xid)
+ */
+int
+drc_compare_reqs (const void *item, const void *rb_node_data, void *param)
+{
+ int ret = -1;
+ rpcsvc_request_t *req = NULL;
+ drc_cached_op_t *reply = NULL;
+
+ GF_ASSERT (item);
+ GF_ASSERT (rb_node_data);
+ GF_ASSERT (param);
+
+ req = (rpcsvc_request_t *)item;
+ reply = (drc_cached_op_t *)rb_node_data;
+
+ ret = req->xid - reply->xid;
+ if (ret != 0)
+ return ret;
+
+ if (req->prognum == reply->prognum &&
+ req->procnum == reply->procnum &&
+ req->progver == reply->progversion)
+ return 0;
+
+ return 1;
+}
+
+/**
+ * drc_rb_calloc - used by rbtree api to allocate memory for nodes
+ *
+ * @param allocator - the libavl_allocator structure used by rbtree
+ * @param size - not needed by this function
+ * @return pointer to new cached reply (node in rbtree)
+ */
+static void *
+drc_rb_calloc (struct libavl_allocator *allocator, size_t size)
+{
+ rpcsvc_drc_globals_t *drc = NULL;
+
+ /* get the drc pointer by simple typecast, since allocator
+ * is the first member of rpcsvc_drc_globals_t
+ */
+ drc = (rpcsvc_drc_globals_t *)allocator;
+
+ return mem_get (drc->mempool);
+}
+
+/**
+ * drc_rb_free - used by rbtree api to free a node
+ *
+ * @param a - the libavl_allocator structure used by rbtree api
+ * @param block - node that needs to be freed
+ * @return void
+ */
+static void
+drc_rb_free (struct libavl_allocator *a, void *block)
+{
+ mem_put (block);
+}
+
+/**
+ * drc_init_client_cache - initialize a drc client and its rb tree
+ *
+ * @param drc - the main drc structure
+ * @param client - the drc client to be initialized
+ * @return 0 on success, -1 on failure
+ */
+static int
+drc_init_client_cache (rpcsvc_drc_globals_t *drc, drc_client_t *client)
+{
+ GF_ASSERT (drc);
+ GF_ASSERT (client);
+
+ drc->allocator.libavl_malloc = drc_rb_calloc;
+ drc->allocator.libavl_free = drc_rb_free;
+
+ client->rbtree = rb_create (drc_compare_reqs, drc,
+ (struct libavl_allocator *)drc);
+ if (!client->rbtree) {
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "rb tree creation failed");
+ return -1;
+ }
+
+ return 0;
+}
+
+/**
+ * rpcsvc_get_drc_client - find the drc client with given sockaddr, else
+ * allocate and initialize a new drc client
+ *
+ * @param drc - the main drc structure
+ * @param sockaddr - network address of client
+ * @return drc client on success, NULL on failure
+ */
+static drc_client_t *
+rpcsvc_get_drc_client (rpcsvc_drc_globals_t *drc,
+ struct sockaddr_storage *sockaddr)
+{
+ drc_client_t *client = NULL;
+
+ GF_ASSERT (drc);
+ GF_ASSERT (sockaddr);
+
+ client = rpcsvc_client_lookup (drc, sockaddr);
+ if (client)
+ goto out;
+
+ /* if lookup fails, allocate cache for the new client */
+ client = GF_CALLOC (1, sizeof (drc_client_t),
+ gf_common_mt_drc_client_t);
+ if (!client)
+ goto out;
+
+ client->ref = 0;
+ client->sock_union = (union gf_sock_union)*sockaddr;
+ client->op_count = 0;
+
+ if (drc_init_client_cache (drc, client)) {
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG,
+ "initialization of drc client failed");
+ GF_FREE (client);
+ client = NULL;
+ goto out;
+ }
+ drc->client_count++;
+
+ list_add (&client->client_list, &drc->clients_head);
+
+ out:
+ return client;
+}
+
+/**
+ * rpcsvc_need_drc - Determine if a request needs DRC service
+ *
+ * @param req - incoming request
+ * @return 1 if DRC is needed for req, 0 otherwise
+ */
+int
+rpcsvc_need_drc (rpcsvc_request_t *req)
+{
+ rpcsvc_actor_t *actor = NULL;
+ rpcsvc_drc_globals_t *drc = NULL;
+
+ GF_ASSERT (req);
+ GF_ASSERT (req->svc);
+
+ drc = req->svc->drc;
+
+ if (!drc || drc->status == DRC_UNINITIATED)
+ return 0;
+
+ actor = rpcsvc_program_actor (req);
+ if (!actor)
+ return 0;
+
+ return (actor->op_type == DRC_NON_IDEMPOTENT
+ && drc->type != DRC_TYPE_NONE);
+}
+
+/**
+ * rpcsvc_drc_client_ref - ref the drc client
+ *
+ * @param client - the drc client to ref
+ * @return client
+ */
+static drc_client_t *
+rpcsvc_drc_client_ref (drc_client_t *client)
+{
+ GF_ASSERT (client);
+ client->ref++;
+ return client;
+}
+
+/**
+ * rpcsvc_drc_client_unref - unref the drc client, and destroy
+ * the client on last unref
+ *
+ * @param drc - the main drc structure
+ * @param client - the drc client to unref
+ * @return NULL if it is the last unref, client otherwise
+ */
+static drc_client_t *
+rpcsvc_drc_client_unref (rpcsvc_drc_globals_t *drc, drc_client_t *client)
+{
+ GF_ASSERT (drc);
+ GF_ASSERT (client->ref);
+
+ client->ref--;
+ if (!client->ref) {
+ drc->client_count--;
+ rpcsvc_remove_drc_client (client);
+ client = NULL;
+ }
+
+ return client;
+}
+
+/**
+ * rpcsvc_drc_lookup - lookup a request to see if it is already cached
+ *
+ * @param req - incoming request
+ * @return cached reply of req if found, NULL otherwise
+ */
+drc_cached_op_t *
+rpcsvc_drc_lookup (rpcsvc_request_t *req)
+{
+ drc_client_t *client = NULL;
+ drc_cached_op_t *reply = NULL;
+
+ GF_ASSERT (req);
+
+ if (!req->trans->drc_client) {
+ client = rpcsvc_get_drc_client (req->svc->drc,
+ &req->trans->peerinfo.sockaddr);
+ if (!client)
+ goto out;
+ req->trans->drc_client = client;
+ }
+
+ client = rpcsvc_drc_client_ref (req->trans->drc_client);
+
+ if (client->op_count == 0)
+ goto out;
+
+ reply = rb_find (client->rbtree, req);
+
+ out:
+ if (client)
+ rpcsvc_drc_client_unref (req->svc->drc, client);
+
+ return reply;
+}
+
+/**
+ * rpcsvc_send_cached_reply - send the cached reply for the incoming request
+ *
+ * @param req - incoming request (which is a duplicate in this case)
+ * @param reply - the cached reply for req
+ * @return 0 on successful reply submission, -1 or other non-zero value otherwise
+ */
+int
+rpcsvc_send_cached_reply (rpcsvc_request_t *req, drc_cached_op_t *reply)
+{
+ int ret = 0;
+
+ GF_ASSERT (req);
+ GF_ASSERT (reply);
+
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "sending cached reply: xid: %d, "
+ "client: %s", req->xid, req->trans->peerinfo.identifier);
+
+ rpcsvc_drc_client_ref (reply->client);
+ ret = rpcsvc_transport_submit (req->trans,
+ reply->msg.rpchdr, reply->msg.rpchdrcount,
+ reply->msg.proghdr, reply->msg.proghdrcount,
+ reply->msg.progpayload, reply->msg.progpayloadcount,
+ reply->msg.iobref, req->trans_private);
+ rpcsvc_drc_client_unref (req->svc->drc, reply->client);
+
+ return ret;
+}
+
+/**
+ * rpcsvc_cache_reply - cache the reply for the processed request 'req'
+ *
+ * @param req - processed request
+ * @param iobref - iobref structure of the reply
+ * @param rpchdr - rpc header of the reply
+ * @param rpchdrcount - size of rpchdr
+ * @param proghdr - program header of the reply
+ * @param proghdrcount - size of proghdr
+ * @param payload - payload of the reply if any
+ * @param payloadcount - size of payload
+ * @return 0 on success, -1 on failure
+ */
+int
+rpcsvc_cache_reply (rpcsvc_request_t *req, struct iobref *iobref,
+ struct iovec *rpchdr, int rpchdrcount,
+ struct iovec *proghdr, int proghdrcount,
+ struct iovec *payload, int payloadcount)
+{
+ int ret = -1;
+ drc_cached_op_t *reply = NULL;
+
+ GF_ASSERT (req);
+ GF_ASSERT (req->reply);
+
+ reply = req->reply;
+
+ reply->state = DRC_OP_CACHED;
+
+ reply->msg.iobref = iobref_ref (iobref);
+
+ reply->msg.rpchdrcount = rpchdrcount;
+ reply->msg.rpchdr = iov_dup (rpchdr, rpchdrcount);
+
+ reply->msg.proghdrcount = proghdrcount;
+ reply->msg.proghdr = iov_dup (proghdr, proghdrcount);
+
+ reply->msg.progpayloadcount = payloadcount;
+ if (payloadcount)
+ reply->msg.progpayload = iov_dup (payload, payloadcount);
+
+ // rpcsvc_drc_client_unref (req->svc->drc, req->trans->drc_client);
+ // rpcsvc_drc_op_unref (req->svc->drc, reply);
+ ret = 0;
+
+ return ret;
+}
+
+/**
+ * rpcsvc_vacate_drc_entries - free up some percentage of drc cache
+ * based on the lru factor
+ *
+ * @param drc - the main drc structure
+ * @return void
+ */
+static void
+rpcsvc_vacate_drc_entries (rpcsvc_drc_globals_t *drc)
+{
+ uint32_t i = 0;
+ uint32_t n = 0;
+ drc_cached_op_t *reply = NULL;
+ drc_cached_op_t *tmp = NULL;
+ drc_client_t *client = NULL;
+
+ GF_ASSERT (drc);
+
+ n = drc->global_cache_size / drc->lru_factor;
+
+ list_for_each_entry_safe_reverse (reply, tmp, &drc->cache_head, global_list) {
+ /* Don't delete ops that are in transit */
+ if (reply->state == DRC_OP_IN_TRANSIT)
+ continue;
+
+ client = reply->client;
+
+ (void *)rb_delete (client->rbtree, reply);
+
+ rpcsvc_drc_op_destroy (drc, reply);
+ rpcsvc_drc_client_unref (drc, client);
+ i++;
+ if (i >= n)
+ break;
+ }
+}
+
+/**
+ * rpcsvc_add_op_to_cache - insert the cached op into the client rbtree and drc list
+ *
+ * @param drc - the main drc structure
+ * @param reply - the op to be inserted
+ * @return 0 on success, -1 on failure
+ */
+static int
+rpcsvc_add_op_to_cache (rpcsvc_drc_globals_t *drc, drc_cached_op_t *reply)
+{
+ drc_client_t *client = NULL;
+ drc_cached_op_t **tmp_reply = NULL;
+
+ GF_ASSERT (drc);
+ GF_ASSERT (reply);
+
+ client = reply->client;
+
+ /* cache is full, free up some space */
+ if (drc->op_count >= drc->global_cache_size)
+ rpcsvc_vacate_drc_entries (drc);
+
+ tmp_reply = (drc_cached_op_t **)rb_probe (client->rbtree, reply);
+ if (*tmp_reply != reply) {
+ /* should never happen */
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "DRC failed to detect duplicates");
+ return -1;
+ } else if (*tmp_reply == NULL) {
+ /* mem alloc failed */
+ return -1;
+ }
+
+ client->op_count++;
+ list_add (&reply->global_list, &drc->cache_head);
+ drc->op_count++;
+
+ return 0;
+}
+
+/**
+ * rpcsvc_cache_request - cache the in-transition incoming request
+ *
+ * @param req - incoming request
+ * @return 0 on success, -1 on failure
+ */
+int
+rpcsvc_cache_request (rpcsvc_request_t *req)
+{
+ int ret = -1;
+ drc_client_t *client = NULL;
+ drc_cached_op_t *reply = NULL;
+ rpcsvc_drc_globals_t *drc = NULL;
+
+ GF_ASSERT (req);
+
+ drc = req->svc->drc;
+
+ client = req->trans->drc_client;
+ if (!client) {
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc client is NULL");
+ goto out;
+ }
+
+ reply = mem_get (drc->mempool);
+ if (!reply)
+ goto out;
+
+ reply->client = rpcsvc_drc_client_ref (client);
+ reply->xid = req->xid;
+ reply->prognum = req->prognum;
+ reply->progversion = req->progver;
+ reply->procnum = req->procnum;
+ reply->state = DRC_OP_IN_TRANSIT;
+ req->reply = reply;
+
+ ret = rpcsvc_add_op_to_cache (drc, reply);
+ if (ret) {
+ req->reply = NULL;
+ rpcsvc_drc_op_destroy (drc, reply);
+ rpcsvc_drc_client_unref (drc, client);
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Failed to add op to drc cache");
+ }
+
+ out:
+ return ret;
+}
+
+/**
+ *
+ * rpcsvc_drc_priv - function which dumps the drc state
+ *
+ * @param drc - the main drc structure
+ * @return 0 on success, -1 on failure
+ */
+int32_t
+rpcsvc_drc_priv (rpcsvc_drc_globals_t *drc)
+{
+ int i = 0;
+ char key[GF_DUMP_MAX_BUF_LEN] = {0};
+ drc_client_t *client = NULL;
+ char ip[INET6_ADDRSTRLEN] = {0};
+
+ if (!drc || drc->status == DRC_UNINITIATED) {
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "DRC is "
+ "uninitialized, not dumping its state");
+ return 0;
+ }
+
+ gf_proc_dump_add_section("rpc.drc");
+
+ if (TRY_LOCK (&drc->lock))
+ return -1;
+
+ gf_proc_dump_build_key (key, "drc", "type");
+ gf_proc_dump_write (key, "%d", drc->type);
+
+ gf_proc_dump_build_key (key, "drc", "client_count");
+ gf_proc_dump_write (key, "%d", drc->client_count);
+
+ gf_proc_dump_build_key (key, "drc", "current_cache_size");
+ gf_proc_dump_write (key, "%d", drc->op_count);
+
+ gf_proc_dump_build_key (key, "drc", "max_cache_size");
+ gf_proc_dump_write (key, "%d", drc->global_cache_size);
+
+ gf_proc_dump_build_key (key, "drc", "lru_factor");
+ gf_proc_dump_write (key, "%d", drc->lru_factor);
+
+ gf_proc_dump_build_key (key, "drc", "duplicate_request_count");
+ gf_proc_dump_write (key, "%d", drc->cache_hits);
+
+ gf_proc_dump_build_key (key, "drc", "in_transit_duplicate_requests");
+ gf_proc_dump_write (key, "%d", drc->intransit_hits);
+
+ list_for_each_entry (client, &drc->clients_head, client_list) {
+ gf_proc_dump_build_key (key, "client", "%d.ip-address", i);
+ memset (ip, 0, INET6_ADDRSTRLEN);
+ switch (client->sock_union.storage.ss_family) {
+ case AF_INET:
+ gf_proc_dump_write (key, "%s", inet_ntop (AF_INET,
+ &client->sock_union.sin.sin_addr.s_addr,
+ ip, INET_ADDRSTRLEN));
+ break;
+ case AF_INET6:
+ gf_proc_dump_write (key, "%s", inet_ntop (AF_INET6,
+ &client->sock_union.sin6.sin6_addr,
+ ip, INET6_ADDRSTRLEN));
+ break;
+ default:
+ gf_proc_dump_write (key, "%s", "N/A");
+ }
+
+ gf_proc_dump_build_key (key, "client", "%d.ref_count", i);
+ gf_proc_dump_write (key, "%d", client->ref);
+ gf_proc_dump_build_key (key, "client", "%d.op_count", i);
+ gf_proc_dump_write (key, "%d", client->op_count);
+ i++;
+ }
+
+ UNLOCK (&drc->lock);
+ return 0;
+}
+
+/**
+ * rpcsvc_drc_notify - function which is notified of RPC transport events
+ *
+ * @param svc - pointer to rpcsvc_t structure of the rpc
+ * @param xl - pointer to the xlator
+ * @param event - the event which triggered this notify
+ * @param data - the transport structure
+ * @return 0 on success, -1 on failure
+ */
+int
+rpcsvc_drc_notify (rpcsvc_t *svc, void *xl,
+ rpcsvc_event_t event, void *data)
+{
+ int ret = -1;
+ rpc_transport_t *trans = NULL;
+ drc_client_t *client = NULL;
+ rpcsvc_drc_globals_t *drc = NULL;
+
+ GF_ASSERT (svc);
+ GF_ASSERT (svc->drc);
+ GF_ASSERT (data);
+
+ drc = svc->drc;
+
+ if (drc->status == DRC_UNINITIATED ||
+ drc->type == DRC_TYPE_NONE)
+ return 0;
+
+ LOCK (&drc->lock);
+
+ trans = (rpc_transport_t *)data;
+ client = rpcsvc_get_drc_client (drc, &trans->peerinfo.sockaddr);
+ if (!client)
+ goto out;
+
+ switch (event) {
+ case RPCSVC_EVENT_ACCEPT:
+ trans->drc_client = rpcsvc_drc_client_ref (client);
+ ret = 0;
+ break;
+
+ case RPCSVC_EVENT_DISCONNECT:
+ ret = 0;
+ if (list_empty (&drc->clients_head))
+ break;
+ /* should be the last unref */
+ rpcsvc_drc_client_unref (drc, client);
+ trans->drc_client = NULL;
+ break;
+
+ default:
+ break;
+ }
+
+ out:
+ UNLOCK (&drc->lock);
+ return ret;
+}
+
+/**
+ * rpcsvc_drc_init - Initialize the duplicate request cache service
+ *
+ * @param svc - pointer to rpcsvc_t structure of the rpc
+ * @param options - the options dictionary which configures drc
+ * @return 0 on success, non-zero integer on failure
+ */
+int
+rpcsvc_drc_init (rpcsvc_t *svc, dict_t *options)
+{
+ int ret = 0;
+ uint32_t drc_type = 0;
+ uint32_t drc_size = 0;
+ uint32_t drc_factor = 0;
+ rpcsvc_drc_globals_t *drc = NULL;
+ static gf_boolean_t drc_inited = _gf_false;
+
+ GF_ASSERT (svc);
+ GF_ASSERT (options);
+
+ /* Already inited */
+ if (drc_inited)
+ return 0;
+
+ if (!svc->drc) {
+ drc = GF_CALLOC (1, sizeof (rpcsvc_drc_globals_t),
+ gf_common_mt_drc_globals_t);
+ if (!drc)
+ return -1;
+
+ svc->drc = drc;
+ LOCK_INIT (&drc->lock);
+ } else {
+ drc = svc->drc;
+ }
+
+ LOCK (&drc->lock);
+ if (drc->type != DRC_TYPE_NONE) {
+ ret = 0;
+ goto out;
+ }
+
+ /* Toggle DRC on/off, when more drc types(persistent/cluster)
+ are added, we shouldn't treat this as boolean */
+ ret = dict_get_str_boolean (options, "nfs.drc", _gf_true);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_INFO, "drc user options need second look");
+ ret = _gf_true;
+ }
+ drc->enable_drc = ret;
+
+ if (ret == _gf_false) {
+ /* drc off */
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "DRC is off");
+ ret = 0;
+ goto out;
+ }
+
+ /* Specify type of DRC to be used */
+ ret = dict_get_uint32 (options, "nfs.drc-type", &drc_type);
+ if (ret) {
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc type not set."
+ " Continuing with default");
+ drc_type = DRC_DEFAULT_TYPE;
+ }
+
+ drc->type = drc_type;
+
+ /* Set the global cache size (no. of ops to cache) */
+ ret = dict_get_uint32 (options, "nfs.drc-size", &drc_size);
+ if (ret) {
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc size not set."
+ " Continuing with default size");
+ drc_size = DRC_DEFAULT_CACHE_SIZE;
+ }
+
+ drc->global_cache_size = drc_size;
+
+ /* Mempool for cached ops */
+ drc->mempool = mem_pool_new (drc_cached_op_t, drc->global_cache_size);
+ if (!drc->mempool) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get mempool for"
+ " DRC, drc-size: %d", drc->global_cache_size);
+ ret = -1;
+ goto out;
+ }
+
+ /* What percent of cache to be evicted whenever it fills up */
+ ret = dict_get_uint32 (options, "nfs.drc-lru-factor", &drc_factor);
+ if (ret) {
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc lru factor not set."
+ " Continuing with policy default");
+ drc_factor = DRC_DEFAULT_LRU_FACTOR;
+ }
+
+ drc->lru_factor = (drc_lru_factor_t) drc_factor;
+
+ INIT_LIST_HEAD (&drc->clients_head);
+ INIT_LIST_HEAD (&drc->cache_head);
+
+ ret = rpcsvc_register_notify (svc, rpcsvc_drc_notify, THIS);
+ if (ret) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "registration of drc_notify function failed");
+ goto out;
+ }
+
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "drc init successful");
+ drc->status = DRC_INITIATED;
+ drc_inited = _gf_true;
+
+ out:
+ UNLOCK (&drc->lock);
+ if (ret == -1) {
+ if (drc->mempool) {
+ mem_pool_destroy (drc->mempool);
+ drc->mempool = NULL;
+ }
+ GF_FREE (drc);
+ svc->drc = NULL;
+ }
+ return ret;
+}
+
+int
+rpcsvc_drc_reconfigure (rpcsvc_t *svc, dict_t *options)
+{
+ int ret = -1;
+ gf_boolean_t enable_drc = _gf_false;
+ rpcsvc_drc_globals_t *drc = NULL;
+ uint32_t drc_size = 0;
+
+ if ((!svc) || (!options))
+ return (-1);
+
+ drc = svc->drc;
+ /* reconfig for drc-size */
+ if (dict_get_uint32 (options, "nfs.drc-size", &drc_size))
+ drc_size = DRC_DEFAULT_CACHE_SIZE;
+
+ if (drc->global_cache_size != drc_size) {
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "nfs.drc-size size can not "
+ "be reconfigured without NFS server restart.");
+ return (-1);
+ }
+
+ /* reconfig for nfs.drc */
+ ret = dict_get_str_boolean (options, "nfs.drc", _gf_true);
+ if (ret < 0) {
+ ret = _gf_true;
+ }
+ enable_drc = ret;
+
+ if (drc->enable_drc == enable_drc)
+ return 0;
+
+ drc->enable_drc = enable_drc;
+ if (enable_drc) {
+ if (drc == NULL)
+ return rpcsvc_drc_init(svc, options);
+ } else {
+ if (drc == NULL)
+ return (0);
+
+ LOCK (&drc->lock);
+ (void) rpcsvc_unregister_notify (svc, rpcsvc_drc_notify, THIS);
+ if (drc->mempool) {
+ mem_pool_destroy (drc->mempool);
+ drc->mempool = NULL;
+ }
+ UNLOCK (&drc->lock);
+ GF_FREE (drc);
+ svc->drc = NULL;
+ }
+
+ return (0);
+}
diff --git a/rpc/rpc-lib/src/rpc-drc.h b/rpc/rpc-lib/src/rpc-drc.h
new file mode 100644
index 000000000..7dfaef978
--- /dev/null
+++ b/rpc/rpc-lib/src/rpc-drc.h
@@ -0,0 +1,104 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef RPC_DRC_H
+#define RPC_DRC_H
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "rpcsvc-common.h"
+#include "rpcsvc.h"
+#include "locking.h"
+#include "dict.h"
+#include "rb.h"
+
+/* per-client cache structure */
+struct drc_client {
+ uint32_t ref;
+ union gf_sock_union sock_union;
+ /* pointers to the cache */
+ struct rb_table *rbtree;
+ /* no. of ops currently cached */
+ uint32_t op_count;
+ struct list_head client_list;
+};
+
+struct drc_cached_op {
+ drc_op_state_t state;
+ uint32_t xid;
+ int prognum;
+ int progversion;
+ int procnum;
+ rpc_transport_msg_t msg;
+ drc_client_t *client;
+ struct list_head client_list;
+ struct list_head global_list;
+ int32_t ref;
+};
+
+/* global drc definitions */
+enum drc_status {
+ DRC_UNINITIATED,
+ DRC_INITIATED
+};
+typedef enum drc_status drc_status_t;
+
+struct drc_globals {
+ /* allocator must be the first member since
+ * it is used so in gf_libavl_allocator
+ */
+ struct libavl_allocator allocator;
+ drc_type_t type;
+ /* configurable size parameter */
+ uint32_t global_cache_size;
+ drc_lru_factor_t lru_factor;
+ gf_lock_t lock;
+ drc_status_t status;
+ uint32_t op_count;
+ uint64_t cache_hits;
+ uint64_t intransit_hits;
+ struct mem_pool *mempool;
+ struct list_head cache_head;
+ uint32_t client_count;
+ struct list_head clients_head;
+ gf_boolean_t enable_drc;
+};
+
+int
+rpcsvc_need_drc (rpcsvc_request_t *req);
+
+drc_cached_op_t *
+rpcsvc_drc_lookup (rpcsvc_request_t *req);
+
+int
+rpcsvc_send_cached_reply (rpcsvc_request_t *req, drc_cached_op_t *reply);
+
+int
+rpcsvc_cache_reply (rpcsvc_request_t *req, struct iobref *iobref,
+ struct iovec *rpchdr, int rpchdrcount,
+ struct iovec *proghdr, int proghdrcount,
+ struct iovec *payload, int payloadcount);
+
+int
+rpcsvc_cache_request (rpcsvc_request_t *req);
+
+int32_t
+rpcsvc_drc_priv (rpcsvc_drc_globals_t *drc);
+
+int
+rpcsvc_drc_init (rpcsvc_t *svc, dict_t *options);
+
+int
+rpcsvc_drc_reconfigure (rpcsvc_t *svc, dict_t *options);
+
+#endif /* RPC_DRC_H */
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index 695261a71..c24d41084 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -69,6 +69,19 @@ out:
return ret;
}
+int
+rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff)
+{
+ int ret = 0;
+
+ if (!this->ops->throttle)
+ return -ENOSYS;
+
+ ret = this->ops->throttle (this, onoff);
+
+ return ret;
+}
+
int32_t
rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,
struct sockaddr_storage *sa, size_t salen)
@@ -145,6 +158,7 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
int8_t is_tcp = 0, is_unix = 0, is_ibsdp = 0;
volume_opt_list_t *vol_opt = NULL;
gf_boolean_t bind_insecure = _gf_false;
+ xlator_t *this = NULL;
GF_VALIDATE_OR_GOTO("rpc-transport", options, fail);
GF_VALIDATE_OR_GOTO("rpc-transport", ctx, fail);
@@ -169,7 +183,7 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
gf_log ("dict", GF_LOG_DEBUG,
"setting transport-type failed");
else
- gf_log ("rpc-transport", GF_LOG_WARNING,
+ gf_log ("rpc-transport", GF_LOG_DEBUG,
"missing 'option transport-type'. defaulting to "
"\"socket\"");
} else {
@@ -281,7 +295,7 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
}
*VOID(&(trans->reconfigure)) = dlsym (handle, "reconfigure");
- if (trans->fini == NULL) {
+ if (trans->reconfigure == NULL) {
gf_log ("rpc-transport", GF_LOG_DEBUG,
"dlsym (gf_rpc_transport_reconfigure) on %s", dlerror());
}
@@ -292,26 +306,26 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
goto fail;
}
+ this = THIS;
vol_opt->given_opt = dlsym (handle, "options");
if (vol_opt->given_opt == NULL) {
gf_log ("rpc-transport", GF_LOG_DEBUG,
"volume option validation not specified");
} else {
INIT_LIST_HEAD (&vol_opt->list);
- list_add_tail (&vol_opt->list, &(THIS->volume_options));
- if (xlator_options_validate_list (THIS, options, vol_opt,
+ list_add_tail (&vol_opt->list, &(this->volume_options));
+ if (xlator_options_validate_list (this, options, vol_opt,
NULL)) {
gf_log ("rpc-transport", GF_LOG_ERROR,
"volume option validation failed");
goto fail;
}
- vol_opt = NULL;
}
trans->options = options;
pthread_mutex_init (&trans->lock, NULL);
- trans->xl = THIS;
+ trans->xl = this;
ret = trans->init (trans);
if (ret != 0) {
@@ -324,8 +338,6 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
GF_FREE (name);
- GF_FREE (vol_opt);
-
return return_trans;
fail:
@@ -340,7 +352,10 @@ fail:
GF_FREE (name);
- GF_FREE (vol_opt);
+ if (vol_opt && !list_empty (&vol_opt->list)) {
+ list_del_init (&vol_opt->list);
+ GF_FREE (vol_opt);
+ }
return NULL;
}
@@ -475,6 +490,8 @@ rpc_transport_unref (rpc_transport_t *this)
if (this->mydata)
this->notify (this, this->mydata, RPC_TRANSPORT_CLEANUP,
NULL);
+ this->mydata = NULL;
+ this->notify = NULL;
rpc_transport_destroy (this);
}
@@ -518,18 +535,6 @@ out:
}
-inline int
-rpc_transport_unregister_notify (rpc_transport_t *trans)
-{
- GF_VALIDATE_OR_GOTO ("rpc-transport", trans, out);
-
- trans->notify = NULL;
- trans->mydata = NULL;
-
-out:
- return 0;
-}
-
//give negative values to skip setting that value
//this function asserts if both the values are negative.
@@ -557,6 +562,63 @@ out:
}
int
+rpc_transport_unix_options_build (dict_t **options, char *filepath,
+ int frame_timeout)
+{
+ dict_t *dict = NULL;
+ char *fpath = NULL;
+ int ret = -1;
+
+ GF_ASSERT (filepath);
+ GF_ASSERT (options);
+
+ dict = dict_new ();
+ if (!dict)
+ goto out;
+
+ fpath = gf_strdup (filepath);
+ if (!fpath) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_set_dynstr (dict, "transport.socket.connect-path", fpath);
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.address-family", "unix");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.socket.nodelay", "off");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport-type", "socket");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.socket.keepalive", "off");
+ if (ret)
+ goto out;
+
+ if (frame_timeout > 0) {
+ ret = dict_set_int32 (dict, "frame-timeout", frame_timeout);
+ if (ret)
+ goto out;
+ }
+
+ *options = dict;
+out:
+ if (ret) {
+ GF_FREE (fpath);
+ if (dict)
+ dict_unref (dict);
+ }
+ return ret;
+}
+
+int
rpc_transport_inet_options_build (dict_t **options, const char *hostname,
int port)
{
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index 272de9d7d..2db9072ae 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -186,16 +186,19 @@ struct rpc_transport {
*/
void *private;
- void *xl_private;
+ struct _client_t *xl_private;
void *xl; /* Used for THIS */
void *mydata;
pthread_mutex_t lock;
int32_t refcount;
+ int32_t outstanding_rpc_count;
+
glusterfs_ctx_t *ctx;
dict_t *options;
char *name;
void *dnscache;
+ void *drc_client;
data_t *buf;
int32_t (*init) (rpc_transport_t *this);
void (*fini) (rpc_transport_t *this);
@@ -210,7 +213,7 @@ struct rpc_transport {
struct list_head list;
int bind_insecure;
- void *dl_handle; /* handle of dlopen() */
+ void *dl_handle; /* handle of dlopen() */
};
struct rpc_transport_ops {
@@ -234,6 +237,7 @@ struct rpc_transport_ops {
int32_t (*get_myaddr) (rpc_transport_t *this, char *peeraddr,
int addrlen, struct sockaddr_storage *sa,
socklen_t sasize);
+ int32_t (*throttle) (rpc_transport_t *this, gf_boolean_t onoff);
};
@@ -273,9 +277,6 @@ int
rpc_transport_register_notify (rpc_transport_t *trans, rpc_transport_notify_t,
void *mydata);
-int
-rpc_transport_unregister_notify (rpc_transport_t *trans);
-
int32_t
rpc_transport_get_peername (rpc_transport_t *this, char *hostname, int hostlen);
@@ -290,6 +291,9 @@ int32_t
rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen,
struct sockaddr_storage *sa, size_t salen);
+int
+rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff);
+
rpc_transport_pollin_t *
rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector,
int count, struct iobuf *hdr_iobuf,
@@ -302,5 +306,9 @@ rpc_transport_keepalive_options_set (dict_t *options, int32_t interval,
int32_t time);
int
+rpc_transport_unix_options_build (dict_t **options, char *filepath,
+ int frame_timeout);
+
+int
rpc_transport_inet_options_build (dict_t **options, const char *hostname, int port);
#endif /* __RPC_TRANSPORT_H__ */
diff --git a/rpc/rpc-lib/src/rpcsvc-auth.c b/rpc/rpc-lib/src/rpcsvc-auth.c
index 907ae1ec9..4cb86a758 100644
--- a/rpc/rpc-lib/src/rpcsvc-auth.c
+++ b/rpc/rpc-lib/src/rpcsvc-auth.c
@@ -178,6 +178,29 @@ err:
}
int
+rpcsvc_set_addr_namelookup (rpcsvc_t *svc, dict_t *options)
+{
+ int ret;
+ static char *addrlookup_key = "rpc-auth.addr.namelookup";
+
+ if (!svc || !options)
+ return (-1);
+
+ /* By default it's disabled */
+ ret = dict_get_str_boolean (options, addrlookup_key, _gf_false);
+ if (ret < 0) {
+ svc->addr_namelookup = _gf_false;
+ } else {
+ svc->addr_namelookup = ret;
+ }
+
+ if (svc->addr_namelookup)
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Addr-Name lookup enabled");
+
+ return (0);
+}
+
+int
rpcsvc_set_allow_insecure (rpcsvc_t *svc, dict_t *options)
{
int ret = -1;
@@ -206,11 +229,16 @@ rpcsvc_set_allow_insecure (rpcsvc_t *svc, dict_t *options)
int
rpcsvc_set_root_squash (rpcsvc_t *svc, dict_t *options)
{
+ int ret = -1;
+
GF_ASSERT (svc);
GF_ASSERT (options);
- if (dict_get_str_boolean (options, "root-squash", 0))
- svc->root_squash = _gf_true;
+ ret = dict_get_str_boolean (options, "root-squash", 0);
+ if (ret != -1)
+ svc->root_squash = ret;
+ else
+ svc->root_squash = _gf_false;
if (svc->root_squash)
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "root squashing enabled ");
@@ -228,6 +256,7 @@ rpcsvc_auth_init (rpcsvc_t *svc, dict_t *options)
(void) rpcsvc_set_allow_insecure (svc, options);
(void) rpcsvc_set_root_squash (svc, options);
+ (void) rpcsvc_set_addr_namelookup (svc, options);
ret = rpcsvc_auth_add_initers (svc);
if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to add initers");
@@ -244,6 +273,25 @@ out:
return ret;
}
+int
+rpcsvc_auth_reconf (rpcsvc_t *svc, dict_t *options)
+{
+ int ret = 0;
+
+ if ((!svc) || (!options))
+ return (-1);
+
+ ret = rpcsvc_set_allow_insecure (svc, options);
+ if (ret)
+ return (-1);
+
+ ret = rpcsvc_set_root_squash (svc, options);
+ if (ret)
+ return (-1);
+
+ return rpcsvc_set_addr_namelookup (svc, options);
+}
+
rpcsvc_auth_t *
__rpcsvc_auth_get_handler (rpcsvc_request_t *req)
@@ -322,6 +370,9 @@ rpcsvc_auth_request_init (rpcsvc_request_t *req)
if (!auth->authops->request_init)
ret = auth->authops->request_init (req, auth->authprivate);
+ req->auxgids = req->auxgidsmall; /* reset to auxgidlarge during
+ unsersialize if necessary */
+ req->auxgidlarge = NULL;
err:
return ret;
}
@@ -361,14 +412,10 @@ err:
int
rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen)
{
- int count = 0;
- int gen = RPCSVC_AUTH_REJECT;
- int spec = RPCSVC_AUTH_REJECT;
- int final = RPCSVC_AUTH_REJECT;
- char *srchstr = NULL;
- char *valstr = NULL;
- gf_boolean_t boolval = _gf_false;
- int ret = 0;
+ int count = 0;
+ int result = RPCSVC_AUTH_REJECT;
+ char *srchstr = NULL;
+ int ret = 0;
struct rpcsvc_auth_list *auth = NULL;
struct rpcsvc_auth_list *tmp = NULL;
@@ -386,59 +433,27 @@ rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen)
if (count >= arrlen)
break;
- gen = gf_asprintf (&srchstr, "rpc-auth.%s", auth->name);
- if (gen == -1) {
+ result = gf_asprintf (&srchstr, "rpc-auth.%s.%s",
+ auth->name, volname);
+ if (result == -1) {
count = -1;
goto err;
}
- gen = RPCSVC_AUTH_REJECT;
- if (dict_get (svc->options, srchstr)) {
- ret = dict_get_str (svc->options, srchstr, &valstr);
- if (ret == 0) {
- ret = gf_string2boolean (valstr, &boolval);
- if (ret == 0) {
- if (boolval == _gf_true)
- gen = RPCSVC_AUTH_ACCEPT;
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Faile"
- "d to read auth val");
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Faile"
- "d to read auth val");
- }
-
+ ret = dict_get_str_boolean (svc->options, srchstr, 0xC00FFEE);
GF_FREE (srchstr);
- spec = gf_asprintf (&srchstr, "rpc-auth.%s.%s", auth->name,
- volname);
- if (spec == -1) {
- count = -1;
- goto err;
- }
-
- spec = RPCSVC_AUTH_DONTCARE;
- if (dict_get (svc->options, srchstr)) {
- ret = dict_get_str (svc->options, srchstr, &valstr);
- if (ret == 0) {
- ret = gf_string2boolean (valstr, &boolval);
- if (ret == 0) {
- if (boolval == _gf_true)
- spec = RPCSVC_AUTH_ACCEPT;
- else
- spec = RPCSVC_AUTH_REJECT;
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Faile"
- "d to read auth val");
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Faile"
- "d to read auth val");
- }
- GF_FREE (srchstr);
- final = rpcsvc_combine_gen_spec_volume_checks (gen, spec);
- if (final == RPCSVC_AUTH_ACCEPT) {
+ switch (ret) {
+ case _gf_true:
+ result = RPCSVC_AUTH_ACCEPT;
autharr[count] = auth->auth->authnum;
++count;
+ break;
+ case _gf_false:
+ result = RPCSVC_AUTH_REJECT;
+ break;
+ default:
+ result = RPCSVC_AUTH_DONTCARE;
}
}
diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h
index 2c6f07488..aed55e039 100644
--- a/rpc/rpc-lib/src/rpcsvc-common.h
+++ b/rpc/rpc-lib/src/rpcsvc-common.h
@@ -30,6 +30,8 @@ struct rpcsvc_state;
typedef int (*rpcsvc_notify_t) (struct rpcsvc_state *, void *mydata,
rpcsvc_event_t, void *data);
+struct drc_globals;
+typedef struct drc_globals rpcsvc_drc_globals_t;
/* Contains global state required for all the RPC services.
*/
@@ -50,25 +52,75 @@ typedef struct rpcsvc_state {
dict_t *options;
/* Allow insecure ports. */
- int allow_insecure;
+ gf_boolean_t allow_insecure;
gf_boolean_t register_portmap;
gf_boolean_t root_squash;
glusterfs_ctx_t *ctx;
/* list of connections which will listen for incoming connections */
- struct list_head listeners;
+ struct list_head listeners;
/* list of programs registered with rpcsvc */
- struct list_head programs;
+ struct list_head programs;
/* list of notification callbacks */
- struct list_head notify;
- int notify_count;
+ struct list_head notify;
+ int notify_count;
void *mydata; /* This is xlator */
- rpcsvc_notify_t notifyfn;
+ rpcsvc_notify_t notifyfn;
struct mem_pool *rxpool;
+ rpcsvc_drc_globals_t *drc;
+
+ /* per-client limit of outstanding rpc requests */
+ int outstanding_rpc_limit;
+ gf_boolean_t addr_namelookup;
} rpcsvc_t;
+/* DRC START */
+enum drc_op_type {
+ DRC_NA = 0,
+ DRC_IDEMPOTENT = 1,
+ DRC_NON_IDEMPOTENT = 2
+};
+typedef enum drc_op_type drc_op_type_t;
+
+enum drc_type {
+ DRC_TYPE_NONE = 0,
+ DRC_TYPE_IN_MEMORY = 1
+};
+typedef enum drc_type drc_type_t;
+
+enum drc_lru_factor {
+ DRC_LRU_5_PC = 20,
+ DRC_LRU_10_PC = 10,
+ DRC_LRU_25_PC = 4,
+ DRC_LRU_50_PC = 2
+};
+typedef enum drc_lru_factor drc_lru_factor_t;
+
+enum drc_xid_state {
+ DRC_XID_MONOTONOUS = 0,
+ DRC_XID_WRAPPED = 1
+};
+typedef enum drc_xid_state drc_xid_state_t;
+
+enum drc_op_state {
+ DRC_OP_IN_TRANSIT = 0,
+ DRC_OP_CACHED = 1
+};
+typedef enum drc_op_state drc_op_state_t;
+
+enum drc_policy {
+ DRC_LRU = 0
+};
+typedef enum drc_policy drc_policy_t;
+
+/* Default policies for DRC */
+#define DRC_DEFAULT_TYPE DRC_TYPE_IN_MEMORY
+#define DRC_DEFAULT_CACHE_SIZE 0x20000
+#define DRC_DEFAULT_LRU_FACTOR DRC_LRU_25_PC
+
+/* DRC END */
#endif /* #ifndef _RPCSVC_COMMON_H */
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 078adc062..037c157f2 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -28,6 +28,7 @@
#include "xdr-generic.h"
#include "rpc-common-xdr.h"
#include "syncop.h"
+#include "rpc-drc.h"
#include <errno.h>
#include <pthread.h>
@@ -41,8 +42,7 @@
#include <stdio.h>
#include "xdr-rpcclnt.h"
-
-#define ACL_PROGRAM 100227
+#include "glusterfs-acl.h"
struct rpcsvc_program gluster_dump_prog;
@@ -129,6 +129,37 @@ rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum,
return NULL;
}
+int
+rpcsvc_request_outstanding (rpcsvc_t *svc, rpc_transport_t *trans, int delta)
+{
+ int ret = 0;
+ int old_count = 0;
+ int new_count = 0;
+ int limit = 0;
+
+ pthread_mutex_lock (&trans->lock);
+ {
+ limit = svc->outstanding_rpc_limit;
+ if (!limit)
+ goto unlock;
+
+ old_count = trans->outstanding_rpc_count;
+ trans->outstanding_rpc_count += delta;
+ new_count = trans->outstanding_rpc_count;
+
+ if (old_count <= limit && new_count > limit)
+ ret = rpc_transport_throttle (trans, _gf_true);
+
+ if (old_count > limit && new_count <= limit)
+ ret = rpc_transport_throttle (trans, _gf_false);
+ }
+unlock:
+ pthread_mutex_unlock (&trans->lock);
+
+ return ret;
+}
+
+
/* This needs to change to returning errors, since
* we need to return RPC specific error messages when some
* of the pointers below are NULL.
@@ -207,7 +238,7 @@ rpcsvc_program_actor (rpcsvc_request_t *req)
goto err;
}
- req->synctask = program->synctask;
+ req->synctask = program->synctask;
err = SUCCESS;
gf_log (GF_RPCSVC, GF_LOG_TRACE, "Actor found: %s - %s",
@@ -279,8 +310,17 @@ rpcsvc_request_destroy (rpcsvc_request_t *req)
if (req->hdr_iobuf)
iobuf_unref (req->hdr_iobuf);
+ /* This marks the "end" of an RPC request. Reply is
+ completely written to the socket and is on the way
+ to the client. It is time to decrement the
+ outstanding request counter by 1.
+ */
+ rpcsvc_request_outstanding (req->svc, req->trans, -1);
+
rpc_transport_unref (req->trans);
+ GF_FREE (req->auxgidlarge);
+
mem_put (req);
out:
@@ -363,6 +403,12 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans,
goto err;
}
+ /* We just received a new request from the wire. Account for
+ it in the outsanding request counter to make sure we don't
+ ingest too many concurrent requests from the same client.
+ */
+ ret = rpcsvc_request_outstanding (svc, trans, +1);
+
msgbuf = msg->vector[0].iov_base;
msglen = msg->vector[0].iov_len;
@@ -422,6 +468,7 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans,
* since we are not handling authentication failures for now.
*/
req->rpc_status = MSG_ACCEPTED;
+ req->reply = NULL;
ret = 0;
err:
if (ret == -1) {
@@ -439,9 +486,9 @@ err:
int
rpcsvc_check_and_reply_error (int ret, call_frame_t *frame, void *opaque)
{
- rpcsvc_request_t *req = NULL;
+ rpcsvc_request_t *req = NULL;
- req = opaque;
+ req = opaque;
if (ret)
gf_log ("rpcsvc", GF_LOG_ERROR,
@@ -454,20 +501,22 @@ rpcsvc_check_and_reply_error (int ret, call_frame_t *frame, void *opaque)
"failed to queue error reply");
}
- return 0;
+ return 0;
}
int
rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
rpc_transport_pollin_t *msg)
{
- rpcsvc_actor_t *actor = NULL;
- rpcsvc_actor actor_fn = NULL;
- rpcsvc_request_t *req = NULL;
- int ret = -1;
- uint16_t port = 0;
- gf_boolean_t is_unix = _gf_false;
- gf_boolean_t unprivileged = _gf_false;
+ rpcsvc_actor_t *actor = NULL;
+ rpcsvc_actor actor_fn = NULL;
+ rpcsvc_request_t *req = NULL;
+ int ret = -1;
+ uint16_t port = 0;
+ gf_boolean_t is_unix = _gf_false;
+ gf_boolean_t unprivileged = _gf_false;
+ drc_cached_op_t *reply = NULL;
+ rpcsvc_drc_globals_t *drc = NULL;
if (!trans || !svc)
return -1;
@@ -503,7 +552,7 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
req = rpcsvc_request_create (svc, trans, msg);
if (!req)
- goto err;
+ goto out;
if (!rpcsvc_request_accepted (req))
goto err_reply;
@@ -521,33 +570,65 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
return -1;
}
+ /* DRC */
+ if (rpcsvc_need_drc (req)) {
+ drc = req->svc->drc;
+
+ LOCK (&drc->lock);
+ reply = rpcsvc_drc_lookup (req);
+
+ /* retransmission of completed request, send cached reply */
+ if (reply && reply->state == DRC_OP_CACHED) {
+ gf_log (GF_RPCSVC, GF_LOG_INFO, "duplicate request:"
+ " XID: 0x%x", req->xid);
+ ret = rpcsvc_send_cached_reply (req, reply);
+ drc->cache_hits++;
+ UNLOCK (&drc->lock);
+ goto out;
+
+ } /* retransmitted request, original op in transit, drop it */
+ else if (reply && reply->state == DRC_OP_IN_TRANSIT) {
+ gf_log (GF_RPCSVC, GF_LOG_INFO, "op in transit,"
+ " discarding. XID: 0x%x", req->xid);
+ ret = 0;
+ drc->intransit_hits++;
+ rpcsvc_request_destroy (req);
+ UNLOCK (&drc->lock);
+ goto out;
+
+ } /* fresh request, cache it as in-transit and proceed */
+ else {
+ ret = rpcsvc_cache_request (req);
+ }
+ UNLOCK (&drc->lock);
+ }
+
if (req->rpc_err == SUCCESS) {
/* Before going to xlator code, set the THIS properly */
THIS = svc->mydata;
- actor_fn = actor->actor;
+ actor_fn = actor->actor;
- if (!actor_fn) {
- rpcsvc_request_seterr (req, PROC_UNAVAIL);
- /* LOG TODO: print more info about procnum,
- prognum etc, also print transport info */
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "No vectored handler present");
- ret = RPCSVC_ACTOR_ERROR;
- goto err_reply;
- }
+ if (!actor_fn) {
+ rpcsvc_request_seterr (req, PROC_UNAVAIL);
+ /* LOG TODO: print more info about procnum,
+ prognum etc, also print transport info */
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "No vectored handler present");
+ ret = RPCSVC_ACTOR_ERROR;
+ goto err_reply;
+ }
- if (req->synctask) {
+ if (req->synctask) {
if (msg->hdr_iobuf)
req->hdr_iobuf = iobuf_ref (msg->hdr_iobuf);
- ret = synctask_new (THIS->ctx->env,
- (synctask_fn_t) actor_fn,
- rpcsvc_check_and_reply_error, NULL,
+ ret = synctask_new (THIS->ctx->env,
+ (synctask_fn_t) actor_fn,
+ rpcsvc_check_and_reply_error, NULL,
req);
} else {
- ret = actor_fn (req);
- req->hdr_iobuf = NULL;
+ ret = actor_fn (req);
}
}
@@ -558,7 +639,7 @@ err_reply:
* has now been queued. */
ret = 0;
-err:
+out:
return ret;
}
@@ -905,21 +986,22 @@ out:
return ret;
}
-static inline int
-rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec,
- int hdrcount, struct iovec *proghdr, int proghdrcount,
- struct iovec *progpayload, int progpayloadcount,
- struct iobref *iobref, void *priv)
+int
+rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr,
+ int rpchdrcount, struct iovec *proghdr,
+ int proghdrcount, struct iovec *progpayload,
+ int progpayloadcount, struct iobref *iobref,
+ void *priv)
{
int ret = -1;
rpc_transport_reply_t reply = {{0, }};
- if ((!trans) || (!hdrvec) || (!hdrvec->iov_base)) {
+ if ((!trans) || (!rpchdr) || (!rpchdr->iov_base)) {
goto out;
}
- reply.msg.rpchdr = hdrvec;
- reply.msg.rpchdrcount = hdrcount;
+ reply.msg.rpchdr = rpchdr;
+ reply.msg.rpchdrcount = rpchdrcount;
reply.msg.proghdr = proghdr;
reply.msg.proghdrcount = proghdrcount;
reply.msg.progpayload = progpayload;
@@ -1065,6 +1147,7 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,
size_t msglen = 0;
size_t hdrlen = 0;
char new_iobref = 0;
+ rpcsvc_drc_globals_t *drc = NULL;
if ((!req) || (!req->trans))
return -1;
@@ -1099,20 +1182,31 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,
iobref_add (iobref, replyiob);
+ /* cache the request in the duplicate request cache for appropriate ops */
+ if (req->reply) {
+ drc = req->svc->drc;
+
+ LOCK (&drc->lock);
+ ret = rpcsvc_cache_reply (req, iobref, &recordhdr, 1,
+ proghdr, hdrcount,
+ payload, payloadcount);
+ UNLOCK (&drc->lock);
+ }
+
ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount,
payload, payloadcount, iobref,
req->trans_private);
if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "failed to submit message "
- "(XID: 0x%ux, Program: %s, ProgVers: %d, Proc: %d) to "
+ "(XID: 0x%x, Program: %s, ProgVers: %d, Proc: %d) to "
"rpc-transport (%s)", req->xid,
req->prog ? req->prog->progname : "(not matched)",
req->prog ? req->prog->progver : 0,
req->procnum, trans->name);
} else {
gf_log (GF_RPCSVC, GF_LOG_TRACE,
- "submitted reply for rpc-message (XID: 0x%ux, "
+ "submitted reply for rpc-message (XID: 0x%x, "
"Program: %s, ProgVers: %d, Proc: %d) to rpc-transport "
"(%s)", req->xid, req->prog ? req->prog->progname: "-",
req->prog ? req->prog->progver : 0,
@@ -1155,12 +1249,13 @@ rpcsvc_error_reply (rpcsvc_request_t *req)
inline int
rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port)
{
- int ret = 0;
+ int ret = -1; /* FAIL */
if (!newprog) {
goto out;
}
+ /* pmap_set() returns 0 for FAIL and 1 for SUCCESS */
if (!(pmap_set (newprog->prognum, newprog->progver, IPPROTO_TCP,
port))) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register with"
@@ -1168,16 +1263,16 @@ rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port)
goto out;
}
- ret = 0;
+ ret = 0; /* SUCCESS */
out:
return ret;
}
-static inline int
+inline int
rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog)
{
- int ret = 0;
+ int ret = -1;
if (!prog)
goto out;
@@ -1795,12 +1890,92 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options)
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Portmap registration "
"disabled");
- ret = 0;
+ ret = rpcsvc_set_outstanding_rpc_limit (svc, options);
out:
return ret;
}
int
+rpcsvc_reconfigure_options (rpcsvc_t *svc, dict_t *options)
+{
+ xlator_t *xlator = NULL;
+ xlator_list_t *volentry = NULL;
+ char *srchkey = NULL;
+ char *keyval = NULL;
+ int ret = -1;
+
+ if ((!svc) || (!svc->options) || (!options))
+ return (-1);
+
+ /* Fetch the xlator from svc */
+ xlator = (xlator_t *) svc->mydata;
+ if (!xlator)
+ return (-1);
+
+ /* Reconfigure the volume specific rpc-auth.addr allow part */
+ volentry = xlator->children;
+ while (volentry) {
+ ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.allow",
+ volentry->xlator->name);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return (-1);
+ }
+
+ /* If found the srchkey, delete old key/val pair
+ * and set the key with new value.
+ */
+ if (!dict_get_str (options, srchkey, &keyval)) {
+ dict_del (svc->options, srchkey);
+ ret = dict_set_str (svc->options, srchkey, keyval);
+ if (ret < 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "dict_set_str error");
+ GF_FREE (srchkey);
+ return (-1);
+ }
+ }
+
+ GF_FREE (srchkey);
+ volentry = volentry->next;
+ }
+
+ /* Reconfigure the volume specific rpc-auth.addr reject part */
+ volentry = xlator->children;
+ while (volentry) {
+ ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.reject",
+ volentry->xlator->name);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return (-1);
+ }
+
+ /* If found the srchkey, delete old key/val pair
+ * and set the key with new value.
+ */
+ if (!dict_get_str (options, srchkey, &keyval)) {
+ dict_del (svc->options, srchkey);
+ ret = dict_set_str (svc->options, srchkey, keyval);
+ if (ret < 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "dict_set_str error");
+ GF_FREE (srchkey);
+ return (-1);
+ }
+ }
+
+ GF_FREE (srchkey);
+ volentry = volentry->next;
+ }
+
+ ret = rpcsvc_init_options (svc, options);
+ if (ret)
+ return (-1);
+
+ return rpcsvc_auth_reconf (svc, options);
+}
+
+int
rpcsvc_transport_unix_options_build (dict_t **options, char *filepath)
{
dict_t *dict = NULL;
@@ -1846,6 +2021,48 @@ out:
return ret;
}
+/*
+ * Reconfigure() the rpc.outstanding-rpc-limit param.
+ */
+int
+rpcsvc_set_outstanding_rpc_limit (rpcsvc_t *svc, dict_t *options)
+{
+ int ret = -1; /* FAILURE */
+ int rpclim = 0;
+ static char *rpclimkey = "rpc.outstanding-rpc-limit";
+
+ if ((!svc) || (!options))
+ return (-1);
+
+ /* Reconfigure() the rpc.outstanding-rpc-limit param */
+ ret = dict_get_int32 (options, rpclimkey, &rpclim);
+ if (ret < 0) {
+ /* Fall back to default for FAILURE */
+ rpclim = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT;
+ } else {
+ /* SUCCESS: round off to multiple of 8.
+ * If the input value fails Boundary check, fall back to
+ * default i.e. RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT.
+ * NB: value 0 is special, means its unset i.e. unlimited.
+ */
+ rpclim = ((rpclim + 8 - 1) >> 3) * 8;
+ if (rpclim < RPCSVC_MIN_OUTSTANDING_RPC_LIMIT) {
+ rpclim = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT;
+ } else if (rpclim > RPCSVC_MAX_OUTSTANDING_RPC_LIMIT) {
+ rpclim = RPCSVC_MAX_OUTSTANDING_RPC_LIMIT;
+ }
+ }
+
+ if (svc->outstanding_rpc_limit != rpclim) {
+ svc->outstanding_rpc_limit = rpclim;
+ gf_log (GF_RPCSVC, GF_LOG_INFO,
+ "Configured %s with value %d",
+ rpclimkey, rpclim);
+ }
+
+ return (0);
+}
+
/* The global RPC service initializer.
*/
rpcsvc_t *
@@ -1906,6 +2123,7 @@ rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options,
"failed to register DUMP program");
goto free_svc;
}
+
ret = 0;
free_svc:
if (ret == -1) {
@@ -1918,21 +2136,16 @@ free_svc:
int
-rpcsvc_transport_peer_check_search (dict_t *options, char *pattern, char *clstr)
+rpcsvc_transport_peer_check_search (dict_t *options, char *pattern,
+ char *ip, char *hostname)
{
- int ret = -1;
- char *addrtok = NULL;
- char *addrstr = NULL;
- char *dup_addrstr = NULL;
- char *svptr = NULL;
- char *fqdn = NULL;
+ int ret = -1;
+ char *addrtok = NULL;
+ char *addrstr = NULL;
+ char *dup_addrstr = NULL;
+ char *svptr = NULL;
- if ((!options) || (!clstr))
- return -1;
-
- ret = dict_get_str (options, "fqdn", &fqdn);
-
- if (!dict_get (options, pattern))
+ if ((!options) || (!ip))
return -1;
ret = dict_get_str (options, pattern, &addrstr);
@@ -1952,19 +2165,19 @@ rpcsvc_transport_peer_check_search (dict_t *options, char *pattern, char *clstr)
/* CASEFOLD not present on Solaris */
#ifdef FNM_CASEFOLD
- ret = fnmatch (addrtok, clstr, FNM_CASEFOLD);
+ ret = fnmatch (addrtok, ip, FNM_CASEFOLD);
#else
- ret = fnmatch (addrtok, clstr, 0);
+ ret = fnmatch (addrtok, ip, 0);
#endif
if (ret == 0)
goto err;
/* compare hostnames if applicable */
- if (fqdn) {
+ if (hostname) {
#ifdef FNM_CASEFOLD
- ret = fnmatch (addrtok, fqdn, FNM_CASEFOLD);
+ ret = fnmatch (addrtok, hostname, FNM_CASEFOLD);
#else
- ret = fnmatch (addrtok, fqdn, 0);
+ ret = fnmatch (addrtok, hostname, 0);
#endif
if (ret == 0)
goto err;
@@ -1981,66 +2194,56 @@ err:
}
-int
-rpcsvc_transport_peer_check_allow (dict_t *options, char *volname, char *clstr)
+static int
+rpcsvc_transport_peer_check_allow (dict_t *options, char *volname,
+ char *ip, char *hostname)
{
- int ret = RPCSVC_AUTH_DONTCARE;
+ int ret = RPCSVC_AUTH_DONTCARE;
char *srchstr = NULL;
- char globalrule[] = "rpc-auth.addr.allow";
- if ((!options) || (!clstr))
+ if ((!options) || (!ip) || (!volname))
return ret;
- /* If volname is NULL, then we're searching for the general rule to
- * determine the current address in clstr is allowed or not for all
- * subvolumes.
- */
- if (volname) {
- ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- ret = RPCSVC_AUTH_DONTCARE;
- goto out;
- }
- } else
- srchstr = globalrule;
+ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ ret = RPCSVC_AUTH_DONTCARE;
+ goto out;
+ }
- ret = rpcsvc_transport_peer_check_search (options, srchstr, clstr);
- if (volname)
- GF_FREE (srchstr);
+ ret = rpcsvc_transport_peer_check_search (options, srchstr,
+ ip, hostname);
+ GF_FREE (srchstr);
if (ret == 0)
ret = RPCSVC_AUTH_ACCEPT;
else
- ret = RPCSVC_AUTH_DONTCARE;
+ ret = RPCSVC_AUTH_REJECT;
out:
return ret;
}
-int
-rpcsvc_transport_peer_check_reject (dict_t *options, char *volname, char *clstr)
+static int
+rpcsvc_transport_peer_check_reject (dict_t *options, char *volname,
+ char *ip, char *hostname)
{
- int ret = RPCSVC_AUTH_DONTCARE;
+ int ret = RPCSVC_AUTH_DONTCARE;
char *srchstr = NULL;
- char generalrule[] = "rpc-auth.addr.reject";
- if ((!options) || (!clstr))
+ if ((!options) || (!ip) || (!volname))
return ret;
- if (volname) {
- ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject",
- volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- ret = RPCSVC_AUTH_REJECT;
- goto out;
- }
- } else
- srchstr = generalrule;
+ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject",
+ volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ ret = RPCSVC_AUTH_REJECT;
+ goto out;
+ }
- ret = rpcsvc_transport_peer_check_search (options, srchstr, clstr);
- if (volname)
- GF_FREE (srchstr);
+ ret = rpcsvc_transport_peer_check_search (options, srchstr,
+ ip, hostname);
+ GF_FREE (srchstr);
if (ret == 0)
ret = RPCSVC_AUTH_REJECT;
@@ -2051,309 +2254,113 @@ out:
}
-/* This function tests the results of the allow rule and the reject rule to
- * combine them into a single result that can be used to determine if the
- * connection should be allowed to proceed.
- * Heres the test matrix we need to follow in this function.
- *
- * A - Allow, the result of the allow test. Never returns R.
- * R - Reject, result of the reject test. Never returns A.
- * Both can return D or dont care if no rule was given.
- *
- * | @allow | @reject | Result |
- * | A | R | R |
- * | D | D | D |
- * | A | D | A |
- * | D | R | R |
+/* Combines rpc auth's allow and reject options.
+ * Order of checks is important.
+ * First, REJECT if either rejects.
+ * If neither rejects, ACCEPT if either accepts.
+ * If neither accepts, DONTCARE
*/
int
rpcsvc_combine_allow_reject_volume_check (int allow, int reject)
{
- int final = RPCSVC_AUTH_REJECT;
-
- /* If allowed rule allows but reject rule rejects, we stay cautious
- * and reject. */
- if ((allow == RPCSVC_AUTH_ACCEPT) && (reject == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- /* if both are dont care, that is user did not specify for either allow
- * or reject, we leave it up to the general rule to apply, in the hope
- * that there is one.
- */
- else if ((allow == RPCSVC_AUTH_DONTCARE) &&
- (reject == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_DONTCARE;
- /* If one is dont care, the other one applies. */
- else if ((allow == RPCSVC_AUTH_ACCEPT) &&
- (reject == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((allow == RPCSVC_AUTH_DONTCARE) &&
- (reject == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
-
- return final;
-}
-
-
-/* Combines the result of the general rule test against, the specific rule
- * to determine final permission for the client's address.
- *
- * | @gen | @spec | Result |
- * | A | A | A |
- * | A | R | R |
- * | A | D | A |
- * | D | A | A |
- * | D | R | R |
- * | D | D | D |
- * | R | A | A |
- * | R | D | R |
- * | R | R | R |
- */
-int
-rpcsvc_combine_gen_spec_addr_checks (int gen, int spec)
-{
- int final = RPCSVC_AUTH_REJECT;
-
- if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec== RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_DONTCARE;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
-
- return final;
-}
-
+ if (allow == RPCSVC_AUTH_REJECT ||
+ reject == RPCSVC_AUTH_REJECT)
+ return RPCSVC_AUTH_REJECT;
+ if (allow == RPCSVC_AUTH_ACCEPT ||
+ reject == RPCSVC_AUTH_ACCEPT)
+ return RPCSVC_AUTH_ACCEPT;
-/* Combines the result of the general rule test against, the specific rule
- * to determine final test for the connection coming in for a given volume.
- *
- * | @gen | @spec | Result |
- * | A | A | A |
- * | A | R | R |
- * | A | D | A |
- * | D | A | A |
- * | D | R | R |
- * | D | D | R |, special case, we intentionally disallow this.
- * | R | A | A |
- * | R | D | R |
- * | R | R | R |
- */
-int
-rpcsvc_combine_gen_spec_volume_checks (int gen, int spec)
-{
- int final = RPCSVC_AUTH_REJECT;
-
- if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- /* On no rule, we reject. */
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec== RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
-
- return final;
+ return RPCSVC_AUTH_DONTCARE;
}
-
int
-rpcsvc_transport_peer_check_name (dict_t *options, char *volname,
- rpc_transport_t *trans)
+rpcsvc_auth_check (rpcsvc_t *svc, char *volname,
+ rpc_transport_t *trans)
{
- int ret = RPCSVC_AUTH_REJECT;
- int aret = RPCSVC_AUTH_REJECT;
- int rjret = RPCSVC_AUTH_REJECT;
- char clstr[RPCSVC_PEER_STRLEN];
- char *hostname = NULL;
-
- if (!trans)
+ int ret = RPCSVC_AUTH_REJECT;
+ int accept = RPCSVC_AUTH_REJECT;
+ int reject = RPCSVC_AUTH_REJECT;
+ char *hostname = NULL;
+ char *ip = NULL;
+ char client_ip[RPCSVC_PEER_STRLEN] = {0};
+ char *allow_str = NULL;
+ char *reject_str = NULL;
+ char *srchstr = NULL;
+ dict_t *options = NULL;
+
+ if (!svc || !volname || !trans)
return ret;
- ret = rpcsvc_transport_peername (trans, clstr, RPCSVC_PEER_STRLEN);
- if (ret != 0) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: "
- "%s", gai_strerror (ret));
- ret = RPCSVC_AUTH_REJECT;
- goto err;
- }
-
- ret = gf_get_hostname_from_ip (clstr, &hostname);
- if (!ret)
- ret = dict_set_dynstr (options, "fqdn",
- hostname);
-
- aret = rpcsvc_transport_peer_check_allow (options, volname, clstr);
- rjret = rpcsvc_transport_peer_check_reject (options, volname, clstr);
-
- ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret);
-
-err:
- return ret;
-}
-
-
-int
-rpcsvc_transport_peer_check_addr (dict_t *options, char *volname,
- rpc_transport_t *trans)
-{
- int ret = RPCSVC_AUTH_REJECT;
- int aret = RPCSVC_AUTH_DONTCARE;
- int rjret = RPCSVC_AUTH_REJECT;
- char clstr[RPCSVC_PEER_STRLEN];
- char *tmp = NULL;
- union gf_sock_union sock_union;
-
- if (!trans)
+ /* Fetch the options from svc struct and validate */
+ options = svc->options;
+ if (!options)
return ret;
- ret = rpcsvc_transport_peeraddr (trans, clstr, RPCSVC_PEER_STRLEN,
- &sock_union.storage,
- sizeof (sock_union.storage));
+ ret = rpcsvc_transport_peername (trans, client_ip, RPCSVC_PEER_STRLEN);
if (ret != 0) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: "
"%s", gai_strerror (ret));
- ret = RPCSVC_AUTH_REJECT;
- goto err;
- }
-
- switch (sock_union.sa.sa_family) {
-
- case AF_INET:
- case AF_INET6:
- tmp = strrchr (clstr, ':');
- if (tmp)
- *tmp = '\0';
- break;
- }
-
- aret = rpcsvc_transport_peer_check_allow (options, volname, clstr);
- rjret = rpcsvc_transport_peer_check_reject (options, volname, clstr);
-
- ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret);
-err:
- return ret;
-}
-
-
-int
-rpcsvc_transport_check_volume_specific (dict_t *options, char *volname,
- rpc_transport_t *trans)
-{
- int namechk = RPCSVC_AUTH_REJECT;
- int addrchk = RPCSVC_AUTH_REJECT;
- gf_boolean_t namelookup = _gf_false;
- char *namestr = NULL;
- int ret = 0;
-
- if ((!options) || (!volname) || (!trans))
return RPCSVC_AUTH_REJECT;
-
- /* Disabled by default */
- if ((dict_get (options, "rpc-auth.addr.namelookup"))) {
- ret = dict_get_str (options, "rpc-auth.addr.namelookup"
- , &namestr);
- if (ret == 0)
- ret = gf_string2boolean (namestr, &namelookup);
}
- /* We need two separate checks because the rules with addresses in them
- * can be network addresses which can be general and names can be
- * specific which will over-ride the network address rules.
+ /* Accept if its the default case: Allow all, Reject none
+ * The default volfile always contains a 'allow *' rule
+ * for each volume. If allow rule is missing (which implies
+ * there is some bad volfile generating code doing this), we
+ * assume no one is allowed mounts, and thus, we reject mounts.
*/
- if (namelookup)
- namechk = rpcsvc_transport_peer_check_name (options, volname,
- trans);
- addrchk = rpcsvc_transport_peer_check_addr (options, volname, trans);
-
- if (namelookup)
- ret = rpcsvc_combine_gen_spec_addr_checks (addrchk,
- namechk);
- else
- ret = addrchk;
-
- return ret;
-}
-
-
-int
-rpcsvc_transport_check_volume_general (dict_t *options, rpc_transport_t *trans)
-{
- int addrchk = RPCSVC_AUTH_REJECT;
- int namechk = RPCSVC_AUTH_REJECT;
- gf_boolean_t namelookup = _gf_false;
- char *namestr = NULL;
- int ret = 0;
-
- if ((!options) || (!trans))
+ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
return RPCSVC_AUTH_REJECT;
-
- /* Disabled by default */
- if ((dict_get (options, "rpc-auth.addr.namelookup"))) {
- ret = dict_get_str (options, "rpc-auth.addr.namelookup"
- , &namestr);
- if (ret == 0)
- ret = gf_string2boolean (namestr, &namelookup);
}
- /* We need two separate checks because the rules with addresses in them
- * can be network addresses which can be general and names can be
- * specific which will over-ride the network address rules.
- */
- if (namelookup)
- namechk = rpcsvc_transport_peer_check_name (options, NULL, trans);
- addrchk = rpcsvc_transport_peer_check_addr (options, NULL, trans);
-
- if (namelookup)
- ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk);
- else
- ret = addrchk;
+ ret = dict_get_str (options, srchstr, &allow_str);
+ GF_FREE (srchstr);
+ if (ret < 0)
+ return RPCSVC_AUTH_REJECT;
- return ret;
-}
+ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject", volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return RPCSVC_AUTH_REJECT;
+ }
-int
-rpcsvc_transport_peer_check (dict_t *options, char *volname,
- rpc_transport_t *trans)
-{
- int general_chk = RPCSVC_AUTH_REJECT;
- int specific_chk = RPCSVC_AUTH_REJECT;
+ ret = dict_get_str (options, srchstr, &reject_str);
+ GF_FREE (srchstr);
+ if (reject_str == NULL && !strcmp ("*", allow_str))
+ return RPCSVC_AUTH_ACCEPT;
+
+ /* Non-default rule, authenticate */
+ if (!get_host_name (client_ip, &ip))
+ ip = client_ip;
+
+ /* addr-namelookup check */
+ if (svc->addr_namelookup == _gf_true) {
+ ret = gf_get_hostname_from_ip (ip, &hostname);
+ if (ret) {
+ if (hostname)
+ GF_FREE (hostname);
+ /* failed to get hostname, but hostname auth
+ * is enabled, so authentication will not be
+ * 100% correct. reject mounts
+ */
+ return RPCSVC_AUTH_REJECT;
+ }
+ }
- if ((!options) || (!volname) || (!trans))
- return RPCSVC_AUTH_REJECT;
+ accept = rpcsvc_transport_peer_check_allow (options, volname,
+ ip, hostname);
- general_chk = rpcsvc_transport_check_volume_general (options, trans);
- specific_chk = rpcsvc_transport_check_volume_specific (options, volname,
- trans);
+ reject = rpcsvc_transport_peer_check_reject (options, volname,
+ ip, hostname);
- return rpcsvc_combine_gen_spec_volume_checks (general_chk,
- specific_chk);
+ if (hostname)
+ GF_FREE (hostname);
+ return rpcsvc_combine_allow_reject_volume_check (accept, reject);
}
-
int
rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname,
rpc_transport_t *trans)
@@ -2363,8 +2370,6 @@ rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname,
socklen_t sinsize = sizeof (&sock_union.sin);
char *srchstr = NULL;
char *valstr = NULL;
- int globalinsecure = RPCSVC_AUTH_REJECT;
- int exportinsecure = RPCSVC_AUTH_DONTCARE;
uint16_t port = 0;
gf_boolean_t insecure = _gf_false;
@@ -2393,23 +2398,6 @@ rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname,
}
/* Disabled by default */
- if ((dict_get (svc->options, "rpc-auth.ports.insecure"))) {
- ret = dict_get_str (svc->options, "rpc-auth.ports.insecure"
- , &srchstr);
- if (ret == 0) {
- ret = gf_string2boolean (srchstr, &insecure);
- if (ret == 0) {
- if (insecure == _gf_true)
- globalinsecure = RPCSVC_AUTH_ACCEPT;
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
- " read rpc-auth.ports.insecure value");
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
- " read rpc-auth.ports.insecure value");
- }
-
- /* Disabled by default */
ret = gf_asprintf (&srchstr, "rpc-auth.ports.%s.insecure", volname);
if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
@@ -2417,25 +2405,22 @@ rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname,
goto err;
}
- if (dict_get (svc->options, srchstr)) {
- ret = dict_get_str (svc->options, srchstr, &valstr);
- if (ret == 0) {
- ret = gf_string2boolean (valstr, &insecure);
- if (ret == 0) {
- if (insecure == _gf_true)
- exportinsecure = RPCSVC_AUTH_ACCEPT;
- else
- exportinsecure = RPCSVC_AUTH_REJECT;
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
- " read rpc-auth.ports.insecure value");
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
- " read rpc-auth.ports.insecure value");
- }
-
- ret = rpcsvc_combine_gen_spec_volume_checks (globalinsecure,
- exportinsecure);
+ ret = dict_get_str (svc->options, srchstr, &valstr);
+ if (ret) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
+ " read rpc-auth.ports.insecure value");
+ goto err;
+ }
+
+ ret = gf_string2boolean (valstr, &insecure);
+ if (ret) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
+ " convert rpc-auth.ports.insecure value");
+ goto err;
+ }
+
+ ret = insecure ? RPCSVC_AUTH_ACCEPT : RPCSVC_AUTH_REJECT;
+
if (ret == RPCSVC_AUTH_ACCEPT)
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port allowed");
else
@@ -2443,7 +2428,8 @@ rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname,
" allowed");
err:
- GF_FREE (srchstr);
+ if (srchstr)
+ GF_FREE (srchstr);
return ret;
}
@@ -2479,9 +2465,9 @@ out:
rpcsvc_actor_t gluster_dump_actors[] = {
- [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0},
- [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0},
- [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0},
+ [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA},
+ [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA},
+ [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0, DRC_NA},
};
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index 1323c8b7a..cbc1f4226 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -38,6 +38,10 @@
#define MAX_IOVEC 16
#endif
+#define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT 64
+#define RPCSVC_MAX_OUTSTANDING_RPC_LIMIT 65536
+#define RPCSVC_MIN_OUTSTANDING_RPC_LIMIT 0 /* No limit i.e. Unlimited */
+
#define GF_RPCSVC "rpc-service"
#define RPCSVC_THREAD_STACK_SIZE ((size_t)(1024 * GF_UNIT_KB))
@@ -140,6 +144,9 @@ typedef struct rpcsvc_auth_data {
#define rpcsvc_auth_flavour(au) ((au).flavour)
+typedef struct drc_client drc_client_t;
+typedef struct drc_cached_op drc_cached_op_t;
+
/* The container for the RPC call handed up to an actor.
* Dynamically allocated. Lives till the call reply is completely
* transmitted.
@@ -178,7 +185,9 @@ struct rpcsvc_request {
/* Might want to move this to AUTH_UNIX specific state since this array
* is not available for every authentication scheme.
*/
- gid_t auxgids[GF_MAX_AUX_GROUPS];
+ gid_t *auxgids;
+ gid_t auxgidsmall[SMALL_GROUP_COUNT];
+ gid_t *auxgidlarge;
int auxgidcount;
@@ -241,6 +250,9 @@ struct rpcsvc_request {
/* we need to ref the 'iobuf' in case of 'synctasking' it */
struct iobuf *hdr_iobuf;
+
+ /* pointer to cached reply for use in DRC */
+ drc_cached_op_t *reply;
};
#define rpcsvc_request_program(req) ((rpcsvc_program_t *)((req)->prog))
@@ -314,7 +326,6 @@ typedef void *(*rpcsvc_encode_reply) (void *msg);
*/
typedef void (*rpcsvc_deallocate_reply) (void *msg);
-
#define RPCSVC_NAME_MAX 32
/* The descriptor for each procedure/actor that runs
* over the RPC service.
@@ -336,6 +347,7 @@ typedef struct rpcsvc_actor_desc {
/* Can actor be ran on behalf an unprivileged requestor? */
gf_boolean_t unprivileged;
+ drc_op_type_t op_type;
} rpcsvc_actor_t;
/* Describes a program and its version along with the function pointers
@@ -429,6 +441,9 @@ extern int
rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port);
extern int
+rpcsvc_program_unregister_portmap (rpcsvc_program_t *newprog);
+
+extern int
rpcsvc_register_portmap_enabled (rpcsvc_t *svc);
/* Inits the global RPC service data structures.
@@ -438,6 +453,9 @@ extern rpcsvc_t *
rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options,
uint32_t poolcount);
+extern int
+rpcsvc_reconfigure_options (rpcsvc_t *svc, dict_t *options);
+
int
rpcsvc_register_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata);
@@ -448,6 +466,13 @@ int
rpcsvc_unregister_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata);
int
+rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr,
+ int rpchdrcount, struct iovec *proghdr,
+ int proghdrcount, struct iovec *progpayload,
+ int progpayloadcount, struct iobref *iobref,
+ void *priv);
+
+int
rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr,
int hdrcount, struct iovec *payload, int payloadcount,
struct iobref *iobref);
@@ -473,12 +498,12 @@ rpcsvc_transport_peeraddr (rpc_transport_t *trans, char *addrstr, int addrlen,
struct sockaddr_storage *returnsa, socklen_t sasize);
extern int
-rpcsvc_transport_peer_check (dict_t *options, char *volname,
- rpc_transport_t *trans);
+rpcsvc_auth_check (rpcsvc_t *svc, char *volname, rpc_transport_t *trans);
extern int
rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname,
rpc_transport_t *trans);
+
#define rpcsvc_request_seterr(req, err) (req)->rpc_err = err
#define rpcsvc_request_set_autherr(req, err) (req)->auth_err = err
@@ -534,6 +559,9 @@ extern int
rpcsvc_auth_init (rpcsvc_t *svc, dict_t *options);
extern int
+rpcsvc_auth_reconf (rpcsvc_t *svc, dict_t *options);
+
+extern int
rpcsvc_auth_transport_init (rpc_transport_t *xprt);
extern int
@@ -550,9 +578,6 @@ rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen);
extern gid_t *
rpcsvc_auth_unix_auxgids (rpcsvc_request_t *req, int *arrlen);
-extern int
-rpcsvc_combine_gen_spec_volume_checks (int gen, int spec);
-
extern char *
rpcsvc_volume_allowed (dict_t *options, char *volname);
@@ -560,18 +585,22 @@ int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
rpcsvc_cbk_program_t *prog, int procnum,
struct iovec *proghdr, int proghdrcount);
+rpcsvc_actor_t *
+rpcsvc_program_actor (rpcsvc_request_t *req);
+
int
rpcsvc_transport_unix_options_build (dict_t **options, char *filepath);
int
rpcsvc_set_allow_insecure (rpcsvc_t *svc, dict_t *options);
int
+rpcsvc_set_addr_namelookup (rpcsvc_t *svc, dict_t *options);
+int
rpcsvc_set_root_squash (rpcsvc_t *svc, dict_t *options);
int
+rpcsvc_set_outstanding_rpc_limit (rpcsvc_t *svc, dict_t *options);
+int
rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen);
-char *
-rpcsvc_volume_allowed (dict_t *options, char *volname);
rpcsvc_vector_sizer
rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum,
uint32_t progver, uint32_t procnum);
-
#endif
diff --git a/rpc/rpc-lib/src/xdr-rpc.c b/rpc/rpc-lib/src/xdr-rpc.c
index ef52764c3..adb48a531 100644
--- a/rpc/rpc-lib/src/xdr-rpc.c
+++ b/rpc/rpc-lib/src/xdr-rpc.c
@@ -34,7 +34,7 @@ xdr_to_rpc_call (char *msgbuf, size_t len, struct rpc_msg *call,
struct iovec *payload, char *credbytes, char *verfbytes)
{
XDR xdr;
- char opaquebytes[MAX_AUTH_BYTES];
+ char opaquebytes[GF_MAX_AUTH_BYTES];
struct opaque_auth *oa = NULL;
int ret = -1;