summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src/rpcsvc.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src/rpcsvc.c')
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c1935
1 files changed, 1259 insertions, 676 deletions
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 88675205c..037c157f2 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -1,20 +1,11 @@
/*
- Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com>
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
- GlusterFS is free software; you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published
- by the Free Software Foundation; either version 3 of the License,
- or (at your option) any later version.
-
- GlusterFS is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see
- <http://www.gnu.org/licenses/>.
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
*/
#ifndef _CONFIG_H
@@ -34,6 +25,10 @@
#include "iobuf.h"
#include "globals.h"
#include "xdr-common.h"
+#include "xdr-generic.h"
+#include "rpc-common-xdr.h"
+#include "syncop.h"
+#include "rpc-drc.h"
#include <errno.h>
#include <pthread.h>
@@ -46,6 +41,9 @@
#include <stdarg.h>
#include <stdio.h>
+#include "xdr-rpcclnt.h"
+#include "glusterfs-acl.h"
+
struct rpcsvc_program gluster_dump_prog;
#define rpcsvc_alloc_request(svc, request) \
@@ -58,450 +56,6 @@ rpcsvc_listener_t *
rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans);
int
-rpcsvc_transport_peer_check_search (dict_t *options, char *pattern, char *clstr)
-{
- int ret = -1;
- char *addrtok = NULL;
- char *addrstr = NULL;
- char *svptr = NULL;
-
- if ((!options) || (!clstr))
- return -1;
-
- if (!dict_get (options, pattern))
- return -1;
-
- ret = dict_get_str (options, pattern, &addrstr);
- if (ret < 0) {
- ret = -1;
- goto err;
- }
-
- if (!addrstr) {
- ret = -1;
- goto err;
- }
-
- addrtok = strtok_r (addrstr, ",", &svptr);
- while (addrtok) {
-
-#ifdef FNM_CASEFOLD
- ret = fnmatch (addrtok, clstr, FNM_CASEFOLD);
-#else
- ret = fnmatch (addrtok, clstr, 0);
-#endif
- if (ret == 0)
- goto err;
-
- addrtok = strtok_r (NULL, ",", &svptr);
- }
-
- ret = -1;
-err:
-
- return ret;
-}
-
-
-int
-rpcsvc_transport_peer_check_allow (dict_t *options, char *volname, char *clstr)
-{
- int ret = RPCSVC_AUTH_DONTCARE;
- char *srchstr = NULL;
- char globalrule[] = "rpc-auth.addr.allow";
-
- if ((!options) || (!clstr))
- return ret;
-
- /* If volname is NULL, then we're searching for the general rule to
- * determine the current address in clstr is allowed or not for all
- * subvolumes.
- */
- if (volname) {
- ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- ret = RPCSVC_AUTH_DONTCARE;
- goto out;
- }
- } else
- srchstr = globalrule;
-
- ret = rpcsvc_transport_peer_check_search (options, srchstr, clstr);
- if (volname)
- GF_FREE (srchstr);
-
- if (ret == 0)
- ret = RPCSVC_AUTH_ACCEPT;
- else
- ret = RPCSVC_AUTH_DONTCARE;
-out:
- return ret;
-}
-
-int
-rpcsvc_transport_peer_check_reject (dict_t *options, char *volname, char *clstr)
-{
- int ret = RPCSVC_AUTH_DONTCARE;
- char *srchstr = NULL;
- char generalrule[] = "rpc-auth.addr.reject";
-
- if ((!options) || (!clstr))
- return ret;
-
- if (volname) {
- ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject", volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- ret = RPCSVC_AUTH_REJECT;
- goto out;
- }
- } else
- srchstr = generalrule;
-
- ret = rpcsvc_transport_peer_check_search (options, srchstr, clstr);
- if (volname)
- GF_FREE (srchstr);
-
- if (ret == 0)
- ret = RPCSVC_AUTH_REJECT;
- else
- ret = RPCSVC_AUTH_DONTCARE;
-out:
- return ret;
-}
-
-
-/* This function tests the results of the allow rule and the reject rule to
- * combine them into a single result that can be used to determine if the
- * connection should be allowed to proceed.
- * Heres the test matrix we need to follow in this function.
- *
- * A - Allow, the result of the allow test. Never returns R.
- * R - Reject, result of the reject test. Never returns A.
- * Both can return D or dont care if no rule was given.
- *
- * | @allow | @reject | Result |
- * | A | R | R |
- * | D | D | D |
- * | A | D | A |
- * | D | R | R |
- */
-int
-rpcsvc_combine_allow_reject_volume_check (int allow, int reject)
-{
- int final = RPCSVC_AUTH_REJECT;
-
- /* If allowed rule allows but reject rule rejects, we stay cautious
- * and reject. */
- if ((allow == RPCSVC_AUTH_ACCEPT) && (reject == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- /* if both are dont care, that is user did not specify for either allow
- * or reject, we leave it up to the general rule to apply, in the hope
- * that there is one.
- */
- else if ((allow == RPCSVC_AUTH_DONTCARE) &&
- (reject == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_DONTCARE;
- /* If one is dont care, the other one applies. */
- else if ((allow == RPCSVC_AUTH_ACCEPT) &&
- (reject == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((allow == RPCSVC_AUTH_DONTCARE) &&
- (reject == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
-
- return final;
-}
-
-
-/* Combines the result of the general rule test against, the specific rule
- * to determine final permission for the client's address.
- *
- * | @gen | @spec | Result |
- * | A | A | A |
- * | A | R | R |
- * | A | D | A |
- * | D | A | A |
- * | D | R | R |
- * | D | D | D |
- * | R | A | A |
- * | R | D | R |
- * | R | R | R |
- */
-int
-rpcsvc_combine_gen_spec_addr_checks (int gen, int spec)
-{
- int final = RPCSVC_AUTH_REJECT;
-
- if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec== RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_DONTCARE;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
-
- return final;
-}
-
-
-
-/* Combines the result of the general rule test against, the specific rule
- * to determine final test for the connection coming in for a given volume.
- *
- * | @gen | @spec | Result |
- * | A | A | A |
- * | A | R | R |
- * | A | D | A |
- * | D | A | A |
- * | D | R | R |
- * | D | D | R |, special case, we intentionally disallow this.
- * | R | A | A |
- * | R | D | R |
- * | R | R | R |
- */
-int
-rpcsvc_combine_gen_spec_volume_checks (int gen, int spec)
-{
- int final = RPCSVC_AUTH_REJECT;
-
- if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- /* On no rule, we reject. */
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec== RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
-
- return final;
-}
-
-
-int
-rpcsvc_transport_peer_check_name (dict_t *options, char *volname,
- rpc_transport_t *trans)
-{
- int ret = RPCSVC_AUTH_REJECT;
- int aret = RPCSVC_AUTH_REJECT;
- int rjret = RPCSVC_AUTH_REJECT;
- char clstr[RPCSVC_PEER_STRLEN];
-
- if (!trans)
- return ret;
-
- ret = rpc_transport_get_peername (trans, clstr, RPCSVC_PEER_STRLEN);
- if (ret != 0) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: "
- "%s", gai_strerror (ret));
- ret = RPCSVC_AUTH_REJECT;
- goto err;
- }
-
- aret = rpcsvc_transport_peer_check_allow (options, volname, clstr);
- rjret = rpcsvc_transport_peer_check_reject (options, volname, clstr);
-
- ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret);
-
-err:
- return ret;
-}
-
-
-int
-rpcsvc_transport_peer_check_addr (dict_t *options, char *volname,
- rpc_transport_t *trans)
-{
- int ret = RPCSVC_AUTH_REJECT;
- int aret = RPCSVC_AUTH_DONTCARE;
- int rjret = RPCSVC_AUTH_REJECT;
- char clstr[RPCSVC_PEER_STRLEN];
-
- if (!trans)
- return ret;
-
- ret = rpcsvc_transport_peeraddr (trans, clstr, RPCSVC_PEER_STRLEN, NULL,
- 0);
- if (ret != 0) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: "
- "%s", gai_strerror (ret));
- ret = RPCSVC_AUTH_REJECT;
- goto err;
- }
-
- aret = rpcsvc_transport_peer_check_allow (options, volname, clstr);
- rjret = rpcsvc_transport_peer_check_reject (options, volname, clstr);
-
- ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret);
-err:
- return ret;
-}
-
-
-int
-rpcsvc_transport_check_volume_specific (dict_t *options, char *volname,
- rpc_transport_t *trans)
-{
- int namechk = RPCSVC_AUTH_REJECT;
- int addrchk = RPCSVC_AUTH_REJECT;
- gf_boolean_t namelookup = _gf_true;
- char *namestr = NULL;
- int ret = 0;
-
- if ((!options) || (!volname) || (!trans))
- return RPCSVC_AUTH_REJECT;
-
- /* Enabled by default */
- if ((dict_get (options, "rpc-auth.addr.namelookup"))) {
- ret = dict_get_str (options, "rpc-auth.addr.namelookup"
- , &namestr);
- if (ret == 0) {
- ret = gf_string2boolean (namestr, &namelookup);
- if (ret)
- gf_log ("rpcsvc", GF_LOG_DEBUG,
- "wrong option %s given for "
- "'namelookup'", namestr);
- }
- }
-
- /* We need two separate checks because the rules with addresses in them
- * can be network addresses which can be general and names can be
- * specific which will over-ride the network address rules.
- */
- if (namelookup)
- namechk = rpcsvc_transport_peer_check_name (options, volname,
- trans);
- addrchk = rpcsvc_transport_peer_check_addr (options, volname, trans);
-
- if (namelookup)
- ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk);
- else
- ret = addrchk;
-
- return ret;
-}
-
-
-int
-rpcsvc_transport_check_volume_general (dict_t *options, rpc_transport_t *trans)
-{
- int addrchk = RPCSVC_AUTH_REJECT;
- int namechk = RPCSVC_AUTH_REJECT;
- gf_boolean_t namelookup = _gf_true;
- char *namestr = NULL;
- int ret = 0;
-
- if ((!options) || (!trans))
- return RPCSVC_AUTH_REJECT;
-
- /* Enabled by default */
- if ((dict_get (options, "rpc-auth.addr.namelookup"))) {
- ret = dict_get_str (options, "rpc-auth.addr.namelookup"
- , &namestr);
- if (!ret) {
- ret = gf_string2boolean (namestr, &namelookup);
- if (ret)
- gf_log ("rpcsvc", GF_LOG_DEBUG,
- "wrong option %s given for "
- "'namelookup'", namestr);
- }
- }
-
- /* We need two separate checks because the rules with addresses in them
- * can be network addresses which can be general and names can be
- * specific which will over-ride the network address rules.
- */
- if (namelookup)
- namechk = rpcsvc_transport_peer_check_name (options, NULL,
- trans);
-
- addrchk = rpcsvc_transport_peer_check_addr (options, NULL, trans);
-
- if (namelookup)
- ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk);
- else
- ret = addrchk;
-
- return ret;
-}
-
-int
-rpcsvc_transport_peer_check (dict_t *options, char *volname,
- rpc_transport_t *trans)
-{
- int general_chk = RPCSVC_AUTH_REJECT;
- int specific_chk = RPCSVC_AUTH_REJECT;
-
- if ((!options) || (!volname) || (!trans))
- return RPCSVC_AUTH_REJECT;
-
- general_chk = rpcsvc_transport_check_volume_general (options, trans);
- specific_chk = rpcsvc_transport_check_volume_specific (options, volname,
- trans);
-
- return rpcsvc_combine_gen_spec_volume_checks (general_chk,specific_chk);
-}
-
-
-char *
-rpcsvc_volume_allowed (dict_t *options, char *volname)
-{
- char globalrule[] = "rpc-auth.addr.allow";
- char *srchstr = NULL;
- char *addrstr = NULL;
- int ret = -1;
-
- if ((!options) || (!volname))
- return NULL;
-
- ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- goto out;
- }
-
- if (!dict_get (options, srchstr)) {
- GF_FREE (srchstr);
- srchstr = globalrule;
- ret = dict_get_str (options, srchstr, &addrstr);
- if (ret)
- gf_log ("rpcsvc", GF_LOG_DEBUG,
- "failed to get the string %s", srchstr);
- } else {
- ret = dict_get_str (options, srchstr, &addrstr);
- if (ret)
- gf_log ("rpcsvc", GF_LOG_DEBUG,
- "failed to get the string %s", srchstr);
- }
-out:
- return addrstr;
-}
-
-
-int
rpcsvc_notify (rpc_transport_t *trans, void *mydata,
rpc_transport_event_t event, void *data, ...);
@@ -512,7 +66,6 @@ rpcsvc_notify_wrapper_alloc (void)
wrapper = GF_CALLOC (1, sizeof (*wrapper), gf_common_mt_rpcsvc_wrapper_t);
if (!wrapper) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation failed");
goto out;
}
@@ -548,99 +101,61 @@ out:
return;
}
-
-int
-rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname,
- rpc_transport_t *trans)
+rpcsvc_vector_sizer
+rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum,
+ uint32_t progver, uint32_t procnum)
{
- struct sockaddr_storage sa = {0, };
- int ret = RPCSVC_AUTH_REJECT;
- socklen_t sasize = sizeof (sa);
- char *srchstr = NULL;
- char *valstr = NULL;
- int globalinsecure = RPCSVC_AUTH_REJECT;
- int exportinsecure = RPCSVC_AUTH_DONTCARE;
- uint16_t port = 0;
- gf_boolean_t insecure = _gf_false;
+ rpcsvc_program_t *program = NULL;
+ char found = 0;
- if ((!svc) || (!volname) || (!trans))
- return ret;
+ if (!svc)
+ return NULL;
- ret = rpcsvc_transport_peeraddr (trans, NULL, 0, &sa,
- sasize);
- if (ret != 0) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get peer addr: %s",
- gai_strerror (ret));
- ret = RPCSVC_AUTH_REJECT;
- goto err;
+ pthread_mutex_lock (&svc->rpclock);
+ {
+ list_for_each_entry (program, &svc->programs, program) {
+ if ((program->prognum == prognum)
+ && (program->progver == progver)) {
+ found = 1;
+ break;
+ }
+ }
}
+ pthread_mutex_unlock (&svc->rpclock);
- if (sa.ss_family == AF_INET) {
- port = ((struct sockaddr_in *)&sa)->sin_port;
- } else {
- port = ((struct sockaddr_in6 *)&sa)->sin6_port;
- }
+ if (found)
+ return program->actors[procnum].vector_sizer;
+ else
+ return NULL;
+}
- gf_log (GF_RPCSVC, GF_LOG_TRACE, "Client port: %d", (int)port);
- /* If the port is already a privileged one, dont bother with checking
- * options.
- */
- if (port <= 1024) {
- ret = RPCSVC_AUTH_ACCEPT;
- goto err;
- }
+int
+rpcsvc_request_outstanding (rpcsvc_t *svc, rpc_transport_t *trans, int delta)
+{
+ int ret = 0;
+ int old_count = 0;
+ int new_count = 0;
+ int limit = 0;
- /* Disabled by default */
- if ((dict_get (svc->options, "rpc-auth.ports.insecure"))) {
- ret = dict_get_str (svc->options, "rpc-auth.ports.insecure"
- , &srchstr);
- if (ret == 0) {
- ret = gf_string2boolean (srchstr, &insecure);
- if (ret == 0) {
- if (insecure == _gf_true)
- globalinsecure = RPCSVC_AUTH_ACCEPT;
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
- " read rpc-auth.ports.insecure value");
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
- " read rpc-auth.ports.insecure value");
- }
+ pthread_mutex_lock (&trans->lock);
+ {
+ limit = svc->outstanding_rpc_limit;
+ if (!limit)
+ goto unlock;
- /* Disabled by default */
- ret = gf_asprintf (&srchstr, "rpc-auth.ports.%s.insecure", volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- ret = RPCSVC_AUTH_REJECT;
- goto err;
- }
+ old_count = trans->outstanding_rpc_count;
+ trans->outstanding_rpc_count += delta;
+ new_count = trans->outstanding_rpc_count;
- if (dict_get (svc->options, srchstr)) {
- ret = dict_get_str (svc->options, srchstr, &valstr);
- if (ret == 0) {
- ret = gf_string2boolean (srchstr, &insecure);
- if (ret == 0) {
- if (insecure == _gf_true)
- exportinsecure = RPCSVC_AUTH_ACCEPT;
- else
- exportinsecure = RPCSVC_AUTH_REJECT;
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
- " read rpc-auth.ports.insecure value");
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
- " read rpc-auth.ports.insecure value");
- }
-
- ret = rpcsvc_combine_gen_spec_volume_checks (globalinsecure,
- exportinsecure);
- if (ret == RPCSVC_AUTH_ACCEPT)
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port allowed");
- else
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port not"
- " allowed");
+ if (old_count <= limit && new_count > limit)
+ ret = rpc_transport_throttle (trans, _gf_true);
+
+ if (old_count > limit && new_count <= limit)
+ ret = rpc_transport_throttle (trans, _gf_false);
+ }
+unlock:
+ pthread_mutex_unlock (&trans->lock);
-err:
return ret;
}
@@ -680,26 +195,35 @@ rpcsvc_program_actor (rpcsvc_request_t *req)
if (!found) {
if (err != PROG_MISMATCH) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "RPC program not available");
+ /* log in DEBUG when nfs clients try to see if
+ * ACL requests are accepted by nfs server
+ */
+ gf_log (GF_RPCSVC, (req->prognum == ACL_PROGRAM) ?
+ GF_LOG_DEBUG : GF_LOG_WARNING,
+ "RPC program not available (req %u %u)",
+ req->prognum, req->progver);
err = PROG_UNAVAIL;
goto err;
}
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC program version not"
- " available");
+ gf_log (GF_RPCSVC, GF_LOG_WARNING,
+ "RPC program version not available (req %u %u)",
+ req->prognum, req->progver);
goto err;
}
req->prog = program;
if (!program->actors) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC System error");
+ gf_log (GF_RPCSVC, GF_LOG_WARNING,
+ "RPC Actor not found for program %s %d",
+ program->progname, program->prognum);
err = SYSTEM_ERR;
goto err;
}
if ((req->procnum < 0) || (req->procnum >= program->numactors)) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC Program procedure not"
- " available");
+ " available for procedure %d in %s", req->procnum,
+ program->progname);
err = PROC_UNAVAIL;
goto err;
}
@@ -707,12 +231,15 @@ rpcsvc_program_actor (rpcsvc_request_t *req)
actor = &program->actors[req->procnum];
if (!actor->actor) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC Program procedure not"
- " available");
+ " available for procedure %d in %s", req->procnum,
+ program->progname);
err = PROC_UNAVAIL;
actor = NULL;
goto err;
}
+ req->synctask = program->synctask;
+
err = SUCCESS;
gf_log (GF_RPCSVC, GF_LOG_TRACE, "Actor found: %s - %s",
program->progname, actor->procname);
@@ -725,9 +252,9 @@ err:
/* this procedure can only pass 4 arguments to registered notifyfn. To send more
- * arguements call wrapper->notify directly.
+ * arguments call wrapper->notify directly.
*/
-inline void
+static inline void
rpcsvc_program_notify (rpcsvc_listener_t *listener, rpcsvc_event_t event,
void *data)
{
@@ -750,7 +277,7 @@ out:
}
-inline int
+static inline int
rpcsvc_accept (rpcsvc_t *svc, rpc_transport_t *listen_trans,
rpc_transport_t *new_trans)
{
@@ -780,9 +307,21 @@ rpcsvc_request_destroy (rpcsvc_request_t *req)
iobref_unref (req->iobref);
}
+ if (req->hdr_iobuf)
+ iobuf_unref (req->hdr_iobuf);
+
+ /* This marks the "end" of an RPC request. Reply is
+ completely written to the socket and is on the way
+ to the client. It is time to decrement the
+ outstanding request counter by 1.
+ */
+ rpcsvc_request_outstanding (req->svc, req->trans, -1);
+
rpc_transport_unref (req->trans);
- mem_put (req->svc->rxpool, req);
+ GF_FREE (req->auxgidlarge);
+
+ mem_put (req);
out:
return;
@@ -795,6 +334,8 @@ rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans,
struct iovec progmsg, rpc_transport_pollin_t *msg,
rpcsvc_request_t *req)
{
+ int i = 0;
+
if ((!trans) || (!callmsg)|| (!req) || (!msg))
return NULL;
@@ -809,7 +350,11 @@ rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans,
req->msg[0] = progmsg;
req->iobref = iobref_ref (msg->iobref);
if (msg->vectored) {
- req->msg[1] = msg->vector[1];
+ /* msg->vector[2] is defined in structure. prevent a
+ out of bound access */
+ for (i = 1; i < min (msg->count, 2); i++) {
+ req->msg[i] = msg->vector[i];
+ }
}
req->svc = svc;
@@ -855,10 +400,15 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans,
*/
rpcsvc_alloc_request (svc, req);
if (!req) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to alloc request");
goto err;
}
+ /* We just received a new request from the wire. Account for
+ it in the outsanding request counter to make sure we don't
+ ingest too many concurrent requests from the same client.
+ */
+ ret = rpcsvc_request_outstanding (svc, trans, +1);
+
msgbuf = msg->vector[0].iov_base;
msglen = msg->vector[0].iov_len;
@@ -866,21 +416,32 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans,
req->cred.authdata,req->verf.authdata);
if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC call decoding failed");
+ gf_log (GF_RPCSVC, GF_LOG_WARNING, "RPC call decoding failed");
rpcsvc_request_seterr (req, GARBAGE_ARGS);
+ req->trans = rpc_transport_ref (trans);
+ req->svc = svc;
goto err;
}
ret = -1;
rpcsvc_request_init (svc, trans, &rpcmsg, progmsg, msg, req);
- gf_log (GF_RPCSVC, GF_LOG_TRACE, "RPC XID: %lx, Ver: %ld, Program: %ld,"
- " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg),
+ gf_log (GF_RPCSVC, GF_LOG_TRACE, "received rpc-message (XID: 0x%lx, "
+ "Ver: %ld, Program: %ld, ProgVers: %ld, Proc: %ld) from"
+ " rpc-transport (%s)", rpc_call_xid (&rpcmsg),
rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg),
- rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg));
+ rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg),
+ trans->name);
if (rpc_call_rpcvers (&rpcmsg) != 2) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC version not supported");
+ /* LOG- TODO: print rpc version, also print the peerinfo
+ from transport */
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC version not supported "
+ "(XID: 0x%lx, Ver: %ld, Prog: %ld, ProgVers: %ld, "
+ "Proc: %ld) from trans (%s)", rpc_call_xid (&rpcmsg),
+ rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg),
+ rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg),
+ trans->name);
rpcsvc_request_seterr (req, RPC_MISMATCH);
goto err;
}
@@ -892,7 +453,12 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans,
* error happened.
*/
rpcsvc_request_seterr (req, AUTH_ERROR);
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed authentication");
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "auth failed on request. "
+ "(XID: 0x%lx, Ver: %ld, Prog: %ld, ProgVers: %ld, "
+ "Proc: %ld) from trans (%s)", rpc_call_xid (&rpcmsg),
+ rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg),
+ rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg),
+ trans->name);
ret = -1;
goto err;
}
@@ -902,12 +468,13 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans,
* since we are not handling authentication failures for now.
*/
req->rpc_status = MSG_ACCEPTED;
+ req->reply = NULL;
ret = 0;
err:
if (ret == -1) {
ret = rpcsvc_error_reply (req);
if (ret)
- gf_log ("rpcsvc", GF_LOG_DEBUG,
+ gf_log ("rpcsvc", GF_LOG_WARNING,
"failed to queue error reply");
req = NULL;
}
@@ -917,19 +484,75 @@ err:
int
+rpcsvc_check_and_reply_error (int ret, call_frame_t *frame, void *opaque)
+{
+ rpcsvc_request_t *req = NULL;
+
+ req = opaque;
+
+ if (ret)
+ gf_log ("rpcsvc", GF_LOG_ERROR,
+ "rpc actor failed to complete successfully");
+
+ if (ret == RPCSVC_ACTOR_ERROR) {
+ ret = rpcsvc_error_reply (req);
+ if (ret)
+ gf_log ("rpcsvc", GF_LOG_WARNING,
+ "failed to queue error reply");
+ }
+
+ return 0;
+}
+
+int
rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
rpc_transport_pollin_t *msg)
{
- rpcsvc_actor_t *actor = NULL;
- rpcsvc_request_t *req = NULL;
- int ret = -1;
+ rpcsvc_actor_t *actor = NULL;
+ rpcsvc_actor actor_fn = NULL;
+ rpcsvc_request_t *req = NULL;
+ int ret = -1;
+ uint16_t port = 0;
+ gf_boolean_t is_unix = _gf_false;
+ gf_boolean_t unprivileged = _gf_false;
+ drc_cached_op_t *reply = NULL;
+ rpcsvc_drc_globals_t *drc = NULL;
if (!trans || !svc)
return -1;
+ switch (trans->peerinfo.sockaddr.ss_family) {
+ case AF_INET:
+ port = ((struct sockaddr_in *)&trans->peerinfo.sockaddr)->sin_port;
+ break;
+
+ case AF_INET6:
+ port = ((struct sockaddr_in6 *)&trans->peerinfo.sockaddr)->sin6_port;
+ break;
+ case AF_UNIX:
+ is_unix = _gf_true;
+ break;
+ default:
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "invalid address family (%d)",
+ trans->peerinfo.sockaddr.ss_family);
+ return -1;
+ }
+
+
+
+ if (is_unix == _gf_false) {
+ port = ntohs (port);
+
+ gf_log ("rpcsvc", GF_LOG_TRACE, "Client port: %d", (int)port);
+
+ if (port > 1024)
+ unprivileged = _gf_true;
+ }
+
req = rpcsvc_request_create (svc, trans, msg);
if (!req)
- goto err;
+ goto out;
if (!rpcsvc_request_accepted (req))
goto err_reply;
@@ -938,38 +561,85 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
if (!actor)
goto err_reply;
- if (actor && (req->rpc_err == SUCCESS)) {
+ if (0 == svc->allow_insecure && unprivileged && !actor->unprivileged) {
+ /* Non-privileged user, fail request */
+ gf_log ("glusterd", GF_LOG_ERROR,
+ "Request received from non-"
+ "privileged port. Failing request");
+ rpcsvc_request_destroy (req);
+ return -1;
+ }
+
+ /* DRC */
+ if (rpcsvc_need_drc (req)) {
+ drc = req->svc->drc;
+
+ LOCK (&drc->lock);
+ reply = rpcsvc_drc_lookup (req);
+
+ /* retransmission of completed request, send cached reply */
+ if (reply && reply->state == DRC_OP_CACHED) {
+ gf_log (GF_RPCSVC, GF_LOG_INFO, "duplicate request:"
+ " XID: 0x%x", req->xid);
+ ret = rpcsvc_send_cached_reply (req, reply);
+ drc->cache_hits++;
+ UNLOCK (&drc->lock);
+ goto out;
+
+ } /* retransmitted request, original op in transit, drop it */
+ else if (reply && reply->state == DRC_OP_IN_TRANSIT) {
+ gf_log (GF_RPCSVC, GF_LOG_INFO, "op in transit,"
+ " discarding. XID: 0x%x", req->xid);
+ ret = 0;
+ drc->intransit_hits++;
+ rpcsvc_request_destroy (req);
+ UNLOCK (&drc->lock);
+ goto out;
+
+ } /* fresh request, cache it as in-transit and proceed */
+ else {
+ ret = rpcsvc_cache_request (req);
+ }
+ UNLOCK (&drc->lock);
+ }
+
+ if (req->rpc_err == SUCCESS) {
/* Before going to xlator code, set the THIS properly */
THIS = svc->mydata;
- if (req->count == 2) {
- if (actor->vector_actor) {
- ret = actor->vector_actor (req, &req->msg[1], 1,
- req->iobref);
- } else {
- rpcsvc_request_seterr (req, PROC_UNAVAIL);
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "No vectored handler present");
- ret = RPCSVC_ACTOR_ERROR;
- }
- } else if (actor->actor) {
- ret = actor->actor (req);
+ actor_fn = actor->actor;
+
+ if (!actor_fn) {
+ rpcsvc_request_seterr (req, PROC_UNAVAIL);
+ /* LOG TODO: print more info about procnum,
+ prognum etc, also print transport info */
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "No vectored handler present");
+ ret = RPCSVC_ACTOR_ERROR;
+ goto err_reply;
}
- }
-err_reply:
- if ((ret == RPCSVC_ACTOR_ERROR) || (req->rpc_err != SUCCESS)) {
- ret = rpcsvc_error_reply (req);
+ if (req->synctask) {
+ if (msg->hdr_iobuf)
+ req->hdr_iobuf = iobuf_ref (msg->hdr_iobuf);
+
+ ret = synctask_new (THIS->ctx->env,
+ (synctask_fn_t) actor_fn,
+ rpcsvc_check_and_reply_error, NULL,
+ req);
+ } else {
+ ret = actor_fn (req);
+ }
}
- if (ret)
- gf_log ("rpcsvc", GF_LOG_DEBUG, "failed to queue error reply");
+err_reply:
+ ret = rpcsvc_check_and_reply_error (ret, NULL, req);
/* No need to propagate error beyond this function since the reply
* has now been queued. */
ret = 0;
-err:
+out:
return ret;
}
@@ -981,46 +651,49 @@ rpcsvc_handle_disconnect (rpcsvc_t *svc, rpc_transport_t *trans)
rpcsvc_notify_wrapper_t *wrappers = NULL, *wrapper;
int32_t ret = -1, i = 0, wrapper_count = 0;
rpcsvc_listener_t *listener = NULL;
-
+
event = (trans->listener == NULL) ? RPCSVC_EVENT_LISTENER_DEAD
: RPCSVC_EVENT_DISCONNECT;
-
+
pthread_mutex_lock (&svc->rpclock);
{
+ if (!svc->notify_count)
+ goto unlock;
+
wrappers = GF_CALLOC (svc->notify_count, sizeof (*wrapper),
gf_common_mt_rpcsvc_wrapper_t);
if (!wrappers) {
goto unlock;
}
-
+
list_for_each_entry (wrapper, &svc->notify, list) {
if (wrapper->notify) {
wrappers[i++] = *wrapper;
}
}
-
+
wrapper_count = i;
}
unlock:
pthread_mutex_unlock (&svc->rpclock);
-
+
if (wrappers) {
for (i = 0; i < wrapper_count; i++) {
wrappers[i].notify (svc, wrappers[i].data,
event, trans);
}
-
+
GF_FREE (wrappers);
}
-
+
if (event == RPCSVC_EVENT_LISTENER_DEAD) {
listener = rpcsvc_get_listener (svc, -1, trans->listener);
rpcsvc_listener_destroy (listener);
}
-
+
return ret;
}
-
+
int
rpcsvc_notify (rpc_transport_t *trans, void *mydata,
@@ -1030,6 +703,7 @@ rpcsvc_notify (rpc_transport_t *trans, void *mydata,
rpc_transport_pollin_t *msg = NULL;
rpc_transport_t *new_trans = NULL;
rpcsvc_t *svc = NULL;
+ rpcsvc_listener_t *listener = NULL;
svc = mydata;
if (svc == NULL) {
@@ -1059,13 +733,20 @@ rpcsvc_notify (rpc_transport_t *trans, void *mydata,
/* do nothing, no need for rpcsvc to handle this, client should
* handle this event
*/
+ /* print info about transport too : LOG TODO */
gf_log ("rpcsvc", GF_LOG_CRITICAL,
"got CONNECT event, which should have not come");
ret = 0;
break;
case RPC_TRANSPORT_CLEANUP:
- /* FIXME: think about this later */
+ listener = rpcsvc_get_listener (svc, -1, trans->listener);
+ if (listener == NULL) {
+ goto out;
+ }
+
+ rpcsvc_program_notify (listener, RPCSVC_EVENT_TRANSPORT_DESTROY,
+ trans);
ret = 0;
break;
@@ -1099,7 +780,7 @@ rpcsvc_record_build_header (char *recordstart, size_t rlen,
*/
ret = rpc_reply_to_xdr (&reply, recordstart, rlen, &replyhdr);
if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to create RPC reply");
+ gf_log (GF_RPCSVC, GF_LOG_WARNING, "Failed to create RPC reply");
goto err;
}
@@ -1119,22 +800,208 @@ err:
return txrecord;
}
+static inline int
+rpcsvc_get_callid (rpcsvc_t *rpc)
+{
+ return GF_UNIVERSAL_ANSWER;
+}
-inline int
-rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec,
- int hdrcount, struct iovec *proghdr, int proghdrcount,
- struct iovec *progpayload, int progpayloadcount,
- struct iobref *iobref, void *priv)
+int
+rpcsvc_fill_callback (int prognum, int progver, int procnum, int payload,
+ uint64_t xid, struct rpc_msg *request)
+{
+ int ret = -1;
+
+ if (!request) {
+ goto out;
+ }
+
+ memset (request, 0, sizeof (*request));
+
+ request->rm_xid = xid;
+ request->rm_direction = CALL;
+
+ request->rm_call.cb_rpcvers = 2;
+ request->rm_call.cb_prog = prognum;
+ request->rm_call.cb_vers = progver;
+ request->rm_call.cb_proc = procnum;
+
+ request->rm_call.cb_cred.oa_flavor = AUTH_NONE;
+ request->rm_call.cb_cred.oa_base = NULL;
+ request->rm_call.cb_cred.oa_length = 0;
+
+ request->rm_call.cb_verf.oa_flavor = AUTH_NONE;
+ request->rm_call.cb_verf.oa_base = NULL;
+ request->rm_call.cb_verf.oa_length = 0;
+
+ ret = 0;
+out:
+ return ret;
+}
+
+
+struct iovec
+rpcsvc_callback_build_header (char *recordstart, size_t rlen,
+ struct rpc_msg *request, size_t payload)
+{
+ struct iovec requesthdr = {0, };
+ struct iovec txrecord = {0, 0};
+ int ret = -1;
+ size_t fraglen = 0;
+
+ ret = rpc_request_to_xdr (request, recordstart, rlen, &requesthdr);
+ if (ret == -1) {
+ gf_log ("rpcsvc", GF_LOG_WARNING,
+ "Failed to create RPC request");
+ goto out;
+ }
+
+ fraglen = payload + requesthdr.iov_len;
+ gf_log ("rpcsvc", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, "
+ "rpc hdr: %zu", fraglen, payload, requesthdr.iov_len);
+
+ txrecord.iov_base = recordstart;
+
+ /* Remember, this is only the vec for the RPC header and does not
+ * include the payload above. We needed the payload only to calculate
+ * the size of the full fragment. This size is sent in the fragment
+ * header.
+ */
+ txrecord.iov_len = requesthdr.iov_len;
+
+out:
+ return txrecord;
+}
+
+struct iobuf *
+rpcsvc_callback_build_record (rpcsvc_t *rpc, int prognum, int progver,
+ int procnum, size_t payload, uint64_t xid,
+ struct iovec *recbuf)
+{
+ struct rpc_msg request = {0, };
+ struct iobuf *request_iob = NULL;
+ char *record = NULL;
+ struct iovec recordhdr = {0, };
+ size_t pagesize = 0;
+ size_t xdr_size = 0;
+ int ret = -1;
+
+ if ((!rpc) || (!recbuf)) {
+ goto out;
+ }
+
+ /* Fill the rpc structure and XDR it into the buffer got above. */
+ ret = rpcsvc_fill_callback (prognum, progver, procnum, payload, xid,
+ &request);
+ if (ret == -1) {
+ gf_log ("rpcsvc", GF_LOG_WARNING, "cannot build a rpc-request "
+ "xid (%"PRIu64")", xid);
+ goto out;
+ }
+
+ /* First, try to get a pointer into the buffer which the RPC
+ * layer can use.
+ */
+ xdr_size = xdr_sizeof ((xdrproc_t)xdr_callmsg, &request);
+
+ request_iob = iobuf_get2 (rpc->ctx->iobuf_pool, (xdr_size + payload));
+ if (!request_iob) {
+ goto out;
+ }
+
+ pagesize = iobuf_pagesize (request_iob);
+
+ record = iobuf_ptr (request_iob); /* Now we have it. */
+
+ recordhdr = rpcsvc_callback_build_header (record, pagesize, &request,
+ payload);
+
+ if (!recordhdr.iov_base) {
+ gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record "
+ " header");
+ iobuf_unref (request_iob);
+ request_iob = NULL;
+ recbuf->iov_base = NULL;
+ goto out;
+ }
+
+ recbuf->iov_base = recordhdr.iov_base;
+ recbuf->iov_len = recordhdr.iov_len;
+
+out:
+ return request_iob;
+}
+
+int
+rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
+ rpcsvc_cbk_program_t *prog, int procnum,
+ struct iovec *proghdr, int proghdrcount)
+{
+ struct iobuf *request_iob = NULL;
+ struct iovec rpchdr = {0,};
+ rpc_transport_req_t req;
+ int ret = -1;
+ int proglen = 0;
+ uint64_t callid = 0;
+
+ if (!rpc) {
+ goto out;
+ }
+
+ memset (&req, 0, sizeof (req));
+
+ callid = rpcsvc_get_callid (rpc);
+
+ if (proghdr) {
+ proglen += iov_length (proghdr, proghdrcount);
+ }
+
+ request_iob = rpcsvc_callback_build_record (rpc, prog->prognum,
+ prog->progver, procnum,
+ proglen, callid,
+ &rpchdr);
+ if (!request_iob) {
+ gf_log ("rpcsvc", GF_LOG_WARNING,
+ "cannot build rpc-record");
+ goto out;
+ }
+
+ req.msg.rpchdr = &rpchdr;
+ req.msg.rpchdrcount = 1;
+ req.msg.proghdr = proghdr;
+ req.msg.proghdrcount = proghdrcount;
+
+ ret = rpc_transport_submit_request (trans, &req);
+ if (ret == -1) {
+ gf_log ("rpcsvc", GF_LOG_WARNING,
+ "transmission of rpc-request failed");
+ goto out;
+ }
+
+ ret = 0;
+
+out:
+ iobuf_unref (request_iob);
+
+ return ret;
+}
+
+int
+rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr,
+ int rpchdrcount, struct iovec *proghdr,
+ int proghdrcount, struct iovec *progpayload,
+ int progpayloadcount, struct iobref *iobref,
+ void *priv)
{
int ret = -1;
rpc_transport_reply_t reply = {{0, }};
- if ((!trans) || (!hdrvec) || (!hdrvec->iov_base)) {
+ if ((!trans) || (!rpchdr) || (!rpchdr->iov_base)) {
goto out;
}
- reply.msg.rpchdr = hdrvec;
- reply.msg.rpchdrcount = hdrcount;
+ reply.msg.rpchdr = rpchdr;
+ reply.msg.rpchdrcount = rpchdrcount;
reply.msg.proghdr = proghdr;
reply.msg.proghdrcount = proghdrcount;
reply.msg.progpayload = progpayload;
@@ -1152,25 +1019,31 @@ out:
int
rpcsvc_fill_reply (rpcsvc_request_t *req, struct rpc_msg *reply)
{
+ int ret = -1;
rpcsvc_program_t *prog = NULL;
if ((!req) || (!reply))
- return -1;
-
- prog = rpcsvc_request_program (req);
+ goto out;
+ ret = 0;
rpc_fill_empty_reply (reply, req->xid);
-
- if (req->rpc_status == MSG_DENIED)
+ if (req->rpc_status == MSG_DENIED) {
rpc_fill_denied_reply (reply, req->rpc_err, req->auth_err);
- else if (req->rpc_status == MSG_ACCEPTED)
- rpc_fill_accepted_reply (reply, req->rpc_err, prog->proglowvers,
- prog->proghighvers, req->verf.flavour,
- req->verf.datalen,
+ goto out;
+ }
+
+ prog = rpcsvc_request_program (req);
+
+ if (req->rpc_status == MSG_ACCEPTED)
+ rpc_fill_accepted_reply (reply, req->rpc_err,
+ (prog) ? prog->proglowvers : 0,
+ (prog) ? prog->proghighvers: 0,
+ req->verf.flavour, req->verf.datalen,
req->verf.authdata);
else
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Invalid rpc_status value");
- return 0;
+out:
+ return ret;
}
@@ -1184,30 +1057,40 @@ rpcsvc_fill_reply (rpcsvc_request_t *req, struct rpc_msg *reply)
*/
struct iobuf *
rpcsvc_record_build_record (rpcsvc_request_t *req, size_t payload,
- struct iovec *recbuf)
+ size_t hdrlen, struct iovec *recbuf)
{
struct rpc_msg reply;
struct iobuf *replyiob = NULL;
char *record = NULL;
struct iovec recordhdr = {0, };
size_t pagesize = 0;
+ size_t xdr_size = 0;
rpcsvc_t *svc = NULL;
+ int ret = -1;
if ((!req) || (!req->trans) || (!req->svc) || (!recbuf))
return NULL;
svc = req->svc;
- replyiob = iobuf_get (svc->ctx->iobuf_pool);
- pagesize = iobpool_pagesize ((struct iobuf_pool *)svc->ctx->iobuf_pool);
+
+ /* Fill the rpc structure and XDR it into the buffer got above. */
+ ret = rpcsvc_fill_reply (req, &reply);
+ if (ret)
+ goto err_exit;
+
+ xdr_size = xdr_sizeof ((xdrproc_t)xdr_replymsg, &reply);
+
+ /* Payload would include 'readv' size etc too, where as
+ that comes as another payload iobuf */
+ replyiob = iobuf_get2 (svc->ctx->iobuf_pool, (xdr_size + hdrlen));
if (!replyiob) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get iobuf");
goto err_exit;
}
+ pagesize = iobuf_pagesize (replyiob);
+
record = iobuf_ptr (replyiob); /* Now we have it. */
- /* Fill the rpc structure and XDR it into the buffer got above. */
- rpcsvc_fill_reply (req, &reply);
recordhdr = rpcsvc_record_build_header (record, pagesize, reply,
payload);
if (!recordhdr.iov_base) {
@@ -1262,7 +1145,9 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,
struct iovec recordhdr = {0, };
rpc_transport_t *trans = NULL;
size_t msglen = 0;
+ size_t hdrlen = 0;
char new_iobref = 0;
+ rpcsvc_drc_globals_t *drc = NULL;
if ((!req) || (!req->trans))
return -1;
@@ -1280,7 +1165,7 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,
gf_log (GF_RPCSVC, GF_LOG_TRACE, "Tx message: %zu", msglen);
/* Build the buffer containing the encoded RPC reply. */
- replyiob = rpcsvc_record_build_record (req, msglen, &recordhdr);
+ replyiob = rpcsvc_record_build_record (req, msglen, hdrlen, &recordhdr);
if (!replyiob) {
gf_log (GF_RPCSVC, GF_LOG_ERROR,"Reply record creation failed");
goto disconnect_exit;
@@ -1289,8 +1174,6 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,
if (!iobref) {
iobref = iobref_new ();
if (!iobref) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation "
- "failed");
goto disconnect_exit;
}
@@ -1299,12 +1182,35 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,
iobref_add (iobref, replyiob);
+ /* cache the request in the duplicate request cache for appropriate ops */
+ if (req->reply) {
+ drc = req->svc->drc;
+
+ LOCK (&drc->lock);
+ ret = rpcsvc_cache_reply (req, iobref, &recordhdr, 1,
+ proghdr, hdrcount,
+ payload, payloadcount);
+ UNLOCK (&drc->lock);
+ }
+
ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount,
payload, payloadcount, iobref,
req->trans_private);
if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to submit message");
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "failed to submit message "
+ "(XID: 0x%x, Program: %s, ProgVers: %d, Proc: %d) to "
+ "rpc-transport (%s)", req->xid,
+ req->prog ? req->prog->progname : "(not matched)",
+ req->prog ? req->prog->progver : 0,
+ req->procnum, trans->name);
+ } else {
+ gf_log (GF_RPCSVC, GF_LOG_TRACE,
+ "submitted reply for rpc-message (XID: 0x%x, "
+ "Program: %s, ProgVers: %d, Proc: %d) to rpc-transport "
+ "(%s)", req->xid, req->prog ? req->prog->progname: "-",
+ req->prog ? req->prog->progver : 0,
+ req->procnum, trans->name);
}
disconnect_exit:
@@ -1330,6 +1236,8 @@ rpcsvc_error_reply (rpcsvc_request_t *req)
if (!req)
return -1;
+ gf_log_callingfn ("", GF_LOG_DEBUG, "sending a RPC error reply");
+
/* At this point the req should already have been filled with the
* appropriate RPC error numbers.
*/
@@ -1341,12 +1249,13 @@ rpcsvc_error_reply (rpcsvc_request_t *req)
inline int
rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port)
{
- int ret = 0;
+ int ret = -1; /* FAIL */
if (!newprog) {
goto out;
}
+ /* pmap_set() returns 0 for FAIL and 1 for SUCCESS */
if (!(pmap_set (newprog->prognum, newprog->progver, IPPROTO_TCP,
port))) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register with"
@@ -1354,7 +1263,7 @@ rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port)
goto out;
}
- ret = 0;
+ ret = 0; /* SUCCESS */
out:
return ret;
}
@@ -1363,18 +1272,27 @@ out:
inline int
rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog)
{
+ int ret = -1;
+
if (!prog)
- return -1;
+ goto out;
if (!(pmap_unset(prog->prognum, prog->progver))) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not unregister with"
" portmap");
- return -1;
+ goto out;
}
- return 0;
+ ret = 0;
+out:
+ return ret;
}
+int
+rpcsvc_register_portmap_enabled (rpcsvc_t *svc)
+{
+ return svc->register_portmap;
+}
int32_t
rpcsvc_get_listener_port (rpcsvc_listener_t *listener)
@@ -1387,11 +1305,11 @@ rpcsvc_get_listener_port (rpcsvc_listener_t *listener)
switch (listener->trans->myinfo.sockaddr.ss_family) {
case AF_INET:
- listener_port = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port;
+ listener_port = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port;
break;
case AF_INET6:
- listener_port = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port;
+ listener_port = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port;
break;
default:
@@ -1413,7 +1331,7 @@ rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans)
{
rpcsvc_listener_t *listener = NULL;
char found = 0;
- uint32_t listener_port = 0;
+ uint32_t listener_port = 0;
if (!svc) {
goto out;
@@ -1433,7 +1351,7 @@ rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans)
listener_port = rpcsvc_get_listener_port (listener);
if (listener_port == -1) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG,
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
"invalid port for listener %s",
listener->trans->name);
continue;
@@ -1476,28 +1394,44 @@ rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr,
int
-rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *prog)
+rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *program)
{
int ret = -1;
-
- if (!svc || !prog) {
+ rpcsvc_program_t *prog = NULL;
+ if (!svc || !program) {
goto out;
}
- ret = rpcsvc_program_unregister_portmap (prog);
+ ret = rpcsvc_program_unregister_portmap (program);
if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap unregistration of"
" program failed");
goto out;
}
+ pthread_mutex_lock (&svc->rpclock);
+ {
+ list_for_each_entry (prog, &svc->programs, program) {
+ if ((prog->prognum == program->prognum)
+ && (prog->progver == program->progver)) {
+ break;
+ }
+ }
+ }
+ pthread_mutex_unlock (&svc->rpclock);
+
+ if (prog == NULL) {
+ ret = -1;
+ goto out;
+ }
+
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Program unregistered: %s, Num: %d,"
" Ver: %d, Port: %d", prog->progname, prog->prognum,
prog->progver, prog->progport);
pthread_mutex_lock (&svc->rpclock);
{
- list_del (&prog->program);
+ list_del_init (&prog->program);
}
pthread_mutex_unlock (&svc->rpclock);
@@ -1505,8 +1439,8 @@ rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *prog)
out:
if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program unregistration failed"
- ": %s, Num: %d, Ver: %d, Port: %d", prog->progname,
- prog->prognum, prog->progver, prog->progport);
+ ": %s, Num: %d, Ver: %d, Port: %d", program->progname,
+ program->prognum, program->progver, program->progport);
}
return ret;
@@ -1545,21 +1479,21 @@ rpcsvc_transport_create (rpcsvc_t *svc, dict_t *options, char *name)
trans = rpc_transport_load (svc->ctx, options, name);
if (!trans) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "cannot create listener, "
+ gf_log (GF_RPCSVC, GF_LOG_WARNING, "cannot create listener, "
"initing the transport failed");
goto out;
}
ret = rpc_transport_listen (trans);
if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG,
+ gf_log (GF_RPCSVC, GF_LOG_WARNING,
"listening on transport failed");
goto out;
}
ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc);
if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed");
+ gf_log (GF_RPCSVC, GF_LOG_WARNING, "registering notify failed");
goto out;
}
@@ -1581,7 +1515,6 @@ rpcsvc_listener_alloc (rpcsvc_t *svc, rpc_transport_t *trans)
listener = GF_CALLOC (1, sizeof (*listener),
gf_common_mt_rpcsvc_listener_t);
if (!listener) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation failed");
goto out;
}
@@ -1600,11 +1533,12 @@ out:
}
-rpcsvc_listener_t *
+int32_t
rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name)
{
rpc_transport_t *trans = NULL;
rpcsvc_listener_t *listener = NULL;
+ int32_t ret = -1;
if (!svc || !options) {
goto out;
@@ -1612,6 +1546,7 @@ rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name)
trans = rpcsvc_transport_create (svc, options, name);
if (!trans) {
+ /* LOG TODO */
goto out;
}
@@ -1620,12 +1555,101 @@ rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name)
goto out;
}
+ ret = 0;
out:
if (!listener && trans) {
rpc_transport_disconnect (trans);
}
- return listener;
+ return ret;
+}
+
+
+int32_t
+rpcsvc_create_listeners (rpcsvc_t *svc, dict_t *options, char *name)
+{
+ int32_t ret = -1, count = 0;
+ data_t *data = NULL;
+ char *str = NULL, *ptr = NULL, *transport_name = NULL;
+ char *transport_type = NULL, *saveptr = NULL, *tmp = NULL;
+
+ if ((svc == NULL) || (options == NULL) || (name == NULL)) {
+ goto out;
+ }
+
+ data = dict_get (options, "transport-type");
+ if (data == NULL) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "option transport-type not set");
+ goto out;
+ }
+
+ transport_type = data_to_str (data);
+ if (transport_type == NULL) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "option transport-type not set");
+ goto out;
+ }
+
+ /* duplicate transport_type, since following dict_set will free it */
+ transport_type = gf_strdup (transport_type);
+ if (transport_type == NULL) {
+ goto out;
+ }
+
+ str = gf_strdup (transport_type);
+ if (str == NULL) {
+ goto out;
+ }
+
+ ptr = strtok_r (str, ",", &saveptr);
+
+ while (ptr != NULL) {
+ tmp = gf_strdup (ptr);
+ if (tmp == NULL) {
+ goto out;
+ }
+
+ ret = gf_asprintf (&transport_name, "%s.%s", tmp, name);
+ if (ret == -1) {
+ goto out;
+ }
+
+ ret = dict_set_dynstr (options, "transport-type", tmp);
+ if (ret == -1) {
+ goto out;
+ }
+
+ tmp = NULL;
+ ptr = strtok_r (NULL, ",", &saveptr);
+
+ ret = rpcsvc_create_listener (svc, options, transport_name);
+ if (ret != 0) {
+ goto out;
+ }
+
+ GF_FREE (transport_name);
+ transport_name = NULL;
+ count++;
+ }
+
+ ret = dict_set_dynstr (options, "transport-type", transport_type);
+ if (ret == -1) {
+ goto out;
+ }
+
+ transport_type = NULL;
+
+out:
+ GF_FREE (str);
+
+ GF_FREE (transport_type);
+
+ GF_FREE (tmp);
+
+ GF_FREE (transport_name);
+
+ return count;
}
@@ -1686,7 +1710,9 @@ out:
inline int
rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
{
- int ret = -1;
+ int ret = -1;
+ rpcsvc_program_t *newprog = NULL;
+ char already_registered = 0;
if (!svc) {
goto out;
@@ -1696,18 +1722,42 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
goto out;
}
- INIT_LIST_HEAD (&program->program);
+ pthread_mutex_lock (&svc->rpclock);
+ {
+ list_for_each_entry (newprog, &svc->programs, program) {
+ if ((newprog->prognum == program->prognum)
+ && (newprog->progver == program->progver)) {
+ already_registered = 1;
+ break;
+ }
+ }
+ }
+ pthread_mutex_unlock (&svc->rpclock);
+
+ if (already_registered) {
+ ret = 0;
+ goto out;
+ }
+
+ newprog = GF_CALLOC (1, sizeof(*newprog),gf_common_mt_rpcsvc_program_t);
+ if (newprog == NULL) {
+ goto out;
+ }
+
+ memcpy (newprog, program, sizeof (*program));
+
+ INIT_LIST_HEAD (&newprog->program);
pthread_mutex_lock (&svc->rpclock);
{
- list_add_tail (&program->program, &svc->programs);
+ list_add_tail (&newprog->program, &svc->programs);
}
pthread_mutex_unlock (&svc->rpclock);
ret = 0;
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "New program registered: %s, Num: %d,"
- " Ver: %d, Port: %d", program->progname, program->prognum,
- program->progver, program->progport);
+ " Ver: %d, Port: %d", newprog->progname, newprog->prognum,
+ newprog->progver, newprog->progport);
out:
if (ret == -1) {
@@ -1719,7 +1769,6 @@ out:
return ret;
}
-
static void
free_prog_details (gf_dump_rsp *rsp)
{
@@ -1768,38 +1817,41 @@ static int
rpcsvc_dump (rpcsvc_request_t *req)
{
char rsp_buf[8 * 1024] = {0,};
- gf_dump_rsp rsp = {0,};
- struct iovec iov = {0,};
- int op_errno = EINVAL;
- int ret = -1;
+ gf_dump_rsp rsp = {0,};
+ struct iovec iov = {0,};
+ int op_errno = EINVAL;
+ int ret = -1;
+ uint32_t dump_rsp_len = 0;
if (!req)
- goto fail;
+ goto sendrsp;
ret = build_prog_details (req, &rsp);
if (ret < 0) {
op_errno = -ret;
- goto fail;
+ goto sendrsp;
}
-fail:
+ op_errno = 0;
+
+sendrsp:
rsp.op_errno = gf_errno_to_error (op_errno);
rsp.op_ret = ret;
+ dump_rsp_len = xdr_sizeof ((xdrproc_t) xdr_gf_dump_rsp,
+ &rsp);
+
iov.iov_base = rsp_buf;
- iov.iov_len = (8 * 1024);
+ iov.iov_len = dump_rsp_len;
- ret = xdr_serialize_dump_rsp (iov, &rsp);
+ ret = xdr_serialize_generic (iov, &rsp, (xdrproc_t)xdr_gf_dump_rsp);
if (ret < 0) {
- if (req)
- req->rpc_err = GARBAGE_ARGS;
- op_errno = EINVAL;
- goto fail;
+ ret = RPCSVC_ACTOR_ERROR;
+ } else {
+ rpcsvc_submit_generic (req, &iov, 1, NULL, 0, NULL);
+ ret = 0;
}
- ret = rpcsvc_submit_generic (req, &iov, 1, NULL, 0,
- NULL);
-
free_prog_details (&rsp);
return ret;
@@ -1808,18 +1860,217 @@ fail:
int
rpcsvc_init_options (rpcsvc_t *svc, dict_t *options)
{
+ char *optstr = NULL;
+ int ret = -1;
+
+ if ((!svc) || (!options))
+ return -1;
+
svc->memfactor = RPCSVC_DEFAULT_MEMFACTOR;
- return 0;
+
+ svc->register_portmap = _gf_true;
+ if (dict_get (options, "rpc.register-with-portmap")) {
+ ret = dict_get_str (options, "rpc.register-with-portmap",
+ &optstr);
+ if (ret < 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to parse "
+ "dict");
+ goto out;
+ }
+
+ ret = gf_string2boolean (optstr, &svc->register_portmap);
+ if (ret < 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to parse bool "
+ "string");
+ goto out;
+ }
+ }
+
+ if (!svc->register_portmap)
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Portmap registration "
+ "disabled");
+
+ ret = rpcsvc_set_outstanding_rpc_limit (svc, options);
+out:
+ return ret;
}
+int
+rpcsvc_reconfigure_options (rpcsvc_t *svc, dict_t *options)
+{
+ xlator_t *xlator = NULL;
+ xlator_list_t *volentry = NULL;
+ char *srchkey = NULL;
+ char *keyval = NULL;
+ int ret = -1;
+
+ if ((!svc) || (!svc->options) || (!options))
+ return (-1);
+
+ /* Fetch the xlator from svc */
+ xlator = (xlator_t *) svc->mydata;
+ if (!xlator)
+ return (-1);
+
+ /* Reconfigure the volume specific rpc-auth.addr allow part */
+ volentry = xlator->children;
+ while (volentry) {
+ ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.allow",
+ volentry->xlator->name);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return (-1);
+ }
+
+ /* If found the srchkey, delete old key/val pair
+ * and set the key with new value.
+ */
+ if (!dict_get_str (options, srchkey, &keyval)) {
+ dict_del (svc->options, srchkey);
+ ret = dict_set_str (svc->options, srchkey, keyval);
+ if (ret < 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "dict_set_str error");
+ GF_FREE (srchkey);
+ return (-1);
+ }
+ }
+
+ GF_FREE (srchkey);
+ volentry = volentry->next;
+ }
+
+ /* Reconfigure the volume specific rpc-auth.addr reject part */
+ volentry = xlator->children;
+ while (volentry) {
+ ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.reject",
+ volentry->xlator->name);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return (-1);
+ }
+
+ /* If found the srchkey, delete old key/val pair
+ * and set the key with new value.
+ */
+ if (!dict_get_str (options, srchkey, &keyval)) {
+ dict_del (svc->options, srchkey);
+ ret = dict_set_str (svc->options, srchkey, keyval);
+ if (ret < 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "dict_set_str error");
+ GF_FREE (srchkey);
+ return (-1);
+ }
+ }
+
+ GF_FREE (srchkey);
+ volentry = volentry->next;
+ }
+
+ ret = rpcsvc_init_options (svc, options);
+ if (ret)
+ return (-1);
+
+ return rpcsvc_auth_reconf (svc, options);
+}
+
+int
+rpcsvc_transport_unix_options_build (dict_t **options, char *filepath)
+{
+ dict_t *dict = NULL;
+ char *fpath = NULL;
+ int ret = -1;
+
+ GF_ASSERT (filepath);
+ GF_ASSERT (options);
+
+ dict = dict_new ();
+ if (!dict)
+ goto out;
+
+ fpath = gf_strdup (filepath);
+ if (!fpath) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_set_dynstr (dict, "transport.socket.listen-path", fpath);
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.address-family", "unix");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.socket.nodelay", "off");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport-type", "socket");
+ if (ret)
+ goto out;
+
+ *options = dict;
+out:
+ if (ret) {
+ GF_FREE (fpath);
+ if (dict)
+ dict_unref (dict);
+ }
+ return ret;
+}
+
+/*
+ * Reconfigure() the rpc.outstanding-rpc-limit param.
+ */
+int
+rpcsvc_set_outstanding_rpc_limit (rpcsvc_t *svc, dict_t *options)
+{
+ int ret = -1; /* FAILURE */
+ int rpclim = 0;
+ static char *rpclimkey = "rpc.outstanding-rpc-limit";
+
+ if ((!svc) || (!options))
+ return (-1);
+
+ /* Reconfigure() the rpc.outstanding-rpc-limit param */
+ ret = dict_get_int32 (options, rpclimkey, &rpclim);
+ if (ret < 0) {
+ /* Fall back to default for FAILURE */
+ rpclim = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT;
+ } else {
+ /* SUCCESS: round off to multiple of 8.
+ * If the input value fails Boundary check, fall back to
+ * default i.e. RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT.
+ * NB: value 0 is special, means its unset i.e. unlimited.
+ */
+ rpclim = ((rpclim + 8 - 1) >> 3) * 8;
+ if (rpclim < RPCSVC_MIN_OUTSTANDING_RPC_LIMIT) {
+ rpclim = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT;
+ } else if (rpclim > RPCSVC_MAX_OUTSTANDING_RPC_LIMIT) {
+ rpclim = RPCSVC_MAX_OUTSTANDING_RPC_LIMIT;
+ }
+ }
+
+ if (svc->outstanding_rpc_limit != rpclim) {
+ svc->outstanding_rpc_limit = rpclim;
+ gf_log (GF_RPCSVC, GF_LOG_INFO,
+ "Configured %s with value %d",
+ rpclimkey, rpclim);
+ }
+
+ return (0);
+}
/* The global RPC service initializer.
*/
rpcsvc_t *
-rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)
+rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options,
+ uint32_t poolcount)
{
rpcsvc_t *svc = NULL;
- int ret = -1, poolcount = 0;
+ int ret = -1;
if ((!ctx) || (!options))
return NULL;
@@ -1840,7 +2091,8 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)
goto free_svc;
}
- poolcount = RPCSVC_POOLCOUNT_MULT * svc->memfactor;
+ if (!poolcount)
+ poolcount = RPCSVC_POOLCOUNT_MULT * svc->memfactor;
gf_log (GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount);
svc->rxpool = mem_pool_new (rpcsvc_request_t, poolcount);
@@ -1860,6 +2112,7 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)
ret = -1;
svc->options = options;
svc->ctx = ctx;
+ svc->mydata = xl;
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "RPC service inited.");
gluster_dump_prog.options = options;
@@ -1870,6 +2123,7 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)
"failed to register DUMP program");
goto free_svc;
}
+
ret = 0;
free_svc:
if (ret == -1) {
@@ -1881,10 +2135,339 @@ free_svc:
}
+int
+rpcsvc_transport_peer_check_search (dict_t *options, char *pattern,
+ char *ip, char *hostname)
+{
+ int ret = -1;
+ char *addrtok = NULL;
+ char *addrstr = NULL;
+ char *dup_addrstr = NULL;
+ char *svptr = NULL;
+
+ if ((!options) || (!ip))
+ return -1;
+
+ ret = dict_get_str (options, pattern, &addrstr);
+ if (ret < 0) {
+ ret = -1;
+ goto err;
+ }
+
+ if (!addrstr) {
+ ret = -1;
+ goto err;
+ }
+
+ dup_addrstr = gf_strdup (addrstr);
+ addrtok = strtok_r (dup_addrstr, ",", &svptr);
+ while (addrtok) {
+
+ /* CASEFOLD not present on Solaris */
+#ifdef FNM_CASEFOLD
+ ret = fnmatch (addrtok, ip, FNM_CASEFOLD);
+#else
+ ret = fnmatch (addrtok, ip, 0);
+#endif
+ if (ret == 0)
+ goto err;
+
+ /* compare hostnames if applicable */
+ if (hostname) {
+#ifdef FNM_CASEFOLD
+ ret = fnmatch (addrtok, hostname, FNM_CASEFOLD);
+#else
+ ret = fnmatch (addrtok, hostname, 0);
+#endif
+ if (ret == 0)
+ goto err;
+ }
+
+ addrtok = strtok_r (NULL, ",", &svptr);
+ }
+
+ ret = -1;
+err:
+ GF_FREE (dup_addrstr);
+
+ return ret;
+}
+
+
+static int
+rpcsvc_transport_peer_check_allow (dict_t *options, char *volname,
+ char *ip, char *hostname)
+{
+ int ret = RPCSVC_AUTH_DONTCARE;
+ char *srchstr = NULL;
+
+ if ((!options) || (!ip) || (!volname))
+ return ret;
+
+ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ ret = RPCSVC_AUTH_DONTCARE;
+ goto out;
+ }
+
+ ret = rpcsvc_transport_peer_check_search (options, srchstr,
+ ip, hostname);
+ GF_FREE (srchstr);
+
+ if (ret == 0)
+ ret = RPCSVC_AUTH_ACCEPT;
+ else
+ ret = RPCSVC_AUTH_REJECT;
+out:
+ return ret;
+}
+
+static int
+rpcsvc_transport_peer_check_reject (dict_t *options, char *volname,
+ char *ip, char *hostname)
+{
+ int ret = RPCSVC_AUTH_DONTCARE;
+ char *srchstr = NULL;
+
+ if ((!options) || (!ip) || (!volname))
+ return ret;
+
+ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject",
+ volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ ret = RPCSVC_AUTH_REJECT;
+ goto out;
+ }
+
+ ret = rpcsvc_transport_peer_check_search (options, srchstr,
+ ip, hostname);
+ GF_FREE (srchstr);
+
+ if (ret == 0)
+ ret = RPCSVC_AUTH_REJECT;
+ else
+ ret = RPCSVC_AUTH_DONTCARE;
+out:
+ return ret;
+}
+
+
+/* Combines rpc auth's allow and reject options.
+ * Order of checks is important.
+ * First, REJECT if either rejects.
+ * If neither rejects, ACCEPT if either accepts.
+ * If neither accepts, DONTCARE
+ */
+int
+rpcsvc_combine_allow_reject_volume_check (int allow, int reject)
+{
+ if (allow == RPCSVC_AUTH_REJECT ||
+ reject == RPCSVC_AUTH_REJECT)
+ return RPCSVC_AUTH_REJECT;
+
+ if (allow == RPCSVC_AUTH_ACCEPT ||
+ reject == RPCSVC_AUTH_ACCEPT)
+ return RPCSVC_AUTH_ACCEPT;
+
+ return RPCSVC_AUTH_DONTCARE;
+}
+
+int
+rpcsvc_auth_check (rpcsvc_t *svc, char *volname,
+ rpc_transport_t *trans)
+{
+ int ret = RPCSVC_AUTH_REJECT;
+ int accept = RPCSVC_AUTH_REJECT;
+ int reject = RPCSVC_AUTH_REJECT;
+ char *hostname = NULL;
+ char *ip = NULL;
+ char client_ip[RPCSVC_PEER_STRLEN] = {0};
+ char *allow_str = NULL;
+ char *reject_str = NULL;
+ char *srchstr = NULL;
+ dict_t *options = NULL;
+
+ if (!svc || !volname || !trans)
+ return ret;
+
+ /* Fetch the options from svc struct and validate */
+ options = svc->options;
+ if (!options)
+ return ret;
+
+ ret = rpcsvc_transport_peername (trans, client_ip, RPCSVC_PEER_STRLEN);
+ if (ret != 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: "
+ "%s", gai_strerror (ret));
+ return RPCSVC_AUTH_REJECT;
+ }
+
+ /* Accept if its the default case: Allow all, Reject none
+ * The default volfile always contains a 'allow *' rule
+ * for each volume. If allow rule is missing (which implies
+ * there is some bad volfile generating code doing this), we
+ * assume no one is allowed mounts, and thus, we reject mounts.
+ */
+ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return RPCSVC_AUTH_REJECT;
+ }
+
+ ret = dict_get_str (options, srchstr, &allow_str);
+ GF_FREE (srchstr);
+ if (ret < 0)
+ return RPCSVC_AUTH_REJECT;
+
+ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject", volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return RPCSVC_AUTH_REJECT;
+ }
+
+ ret = dict_get_str (options, srchstr, &reject_str);
+ GF_FREE (srchstr);
+ if (reject_str == NULL && !strcmp ("*", allow_str))
+ return RPCSVC_AUTH_ACCEPT;
+
+ /* Non-default rule, authenticate */
+ if (!get_host_name (client_ip, &ip))
+ ip = client_ip;
+
+ /* addr-namelookup check */
+ if (svc->addr_namelookup == _gf_true) {
+ ret = gf_get_hostname_from_ip (ip, &hostname);
+ if (ret) {
+ if (hostname)
+ GF_FREE (hostname);
+ /* failed to get hostname, but hostname auth
+ * is enabled, so authentication will not be
+ * 100% correct. reject mounts
+ */
+ return RPCSVC_AUTH_REJECT;
+ }
+ }
+
+ accept = rpcsvc_transport_peer_check_allow (options, volname,
+ ip, hostname);
+
+ reject = rpcsvc_transport_peer_check_reject (options, volname,
+ ip, hostname);
+
+ if (hostname)
+ GF_FREE (hostname);
+ return rpcsvc_combine_allow_reject_volume_check (accept, reject);
+}
+
+int
+rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname,
+ rpc_transport_t *trans)
+{
+ union gf_sock_union sock_union;
+ int ret = RPCSVC_AUTH_REJECT;
+ socklen_t sinsize = sizeof (&sock_union.sin);
+ char *srchstr = NULL;
+ char *valstr = NULL;
+ uint16_t port = 0;
+ gf_boolean_t insecure = _gf_false;
+
+ memset (&sock_union, 0, sizeof (sock_union));
+
+ if ((!svc) || (!volname) || (!trans))
+ return ret;
+
+ ret = rpcsvc_transport_peeraddr (trans, NULL, 0, &sock_union.storage,
+ sinsize);
+ if (ret != 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get peer addr: %s",
+ gai_strerror (ret));
+ ret = RPCSVC_AUTH_REJECT;
+ goto err;
+ }
+
+ port = ntohs (sock_union.sin.sin_port);
+ gf_log (GF_RPCSVC, GF_LOG_TRACE, "Client port: %d", (int)port);
+ /* If the port is already a privileged one, dont bother with checking
+ * options.
+ */
+ if (port <= 1024) {
+ ret = RPCSVC_AUTH_ACCEPT;
+ goto err;
+ }
+
+ /* Disabled by default */
+ ret = gf_asprintf (&srchstr, "rpc-auth.ports.%s.insecure", volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ ret = RPCSVC_AUTH_REJECT;
+ goto err;
+ }
+
+ ret = dict_get_str (svc->options, srchstr, &valstr);
+ if (ret) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
+ " read rpc-auth.ports.insecure value");
+ goto err;
+ }
+
+ ret = gf_string2boolean (valstr, &insecure);
+ if (ret) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
+ " convert rpc-auth.ports.insecure value");
+ goto err;
+ }
+
+ ret = insecure ? RPCSVC_AUTH_ACCEPT : RPCSVC_AUTH_REJECT;
+
+ if (ret == RPCSVC_AUTH_ACCEPT)
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port allowed");
+ else
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port not"
+ " allowed");
+
+err:
+ if (srchstr)
+ GF_FREE (srchstr);
+
+ return ret;
+}
+
+
+char *
+rpcsvc_volume_allowed (dict_t *options, char *volname)
+{
+ char globalrule[] = "rpc-auth.addr.allow";
+ char *srchstr = NULL;
+ char *addrstr = NULL;
+ int ret = -1;
+
+ if ((!options) || (!volname))
+ return NULL;
+
+ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ goto out;
+ }
+
+ if (!dict_get (options, srchstr))
+ ret = dict_get_str (options, globalrule, &addrstr);
+ else
+ ret = dict_get_str (options, srchstr, &addrstr);
+
+out:
+ GF_FREE (srchstr);
+
+ return addrstr;
+}
+
+
rpcsvc_actor_t gluster_dump_actors[] = {
- [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, NULL },
- [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, NULL },
- [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, NULL },
+ [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA},
+ [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA},
+ [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0, DRC_NA},
};