summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src/rpcsvc.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src/rpcsvc.c')
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c2543
1 files changed, 1471 insertions, 1072 deletions
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 687deefba..037c157f2 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -1,20 +1,11 @@
/*
- Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com>
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
- GlusterFS is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3 of the License,
- or (at your option) any later version.
-
- GlusterFS is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see
- <http://www.gnu.org/licenses/>.
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
*/
#ifndef _CONFIG_H
@@ -34,6 +25,10 @@
#include "iobuf.h"
#include "globals.h"
#include "xdr-common.h"
+#include "xdr-generic.h"
+#include "rpc-common-xdr.h"
+#include "syncop.h"
+#include "rpc-drc.h"
#include <errno.h>
#include <pthread.h>
@@ -46,516 +41,24 @@
#include <stdarg.h>
#include <stdio.h>
-struct rpcsvc_program gluster_dump_prog;
+#include "xdr-rpcclnt.h"
+#include "glusterfs-acl.h"
+struct rpcsvc_program gluster_dump_prog;
-#define rpcsvc_alloc_request(con, request) \
+#define rpcsvc_alloc_request(svc, request) \
do { \
- request = (rpcsvc_request_t *) mem_get ((con)->rxpool); \
+ request = (rpcsvc_request_t *) mem_get ((svc)->rxpool); \
memset (request, 0, sizeof (rpcsvc_request_t)); \
} while (0)
-
-int
-rpcsvc_conn_peer_check_search (dict_t *options, char *pattern, char *clstr)
-{
- int ret = -1;
- char *addrtok = NULL;
- char *addrstr = NULL;
- char *svptr = NULL;
-
- if ((!options) || (!clstr))
- return -1;
-
- if (!dict_get (options, pattern))
- return -1;
-
- ret = dict_get_str (options, pattern, &addrstr);
- if (ret < 0) {
- ret = -1;
- goto err;
- }
-
- if (!addrstr) {
- ret = -1;
- goto err;
- }
-
- addrtok = strtok_r (addrstr, ",", &svptr);
- while (addrtok) {
-
-#ifdef FNM_CASEFOLD
- ret = fnmatch (addrtok, clstr, FNM_CASEFOLD);
-#else
- ret = fnmatch (addrtok, clstr, 0);
-#endif
- if (ret == 0)
- goto err;
-
- addrtok = strtok_r (NULL, ",", &svptr);
- }
-
- ret = -1;
-err:
-
- return ret;
-}
-
-
-int
-rpcsvc_conn_peer_check_allow (dict_t *options, char *volname, char *clstr)
-{
- int ret = RPCSVC_AUTH_DONTCARE;
- char *srchstr = NULL;
- char globalrule[] = "rpc-auth.addr.allow";
-
- if ((!options) || (!clstr))
- return ret;
-
- /* If volname is NULL, then we're searching for the general rule to
- * determine the current address in clstr is allowed or not for all
- * subvolumes.
- */
- if (volname) {
- ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- ret = RPCSVC_AUTH_DONTCARE;
- goto out;
- }
- } else
- srchstr = globalrule;
-
- ret = rpcsvc_conn_peer_check_search (options, srchstr, clstr);
- if (volname)
- GF_FREE (srchstr);
-
- if (ret == 0)
- ret = RPCSVC_AUTH_ACCEPT;
- else
- ret = RPCSVC_AUTH_DONTCARE;
-out:
- return ret;
-}
-
-int
-rpcsvc_conn_peer_check_reject (dict_t *options, char *volname, char *clstr)
-{
- int ret = RPCSVC_AUTH_DONTCARE;
- char *srchstr = NULL;
- char generalrule[] = "rpc-auth.addr.reject";
-
- if ((!options) || (!clstr))
- return ret;
-
- if (volname) {
- ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject", volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- ret = RPCSVC_AUTH_REJECT;
- goto out;
- }
- } else
- srchstr = generalrule;
-
- ret = rpcsvc_conn_peer_check_search (options, srchstr, clstr);
- if (volname)
- GF_FREE (srchstr);
-
- if (ret == 0)
- ret = RPCSVC_AUTH_REJECT;
- else
- ret = RPCSVC_AUTH_DONTCARE;
-out:
- return ret;
-}
-
-
-/* This function tests the results of the allow rule and the reject rule to
- * combine them into a single result that can be used to determine if the
- * connection should be allowed to proceed.
- * Heres the test matrix we need to follow in this function.
- *
- * A - Allow, the result of the allow test. Never returns R.
- * R - Reject, result of the reject test. Never returns A.
- * Both can return D or dont care if no rule was given.
- *
- * | @allow | @reject | Result |
- * | A | R | R |
- * | D | D | D |
- * | A | D | A |
- * | D | R | R |
- */
-int
-rpcsvc_combine_allow_reject_volume_check (int allow, int reject)
-{
- int final = RPCSVC_AUTH_REJECT;
-
- /* If allowed rule allows but reject rule rejects, we stay cautious
- * and reject. */
- if ((allow == RPCSVC_AUTH_ACCEPT) && (reject == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- /* if both are dont care, that is user did not specify for either allow
- * or reject, we leave it up to the general rule to apply, in the hope
- * that there is one.
- */
- else if ((allow == RPCSVC_AUTH_DONTCARE) &&
- (reject == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_DONTCARE;
- /* If one is dont care, the other one applies. */
- else if ((allow == RPCSVC_AUTH_ACCEPT) &&
- (reject == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((allow == RPCSVC_AUTH_DONTCARE) &&
- (reject == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
-
- return final;
-}
-
-
-/* Combines the result of the general rule test against, the specific rule
- * to determine final permission for the client's address.
- *
- * | @gen | @spec | Result |
- * | A | A | A |
- * | A | R | R |
- * | A | D | A |
- * | D | A | A |
- * | D | R | R |
- * | D | D | D |
- * | R | A | A |
- * | R | D | R |
- * | R | R | R |
- */
-int
-rpcsvc_combine_gen_spec_addr_checks (int gen, int spec)
-{
- int final = RPCSVC_AUTH_REJECT;
-
- if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec== RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_DONTCARE;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
-
- return final;
-}
-
-
-
-/* Combines the result of the general rule test against, the specific rule
- * to determine final test for the connection coming in for a given volume.
- *
- * | @gen | @spec | Result |
- * | A | A | A |
- * | A | R | R |
- * | A | D | A |
- * | D | A | A |
- * | D | R | R |
- * | D | D | R |, special case, we intentionally disallow this.
- * | R | A | A |
- * | R | D | R |
- * | R | R | R |
- */
-int
-rpcsvc_combine_gen_spec_volume_checks (int gen, int spec)
-{
- int final = RPCSVC_AUTH_REJECT;
-
- if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
- /* On no rule, we reject. */
- else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec== RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_ACCEPT))
- final = RPCSVC_AUTH_ACCEPT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_DONTCARE))
- final = RPCSVC_AUTH_REJECT;
- else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_REJECT))
- final = RPCSVC_AUTH_REJECT;
-
- return final;
-}
-
-
-int
-rpcsvc_conn_peer_check_name (dict_t *options, char *volname,
- rpcsvc_conn_t *conn)
-{
- int ret = RPCSVC_AUTH_REJECT;
- int aret = RPCSVC_AUTH_REJECT;
- int rjret = RPCSVC_AUTH_REJECT;
- char clstr[RPCSVC_PEER_STRLEN];
-
- if (!conn)
- return ret;
-
- ret = rpcsvc_conn_peername (conn, clstr, RPCSVC_PEER_STRLEN);
- if (ret != 0) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: "
- "%s", gai_strerror (ret));
- ret = RPCSVC_AUTH_REJECT;
- goto err;
- }
-
- aret = rpcsvc_conn_peer_check_allow (options, volname, clstr);
- rjret = rpcsvc_conn_peer_check_reject (options, volname, clstr);
-
- ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret);
-
-err:
- return ret;
-}
-
-
-int
-rpcsvc_conn_peer_check_addr (dict_t *options, char *volname,rpcsvc_conn_t *conn)
-{
- int ret = RPCSVC_AUTH_REJECT;
- int aret = RPCSVC_AUTH_DONTCARE;
- int rjret = RPCSVC_AUTH_REJECT;
- char clstr[RPCSVC_PEER_STRLEN];
-
- if (!conn)
- return ret;
-
- ret = rpcsvc_conn_peeraddr (conn, clstr, RPCSVC_PEER_STRLEN, NULL, 0);
- if (ret != 0) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: "
- "%s", gai_strerror (ret));
- ret = RPCSVC_AUTH_REJECT;
- goto err;
- }
-
- aret = rpcsvc_conn_peer_check_allow (options, volname, clstr);
- rjret = rpcsvc_conn_peer_check_reject (options, volname, clstr);
-
- ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret);
-err:
- return ret;
-}
-
-
-int
-rpcsvc_conn_check_volume_specific (dict_t *options, char *volname,
- rpcsvc_conn_t *conn)
-{
- int namechk = RPCSVC_AUTH_REJECT;
- int addrchk = RPCSVC_AUTH_REJECT;
- gf_boolean_t namelookup = _gf_true;
- char *namestr = NULL;
- int ret = 0;
-
- if ((!options) || (!volname) || (!conn))
- return RPCSVC_AUTH_REJECT;
-
- /* Enabled by default */
- if ((dict_get (options, "rpc-auth.addr.namelookup"))) {
- ret = dict_get_str (options, "rpc-auth.addr.namelookup"
- , &namestr);
- if (ret == 0) {
- ret = gf_string2boolean (namestr, &namelookup);
- if (ret)
- gf_log ("rpcsvc", GF_LOG_DEBUG,
- "wrong option %s given for "
- "'namelookup'", namestr);
- }
- }
-
- /* We need two separate checks because the rules with addresses in them
- * can be network addresses which can be general and names can be
- * specific which will over-ride the network address rules.
- */
- if (namelookup)
- namechk = rpcsvc_conn_peer_check_name (options, volname, conn);
- addrchk = rpcsvc_conn_peer_check_addr (options, volname, conn);
-
- if (namelookup)
- ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk);
- else
- ret = addrchk;
-
- return ret;
-}
-
-
-int
-rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn)
-{
- int addrchk = RPCSVC_AUTH_REJECT;
- int namechk = RPCSVC_AUTH_REJECT;
- gf_boolean_t namelookup = _gf_true;
- char *namestr = NULL;
- int ret = 0;
-
- if ((!options) || (!conn))
- return RPCSVC_AUTH_REJECT;
-
- /* Enabled by default */
- if ((dict_get (options, "rpc-auth.addr.namelookup"))) {
- ret = dict_get_str (options, "rpc-auth.addr.namelookup"
- , &namestr);
- if (!ret) {
- ret = gf_string2boolean (namestr, &namelookup);
- if (ret)
- gf_log ("rpcsvc", GF_LOG_DEBUG,
- "wrong option %s given for "
- "'namelookup'", namestr);
- }
- }
-
- /* We need two separate checks because the rules with addresses in them
- * can be network addresses which can be general and names can be
- * specific which will over-ride the network address rules.
- */
- if (namelookup)
- namechk = rpcsvc_conn_peer_check_name (options, NULL, conn);
- addrchk = rpcsvc_conn_peer_check_addr (options, NULL, conn);
-
- if (namelookup)
- ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk);
- else
- ret = addrchk;
-
- return ret;
-}
-
-int
-rpcsvc_conn_peer_check (dict_t *options, char *volname, rpcsvc_conn_t *conn)
-{
- int general_chk = RPCSVC_AUTH_REJECT;
- int specific_chk = RPCSVC_AUTH_REJECT;
-
- if ((!options) || (!volname) || (!conn))
- return RPCSVC_AUTH_REJECT;
-
- general_chk = rpcsvc_conn_check_volume_general (options, conn);
- specific_chk = rpcsvc_conn_check_volume_specific (options, volname,
- conn);
-
- return rpcsvc_combine_gen_spec_volume_checks (general_chk,specific_chk);
-}
-
-
-char *
-rpcsvc_volume_allowed (dict_t *options, char *volname)
-{
- char globalrule[] = "rpc-auth.addr.allow";
- char *srchstr = NULL;
- char *addrstr = NULL;
- int ret = -1;
-
- if ((!options) || (!volname))
- return NULL;
-
- ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- goto out;
- }
-
- if (!dict_get (options, srchstr)) {
- GF_FREE (srchstr);
- srchstr = globalrule;
- ret = dict_get_str (options, srchstr, &addrstr);
- if (ret)
- gf_log ("rpcsvc", GF_LOG_DEBUG,
- "failed to get the string %s", srchstr);
- } else {
- ret = dict_get_str (options, srchstr, &addrstr);
- if (ret)
- gf_log ("rpcsvc", GF_LOG_DEBUG,
- "failed to get the string %s", srchstr);
- }
-out:
- return addrstr;
-}
-
-
-
-/* Initialize the core of a connection */
-rpcsvc_conn_t *
-rpcsvc_conn_alloc (rpcsvc_t *svc, rpc_transport_t *trans)
-{
- rpcsvc_conn_t *conn = NULL;
- int ret = -1;
- unsigned int poolcount = 0;
-
- conn = GF_CALLOC (1, sizeof(*conn), gf_common_mt_rpcsvc_conn_t);
- if (!conn) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation failed");
- return NULL;
- }
-
- conn->trans = trans;
- conn->svc = svc;
- poolcount = RPCSVC_POOLCOUNT_MULT * svc->memfactor;
-
- gf_log (GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount);
- conn->rxpool = mem_pool_new (rpcsvc_request_t, poolcount);
- /* TODO: leak */
- if (!conn->rxpool) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "mem pool allocation failed");
- goto free_conn;
- }
-
- /* Cannot consider a connection connected unless the user of this
- * connection decides it is ready to use. It is possible that we have
- * to free this connection soon after. That free will not happpen
- * unless the state is disconnected.
- */
- conn->connstate = RPCSVC_CONNSTATE_DISCONNECTED;
- pthread_mutex_init (&conn->connlock, NULL);
- conn->connref = 0;
-
- ret = 0;
-
-free_conn:
- if (ret == -1) {
- GF_FREE (conn);
- conn = NULL;
- }
-
- return conn;
-}
+rpcsvc_listener_t *
+rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans);
int
rpcsvc_notify (rpc_transport_t *trans, void *mydata,
rpc_transport_event_t event, void *data, ...);
-void
-rpcsvc_conn_state_init (rpcsvc_conn_t *conn)
-{
- if (!conn)
- return;
-
- ++conn->connref;
- conn->connstate = RPCSVC_CONNSTATE_CONNECTED;
-}
-
-
rpcsvc_notify_wrapper_t *
rpcsvc_notify_wrapper_alloc (void)
{
@@ -563,7 +66,6 @@ rpcsvc_notify_wrapper_alloc (void)
wrapper = GF_CALLOC (1, sizeof (*wrapper), gf_common_mt_rpcsvc_wrapper_t);
if (!wrapper) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation failed");
goto out;
}
@@ -582,11 +84,7 @@ rpcsvc_listener_destroy (rpcsvc_listener_t *listener)
goto out;
}
- if (!listener->conn) {
- goto listener_free;
- }
-
- svc = listener->conn->svc;
+ svc = listener->svc;
if (!svc) {
goto listener_free;
}
@@ -603,245 +101,61 @@ out:
return;
}
-
-void
-rpcsvc_conn_destroy (rpcsvc_conn_t *conn)
-{
- rpcsvc_listener_t *listener = NULL;
-
- if (!conn || !conn->rxpool || !conn->listener)
- goto out;
-
- if (conn->trans)
- rpc_transport_destroy (conn->trans);
-
- mem_pool_destroy (conn->rxpool);
-
- listener = conn->listener;
- if (listener->conn == conn) {
- rpcsvc_listener_destroy (listener);
- }
-
- /* Need to destory record state, txlists etc. */
- GF_FREE (conn);
-out:
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Connection destroyed");
-}
-
-
-rpcsvc_conn_t *
-rpcsvc_conn_init (rpcsvc_t *svc, rpc_transport_t *trans)
-{
- int ret = -1;
- rpcsvc_conn_t *conn = NULL;
-
- conn = rpcsvc_conn_alloc (svc, trans);
- if (!conn) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "cannot init a connection");
- goto out;
- }
-
- ret = rpc_transport_register_notify (trans, rpcsvc_notify, conn);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed");
- rpcsvc_conn_destroy (conn);
- conn = NULL;
- goto out;
- }
-
- rpcsvc_conn_state_init (conn);
-
-out:
- return conn;
-}
-
-
-int
-__rpcsvc_conn_unref (rpcsvc_conn_t *conn)
-{
- --conn->connref;
- return conn->connref;
-}
-
-
-void
-__rpcsvc_conn_deinit (rpcsvc_conn_t *conn)
-{
- if (!conn)
- return;
-
- if (rpcsvc_conn_check_active (conn)) {
- conn->connstate = RPCSVC_CONNSTATE_DISCONNECTED;
- }
-
- if (conn->trans) {
- rpc_transport_disconnect (conn->trans);
- conn->trans = NULL;
- }
-}
-
-
-void
-rpcsvc_conn_deinit (rpcsvc_conn_t *conn)
+rpcsvc_vector_sizer
+rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum,
+ uint32_t progver, uint32_t procnum)
{
- int ref = 0;
-
- if (!conn)
- return;
-
- pthread_mutex_lock (&conn->connlock);
- {
- __rpcsvc_conn_deinit (conn);
- ref = __rpcsvc_conn_unref (conn);
- }
- pthread_mutex_unlock (&conn->connlock);
-
- if (ref == 0)
- rpcsvc_conn_destroy (conn);
-
- return;
-}
-
+ rpcsvc_program_t *program = NULL;
+ char found = 0;
-void
-rpcsvc_conn_unref (rpcsvc_conn_t *conn)
-{
- int ref = 0;
- if (!conn)
- return;
+ if (!svc)
+ return NULL;
- pthread_mutex_lock (&conn->connlock);
+ pthread_mutex_lock (&svc->rpclock);
{
- ref = __rpcsvc_conn_unref (conn);
+ list_for_each_entry (program, &svc->programs, program) {
+ if ((program->prognum == prognum)
+ && (program->progver == progver)) {
+ found = 1;
+ break;
+ }
+ }
}
- pthread_mutex_unlock (&conn->connlock);
+ pthread_mutex_unlock (&svc->rpclock);
- if (ref == 0) {
- rpcsvc_conn_destroy (conn);
- }
+ if (found)
+ return program->actors[procnum].vector_sizer;
+ else
+ return NULL;
}
-
int
-rpcsvc_conn_active (rpcsvc_conn_t *conn)
-{
- int status = 0;
-
- if (!conn)
- return 0;
-
- pthread_mutex_lock (&conn->connlock);
- {
- status = rpcsvc_conn_check_active (conn);
- }
- pthread_mutex_unlock (&conn->connlock);
-
- return status;
-}
-
-
-void
-rpcsvc_conn_ref (rpcsvc_conn_t *conn)
+rpcsvc_request_outstanding (rpcsvc_t *svc, rpc_transport_t *trans, int delta)
{
- if (!conn)
- return;
+ int ret = 0;
+ int old_count = 0;
+ int new_count = 0;
+ int limit = 0;
- pthread_mutex_lock (&conn->connlock);
+ pthread_mutex_lock (&trans->lock);
{
- ++conn->connref;
- }
- pthread_mutex_unlock (&conn->connlock);
-
- return;
-}
-
-
-int
-rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn)
-{
- struct sockaddr_in sa;
- int ret = RPCSVC_AUTH_REJECT;
- socklen_t sasize = sizeof (sa);
- char *srchstr = NULL;
- char *valstr = NULL;
- int globalinsecure = RPCSVC_AUTH_REJECT;
- int exportinsecure = RPCSVC_AUTH_DONTCARE;
- uint16_t port = 0;
- gf_boolean_t insecure = _gf_false;
-
- if ((!svc) || (!volname) || (!conn))
- return ret;
+ limit = svc->outstanding_rpc_limit;
+ if (!limit)
+ goto unlock;
- ret = rpcsvc_conn_peeraddr (conn, NULL, 0, (struct sockaddr *)&sa,
- sasize);
- if (ret != 0) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get peer addr: %s",
- gai_strerror (ret));
- ret = RPCSVC_AUTH_REJECT;
- goto err;
- }
+ old_count = trans->outstanding_rpc_count;
+ trans->outstanding_rpc_count += delta;
+ new_count = trans->outstanding_rpc_count;
- port = ntohs (sa.sin_port);
- gf_log (GF_RPCSVC, GF_LOG_TRACE, "Client port: %d", (int)port);
- /* If the port is already a privileged one, dont bother with checking
- * options.
- */
- if (port <= 1024) {
- ret = RPCSVC_AUTH_ACCEPT;
- goto err;
- }
+ if (old_count <= limit && new_count > limit)
+ ret = rpc_transport_throttle (trans, _gf_true);
- /* Disabled by default */
- if ((dict_get (svc->options, "rpc-auth.ports.insecure"))) {
- ret = dict_get_str (svc->options, "rpc-auth.ports.insecure"
- , &srchstr);
- if (ret == 0) {
- ret = gf_string2boolean (srchstr, &insecure);
- if (ret == 0) {
- if (insecure == _gf_true)
- globalinsecure = RPCSVC_AUTH_ACCEPT;
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
- " read rpc-auth.ports.insecure value");
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
- " read rpc-auth.ports.insecure value");
+ if (old_count > limit && new_count <= limit)
+ ret = rpc_transport_throttle (trans, _gf_false);
}
+unlock:
+ pthread_mutex_unlock (&trans->lock);
- /* Disabled by default */
- ret = gf_asprintf (&srchstr, "rpc-auth.ports.%s.insecure", volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- ret = RPCSVC_AUTH_REJECT;
- goto err;
- }
-
- if (dict_get (svc->options, srchstr)) {
- ret = dict_get_str (svc->options, srchstr, &valstr);
- if (ret == 0) {
- ret = gf_string2boolean (srchstr, &insecure);
- if (ret == 0) {
- if (insecure == _gf_true)
- exportinsecure = RPCSVC_AUTH_ACCEPT;
- else
- exportinsecure = RPCSVC_AUTH_REJECT;
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
- " read rpc-auth.ports.insecure value");
- } else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
- " read rpc-auth.ports.insecure value");
- }
-
- ret = rpcsvc_combine_gen_spec_volume_checks (globalinsecure,
- exportinsecure);
- if (ret == RPCSVC_AUTH_ACCEPT)
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port allowed");
- else
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port not"
- " allowed");
-
-err:
return ret;
}
@@ -851,7 +165,7 @@ err:
* of the pointers below are NULL.
*/
rpcsvc_actor_t *
-rpcsvc_program_actor (rpcsvc_conn_t *conn, rpcsvc_request_t *req)
+rpcsvc_program_actor (rpcsvc_request_t *req)
{
rpcsvc_program_t *program = NULL;
int err = SYSTEM_ERR;
@@ -859,10 +173,10 @@ rpcsvc_program_actor (rpcsvc_conn_t *conn, rpcsvc_request_t *req)
rpcsvc_t *svc = NULL;
char found = 0;
- if ((!conn) || (!req))
+ if (!req)
goto err;
- svc = conn->svc;
+ svc = req->svc;
pthread_mutex_lock (&svc->rpclock);
{
list_for_each_entry (program, &svc->programs, program) {
@@ -881,26 +195,35 @@ rpcsvc_program_actor (rpcsvc_conn_t *conn, rpcsvc_request_t *req)
if (!found) {
if (err != PROG_MISMATCH) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "RPC program not available");
+ /* log in DEBUG when nfs clients try to see if
+ * ACL requests are accepted by nfs server
+ */
+ gf_log (GF_RPCSVC, (req->prognum == ACL_PROGRAM) ?
+ GF_LOG_DEBUG : GF_LOG_WARNING,
+ "RPC program not available (req %u %u)",
+ req->prognum, req->progver);
err = PROG_UNAVAIL;
goto err;
}
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC program version not"
- " available");
+ gf_log (GF_RPCSVC, GF_LOG_WARNING,
+ "RPC program version not available (req %u %u)",
+ req->prognum, req->progver);
goto err;
}
req->prog = program;
if (!program->actors) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC System error");
+ gf_log (GF_RPCSVC, GF_LOG_WARNING,
+ "RPC Actor not found for program %s %d",
+ program->progname, program->prognum);
err = SYSTEM_ERR;
goto err;
}
if ((req->procnum < 0) || (req->procnum >= program->numactors)) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC Program procedure not"
- " available");
+ " available for procedure %d in %s", req->procnum,
+ program->progname);
err = PROC_UNAVAIL;
goto err;
}
@@ -908,12 +231,15 @@ rpcsvc_program_actor (rpcsvc_conn_t *conn, rpcsvc_request_t *req)
actor = &program->actors[req->procnum];
if (!actor->actor) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC Program procedure not"
- " available");
+ " available for procedure %d in %s", req->procnum,
+ program->progname);
err = PROC_UNAVAIL;
actor = NULL;
goto err;
}
+ req->synctask = program->synctask;
+
err = SUCCESS;
gf_log (GF_RPCSVC, GF_LOG_TRACE, "Actor found: %s - %s",
program->progname, actor->procname);
@@ -926,9 +252,9 @@ err:
/* this procedure can only pass 4 arguments to registered notifyfn. To send more
- * arguements call wrapper->notify directly.
+ * arguments call wrapper->notify directly.
*/
-inline void
+static inline void
rpcsvc_program_notify (rpcsvc_listener_t *listener, rpcsvc_event_t event,
void *data)
{
@@ -938,9 +264,9 @@ rpcsvc_program_notify (rpcsvc_listener_t *listener, rpcsvc_event_t event,
goto out;
}
- list_for_each_entry (wrapper, &listener->list, list) {
+ list_for_each_entry (wrapper, &listener->svc->notify, list) {
if (wrapper->notify) {
- wrapper->notify (listener->conn->svc,
+ wrapper->notify (listener->svc,
wrapper->data,
event, data);
}
@@ -951,37 +277,29 @@ out:
}
-int
-rpcsvc_accept (rpcsvc_conn_t *listen_conn, rpc_transport_t *new_trans)
+static inline int
+rpcsvc_accept (rpcsvc_t *svc, rpc_transport_t *listen_trans,
+ rpc_transport_t *new_trans)
{
rpcsvc_listener_t *listener = NULL;
- rpcsvc_conn_t *conn = NULL;
- char clstr[RPCSVC_PEER_STRLEN];
-
- listener = listen_conn->listener;
- conn = rpcsvc_conn_init (listen_conn->svc, new_trans);
- if (!conn) {
- rpc_transport_disconnect (new_trans);
- memset (clstr, 0, RPCSVC_PEER_STRLEN);
- rpc_transport_get_peername (new_trans, clstr,
- RPCSVC_PEER_STRLEN);
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "allocating connection for "
- "new transport (%s) failed", clstr);
+ int32_t ret = -1;
+
+ listener = rpcsvc_get_listener (svc, -1, listen_trans);
+ if (listener == NULL) {
goto out;
}
- conn->listener = listener;
-
- //rpcsvc_program_notify (listener, RPCSVC_EVENT_ACCEPT, conn);
+ rpcsvc_program_notify (listener, RPCSVC_EVENT_ACCEPT, new_trans);
+ ret = 0;
out:
- return 0;
+ return ret;
}
void
-rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req)
+rpcsvc_request_destroy (rpcsvc_request_t *req)
{
- if (!conn || !req) {
+ if (!req) {
goto out;
}
@@ -989,18 +307,36 @@ rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req)
iobref_unref (req->iobref);
}
- mem_put (conn->rxpool, req);
+ if (req->hdr_iobuf)
+ iobuf_unref (req->hdr_iobuf);
+
+ /* This marks the "end" of an RPC request. Reply is
+ completely written to the socket and is on the way
+ to the client. It is time to decrement the
+ outstanding request counter by 1.
+ */
+ rpcsvc_request_outstanding (req->svc, req->trans, -1);
+
+ rpc_transport_unref (req->trans);
+
+ GF_FREE (req->auxgidlarge);
+
+ mem_put (req);
+
out:
return;
}
rpcsvc_request_t *
-rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg,
+rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans,
+ struct rpc_msg *callmsg,
struct iovec progmsg, rpc_transport_pollin_t *msg,
rpcsvc_request_t *req)
{
- if ((!conn) || (!callmsg)|| (!req) || (!msg))
+ int i = 0;
+
+ if ((!trans) || (!callmsg)|| (!req) || (!msg))
return NULL;
/* We start a RPC request as always denied. */
@@ -1009,14 +345,19 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg,
req->prognum = rpc_call_program (callmsg);
req->progver = rpc_call_progver (callmsg);
req->procnum = rpc_call_progproc (callmsg);
- req->conn = conn;
+ req->trans = rpc_transport_ref (trans);
req->count = msg->count;
req->msg[0] = progmsg;
req->iobref = iobref_ref (msg->iobref);
if (msg->vectored) {
- req->msg[1] = msg->vector[1];
+ /* msg->vector[2] is defined in structure. prevent a
+ out of bound access */
+ for (i = 1; i < min (msg->count, 2); i++) {
+ req->msg[i] = msg->vector[i];
+ }
}
+ req->svc = svc;
req->trans_private = msg->private;
INIT_LIST_HEAD (&req->txlist);
@@ -1038,7 +379,8 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg,
rpcsvc_request_t *
-rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
+rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans,
+ rpc_transport_pollin_t *msg)
{
char *msgbuf = NULL;
struct rpc_msg rpcmsg;
@@ -1047,7 +389,7 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
size_t msglen = 0;
int ret = -1;
- if (!conn)
+ if (!svc || !trans)
return NULL;
/* We need to allocate the request before actually calling
@@ -1056,12 +398,17 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
* This avoids a need to keep a temp buffer into which the auth data
* would've been copied otherwise.
*/
- rpcsvc_alloc_request (conn, req);
+ rpcsvc_alloc_request (svc, req);
if (!req) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to alloc request");
goto err;
}
+ /* We just received a new request from the wire. Account for
+ it in the outsanding request counter to make sure we don't
+ ingest too many concurrent requests from the same client.
+ */
+ ret = rpcsvc_request_outstanding (svc, trans, +1);
+
msgbuf = msg->vector[0].iov_base;
msglen = msg->vector[0].iov_len;
@@ -1069,21 +416,32 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
req->cred.authdata,req->verf.authdata);
if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC call decoding failed");
+ gf_log (GF_RPCSVC, GF_LOG_WARNING, "RPC call decoding failed");
rpcsvc_request_seterr (req, GARBAGE_ARGS);
+ req->trans = rpc_transport_ref (trans);
+ req->svc = svc;
goto err;
}
ret = -1;
- rpcsvc_request_init (conn, &rpcmsg, progmsg, msg, req);
+ rpcsvc_request_init (svc, trans, &rpcmsg, progmsg, msg, req);
- gf_log (GF_RPCSVC, GF_LOG_TRACE, "RPC XID: %lx, Ver: %ld, Program: %ld,"
- " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg),
+ gf_log (GF_RPCSVC, GF_LOG_TRACE, "received rpc-message (XID: 0x%lx, "
+ "Ver: %ld, Program: %ld, ProgVers: %ld, Proc: %ld) from"
+ " rpc-transport (%s)", rpc_call_xid (&rpcmsg),
rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg),
- rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg));
+ rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg),
+ trans->name);
if (rpc_call_rpcvers (&rpcmsg) != 2) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC version not supported");
+ /* LOG- TODO: print rpc version, also print the peerinfo
+ from transport */
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC version not supported "
+ "(XID: 0x%lx, Ver: %ld, Prog: %ld, ProgVers: %ld, "
+ "Proc: %ld) from trans (%s)", rpc_call_xid (&rpcmsg),
+ rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg),
+ rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg),
+ trans->name);
rpcsvc_request_seterr (req, RPC_MISMATCH);
goto err;
}
@@ -1095,7 +453,12 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
* error happened.
*/
rpcsvc_request_seterr (req, AUTH_ERROR);
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed authentication");
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "auth failed on request. "
+ "(XID: 0x%lx, Ver: %ld, Prog: %ld, ProgVers: %ld, "
+ "Proc: %ld) from trans (%s)", rpc_call_xid (&rpcmsg),
+ rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg),
+ rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg),
+ trans->name);
ret = -1;
goto err;
}
@@ -1105,12 +468,13 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
* since we are not handling authentication failures for now.
*/
req->rpc_status = MSG_ACCEPTED;
+ req->reply = NULL;
ret = 0;
err:
if (ret == -1) {
ret = rpcsvc_error_reply (req);
if (ret)
- gf_log ("rpcsvc", GF_LOG_DEBUG,
+ gf_log ("rpcsvc", GF_LOG_WARNING,
"failed to queue error reply");
req = NULL;
}
@@ -1120,57 +484,213 @@ err:
int
-rpcsvc_handle_rpc_call (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
+rpcsvc_check_and_reply_error (int ret, call_frame_t *frame, void *opaque)
{
- rpcsvc_actor_t *actor = NULL;
- rpcsvc_request_t *req = NULL;
- int ret = -1;
+ rpcsvc_request_t *req = NULL;
+
+ req = opaque;
+
+ if (ret)
+ gf_log ("rpcsvc", GF_LOG_ERROR,
+ "rpc actor failed to complete successfully");
+
+ if (ret == RPCSVC_ACTOR_ERROR) {
+ ret = rpcsvc_error_reply (req);
+ if (ret)
+ gf_log ("rpcsvc", GF_LOG_WARNING,
+ "failed to queue error reply");
+ }
+
+ return 0;
+}
- if (!conn)
+int
+rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
+ rpc_transport_pollin_t *msg)
+{
+ rpcsvc_actor_t *actor = NULL;
+ rpcsvc_actor actor_fn = NULL;
+ rpcsvc_request_t *req = NULL;
+ int ret = -1;
+ uint16_t port = 0;
+ gf_boolean_t is_unix = _gf_false;
+ gf_boolean_t unprivileged = _gf_false;
+ drc_cached_op_t *reply = NULL;
+ rpcsvc_drc_globals_t *drc = NULL;
+
+ if (!trans || !svc)
+ return -1;
+
+ switch (trans->peerinfo.sockaddr.ss_family) {
+ case AF_INET:
+ port = ((struct sockaddr_in *)&trans->peerinfo.sockaddr)->sin_port;
+ break;
+
+ case AF_INET6:
+ port = ((struct sockaddr_in6 *)&trans->peerinfo.sockaddr)->sin6_port;
+ break;
+ case AF_UNIX:
+ is_unix = _gf_true;
+ break;
+ default:
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "invalid address family (%d)",
+ trans->peerinfo.sockaddr.ss_family);
return -1;
+ }
+
+
+
+ if (is_unix == _gf_false) {
+ port = ntohs (port);
- req = rpcsvc_request_create (conn, msg);
+ gf_log ("rpcsvc", GF_LOG_TRACE, "Client port: %d", (int)port);
+
+ if (port > 1024)
+ unprivileged = _gf_true;
+ }
+
+ req = rpcsvc_request_create (svc, trans, msg);
if (!req)
- goto err;
+ goto out;
if (!rpcsvc_request_accepted (req))
goto err_reply;
- actor = rpcsvc_program_actor (conn, req);
+ actor = rpcsvc_program_actor (req);
if (!actor)
goto err_reply;
- if (actor && (req->rpc_err == SUCCESS)) {
- if (req->count == 2) {
- if (actor->vector_actor) {
- rpcsvc_conn_ref (conn);
- ret = actor->vector_actor (req, &req->msg[1], 1,
- req->iobref);
- } else {
- rpcsvc_request_seterr (req, PROC_UNAVAIL);
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "No vectored handler present");
- ret = RPCSVC_ACTOR_ERROR;
- }
- } else if (actor->actor) {
- rpcsvc_conn_ref (req->conn);
- /* Before going to xlator code, set the THIS properly */
- THIS = conn->svc->mydata;
- ret = actor->actor (req);
+ if (0 == svc->allow_insecure && unprivileged && !actor->unprivileged) {
+ /* Non-privileged user, fail request */
+ gf_log ("glusterd", GF_LOG_ERROR,
+ "Request received from non-"
+ "privileged port. Failing request");
+ rpcsvc_request_destroy (req);
+ return -1;
+ }
+
+ /* DRC */
+ if (rpcsvc_need_drc (req)) {
+ drc = req->svc->drc;
+
+ LOCK (&drc->lock);
+ reply = rpcsvc_drc_lookup (req);
+
+ /* retransmission of completed request, send cached reply */
+ if (reply && reply->state == DRC_OP_CACHED) {
+ gf_log (GF_RPCSVC, GF_LOG_INFO, "duplicate request:"
+ " XID: 0x%x", req->xid);
+ ret = rpcsvc_send_cached_reply (req, reply);
+ drc->cache_hits++;
+ UNLOCK (&drc->lock);
+ goto out;
+
+ } /* retransmitted request, original op in transit, drop it */
+ else if (reply && reply->state == DRC_OP_IN_TRANSIT) {
+ gf_log (GF_RPCSVC, GF_LOG_INFO, "op in transit,"
+ " discarding. XID: 0x%x", req->xid);
+ ret = 0;
+ drc->intransit_hits++;
+ rpcsvc_request_destroy (req);
+ UNLOCK (&drc->lock);
+ goto out;
+
+ } /* fresh request, cache it as in-transit and proceed */
+ else {
+ ret = rpcsvc_cache_request (req);
}
+ UNLOCK (&drc->lock);
}
-err_reply:
- if ((ret == RPCSVC_ACTOR_ERROR) || (req->rpc_err != SUCCESS))
- ret = rpcsvc_error_reply (req);
+ if (req->rpc_err == SUCCESS) {
+ /* Before going to xlator code, set the THIS properly */
+ THIS = svc->mydata;
- if (ret)
- gf_log ("rpcsvc", GF_LOG_DEBUG, "failed to queue error reply");
+ actor_fn = actor->actor;
+
+ if (!actor_fn) {
+ rpcsvc_request_seterr (req, PROC_UNAVAIL);
+ /* LOG TODO: print more info about procnum,
+ prognum etc, also print transport info */
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "No vectored handler present");
+ ret = RPCSVC_ACTOR_ERROR;
+ goto err_reply;
+ }
+
+ if (req->synctask) {
+ if (msg->hdr_iobuf)
+ req->hdr_iobuf = iobuf_ref (msg->hdr_iobuf);
+ ret = synctask_new (THIS->ctx->env,
+ (synctask_fn_t) actor_fn,
+ rpcsvc_check_and_reply_error, NULL,
+ req);
+ } else {
+ ret = actor_fn (req);
+ }
+ }
+
+err_reply:
+
+ ret = rpcsvc_check_and_reply_error (ret, NULL, req);
/* No need to propagate error beyond this function since the reply
* has now been queued. */
ret = 0;
-err:
+
+out:
+ return ret;
+}
+
+
+int
+rpcsvc_handle_disconnect (rpcsvc_t *svc, rpc_transport_t *trans)
+{
+ rpcsvc_event_t event;
+ rpcsvc_notify_wrapper_t *wrappers = NULL, *wrapper;
+ int32_t ret = -1, i = 0, wrapper_count = 0;
+ rpcsvc_listener_t *listener = NULL;
+
+ event = (trans->listener == NULL) ? RPCSVC_EVENT_LISTENER_DEAD
+ : RPCSVC_EVENT_DISCONNECT;
+
+ pthread_mutex_lock (&svc->rpclock);
+ {
+ if (!svc->notify_count)
+ goto unlock;
+
+ wrappers = GF_CALLOC (svc->notify_count, sizeof (*wrapper),
+ gf_common_mt_rpcsvc_wrapper_t);
+ if (!wrappers) {
+ goto unlock;
+ }
+
+ list_for_each_entry (wrapper, &svc->notify, list) {
+ if (wrapper->notify) {
+ wrappers[i++] = *wrapper;
+ }
+ }
+
+ wrapper_count = i;
+ }
+unlock:
+ pthread_mutex_unlock (&svc->rpclock);
+
+ if (wrappers) {
+ for (i = 0; i < wrapper_count; i++) {
+ wrappers[i].notify (svc, wrappers[i].data,
+ event, trans);
+ }
+
+ GF_FREE (wrappers);
+ }
+
+ if (event == RPCSVC_EVENT_LISTENER_DEAD) {
+ listener = rpcsvc_get_listener (svc, -1, trans->listener);
+ rpcsvc_listener_destroy (listener);
+ }
+
return ret;
}
@@ -1179,30 +699,30 @@ int
rpcsvc_notify (rpc_transport_t *trans, void *mydata,
rpc_transport_event_t event, void *data, ...)
{
- rpcsvc_conn_t *conn = NULL;
int ret = -1;
rpc_transport_pollin_t *msg = NULL;
rpc_transport_t *new_trans = NULL;
+ rpcsvc_t *svc = NULL;
+ rpcsvc_listener_t *listener = NULL;
- conn = mydata;
- if (conn == NULL) {
+ svc = mydata;
+ if (svc == NULL) {
goto out;
}
switch (event) {
case RPC_TRANSPORT_ACCEPT:
new_trans = data;
- ret = rpcsvc_accept (conn, new_trans);
+ ret = rpcsvc_accept (svc, trans, new_trans);
break;
case RPC_TRANSPORT_DISCONNECT:
- rpcsvc_conn_deinit (conn);
- ret = 0;
+ ret = rpcsvc_handle_disconnect (svc, trans);
break;
case RPC_TRANSPORT_MSG_RECEIVED:
msg = data;
- ret = rpcsvc_handle_rpc_call (conn, msg);
+ ret = rpcsvc_handle_rpc_call (svc, trans, msg);
break;
case RPC_TRANSPORT_MSG_SENT:
@@ -1213,13 +733,20 @@ rpcsvc_notify (rpc_transport_t *trans, void *mydata,
/* do nothing, no need for rpcsvc to handle this, client should
* handle this event
*/
+ /* print info about transport too : LOG TODO */
gf_log ("rpcsvc", GF_LOG_CRITICAL,
"got CONNECT event, which should have not come");
ret = 0;
break;
case RPC_TRANSPORT_CLEANUP:
- /* FIXME: think about this later */
+ listener = rpcsvc_get_listener (svc, -1, trans->listener);
+ if (listener == NULL) {
+ goto out;
+ }
+
+ rpcsvc_program_notify (listener, RPCSVC_EVENT_TRANSPORT_DESTROY,
+ trans);
ret = 0;
break;
@@ -1236,26 +763,6 @@ out:
}
-void
-rpcsvc_set_lastfrag (uint32_t *fragsize) {
- (*fragsize) |= 0x80000000U;
-}
-
-void
-rpcsvc_set_frag_header_size (uint32_t size, char *haddr)
-{
- size = htonl (size);
- memcpy (haddr, &size, sizeof (size));
-}
-
-void
-rpcsvc_set_last_frag_header_size (uint32_t size, char *haddr)
-{
- rpcsvc_set_lastfrag (&size);
- rpcsvc_set_frag_header_size (size, haddr);
-}
-
-
/* Given the RPC reply structure and the payload handed by the RPC program,
* encode the RPC record header into the buffer pointed by recordstart.
*/
@@ -1271,10 +778,9 @@ rpcsvc_record_build_header (char *recordstart, size_t rlen,
/* After leaving aside the 4 bytes for the fragment header, lets
* encode the RPC reply structure into the buffer given to us.
*/
- ret = rpc_reply_to_xdr (&reply,(recordstart + RPCSVC_FRAGHDR_SIZE),
- rlen, &replyhdr);
+ ret = rpc_reply_to_xdr (&reply, recordstart, rlen, &replyhdr);
if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to create RPC reply");
+ gf_log (GF_RPCSVC, GF_LOG_WARNING, "Failed to create RPC reply");
goto err;
}
@@ -1282,16 +788,78 @@ rpcsvc_record_build_header (char *recordstart, size_t rlen,
gf_log (GF_RPCSVC, GF_LOG_TRACE, "Reply fraglen %zu, payload: %zu, "
"rpc hdr: %zu", fraglen, payload, replyhdr.iov_len);
- /* Since we're not spreading RPC records over mutiple fragments
- * we just set this fragment as the first and last fragment for this
- * record.
- */
- rpcsvc_set_last_frag_header_size (fraglen, recordstart);
+ txrecord.iov_base = recordstart;
- /* Even though the RPC record starts at recordstart+RPCSVC_FRAGHDR_SIZE
- * we need to transmit the record with the fragment header, which starts
- * at recordstart.
+ /* Remember, this is only the vec for the RPC header and does not
+ * include the payload above. We needed the payload only to calculate
+ * the size of the full fragment. This size is sent in the fragment
+ * header.
*/
+ txrecord.iov_len = replyhdr.iov_len;
+err:
+ return txrecord;
+}
+
+static inline int
+rpcsvc_get_callid (rpcsvc_t *rpc)
+{
+ return GF_UNIVERSAL_ANSWER;
+}
+
+int
+rpcsvc_fill_callback (int prognum, int progver, int procnum, int payload,
+ uint64_t xid, struct rpc_msg *request)
+{
+ int ret = -1;
+
+ if (!request) {
+ goto out;
+ }
+
+ memset (request, 0, sizeof (*request));
+
+ request->rm_xid = xid;
+ request->rm_direction = CALL;
+
+ request->rm_call.cb_rpcvers = 2;
+ request->rm_call.cb_prog = prognum;
+ request->rm_call.cb_vers = progver;
+ request->rm_call.cb_proc = procnum;
+
+ request->rm_call.cb_cred.oa_flavor = AUTH_NONE;
+ request->rm_call.cb_cred.oa_base = NULL;
+ request->rm_call.cb_cred.oa_length = 0;
+
+ request->rm_call.cb_verf.oa_flavor = AUTH_NONE;
+ request->rm_call.cb_verf.oa_base = NULL;
+ request->rm_call.cb_verf.oa_length = 0;
+
+ ret = 0;
+out:
+ return ret;
+}
+
+
+struct iovec
+rpcsvc_callback_build_header (char *recordstart, size_t rlen,
+ struct rpc_msg *request, size_t payload)
+{
+ struct iovec requesthdr = {0, };
+ struct iovec txrecord = {0, 0};
+ int ret = -1;
+ size_t fraglen = 0;
+
+ ret = rpc_request_to_xdr (request, recordstart, rlen, &requesthdr);
+ if (ret == -1) {
+ gf_log ("rpcsvc", GF_LOG_WARNING,
+ "Failed to create RPC request");
+ goto out;
+ }
+
+ fraglen = payload + requesthdr.iov_len;
+ gf_log ("rpcsvc", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, "
+ "rpc hdr: %zu", fraglen, payload, requesthdr.iov_len);
+
txrecord.iov_base = recordstart;
/* Remember, this is only the vec for the RPC header and does not
@@ -1299,27 +867,141 @@ rpcsvc_record_build_header (char *recordstart, size_t rlen,
* the size of the full fragment. This size is sent in the fragment
* header.
*/
- txrecord.iov_len = RPCSVC_FRAGHDR_SIZE + replyhdr.iov_len;
-err:
+ txrecord.iov_len = requesthdr.iov_len;
+
+out:
return txrecord;
}
+struct iobuf *
+rpcsvc_callback_build_record (rpcsvc_t *rpc, int prognum, int progver,
+ int procnum, size_t payload, uint64_t xid,
+ struct iovec *recbuf)
+{
+ struct rpc_msg request = {0, };
+ struct iobuf *request_iob = NULL;
+ char *record = NULL;
+ struct iovec recordhdr = {0, };
+ size_t pagesize = 0;
+ size_t xdr_size = 0;
+ int ret = -1;
+
+ if ((!rpc) || (!recbuf)) {
+ goto out;
+ }
+
+ /* Fill the rpc structure and XDR it into the buffer got above. */
+ ret = rpcsvc_fill_callback (prognum, progver, procnum, payload, xid,
+ &request);
+ if (ret == -1) {
+ gf_log ("rpcsvc", GF_LOG_WARNING, "cannot build a rpc-request "
+ "xid (%"PRIu64")", xid);
+ goto out;
+ }
+
+ /* First, try to get a pointer into the buffer which the RPC
+ * layer can use.
+ */
+ xdr_size = xdr_sizeof ((xdrproc_t)xdr_callmsg, &request);
+
+ request_iob = iobuf_get2 (rpc->ctx->iobuf_pool, (xdr_size + payload));
+ if (!request_iob) {
+ goto out;
+ }
+
+ pagesize = iobuf_pagesize (request_iob);
+
+ record = iobuf_ptr (request_iob); /* Now we have it. */
+
+ recordhdr = rpcsvc_callback_build_header (record, pagesize, &request,
+ payload);
+
+ if (!recordhdr.iov_base) {
+ gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record "
+ " header");
+ iobuf_unref (request_iob);
+ request_iob = NULL;
+ recbuf->iov_base = NULL;
+ goto out;
+ }
+
+ recbuf->iov_base = recordhdr.iov_base;
+ recbuf->iov_len = recordhdr.iov_len;
+
+out:
+ return request_iob;
+}
int
-rpcsvc_conn_submit (rpcsvc_conn_t *conn, struct iovec *hdrvec,
- int hdrcount, struct iovec *proghdr, int proghdrcount,
- struct iovec *progpayload, int progpayloadcount,
- struct iobref *iobref, void *priv)
+rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
+ rpcsvc_cbk_program_t *prog, int procnum,
+ struct iovec *proghdr, int proghdrcount)
+{
+ struct iobuf *request_iob = NULL;
+ struct iovec rpchdr = {0,};
+ rpc_transport_req_t req;
+ int ret = -1;
+ int proglen = 0;
+ uint64_t callid = 0;
+
+ if (!rpc) {
+ goto out;
+ }
+
+ memset (&req, 0, sizeof (req));
+
+ callid = rpcsvc_get_callid (rpc);
+
+ if (proghdr) {
+ proglen += iov_length (proghdr, proghdrcount);
+ }
+
+ request_iob = rpcsvc_callback_build_record (rpc, prog->prognum,
+ prog->progver, procnum,
+ proglen, callid,
+ &rpchdr);
+ if (!request_iob) {
+ gf_log ("rpcsvc", GF_LOG_WARNING,
+ "cannot build rpc-record");
+ goto out;
+ }
+
+ req.msg.rpchdr = &rpchdr;
+ req.msg.rpchdrcount = 1;
+ req.msg.proghdr = proghdr;
+ req.msg.proghdrcount = proghdrcount;
+
+ ret = rpc_transport_submit_request (trans, &req);
+ if (ret == -1) {
+ gf_log ("rpcsvc", GF_LOG_WARNING,
+ "transmission of rpc-request failed");
+ goto out;
+ }
+
+ ret = 0;
+
+out:
+ iobuf_unref (request_iob);
+
+ return ret;
+}
+
+int
+rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr,
+ int rpchdrcount, struct iovec *proghdr,
+ int proghdrcount, struct iovec *progpayload,
+ int progpayloadcount, struct iobref *iobref,
+ void *priv)
{
int ret = -1;
rpc_transport_reply_t reply = {{0, }};
- if ((!conn) || (!hdrvec) || (!hdrvec->iov_base) || (!conn->trans)) {
+ if ((!trans) || (!rpchdr) || (!rpchdr->iov_base)) {
goto out;
}
- reply.msg.rpchdr = hdrvec;
- reply.msg.rpchdrcount = hdrcount;
+ reply.msg.rpchdr = rpchdr;
+ reply.msg.rpchdrcount = rpchdrcount;
reply.msg.proghdr = proghdr;
reply.msg.proghdrcount = proghdrcount;
reply.msg.progpayload = progpayload;
@@ -1327,15 +1009,7 @@ rpcsvc_conn_submit (rpcsvc_conn_t *conn, struct iovec *hdrvec,
reply.msg.iobref = iobref;
reply.private = priv;
- /* Now that we have both the RPC and Program buffers in xdr format
- * lets hand it to the transmission layer.
- */
- if (!rpcsvc_conn_check_active (conn)) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Connection inactive");
- goto out;
- }
-
- ret = rpc_transport_submit_reply (conn->trans, &reply);
+ ret = rpc_transport_submit_reply (trans, &reply);
out:
return ret;
@@ -1345,24 +1019,31 @@ out:
int
rpcsvc_fill_reply (rpcsvc_request_t *req, struct rpc_msg *reply)
{
+ int ret = -1;
rpcsvc_program_t *prog = NULL;
if ((!req) || (!reply))
- return -1;
+ goto out;
- prog = rpcsvc_request_program (req);
+ ret = 0;
rpc_fill_empty_reply (reply, req->xid);
-
- if (req->rpc_status == MSG_DENIED)
+ if (req->rpc_status == MSG_DENIED) {
rpc_fill_denied_reply (reply, req->rpc_err, req->auth_err);
- else if (req->rpc_status == MSG_ACCEPTED)
- rpc_fill_accepted_reply (reply, req->rpc_err, prog->proglowvers,
- prog->proghighvers, req->verf.flavour,
- req->verf.datalen,
+ goto out;
+ }
+
+ prog = rpcsvc_request_program (req);
+
+ if (req->rpc_status == MSG_ACCEPTED)
+ rpc_fill_accepted_reply (reply, req->rpc_err,
+ (prog) ? prog->proglowvers : 0,
+ (prog) ? prog->proghighvers: 0,
+ req->verf.flavour, req->verf.datalen,
req->verf.authdata);
else
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Invalid rpc_status value");
- return 0;
+out:
+ return ret;
}
@@ -1376,35 +1057,40 @@ rpcsvc_fill_reply (rpcsvc_request_t *req, struct rpc_msg *reply)
*/
struct iobuf *
rpcsvc_record_build_record (rpcsvc_request_t *req, size_t payload,
- struct iovec *recbuf)
+ size_t hdrlen, struct iovec *recbuf)
{
struct rpc_msg reply;
struct iobuf *replyiob = NULL;
char *record = NULL;
struct iovec recordhdr = {0, };
size_t pagesize = 0;
- rpcsvc_conn_t *conn = NULL;
+ size_t xdr_size = 0;
rpcsvc_t *svc = NULL;
+ int ret = -1;
- if ((!req) || (!req->conn) || (!recbuf))
+ if ((!req) || (!req->trans) || (!req->svc) || (!recbuf))
return NULL;
- /* First, try to get a pointer into the buffer which the RPC
- * layer can use.
- */
- conn = req->conn;
- svc = rpcsvc_conn_rpcsvc (conn);
- replyiob = iobuf_get (svc->ctx->iobuf_pool);
- pagesize = iobpool_pagesize ((struct iobuf_pool *)svc->ctx->iobuf_pool);
+ svc = req->svc;
+
+ /* Fill the rpc structure and XDR it into the buffer got above. */
+ ret = rpcsvc_fill_reply (req, &reply);
+ if (ret)
+ goto err_exit;
+
+ xdr_size = xdr_sizeof ((xdrproc_t)xdr_replymsg, &reply);
+
+ /* Payload would include 'readv' size etc too, where as
+ that comes as another payload iobuf */
+ replyiob = iobuf_get2 (svc->ctx->iobuf_pool, (xdr_size + hdrlen));
if (!replyiob) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get iobuf");
goto err_exit;
}
+ pagesize = iobuf_pagesize (replyiob);
+
record = iobuf_ptr (replyiob); /* Now we have it. */
- /* Fill the rpc structure and XDR it into the buffer got above. */
- rpcsvc_fill_reply (req, &reply);
recordhdr = rpcsvc_record_build_header (record, pagesize, reply,
payload);
if (!recordhdr.iov_base) {
@@ -1454,17 +1140,19 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,
int hdrcount, struct iovec *payload, int payloadcount,
struct iobref *iobref)
{
- int ret = -1, i = 0;
- struct iobuf *replyiob = NULL;
- struct iovec recordhdr = {0, };
- rpcsvc_conn_t *conn = NULL;
- size_t msglen = 0;
- char new_iobref = 0;
-
- if ((!req) || (!req->conn))
+ int ret = -1, i = 0;
+ struct iobuf *replyiob = NULL;
+ struct iovec recordhdr = {0, };
+ rpc_transport_t *trans = NULL;
+ size_t msglen = 0;
+ size_t hdrlen = 0;
+ char new_iobref = 0;
+ rpcsvc_drc_globals_t *drc = NULL;
+
+ if ((!req) || (!req->trans))
return -1;
- conn = req->conn;
+ trans = req->trans;
for (i = 0; i < hdrcount; i++) {
msglen += proghdr[i].iov_len;
@@ -1477,7 +1165,7 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,
gf_log (GF_RPCSVC, GF_LOG_TRACE, "Tx message: %zu", msglen);
/* Build the buffer containing the encoded RPC reply. */
- replyiob = rpcsvc_record_build_record (req, msglen, &recordhdr);
+ replyiob = rpcsvc_record_build_record (req, msglen, hdrlen, &recordhdr);
if (!replyiob) {
gf_log (GF_RPCSVC, GF_LOG_ERROR,"Reply record creation failed");
goto disconnect_exit;
@@ -1486,8 +1174,6 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,
if (!iobref) {
iobref = iobref_new ();
if (!iobref) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation "
- "failed");
goto disconnect_exit;
}
@@ -1496,14 +1182,35 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,
iobref_add (iobref, replyiob);
- ret = rpcsvc_conn_submit (conn, &recordhdr, 1, proghdr, hdrcount,
- payload, payloadcount, iobref,
- req->trans_private);
+ /* cache the request in the duplicate request cache for appropriate ops */
+ if (req->reply) {
+ drc = req->svc->drc;
+
+ LOCK (&drc->lock);
+ ret = rpcsvc_cache_reply (req, iobref, &recordhdr, 1,
+ proghdr, hdrcount,
+ payload, payloadcount);
+ UNLOCK (&drc->lock);
+ }
- rpcsvc_request_destroy (conn, req);
+ ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount,
+ payload, payloadcount, iobref,
+ req->trans_private);
if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to submit message");
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "failed to submit message "
+ "(XID: 0x%x, Program: %s, ProgVers: %d, Proc: %d) to "
+ "rpc-transport (%s)", req->xid,
+ req->prog ? req->prog->progname : "(not matched)",
+ req->prog ? req->prog->progver : 0,
+ req->procnum, trans->name);
+ } else {
+ gf_log (GF_RPCSVC, GF_LOG_TRACE,
+ "submitted reply for rpc-message (XID: 0x%x, "
+ "Program: %s, ProgVers: %d, Proc: %d) to rpc-transport "
+ "(%s)", req->xid, req->prog ? req->prog->progname: "-",
+ req->prog ? req->prog->progver : 0,
+ req->procnum, trans->name);
}
disconnect_exit:
@@ -1515,20 +1222,7 @@ disconnect_exit:
iobref_unref (iobref);
}
- /* Note that a unref is called everytime a reply is sent. This is in
- * response to the ref that is performed on the conn when a request is
- * handed to the RPC program.
- *
- * The catch, however, is that if the reply is an rpc error, we must
- * not unref. This is because the ref only contains
- * references for the actors to which the request was handed plus one
- * reference maintained by the RPC layer. By unrefing for a case where
- * no actor was called, we will be losing the ref held for the RPC
- * layer.
- */
- if ((rpcsvc_request_accepted (req)) &&
- (rpcsvc_request_accepted_success (req)))
- rpcsvc_conn_unref (conn);
+ rpcsvc_request_destroy (req);
return ret;
}
@@ -1542,6 +1236,8 @@ rpcsvc_error_reply (rpcsvc_request_t *req)
if (!req)
return -1;
+ gf_log_callingfn ("", GF_LOG_DEBUG, "sending a RPC error reply");
+
/* At this point the req should already have been filled with the
* appropriate RPC error numbers.
*/
@@ -1550,50 +1246,92 @@ rpcsvc_error_reply (rpcsvc_request_t *req)
/* Register the program with the local portmapper service. */
-int
-rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, rpcsvc_conn_t *conn)
+inline int
+rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port)
{
- int ret = 0;
- struct sockaddr_in sa = {0, };
+ int ret = -1; /* FAIL */
- if (!newprog || !conn->trans) {
+ if (!newprog) {
goto out;
}
+ /* pmap_set() returns 0 for FAIL and 1 for SUCCESS */
if (!(pmap_set (newprog->prognum, newprog->progver, IPPROTO_TCP,
- sa.sin_port))) {
+ port))) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register with"
" portmap");
goto out;
}
- ret = 0;
+ ret = 0; /* SUCCESS */
out:
return ret;
}
-int
+inline int
rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog)
{
+ int ret = -1;
+
if (!prog)
- return -1;
+ goto out;
if (!(pmap_unset(prog->prognum, prog->progver))) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not unregister with"
" portmap");
- return -1;
+ goto out;
}
- return 0;
+ ret = 0;
+out:
+ return ret;
+}
+
+int
+rpcsvc_register_portmap_enabled (rpcsvc_t *svc)
+{
+ return svc->register_portmap;
+}
+
+int32_t
+rpcsvc_get_listener_port (rpcsvc_listener_t *listener)
+{
+ int32_t listener_port = -1;
+
+ if ((listener == NULL) || (listener->trans == NULL)) {
+ goto out;
+ }
+
+ switch (listener->trans->myinfo.sockaddr.ss_family) {
+ case AF_INET:
+ listener_port = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port;
+ break;
+
+ case AF_INET6:
+ listener_port = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port;
+ break;
+
+ default:
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG,
+ "invalid address family (%d)",
+ listener->trans->myinfo.sockaddr.ss_family);
+ goto out;
+ }
+
+ listener_port = ntohs (listener_port);
+
+out:
+ return listener_port;
}
rpcsvc_listener_t *
-rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port)
+rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans)
{
- rpcsvc_listener_t *listener = NULL;
- char found = 0;
+ rpcsvc_listener_t *listener = NULL;
+ char found = 0;
+ uint32_t listener_port = 0;
if (!svc) {
goto out;
@@ -1602,8 +1340,24 @@ rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port)
pthread_mutex_lock (&svc->rpclock);
{
list_for_each_entry (listener, &svc->listeners, list) {
- if (((struct sockaddr_in *)&listener->sa)->sin_port
- == port) {
+ if (trans != NULL) {
+ if (listener->trans == trans) {
+ found = 1;
+ break;
+ }
+
+ continue;
+ }
+
+ listener_port = rpcsvc_get_listener_port (listener);
+ if (listener_port == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "invalid port for listener %s",
+ listener->trans->name);
+ continue;
+ }
+
+ if (listener_port == port) {
found = 1;
break;
}
@@ -1631,7 +1385,7 @@ rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr,
int hdrcount, struct iovec *payload, int payloadcount,
struct iobref *iobref)
{
- if ((!req) || (!req->conn) || (!proghdr) || (!proghdr->iov_base))
+ if ((!req) || (!req->trans) || (!proghdr) || (!proghdr->iov_base))
return -1;
return rpcsvc_submit_generic (req, proghdr, hdrcount, payload,
@@ -1640,84 +1394,106 @@ rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr,
int
-rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t prog)
+rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *program)
{
int ret = -1;
+ rpcsvc_program_t *prog = NULL;
+ if (!svc || !program) {
+ goto out;
+ }
- if (!svc)
- return -1;
-
- /* TODO: De-init the listening connection for this program. */
- ret = rpcsvc_program_unregister_portmap (&prog);
+ ret = rpcsvc_program_unregister_portmap (program);
if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap unregistration of"
" program failed");
- goto err;
+ goto out;
+ }
+
+ pthread_mutex_lock (&svc->rpclock);
+ {
+ list_for_each_entry (prog, &svc->programs, program) {
+ if ((prog->prognum == program->prognum)
+ && (prog->progver == program->progver)) {
+ break;
+ }
+ }
+ }
+ pthread_mutex_unlock (&svc->rpclock);
+
+ if (prog == NULL) {
+ ret = -1;
+ goto out;
}
- ret = 0;
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Program unregistered: %s, Num: %d,"
- " Ver: %d, Port: %d", prog.progname, prog.prognum,
- prog.progver, prog.progport);
+ " Ver: %d, Port: %d", prog->progname, prog->prognum,
+ prog->progver, prog->progport);
-err:
- if (ret == -1)
+ pthread_mutex_lock (&svc->rpclock);
+ {
+ list_del_init (&prog->program);
+ }
+ pthread_mutex_unlock (&svc->rpclock);
+
+ ret = 0;
+out:
+ if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program unregistration failed"
- ": %s, Num: %d, Ver: %d, Port: %d", prog.progname,
- prog.prognum, prog.progver, prog.progport);
+ ": %s, Num: %d, Ver: %d, Port: %d", program->progname,
+ program->prognum, program->progver, program->progport);
+ }
return ret;
}
-int
-rpcsvc_conn_peername (rpcsvc_conn_t *conn, char *hostname, int hostlen)
+inline int
+rpcsvc_transport_peername (rpc_transport_t *trans, char *hostname, int hostlen)
{
- if (!conn || !conn->trans)
+ if (!trans) {
return -1;
+ }
- return rpc_transport_get_peername (conn->trans, hostname, hostlen);
+ return rpc_transport_get_peername (trans, hostname, hostlen);
}
-int
-rpcsvc_conn_peeraddr (rpcsvc_conn_t *conn, char *addrstr, int addrlen,
- struct sockaddr *sa, socklen_t sasize)
+inline int
+rpcsvc_transport_peeraddr (rpc_transport_t *trans, char *addrstr, int addrlen,
+ struct sockaddr_storage *sa, socklen_t sasize)
{
- if (!conn || !conn->trans)
+ if (!trans) {
return -1;
+ }
- return rpc_transport_get_peeraddr(conn->trans, addrstr, addrlen, sa,
+ return rpc_transport_get_peeraddr(trans, addrstr, addrlen, sa,
sasize);
}
-rpcsvc_conn_t *
-rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name)
+rpc_transport_t *
+rpcsvc_transport_create (rpcsvc_t *svc, dict_t *options, char *name)
{
- int ret = -1;
+ int ret = -1;
rpc_transport_t *trans = NULL;
- rpcsvc_conn_t *conn = NULL;
trans = rpc_transport_load (svc->ctx, options, name);
if (!trans) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "cannot create listener, "
+ gf_log (GF_RPCSVC, GF_LOG_WARNING, "cannot create listener, "
"initing the transport failed");
goto out;
}
ret = rpc_transport_listen (trans);
if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG,
+ gf_log (GF_RPCSVC, GF_LOG_WARNING,
"listening on transport failed");
goto out;
}
- conn = rpcsvc_conn_init (svc, trans);
- if (!conn) {
- ret = -1;
- gf_log (GF_RPCSVC, GF_LOG_DEBUG,
- "initializing connection for transport failed");
+ ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_WARNING, "registering notify failed");
goto out;
}
@@ -1725,24 +1501,25 @@ rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name)
out:
if ((ret == -1) && (trans)) {
rpc_transport_disconnect (trans);
+ trans = NULL;
}
- return conn;
+ return trans;
}
rpcsvc_listener_t *
-rpcsvc_listener_alloc (rpcsvc_t *svc, rpcsvc_conn_t *conn)
+rpcsvc_listener_alloc (rpcsvc_t *svc, rpc_transport_t *trans)
{
rpcsvc_listener_t *listener = NULL;
listener = GF_CALLOC (1, sizeof (*listener),
gf_common_mt_rpcsvc_listener_t);
if (!listener) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation failed");
goto out;
}
- listener->conn = conn;
+ listener->trans = trans;
+ listener->svc = svc;
INIT_LIST_HEAD (&listener->list);
@@ -1756,33 +1533,123 @@ out:
}
-rpcsvc_listener_t *
+int32_t
rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name)
{
- rpcsvc_conn_t *conn = NULL;
+ rpc_transport_t *trans = NULL;
rpcsvc_listener_t *listener = NULL;
+ int32_t ret = -1;
if (!svc || !options) {
goto out;
}
- conn = rpcsvc_conn_create (svc, options, name);
- if (!conn) {
+ trans = rpcsvc_transport_create (svc, options, name);
+ if (!trans) {
+ /* LOG TODO */
goto out;
}
- listener = rpcsvc_listener_alloc (svc, conn);
+ listener = rpcsvc_listener_alloc (svc, trans);
if (listener == NULL) {
goto out;
}
- conn->listener = listener;
+ ret = 0;
out:
- if (!listener && conn) {
- rpcsvc_conn_deinit (conn);
+ if (!listener && trans) {
+ rpc_transport_disconnect (trans);
}
- return listener;
+ return ret;
+}
+
+
+int32_t
+rpcsvc_create_listeners (rpcsvc_t *svc, dict_t *options, char *name)
+{
+ int32_t ret = -1, count = 0;
+ data_t *data = NULL;
+ char *str = NULL, *ptr = NULL, *transport_name = NULL;
+ char *transport_type = NULL, *saveptr = NULL, *tmp = NULL;
+
+ if ((svc == NULL) || (options == NULL) || (name == NULL)) {
+ goto out;
+ }
+
+ data = dict_get (options, "transport-type");
+ if (data == NULL) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "option transport-type not set");
+ goto out;
+ }
+
+ transport_type = data_to_str (data);
+ if (transport_type == NULL) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "option transport-type not set");
+ goto out;
+ }
+
+ /* duplicate transport_type, since following dict_set will free it */
+ transport_type = gf_strdup (transport_type);
+ if (transport_type == NULL) {
+ goto out;
+ }
+
+ str = gf_strdup (transport_type);
+ if (str == NULL) {
+ goto out;
+ }
+
+ ptr = strtok_r (str, ",", &saveptr);
+
+ while (ptr != NULL) {
+ tmp = gf_strdup (ptr);
+ if (tmp == NULL) {
+ goto out;
+ }
+
+ ret = gf_asprintf (&transport_name, "%s.%s", tmp, name);
+ if (ret == -1) {
+ goto out;
+ }
+
+ ret = dict_set_dynstr (options, "transport-type", tmp);
+ if (ret == -1) {
+ goto out;
+ }
+
+ tmp = NULL;
+ ptr = strtok_r (NULL, ",", &saveptr);
+
+ ret = rpcsvc_create_listener (svc, options, transport_name);
+ if (ret != 0) {
+ goto out;
+ }
+
+ GF_FREE (transport_name);
+ transport_name = NULL;
+ count++;
+ }
+
+ ret = dict_set_dynstr (options, "transport-type", transport_type);
+ if (ret == -1) {
+ goto out;
+ }
+
+ transport_type = NULL;
+
+out:
+ GF_FREE (str);
+
+ GF_FREE (transport_type);
+
+ GF_FREE (tmp);
+
+ GF_FREE (transport_name);
+
+ return count;
}
@@ -1840,35 +1707,47 @@ out:
}
-
-int
-rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program)
+inline int
+rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
{
- rpcsvc_program_t *newprog = NULL;
- int ret = -1;
- rpcsvc_listener_t *listener = NULL;
-
- if (!svc)
- return -1;
+ int ret = -1;
+ rpcsvc_program_t *newprog = NULL;
+ char already_registered = 0;
- newprog = GF_CALLOC (1, sizeof(*newprog), gf_common_mt_rpcsvc_program_t);
- if (!newprog)
- return -1;
+ if (!svc) {
+ goto out;
+ }
- if (!program.actors)
- goto free_prog;
+ if (program->actors == NULL) {
+ goto out;
+ }
- memcpy (newprog, &program, sizeof (program));
+ pthread_mutex_lock (&svc->rpclock);
+ {
+ list_for_each_entry (newprog, &svc->programs, program) {
+ if ((newprog->prognum == program->prognum)
+ && (newprog->progver == program->progver)) {
+ already_registered = 1;
+ break;
+ }
+ }
+ }
+ pthread_mutex_unlock (&svc->rpclock);
- listener = svc->listener;
+ if (already_registered) {
+ ret = 0;
+ goto out;
+ }
- ret = rpcsvc_program_register_portmap (newprog, listener->conn);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap registration of"
- " program failed");
- goto free_prog;
+ newprog = GF_CALLOC (1, sizeof(*newprog),gf_common_mt_rpcsvc_program_t);
+ if (newprog == NULL) {
+ goto out;
}
+ memcpy (newprog, program, sizeof (*program));
+
+ INIT_LIST_HEAD (&newprog->program);
+
pthread_mutex_lock (&svc->rpclock);
{
list_add_tail (&newprog->program, &svc->programs);
@@ -1880,12 +1759,11 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program)
" Ver: %d, Port: %d", newprog->progname, newprog->prognum,
newprog->progver, newprog->progport);
-free_prog:
+out:
if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program registration failed:"
- " %s, Num: %d, Ver: %d, Port: %d", newprog->progname,
- newprog->prognum, newprog->progver, newprog->progport);
- GF_FREE (newprog);
+ " %s, Num: %d, Ver: %d, Port: %d", program->progname,
+ program->prognum, program->progver, program->progport);
}
return ret;
@@ -1913,10 +1791,10 @@ build_prog_details (rpcsvc_request_t *req, gf_dump_rsp *rsp)
gf_prog_detail *prog = NULL;
gf_prog_detail *prev = NULL;
- if (!req || !req->conn || !req->conn->svc)
+ if (!req || !req->trans || !req->svc)
goto out;
- list_for_each_entry (program, &req->conn->svc->programs, program) {
+ list_for_each_entry (program, &req->svc->programs, program) {
prog = GF_CALLOC (1, sizeof (*prog), 0);
if (!prog)
goto out;
@@ -1939,38 +1817,41 @@ static int
rpcsvc_dump (rpcsvc_request_t *req)
{
char rsp_buf[8 * 1024] = {0,};
- gf_dump_rsp rsp = {0,};
- struct iovec iov = {0,};
- int op_errno = EINVAL;
- int ret = -1;
+ gf_dump_rsp rsp = {0,};
+ struct iovec iov = {0,};
+ int op_errno = EINVAL;
+ int ret = -1;
+ uint32_t dump_rsp_len = 0;
if (!req)
- goto fail;
+ goto sendrsp;
ret = build_prog_details (req, &rsp);
if (ret < 0) {
op_errno = -ret;
- goto fail;
+ goto sendrsp;
}
-fail:
+ op_errno = 0;
+
+sendrsp:
rsp.op_errno = gf_errno_to_error (op_errno);
rsp.op_ret = ret;
+ dump_rsp_len = xdr_sizeof ((xdrproc_t) xdr_gf_dump_rsp,
+ &rsp);
+
iov.iov_base = rsp_buf;
- iov.iov_len = (8 * 1024);
+ iov.iov_len = dump_rsp_len;
- ret = xdr_serialize_dump_rsp (iov, &rsp);
+ ret = xdr_serialize_generic (iov, &rsp, (xdrproc_t)xdr_gf_dump_rsp);
if (ret < 0) {
- if (req)
- req->rpc_err = GARBAGE_ARGS;
- op_errno = EINVAL;
- goto fail;
+ ret = RPCSVC_ACTOR_ERROR;
+ } else {
+ rpcsvc_submit_generic (req, &iov, 1, NULL, 0, NULL);
+ ret = 0;
}
- ret = rpcsvc_submit_generic (req, &iov, 1, NULL, 0,
- NULL);
-
free_prog_details (&rsp);
return ret;
@@ -1979,19 +1860,217 @@ fail:
int
rpcsvc_init_options (rpcsvc_t *svc, dict_t *options)
{
+ char *optstr = NULL;
+ int ret = -1;
+
+ if ((!svc) || (!options))
+ return -1;
+
svc->memfactor = RPCSVC_DEFAULT_MEMFACTOR;
- return 0;
+
+ svc->register_portmap = _gf_true;
+ if (dict_get (options, "rpc.register-with-portmap")) {
+ ret = dict_get_str (options, "rpc.register-with-portmap",
+ &optstr);
+ if (ret < 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to parse "
+ "dict");
+ goto out;
+ }
+
+ ret = gf_string2boolean (optstr, &svc->register_portmap);
+ if (ret < 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to parse bool "
+ "string");
+ goto out;
+ }
+ }
+
+ if (!svc->register_portmap)
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Portmap registration "
+ "disabled");
+
+ ret = rpcsvc_set_outstanding_rpc_limit (svc, options);
+out:
+ return ret;
}
+int
+rpcsvc_reconfigure_options (rpcsvc_t *svc, dict_t *options)
+{
+ xlator_t *xlator = NULL;
+ xlator_list_t *volentry = NULL;
+ char *srchkey = NULL;
+ char *keyval = NULL;
+ int ret = -1;
+
+ if ((!svc) || (!svc->options) || (!options))
+ return (-1);
+
+ /* Fetch the xlator from svc */
+ xlator = (xlator_t *) svc->mydata;
+ if (!xlator)
+ return (-1);
+
+ /* Reconfigure the volume specific rpc-auth.addr allow part */
+ volentry = xlator->children;
+ while (volentry) {
+ ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.allow",
+ volentry->xlator->name);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return (-1);
+ }
+
+ /* If found the srchkey, delete old key/val pair
+ * and set the key with new value.
+ */
+ if (!dict_get_str (options, srchkey, &keyval)) {
+ dict_del (svc->options, srchkey);
+ ret = dict_set_str (svc->options, srchkey, keyval);
+ if (ret < 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "dict_set_str error");
+ GF_FREE (srchkey);
+ return (-1);
+ }
+ }
+
+ GF_FREE (srchkey);
+ volentry = volentry->next;
+ }
+
+ /* Reconfigure the volume specific rpc-auth.addr reject part */
+ volentry = xlator->children;
+ while (volentry) {
+ ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.reject",
+ volentry->xlator->name);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return (-1);
+ }
+
+ /* If found the srchkey, delete old key/val pair
+ * and set the key with new value.
+ */
+ if (!dict_get_str (options, srchkey, &keyval)) {
+ dict_del (svc->options, srchkey);
+ ret = dict_set_str (svc->options, srchkey, keyval);
+ if (ret < 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR,
+ "dict_set_str error");
+ GF_FREE (srchkey);
+ return (-1);
+ }
+ }
+
+ GF_FREE (srchkey);
+ volentry = volentry->next;
+ }
+
+ ret = rpcsvc_init_options (svc, options);
+ if (ret)
+ return (-1);
+
+ return rpcsvc_auth_reconf (svc, options);
+}
+
+int
+rpcsvc_transport_unix_options_build (dict_t **options, char *filepath)
+{
+ dict_t *dict = NULL;
+ char *fpath = NULL;
+ int ret = -1;
+
+ GF_ASSERT (filepath);
+ GF_ASSERT (options);
+
+ dict = dict_new ();
+ if (!dict)
+ goto out;
+
+ fpath = gf_strdup (filepath);
+ if (!fpath) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_set_dynstr (dict, "transport.socket.listen-path", fpath);
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.address-family", "unix");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.socket.nodelay", "off");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport-type", "socket");
+ if (ret)
+ goto out;
+
+ *options = dict;
+out:
+ if (ret) {
+ GF_FREE (fpath);
+ if (dict)
+ dict_unref (dict);
+ }
+ return ret;
+}
+
+/*
+ * Reconfigure() the rpc.outstanding-rpc-limit param.
+ */
+int
+rpcsvc_set_outstanding_rpc_limit (rpcsvc_t *svc, dict_t *options)
+{
+ int ret = -1; /* FAILURE */
+ int rpclim = 0;
+ static char *rpclimkey = "rpc.outstanding-rpc-limit";
+
+ if ((!svc) || (!options))
+ return (-1);
+
+ /* Reconfigure() the rpc.outstanding-rpc-limit param */
+ ret = dict_get_int32 (options, rpclimkey, &rpclim);
+ if (ret < 0) {
+ /* Fall back to default for FAILURE */
+ rpclim = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT;
+ } else {
+ /* SUCCESS: round off to multiple of 8.
+ * If the input value fails Boundary check, fall back to
+ * default i.e. RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT.
+ * NB: value 0 is special, means its unset i.e. unlimited.
+ */
+ rpclim = ((rpclim + 8 - 1) >> 3) * 8;
+ if (rpclim < RPCSVC_MIN_OUTSTANDING_RPC_LIMIT) {
+ rpclim = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT;
+ } else if (rpclim > RPCSVC_MAX_OUTSTANDING_RPC_LIMIT) {
+ rpclim = RPCSVC_MAX_OUTSTANDING_RPC_LIMIT;
+ }
+ }
+
+ if (svc->outstanding_rpc_limit != rpclim) {
+ svc->outstanding_rpc_limit = rpclim;
+ gf_log (GF_RPCSVC, GF_LOG_INFO,
+ "Configured %s with value %d",
+ rpclimkey, rpclim);
+ }
+
+ return (0);
+}
/* The global RPC service initializer.
*/
rpcsvc_t *
-rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)
+rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options,
+ uint32_t poolcount)
{
- rpcsvc_t *svc = NULL;
- int ret = -1;
- rpcsvc_listener_t *listener = NULL;
+ rpcsvc_t *svc = NULL;
+ int ret = -1;
if ((!ctx) || (!options))
return NULL;
@@ -2012,6 +2091,17 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)
goto free_svc;
}
+ if (!poolcount)
+ poolcount = RPCSVC_POOLCOUNT_MULT * svc->memfactor;
+
+ gf_log (GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount);
+ svc->rxpool = mem_pool_new (rpcsvc_request_t, poolcount);
+ /* TODO: leak */
+ if (!svc->rxpool) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "mem pool allocation failed");
+ goto free_svc;
+ }
+
ret = rpcsvc_auth_init (svc, options);
if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to init "
@@ -2022,38 +2112,18 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)
ret = -1;
svc->options = options;
svc->ctx = ctx;
+ svc->mydata = xl;
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "RPC service inited.");
- /* One listen port per RPC */
- listener = rpcsvc_get_listener (svc, 0);
- if (!listener) {
- /* FIXME: listener is given the name of first program that
- * creates it. This is not always correct. For eg., multiple
- * programs can be listening on the same listener
- * (glusterfs 3.1.0, 3.1.2, 3.1.3 etc).
- */
- listener = rpcsvc_create_listener (svc, options, "RPC");
- if (!listener) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "creation of listener"
- " for program failed");
- goto free_svc;
- }
- }
+ gluster_dump_prog.options = options;
- if (!listener->conn) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "listener with no connection "
- "found");
- goto free_svc;
- }
-
- svc->listener = listener;
-
- ret = rpcsvc_program_register (svc, gluster_dump_prog);
+ ret = rpcsvc_program_register (svc, &gluster_dump_prog);
if (ret) {
gf_log (GF_RPCSVC, GF_LOG_ERROR,
"failed to register DUMP program");
goto free_svc;
}
+
ret = 0;
free_svc:
if (ret == -1) {
@@ -2065,10 +2135,339 @@ free_svc:
}
+int
+rpcsvc_transport_peer_check_search (dict_t *options, char *pattern,
+ char *ip, char *hostname)
+{
+ int ret = -1;
+ char *addrtok = NULL;
+ char *addrstr = NULL;
+ char *dup_addrstr = NULL;
+ char *svptr = NULL;
+
+ if ((!options) || (!ip))
+ return -1;
+
+ ret = dict_get_str (options, pattern, &addrstr);
+ if (ret < 0) {
+ ret = -1;
+ goto err;
+ }
+
+ if (!addrstr) {
+ ret = -1;
+ goto err;
+ }
+
+ dup_addrstr = gf_strdup (addrstr);
+ addrtok = strtok_r (dup_addrstr, ",", &svptr);
+ while (addrtok) {
+
+ /* CASEFOLD not present on Solaris */
+#ifdef FNM_CASEFOLD
+ ret = fnmatch (addrtok, ip, FNM_CASEFOLD);
+#else
+ ret = fnmatch (addrtok, ip, 0);
+#endif
+ if (ret == 0)
+ goto err;
+
+ /* compare hostnames if applicable */
+ if (hostname) {
+#ifdef FNM_CASEFOLD
+ ret = fnmatch (addrtok, hostname, FNM_CASEFOLD);
+#else
+ ret = fnmatch (addrtok, hostname, 0);
+#endif
+ if (ret == 0)
+ goto err;
+ }
+
+ addrtok = strtok_r (NULL, ",", &svptr);
+ }
+
+ ret = -1;
+err:
+ GF_FREE (dup_addrstr);
+
+ return ret;
+}
+
+
+static int
+rpcsvc_transport_peer_check_allow (dict_t *options, char *volname,
+ char *ip, char *hostname)
+{
+ int ret = RPCSVC_AUTH_DONTCARE;
+ char *srchstr = NULL;
+
+ if ((!options) || (!ip) || (!volname))
+ return ret;
+
+ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ ret = RPCSVC_AUTH_DONTCARE;
+ goto out;
+ }
+
+ ret = rpcsvc_transport_peer_check_search (options, srchstr,
+ ip, hostname);
+ GF_FREE (srchstr);
+
+ if (ret == 0)
+ ret = RPCSVC_AUTH_ACCEPT;
+ else
+ ret = RPCSVC_AUTH_REJECT;
+out:
+ return ret;
+}
+
+static int
+rpcsvc_transport_peer_check_reject (dict_t *options, char *volname,
+ char *ip, char *hostname)
+{
+ int ret = RPCSVC_AUTH_DONTCARE;
+ char *srchstr = NULL;
+
+ if ((!options) || (!ip) || (!volname))
+ return ret;
+
+ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject",
+ volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ ret = RPCSVC_AUTH_REJECT;
+ goto out;
+ }
+
+ ret = rpcsvc_transport_peer_check_search (options, srchstr,
+ ip, hostname);
+ GF_FREE (srchstr);
+
+ if (ret == 0)
+ ret = RPCSVC_AUTH_REJECT;
+ else
+ ret = RPCSVC_AUTH_DONTCARE;
+out:
+ return ret;
+}
+
+
+/* Combines rpc auth's allow and reject options.
+ * Order of checks is important.
+ * First, REJECT if either rejects.
+ * If neither rejects, ACCEPT if either accepts.
+ * If neither accepts, DONTCARE
+ */
+int
+rpcsvc_combine_allow_reject_volume_check (int allow, int reject)
+{
+ if (allow == RPCSVC_AUTH_REJECT ||
+ reject == RPCSVC_AUTH_REJECT)
+ return RPCSVC_AUTH_REJECT;
+
+ if (allow == RPCSVC_AUTH_ACCEPT ||
+ reject == RPCSVC_AUTH_ACCEPT)
+ return RPCSVC_AUTH_ACCEPT;
+
+ return RPCSVC_AUTH_DONTCARE;
+}
+
+int
+rpcsvc_auth_check (rpcsvc_t *svc, char *volname,
+ rpc_transport_t *trans)
+{
+ int ret = RPCSVC_AUTH_REJECT;
+ int accept = RPCSVC_AUTH_REJECT;
+ int reject = RPCSVC_AUTH_REJECT;
+ char *hostname = NULL;
+ char *ip = NULL;
+ char client_ip[RPCSVC_PEER_STRLEN] = {0};
+ char *allow_str = NULL;
+ char *reject_str = NULL;
+ char *srchstr = NULL;
+ dict_t *options = NULL;
+
+ if (!svc || !volname || !trans)
+ return ret;
+
+ /* Fetch the options from svc struct and validate */
+ options = svc->options;
+ if (!options)
+ return ret;
+
+ ret = rpcsvc_transport_peername (trans, client_ip, RPCSVC_PEER_STRLEN);
+ if (ret != 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: "
+ "%s", gai_strerror (ret));
+ return RPCSVC_AUTH_REJECT;
+ }
+
+ /* Accept if its the default case: Allow all, Reject none
+ * The default volfile always contains a 'allow *' rule
+ * for each volume. If allow rule is missing (which implies
+ * there is some bad volfile generating code doing this), we
+ * assume no one is allowed mounts, and thus, we reject mounts.
+ */
+ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return RPCSVC_AUTH_REJECT;
+ }
+
+ ret = dict_get_str (options, srchstr, &allow_str);
+ GF_FREE (srchstr);
+ if (ret < 0)
+ return RPCSVC_AUTH_REJECT;
+
+ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject", volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return RPCSVC_AUTH_REJECT;
+ }
+
+ ret = dict_get_str (options, srchstr, &reject_str);
+ GF_FREE (srchstr);
+ if (reject_str == NULL && !strcmp ("*", allow_str))
+ return RPCSVC_AUTH_ACCEPT;
+
+ /* Non-default rule, authenticate */
+ if (!get_host_name (client_ip, &ip))
+ ip = client_ip;
+
+ /* addr-namelookup check */
+ if (svc->addr_namelookup == _gf_true) {
+ ret = gf_get_hostname_from_ip (ip, &hostname);
+ if (ret) {
+ if (hostname)
+ GF_FREE (hostname);
+ /* failed to get hostname, but hostname auth
+ * is enabled, so authentication will not be
+ * 100% correct. reject mounts
+ */
+ return RPCSVC_AUTH_REJECT;
+ }
+ }
+
+ accept = rpcsvc_transport_peer_check_allow (options, volname,
+ ip, hostname);
+
+ reject = rpcsvc_transport_peer_check_reject (options, volname,
+ ip, hostname);
+
+ if (hostname)
+ GF_FREE (hostname);
+ return rpcsvc_combine_allow_reject_volume_check (accept, reject);
+}
+
+int
+rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname,
+ rpc_transport_t *trans)
+{
+ union gf_sock_union sock_union;
+ int ret = RPCSVC_AUTH_REJECT;
+ socklen_t sinsize = sizeof (&sock_union.sin);
+ char *srchstr = NULL;
+ char *valstr = NULL;
+ uint16_t port = 0;
+ gf_boolean_t insecure = _gf_false;
+
+ memset (&sock_union, 0, sizeof (sock_union));
+
+ if ((!svc) || (!volname) || (!trans))
+ return ret;
+
+ ret = rpcsvc_transport_peeraddr (trans, NULL, 0, &sock_union.storage,
+ sinsize);
+ if (ret != 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get peer addr: %s",
+ gai_strerror (ret));
+ ret = RPCSVC_AUTH_REJECT;
+ goto err;
+ }
+
+ port = ntohs (sock_union.sin.sin_port);
+ gf_log (GF_RPCSVC, GF_LOG_TRACE, "Client port: %d", (int)port);
+ /* If the port is already a privileged one, dont bother with checking
+ * options.
+ */
+ if (port <= 1024) {
+ ret = RPCSVC_AUTH_ACCEPT;
+ goto err;
+ }
+
+ /* Disabled by default */
+ ret = gf_asprintf (&srchstr, "rpc-auth.ports.%s.insecure", volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ ret = RPCSVC_AUTH_REJECT;
+ goto err;
+ }
+
+ ret = dict_get_str (svc->options, srchstr, &valstr);
+ if (ret) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
+ " read rpc-auth.ports.insecure value");
+ goto err;
+ }
+
+ ret = gf_string2boolean (valstr, &insecure);
+ if (ret) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
+ " convert rpc-auth.ports.insecure value");
+ goto err;
+ }
+
+ ret = insecure ? RPCSVC_AUTH_ACCEPT : RPCSVC_AUTH_REJECT;
+
+ if (ret == RPCSVC_AUTH_ACCEPT)
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port allowed");
+ else
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port not"
+ " allowed");
+
+err:
+ if (srchstr)
+ GF_FREE (srchstr);
+
+ return ret;
+}
+
+
+char *
+rpcsvc_volume_allowed (dict_t *options, char *volname)
+{
+ char globalrule[] = "rpc-auth.addr.allow";
+ char *srchstr = NULL;
+ char *addrstr = NULL;
+ int ret = -1;
+
+ if ((!options) || (!volname))
+ return NULL;
+
+ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ goto out;
+ }
+
+ if (!dict_get (options, srchstr))
+ ret = dict_get_str (options, globalrule, &addrstr);
+ else
+ ret = dict_get_str (options, srchstr, &addrstr);
+
+out:
+ GF_FREE (srchstr);
+
+ return addrstr;
+}
+
+
rpcsvc_actor_t gluster_dump_actors[] = {
- [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, NULL },
- [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, NULL },
- [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, NULL },
+ [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA},
+ [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA},
+ [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0, DRC_NA},
};