summaryrefslogtreecommitdiffstats
path: root/xlators
diff options
context:
space:
mode:
authorAmar Tumballi <amar@gluster.com>2010-05-04 00:36:24 +0000
committerAnand V. Avati <avati@dev.gluster.com>2010-05-03 23:40:05 -0700
commitf75b76350747f5f58a4bbe704915c74979cc5ac3 (patch)
treeaf828761f51a8aa2612aa5eb9c9f0709869675c8 /xlators
parent7504b0e623914d097933f0a613ba50e376131828 (diff)
structuring of protocol - 2
* 'transports/' and 'auth/' moved to xlators/protocol/ * transport.{c,h}, authenticate.{c,h}, protocol.h moved to xlators/protocol/lib/src/ Signed-off-by: Amar Tumballi <amar@gluster.com> Signed-off-by: Anand V. Avati <avati@dev.gluster.com> BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875
Diffstat (limited to 'xlators')
-rw-r--r--xlators/nfs/lib/src/rpcsvc.h1
-rw-r--r--xlators/protocol/Makefile.am2
-rw-r--r--xlators/protocol/auth/Makefile.am3
-rw-r--r--xlators/protocol/auth/addr/Makefile.am3
-rw-r--r--xlators/protocol/auth/addr/src/Makefile.am14
-rw-r--r--xlators/protocol/auth/addr/src/addr.c224
-rw-r--r--xlators/protocol/auth/login/Makefile.am3
-rw-r--r--xlators/protocol/auth/login/src/Makefile.am15
-rw-r--r--xlators/protocol/auth/login/src/login.c114
-rw-r--r--xlators/protocol/client/src/Makefile.am6
-rw-r--r--xlators/protocol/lib/Makefile.am3
-rw-r--r--xlators/protocol/lib/src/Makefile.am15
-rw-r--r--xlators/protocol/lib/src/authenticate.c250
-rw-r--r--xlators/protocol/lib/src/authenticate.h61
-rw-r--r--xlators/protocol/lib/src/protocol.h1114
-rw-r--r--xlators/protocol/lib/src/transport.c422
-rw-r--r--xlators/protocol/lib/src/transport.h106
-rw-r--r--xlators/protocol/server/src/Makefile.am5
-rw-r--r--xlators/protocol/transport/Makefile.am3
-rw-r--r--xlators/protocol/transport/ib-verbs/Makefile.am1
-rw-r--r--xlators/protocol/transport/ib-verbs/src/Makefile.am19
-rw-r--r--xlators/protocol/transport/ib-verbs/src/ib-verbs-mem-types.h39
-rw-r--r--xlators/protocol/transport/ib-verbs/src/ib-verbs.c2613
-rw-r--r--xlators/protocol/transport/ib-verbs/src/ib-verbs.h220
-rw-r--r--xlators/protocol/transport/ib-verbs/src/name.c712
-rw-r--r--xlators/protocol/transport/ib-verbs/src/name.h47
-rw-r--r--xlators/protocol/transport/socket/Makefile.am1
-rw-r--r--xlators/protocol/transport/socket/src/Makefile.am19
-rw-r--r--xlators/protocol/transport/socket/src/name.c737
-rw-r--r--xlators/protocol/transport/socket/src/name.h44
-rw-r--r--xlators/protocol/transport/socket/src/socket-mem-types.h36
-rw-r--r--xlators/protocol/transport/socket/src/socket.c1552
-rw-r--r--xlators/protocol/transport/socket/src/socket.h125
33 files changed, 8523 insertions, 6 deletions
diff --git a/xlators/nfs/lib/src/rpcsvc.h b/xlators/nfs/lib/src/rpcsvc.h
index 2746288f8..6e6dc9bc9 100644
--- a/xlators/nfs/lib/src/rpcsvc.h
+++ b/xlators/nfs/lib/src/rpcsvc.h
@@ -27,7 +27,6 @@
#endif
#include "event.h"
-#include "transport.h"
#include "logging.h"
#include "dict.h"
#include "mem-pool.h"
diff --git a/xlators/protocol/Makefile.am b/xlators/protocol/Makefile.am
index 745e277c2..bef0c6624 100644
--- a/xlators/protocol/Makefile.am
+++ b/xlators/protocol/Makefile.am
@@ -1,3 +1,3 @@
-SUBDIRS = client server
+SUBDIRS = lib transport client server auth
CLEANFILES =
diff --git a/xlators/protocol/auth/Makefile.am b/xlators/protocol/auth/Makefile.am
new file mode 100644
index 000000000..6bd54eee3
--- /dev/null
+++ b/xlators/protocol/auth/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = addr login
+
+CLEANFILES =
diff --git a/xlators/protocol/auth/addr/Makefile.am b/xlators/protocol/auth/addr/Makefile.am
new file mode 100644
index 000000000..d471a3f92
--- /dev/null
+++ b/xlators/protocol/auth/addr/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/protocol/auth/addr/src/Makefile.am b/xlators/protocol/auth/addr/src/Makefile.am
new file mode 100644
index 000000000..9b053a846
--- /dev/null
+++ b/xlators/protocol/auth/addr/src/Makefile.am
@@ -0,0 +1,14 @@
+auth_LTLIBRARIES = addr.la
+authdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/auth
+
+addr_la_LDFLAGS = -module -avoidversion
+
+addr_la_SOURCES = addr.c
+addr_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ $(top_builddir)/xlators/protocol/lib/src/libgfproto.la
+
+AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\
+ -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) \
+ -I$(top_srcdir)/xlators/protocol/lib/src
+
+CLEANFILES =
diff --git a/xlators/protocol/auth/addr/src/addr.c b/xlators/protocol/auth/addr/src/addr.c
new file mode 100644
index 000000000..a8803a39f
--- /dev/null
+++ b/xlators/protocol/auth/addr/src/addr.c
@@ -0,0 +1,224 @@
+/*
+ Copyright (c) 2007-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include <fnmatch.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include "authenticate.h"
+#include "dict.h"
+
+#define ADDR_DELIMITER " ,"
+#define PRIVILEGED_PORT_CEILING 1024
+
+#ifndef AF_INET_SDP
+#define AF_INET_SDP 27
+#endif
+
+auth_result_t
+gf_auth (dict_t *input_params, dict_t *config_params)
+{
+ int ret = 0;
+ char *name = NULL;
+ char *searchstr = NULL;
+ char peer_addr[UNIX_PATH_MAX];
+ data_t *peer_info_data = NULL;
+ peer_info_t *peer_info = NULL;
+ data_t *allow_addr = NULL, *reject_addr = NULL;
+ char is_inet_sdp = 0;
+
+ name = data_to_str (dict_get (input_params, "remote-subvolume"));
+ if (!name) {
+ gf_log ("authenticate/addr",
+ GF_LOG_ERROR,
+ "remote-subvolume not specified");
+ return AUTH_DONT_CARE;
+ }
+
+ ret = asprintf (&searchstr, "auth.addr.%s.allow", name);
+ if (-1 == ret) {
+ gf_log ("auth/addr", GF_LOG_ERROR,
+ "asprintf failed while setting search string");
+ return AUTH_DONT_CARE;
+ }
+ allow_addr = dict_get (config_params,
+ searchstr);
+ free (searchstr);
+
+ ret = asprintf (&searchstr, "auth.addr.%s.reject", name);
+ if (-1 == ret) {
+ gf_log ("auth/addr", GF_LOG_ERROR,
+ "asprintf failed while setting search string");
+ return AUTH_DONT_CARE;
+ }
+ reject_addr = dict_get (config_params,
+ searchstr);
+ free (searchstr);
+
+ if (!allow_addr) {
+ /* TODO: backword compatibility */
+ ret = asprintf (&searchstr, "auth.ip.%s.allow", name);
+ if (-1 == ret) {
+ gf_log ("auth/addr", GF_LOG_ERROR,
+ "asprintf failed while setting search string");
+ return AUTH_DONT_CARE;
+ }
+ allow_addr = dict_get (config_params, searchstr);
+ free (searchstr);
+ }
+
+ if (!(allow_addr || reject_addr)) {
+ gf_log ("auth/addr", GF_LOG_DEBUG,
+ "none of the options auth.addr.%s.allow or "
+ "auth.addr.%s.reject specified, returning auth_dont_care",
+ name, name);
+ return AUTH_DONT_CARE;
+ }
+
+ peer_info_data = dict_get (input_params, "peer-info");
+ if (!peer_info_data) {
+ gf_log ("authenticate/addr",
+ GF_LOG_ERROR,
+ "peer-info not present");
+ return AUTH_DONT_CARE;
+ }
+
+ peer_info = data_to_ptr (peer_info_data);
+
+ switch (((struct sockaddr *) &peer_info->sockaddr)->sa_family)
+ {
+ case AF_INET_SDP:
+ is_inet_sdp = 1;
+ ((struct sockaddr *) &peer_info->sockaddr)->sa_family = AF_INET;
+
+ case AF_INET:
+ case AF_INET6:
+ {
+ char *service;
+ uint16_t peer_port;
+ strcpy (peer_addr, peer_info->identifier);
+ service = strrchr (peer_addr, ':');
+ *service = '\0';
+ service ++;
+
+ if (is_inet_sdp) {
+ ((struct sockaddr *) &peer_info->sockaddr)->sa_family = AF_INET_SDP;
+ }
+
+ peer_port = atoi (service);
+ if (peer_port >= PRIVILEGED_PORT_CEILING) {
+ gf_log ("auth/addr", GF_LOG_ERROR,
+ "client is bound to port %d which is not privileged",
+ peer_port);
+ return AUTH_DONT_CARE;
+ }
+ break;
+
+ case AF_UNIX:
+ strcpy (peer_addr, peer_info->identifier);
+ break;
+
+ default:
+ gf_log ("authenticate/addr", GF_LOG_ERROR,
+ "unknown address family %d",
+ ((struct sockaddr *) &peer_info->sockaddr)->sa_family);
+ return AUTH_DONT_CARE;
+ }
+ }
+
+ if (reject_addr) {
+ char *addr_str = NULL;
+ char *tmp;
+ char *addr_cpy = strdup (reject_addr->data);
+
+ addr_str = strtok_r (addr_cpy, ADDR_DELIMITER, &tmp);
+
+ while (addr_str) {
+ char negate = 0, match =0;
+ gf_log (name, GF_LOG_DEBUG,
+ "rejected = \"%s\", received addr = \"%s\"",
+ addr_str, peer_addr);
+ if (addr_str[0] == '!') {
+ negate = 1;
+ addr_str++;
+ }
+
+ match = fnmatch (addr_str,
+ peer_addr,
+ 0);
+ if (negate ? match : !match) {
+ free (addr_cpy);
+ return AUTH_REJECT;
+ }
+ addr_str = strtok_r (NULL, ADDR_DELIMITER, &tmp);
+ }
+ free (addr_cpy);
+ }
+
+ if (allow_addr) {
+ char *addr_str = NULL;
+ char *tmp;
+ char *addr_cpy = strdup (allow_addr->data);
+
+ addr_str = strtok_r (addr_cpy, ADDR_DELIMITER, &tmp);
+
+ while (addr_str) {
+ char negate = 0, match = 0;
+ gf_log (name, GF_LOG_DEBUG,
+ "allowed = \"%s\", received addr = \"%s\"",
+ addr_str, peer_addr);
+ if (addr_str[0] == '!') {
+ negate = 1;
+ addr_str++;
+ }
+
+ match = fnmatch (addr_str,
+ peer_addr,
+ 0);
+
+ if (negate ? match : !match) {
+ free (addr_cpy);
+ return AUTH_ACCEPT;
+ }
+ addr_str = strtok_r (NULL, ADDR_DELIMITER, &tmp);
+ }
+ free (addr_cpy);
+ }
+
+ return AUTH_DONT_CARE;
+}
+
+struct volume_options options[] = {
+ { .key = {"auth.addr.*.allow"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"auth.addr.*.reject"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ /* Backword compatibility */
+ { .key = {"auth.ip.*.allow"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {NULL} }
+};
diff --git a/xlators/protocol/auth/login/Makefile.am b/xlators/protocol/auth/login/Makefile.am
new file mode 100644
index 000000000..d471a3f92
--- /dev/null
+++ b/xlators/protocol/auth/login/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/protocol/auth/login/src/Makefile.am b/xlators/protocol/auth/login/src/Makefile.am
new file mode 100644
index 000000000..4a50e07d3
--- /dev/null
+++ b/xlators/protocol/auth/login/src/Makefile.am
@@ -0,0 +1,15 @@
+auth_LTLIBRARIES = login.la
+authdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/auth
+
+login_la_LDFLAGS = -module -avoidversion
+
+login_la_SOURCES = login.c
+login_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ $(top_builddir)/xlators/protocol/lib/src/libgfproto.la
+
+
+AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\
+ -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) \
+ -I$(top_srcdir)/xlators/protocol/lib/src
+
+CLEANFILES =
diff --git a/xlators/protocol/auth/login/src/login.c b/xlators/protocol/auth/login/src/login.c
new file mode 100644
index 000000000..0c85292f7
--- /dev/null
+++ b/xlators/protocol/auth/login/src/login.c
@@ -0,0 +1,114 @@
+/*
+ Copyright (c) 2007-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include <fnmatch.h>
+#include "authenticate.h"
+
+auth_result_t gf_auth (dict_t *input_params, dict_t *config_params)
+{
+ int ret = 0;
+ char *username = NULL, *password = NULL;
+ data_t *allow_user = NULL, *username_data = NULL, *password_data = NULL;
+ int32_t result = AUTH_DONT_CARE;
+ char *brick_name = NULL, *searchstr = NULL;
+
+ username_data = dict_get (input_params, "username");
+ if (!username_data)
+ return AUTH_DONT_CARE;
+
+ username = data_to_str (username_data);
+
+ password_data = dict_get (input_params, "password");
+ if (!password_data)
+ return AUTH_DONT_CARE;
+
+ password = data_to_str (password_data);
+
+ brick_name = data_to_str (dict_get (input_params, "remote-subvolume"));
+ if (!brick_name) {
+ gf_log ("auth/login",
+ GF_LOG_ERROR,
+ "remote-subvolume not specified");
+ return AUTH_REJECT;
+ }
+
+ ret = asprintf (&searchstr, "auth.login.%s.allow", brick_name);
+ if (-1 == ret) {
+ gf_log ("auth/login", GF_LOG_ERROR,
+ "asprintf failed while setting search string");
+ return AUTH_DONT_CARE;
+ }
+
+ allow_user = dict_get (config_params,
+ searchstr);
+ free (searchstr);
+
+ if (allow_user) {
+ char *username_str = NULL;
+ char *tmp;
+ char *username_cpy = strdup (allow_user->data);
+
+ username_str = strtok_r (username_cpy, " ,", &tmp);
+
+ while (username_str) {
+ data_t *passwd_data = NULL;
+ if (!fnmatch (username_str,
+ username,
+ 0)) {
+ ret = asprintf (&searchstr, "auth.login.%s.password", username);
+ if (-1 == ret) {
+ gf_log ("auth/login", GF_LOG_ERROR,
+ "asprintf failed while setting search string");
+ return AUTH_DONT_CARE;
+ }
+ passwd_data = dict_get (config_params, searchstr);
+ FREE (searchstr);
+
+ if (!passwd_data) {
+ gf_log ("auth/login",
+ GF_LOG_DEBUG,
+ "wrong username/password combination");
+ result = AUTH_REJECT;
+ }
+ else
+ result = !strcmp (data_to_str (passwd_data), password) ? AUTH_ACCEPT : AUTH_REJECT;
+ break;
+ }
+ username_str = strtok_r (NULL, " ,", &tmp);
+ }
+ free (username_cpy);
+ }
+
+ return result;
+}
+
+struct volume_options options[] = {
+ { .key = {"auth.login.*.allow"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"auth.login.*.password"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {NULL} }
+};
diff --git a/xlators/protocol/client/src/Makefile.am b/xlators/protocol/client/src/Makefile.am
index fb720942c..722d62e3a 100644
--- a/xlators/protocol/client/src/Makefile.am
+++ b/xlators/protocol/client/src/Makefile.am
@@ -5,12 +5,14 @@ xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/protocol
client_la_LDFLAGS = -module -avoidversion
client_la_SOURCES = client-protocol.c saved-frames.c
-client_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+client_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ $(top_builddir)/xlators/protocol/lib/src/libgfproto.la
noinst_HEADERS = client-protocol.h saved-frames.h
AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS) \
- -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS)
+ -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) \
+ -I$(top_srcdir)/xlators/protocol/lib/src
CLEANFILES =
diff --git a/xlators/protocol/lib/Makefile.am b/xlators/protocol/lib/Makefile.am
new file mode 100644
index 000000000..d471a3f92
--- /dev/null
+++ b/xlators/protocol/lib/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/protocol/lib/src/Makefile.am b/xlators/protocol/lib/src/Makefile.am
new file mode 100644
index 000000000..d3d1aafe1
--- /dev/null
+++ b/xlators/protocol/lib/src/Makefile.am
@@ -0,0 +1,15 @@
+libgfproto_la_CFLAGS = -fPIC -Wall -g -shared -nostartfiles $(GF_CFLAGS) $(GF_DARWIN_LIBGLUSTERFS_CFLAGS)
+
+libgfproto_la_CPPFLAGS = -D_FILE_OFFSET_BITS=64 -D__USE_FILE_OFFSET64 -D_GNU_SOURCE \
+ -D$(GF_HOST_OS) -DLIBDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/auth\" \
+ -DTRANSPORTDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/transport\" \
+ -I$(CONTRIBDIR)/rbtree -I$(top_srcdir)/libglusterfs/src/
+
+libgfproto_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+lib_LTLIBRARIES = libgfproto.la
+
+
+libgfproto_la_SOURCES = transport.c authenticate.c
+
+noinst_HEADERS = transport.h protocol.h authenticate.h
diff --git a/xlators/protocol/lib/src/authenticate.c b/xlators/protocol/lib/src/authenticate.c
new file mode 100644
index 000000000..eb0e2464c
--- /dev/null
+++ b/xlators/protocol/lib/src/authenticate.c
@@ -0,0 +1,250 @@
+/*
+ Copyright (c) 2007-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+
+#include <stdio.h>
+#include <dlfcn.h>
+#include <errno.h>
+#include "authenticate.h"
+
+static void
+init (dict_t *this,
+ char *key,
+ data_t *value,
+ void *data)
+{
+ void *handle = NULL;
+ char *auth_file = NULL;
+ auth_handle_t *auth_handle = NULL;
+ auth_fn_t authenticate = NULL;
+ int *error = NULL;
+ int ret = 0;
+
+ /* It gets over written */
+ error = data;
+
+ if (!strncasecmp (key, "ip", strlen ("ip"))) {
+ gf_log ("authenticate", GF_LOG_ERROR,
+ "AUTHENTICATION MODULE \"IP\" HAS BEEN REPLACED "
+ "BY \"ADDR\"");
+ dict_set (this, key, data_from_dynptr (NULL, 0));
+ /* TODO: 1.3.x backword compatibility */
+ // *error = -1;
+ // return;
+ key = "addr";
+ }
+
+ ret = gf_asprintf (&auth_file, "%s/%s.so", LIBDIR, key);
+ if (-1 == ret) {
+ gf_log ("authenticate", GF_LOG_ERROR, "asprintf failed");
+ dict_set (this, key, data_from_dynptr (NULL, 0));
+ *error = -1;
+ return;
+ }
+
+ handle = dlopen (auth_file, RTLD_LAZY);
+ if (!handle) {
+ gf_log ("authenticate", GF_LOG_ERROR, "dlopen(%s): %s\n",
+ auth_file, dlerror ());
+ dict_set (this, key, data_from_dynptr (NULL, 0));
+ GF_FREE (auth_file);
+ *error = -1;
+ return;
+ }
+ GF_FREE (auth_file);
+
+ authenticate = dlsym (handle, "gf_auth");
+ if (!authenticate) {
+ gf_log ("authenticate", GF_LOG_ERROR,
+ "dlsym(gf_auth) on %s\n", dlerror ());
+ dict_set (this, key, data_from_dynptr (NULL, 0));
+ *error = -1;
+ return;
+ }
+
+ auth_handle = GF_CALLOC (1, sizeof (*auth_handle),
+ gf_common_mt_auth_handle_t);
+ if (!auth_handle) {
+ gf_log ("authenticate", GF_LOG_ERROR, "Out of memory");
+ dict_set (this, key, data_from_dynptr (NULL, 0));
+ *error = -1;
+ return;
+ }
+ auth_handle->vol_opt = GF_CALLOC (1, sizeof (volume_opt_list_t),
+ gf_common_mt_volume_opt_list_t);
+ auth_handle->vol_opt->given_opt = dlsym (handle, "options");
+ if (auth_handle->vol_opt->given_opt == NULL) {
+ gf_log ("authenticate", GF_LOG_DEBUG,
+ "volume option validation not specified");
+ }
+
+ auth_handle->authenticate = authenticate;
+ auth_handle->handle = handle;
+
+ dict_set (this, key,
+ data_from_dynptr (auth_handle, sizeof (*auth_handle)));
+}
+
+static void
+fini (dict_t *this,
+ char *key,
+ data_t *value,
+ void *data)
+{
+ auth_handle_t *handle = data_to_ptr (value);
+ if (handle) {
+ dlclose (handle->handle);
+ }
+}
+
+int32_t
+gf_auth_init (xlator_t *xl, dict_t *auth_modules)
+{
+ int ret = 0;
+ auth_handle_t *handle = NULL;
+ data_pair_t *pair = NULL;
+ dict_foreach (auth_modules, init, &ret);
+ if (!ret) {
+ pair = auth_modules->members_list;
+ while (pair) {
+ handle = data_to_ptr (pair->value);
+ if (handle) {
+ list_add_tail (&(handle->vol_opt->list),
+ &(xl->volume_options));
+ if (-1 ==
+ validate_xlator_volume_options (xl,
+ handle->vol_opt->given_opt)) {
+ gf_log ("authenticate", GF_LOG_ERROR,
+ "volume option validation "
+ "failed");
+ ret = -1;
+ }
+ }
+ pair = pair->next;
+ }
+ }
+ if (ret) {
+ gf_log (xl->name, GF_LOG_ERROR, "authentication init failed");
+ dict_foreach (auth_modules, fini, &ret);
+ ret = -1;
+ }
+ return ret;
+}
+
+static dict_t *__input_params;
+static dict_t *__config_params;
+
+void
+map (dict_t *this,
+ char *key,
+ data_t *value,
+ void *data)
+{
+ dict_t *res = data;
+ auth_fn_t authenticate;
+ auth_handle_t *handle = NULL;
+
+ if (value && (handle = data_to_ptr (value)) &&
+ (authenticate = handle->authenticate)) {
+ dict_set (res, key,
+ int_to_data (authenticate (__input_params,
+ __config_params)));
+ } else {
+ dict_set (res, key, int_to_data (AUTH_DONT_CARE));
+ }
+}
+
+void
+reduce (dict_t *this,
+ char *key,
+ data_t *value,
+ void *data)
+{
+ int64_t val = 0;
+ int64_t *res = data;
+ if (!data)
+ return;
+
+ val = data_to_int64 (value);
+ switch (val)
+ {
+ case AUTH_ACCEPT:
+ if (AUTH_DONT_CARE == *res)
+ *res = AUTH_ACCEPT;
+ break;
+
+ case AUTH_REJECT:
+ *res = AUTH_REJECT;
+ break;
+
+ case AUTH_DONT_CARE:
+ break;
+ }
+}
+
+
+auth_result_t
+gf_authenticate (dict_t *input_params,
+ dict_t *config_params,
+ dict_t *auth_modules)
+{
+ dict_t *results = NULL;
+ int64_t result = AUTH_DONT_CARE;
+
+ results = get_new_dict ();
+ __input_params = input_params;
+ __config_params = config_params;
+
+ dict_foreach (auth_modules, map, results);
+
+ dict_foreach (results, reduce, &result);
+ if (AUTH_DONT_CARE == result) {
+ data_t *peerinfo_data = dict_get (input_params, "peer-info");
+ char *name = NULL;
+
+ if (peerinfo_data) {
+ peer_info_t *peerinfo = data_to_ptr (peerinfo_data);
+ name = peerinfo->identifier;
+ }
+
+ gf_log ("auth", GF_LOG_ERROR,
+ "no authentication module is interested in "
+ "accepting remote-client %s", name);
+ result = AUTH_REJECT;
+ }
+
+ dict_destroy (results);
+ return result;
+}
+
+void
+gf_auth_fini (dict_t *auth_modules)
+{
+ int32_t dummy;
+
+ dict_foreach (auth_modules, fini, &dummy);
+}
diff --git a/xlators/protocol/lib/src/authenticate.h b/xlators/protocol/lib/src/authenticate.h
new file mode 100644
index 000000000..8931f62e6
--- /dev/null
+++ b/xlators/protocol/lib/src/authenticate.h
@@ -0,0 +1,61 @@
+/*
+ Copyright (c) 2007-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _AUTHENTICATE_H
+#define _AUTHENTICATE_H
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+
+#include <stdio.h>
+#include <fnmatch.h>
+#include "dict.h"
+#include "compat.h"
+#include "list.h"
+#include "transport.h"
+#include "xlator.h"
+
+typedef enum {
+ AUTH_ACCEPT,
+ AUTH_REJECT,
+ AUTH_DONT_CARE
+} auth_result_t;
+
+typedef auth_result_t (*auth_fn_t) (dict_t *input_params,
+ dict_t *config_params);
+
+typedef struct {
+ void *handle;
+ auth_fn_t authenticate;
+ volume_opt_list_t *vol_opt;
+} auth_handle_t;
+
+auth_result_t gf_authenticate (dict_t *input_params,
+ dict_t *config_params,
+ dict_t *auth_modules);
+int32_t gf_auth_init (xlator_t *xl, dict_t *auth_modules);
+void gf_auth_fini (dict_t *auth_modules);
+
+#endif /* _AUTHENTICATE_H */
diff --git a/xlators/protocol/lib/src/protocol.h b/xlators/protocol/lib/src/protocol.h
new file mode 100644
index 000000000..6fd291bbe
--- /dev/null
+++ b/xlators/protocol/lib/src/protocol.h
@@ -0,0 +1,1114 @@
+/*
+ Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _PROTOCOL_H
+#define _PROTOCOL_H
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include <inttypes.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/statvfs.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+#include "byte-order.h"
+#include "iatt.h"
+
+/* Any changes in the protocol structure or adding new '[f,m]ops' needs to
+ * bump the protocol version by "0.1"
+ */
+
+#define GF_PROTOCOL_VERSION "3.0"
+
+struct gf_stat {
+ uint64_t ino;
+ uint64_t size;
+ uint64_t blocks;
+ uint64_t dev;
+ uint32_t rdev;
+ uint32_t mode;
+ uint32_t nlink;
+ uint32_t uid;
+ uint32_t gid;
+ uint32_t blksize;
+ uint32_t atime;
+ uint32_t atime_nsec;
+ uint32_t mtime ;
+ uint32_t mtime_nsec;
+ uint32_t ctime;
+ uint32_t ctime_nsec;
+} __attribute__((packed));
+
+
+static inline void
+gf_stat_to_stat (struct gf_stat *gf_stat, struct stat *stat)
+{
+ stat->st_dev = ntoh64 (gf_stat->dev);
+ stat->st_ino = ntoh64 (gf_stat->ino);
+ stat->st_mode = ntoh32 (gf_stat->mode);
+ stat->st_nlink = ntoh32 (gf_stat->nlink);
+ stat->st_uid = ntoh32 (gf_stat->uid);
+ stat->st_gid = ntoh32 (gf_stat->gid);
+ stat->st_rdev = ntoh32 (gf_stat->rdev);
+ stat->st_size = ntoh64 (gf_stat->size);
+ stat->st_blksize = ntoh32 (gf_stat->blksize);
+ stat->st_blocks = ntoh64 (gf_stat->blocks);
+ stat->st_atime = ntoh32 (gf_stat->atime);
+ stat->st_mtime = ntoh32 (gf_stat->mtime);
+ stat->st_ctime = ntoh32 (gf_stat->ctime);
+ ST_ATIM_NSEC_SET(stat, ntoh32 (gf_stat->atime_nsec));
+ ST_MTIM_NSEC_SET(stat, ntoh32 (gf_stat->mtime_nsec));
+ ST_CTIM_NSEC_SET(stat, ntoh32 (gf_stat->ctime_nsec));
+}
+
+
+static inline void
+gf_stat_from_stat (struct gf_stat *gf_stat, struct stat *stat)
+{
+ gf_stat->dev = hton64 (stat->st_dev);
+ gf_stat->ino = hton64 (stat->st_ino);
+ gf_stat->mode = hton32 (stat->st_mode);
+ gf_stat->nlink = hton32 (stat->st_nlink);
+ gf_stat->uid = hton32 (stat->st_uid);
+ gf_stat->gid = hton32 (stat->st_gid);
+ gf_stat->rdev = hton32 (stat->st_rdev);
+ gf_stat->size = hton64 (stat->st_size);
+ gf_stat->blksize = hton32 (stat->st_blksize);
+ gf_stat->blocks = hton64 (stat->st_blocks);
+ gf_stat->atime = hton32 (stat->st_atime);
+ gf_stat->mtime = hton32 (stat->st_mtime);
+ gf_stat->ctime = hton32 (stat->st_ctime);
+ gf_stat->atime_nsec = hton32 (ST_ATIM_NSEC(stat));
+ gf_stat->mtime_nsec = hton32 (ST_MTIM_NSEC(stat));
+ gf_stat->ctime_nsec = hton32 (ST_CTIM_NSEC(stat));
+}
+
+
+static inline void
+gf_stat_to_iatt (struct gf_stat *gf_stat, struct iatt *iatt)
+{
+ iatt->ia_ino = ntoh64 (gf_stat->ino);
+ iatt->ia_dev = ntoh64 (gf_stat->dev);
+ iatt->ia_type = ia_type_from_st_mode (ntoh32 (gf_stat->mode));
+ iatt->ia_prot = ia_prot_from_st_mode (ntoh32 (gf_stat->mode));
+ iatt->ia_nlink = ntoh32 (gf_stat->nlink);
+ iatt->ia_uid = ntoh32 (gf_stat->uid);
+ iatt->ia_gid = ntoh32 (gf_stat->gid);
+ iatt->ia_rdev = ntoh64 (gf_stat->rdev);
+ iatt->ia_size = ntoh64 (gf_stat->size);
+ iatt->ia_blksize = ntoh32 (gf_stat->blksize);
+ iatt->ia_blocks = ntoh64 (gf_stat->blocks);
+ iatt->ia_atime = ntoh32 (gf_stat->atime);
+ iatt->ia_atime_nsec = ntoh32 (gf_stat->atime_nsec);
+ iatt->ia_mtime = ntoh32 (gf_stat->mtime);
+ iatt->ia_mtime_nsec = ntoh32 (gf_stat->mtime_nsec);
+ iatt->ia_ctime = ntoh32 (gf_stat->ctime);
+ iatt->ia_ctime_nsec = ntoh32 (gf_stat->ctime_nsec);
+
+ iatt->ia_gen = ntoh64 (gf_stat->dev);
+}
+
+
+static inline void
+gf_stat_from_iatt (struct gf_stat *gf_stat, struct iatt *iatt)
+{
+ gf_stat->ino = hton64 (iatt->ia_ino);
+ gf_stat->dev = hton64 (iatt->ia_dev);
+ gf_stat->mode = hton32 (st_mode_from_ia (iatt->ia_prot,
+ iatt->ia_type));
+ gf_stat->nlink = hton32 (iatt->ia_nlink);
+ gf_stat->uid = hton32 (iatt->ia_uid);
+ gf_stat->gid = hton32 (iatt->ia_gid);
+ gf_stat->rdev = hton32 (iatt->ia_rdev);
+ gf_stat->size = hton64 (iatt->ia_size);
+ gf_stat->blksize = hton32 (iatt->ia_blksize);
+ gf_stat->blocks = hton64 (iatt->ia_blocks);
+ gf_stat->atime = hton32 (iatt->ia_atime);
+ gf_stat->atime_nsec = hton32 (iatt->ia_atime_nsec);
+ gf_stat->mtime = hton32 (iatt->ia_mtime);
+ gf_stat->mtime_nsec = hton32 (iatt->ia_mtime_nsec);
+ gf_stat->ctime = hton32 (iatt->ia_ctime);
+ gf_stat->ctime_nsec = hton32 (iatt->ia_ctime_nsec);
+
+ gf_stat->dev = hton64 (iatt->ia_gen);
+
+}
+
+
+struct gf_statfs {
+ uint64_t bsize;
+ uint64_t frsize;
+ uint64_t blocks;
+ uint64_t bfree;
+ uint64_t bavail;
+ uint64_t files;
+ uint64_t ffree;
+ uint64_t favail;
+ uint64_t fsid;
+ uint64_t flag;
+ uint64_t namemax;
+} __attribute__((packed));
+
+
+static inline void
+gf_statfs_to_statfs (struct gf_statfs *gf_stat, struct statvfs *stat)
+{
+ stat->f_bsize = ntoh64 (gf_stat->bsize);
+ stat->f_frsize = ntoh64 (gf_stat->frsize);
+ stat->f_blocks = ntoh64 (gf_stat->blocks);
+ stat->f_bfree = ntoh64 (gf_stat->bfree);
+ stat->f_bavail = ntoh64 (gf_stat->bavail);
+ stat->f_files = ntoh64 (gf_stat->files);
+ stat->f_ffree = ntoh64 (gf_stat->ffree);
+ stat->f_favail = ntoh64 (gf_stat->favail);
+ stat->f_fsid = ntoh64 (gf_stat->fsid);
+ stat->f_flag = ntoh64 (gf_stat->flag);
+ stat->f_namemax = ntoh64 (gf_stat->namemax);
+}
+
+
+static inline void
+gf_statfs_from_statfs (struct gf_statfs *gf_stat, struct statvfs *stat)
+{
+ gf_stat->bsize = hton64 (stat->f_bsize);
+ gf_stat->frsize = hton64 (stat->f_frsize);
+ gf_stat->blocks = hton64 (stat->f_blocks);
+ gf_stat->bfree = hton64 (stat->f_bfree);
+ gf_stat->bavail = hton64 (stat->f_bavail);
+ gf_stat->files = hton64 (stat->f_files);
+ gf_stat->ffree = hton64 (stat->f_ffree);
+ gf_stat->favail = hton64 (stat->f_favail);
+ gf_stat->fsid = hton64 (stat->f_fsid);
+ gf_stat->flag = hton64 (stat->f_flag);
+ gf_stat->namemax = hton64 (stat->f_namemax);
+}
+
+
+struct gf_flock {
+ uint16_t type;
+ uint16_t whence;
+ uint64_t start;
+ uint64_t len;
+ uint32_t pid;
+} __attribute__((packed));
+
+
+static inline void
+gf_flock_to_flock (struct gf_flock *gf_flock, struct flock *flock)
+{
+ flock->l_type = ntoh16 (gf_flock->type);
+ flock->l_whence = ntoh16 (gf_flock->whence);
+ flock->l_start = ntoh64 (gf_flock->start);
+ flock->l_len = ntoh64 (gf_flock->len);
+ flock->l_pid = ntoh32 (gf_flock->pid);
+}
+
+
+static inline void
+gf_flock_from_flock (struct gf_flock *gf_flock, struct flock *flock)
+{
+ gf_flock->type = hton16 (flock->l_type);
+ gf_flock->whence = hton16 (flock->l_whence);
+ gf_flock->start = hton64 (flock->l_start);
+ gf_flock->len = hton64 (flock->l_len);
+ gf_flock->pid = hton32 (flock->l_pid);
+}
+
+
+struct gf_timespec {
+ uint32_t tv_sec;
+ uint32_t tv_nsec;
+} __attribute__((packed));
+
+
+static inline void
+gf_timespec_to_timespec (struct gf_timespec *gf_ts, struct timespec *ts)
+{
+
+ ts[0].tv_sec = ntoh32 (gf_ts[0].tv_sec);
+ ts[0].tv_nsec = ntoh32 (gf_ts[0].tv_nsec);
+ ts[1].tv_sec = ntoh32 (gf_ts[1].tv_sec);
+ ts[1].tv_nsec = ntoh32 (gf_ts[1].tv_nsec);
+}
+
+
+static inline void
+gf_timespec_from_timespec (struct gf_timespec *gf_ts, struct timespec *ts)
+{
+ gf_ts[0].tv_sec = hton32 (ts[0].tv_sec);
+ gf_ts[0].tv_nsec = hton32 (ts[0].tv_nsec);
+ gf_ts[1].tv_sec = hton32 (ts[1].tv_sec);
+ gf_ts[1].tv_nsec = hton32 (ts[1].tv_nsec);
+}
+
+
+#define GF_O_ACCMODE 003
+#define GF_O_RDONLY 00
+#define GF_O_WRONLY 01
+#define GF_O_RDWR 02
+#define GF_O_CREAT 0100
+#define GF_O_EXCL 0200
+#define GF_O_NOCTTY 0400
+#define GF_O_TRUNC 01000
+#define GF_O_APPEND 02000
+#define GF_O_NONBLOCK 04000
+#define GF_O_SYNC 010000
+#define GF_O_ASYNC 020000
+
+#define GF_O_DIRECT 040000
+#define GF_O_DIRECTORY 0200000
+#define GF_O_NOFOLLOW 0400000
+#define GF_O_NOATIME 01000000
+#define GF_O_CLOEXEC 02000000
+
+#define GF_O_LARGEFILE 0100000
+
+#define XLATE_BIT(from, to, bit) do { \
+ if (from & bit) \
+ to = to | GF_##bit; \
+ } while (0)
+
+#define UNXLATE_BIT(from, to, bit) do { \
+ if (from & GF_##bit) \
+ to = to | bit; \
+ } while (0)
+
+#define XLATE_ACCESSMODE(from, to) do { \
+ switch (from & O_ACCMODE) { \
+ case O_RDONLY: to |= GF_O_RDONLY; \
+ break; \
+ case O_WRONLY: to |= GF_O_WRONLY; \
+ break; \
+ case O_RDWR: to |= GF_O_RDWR; \
+ break; \
+ } \
+ } while (0)
+
+#define UNXLATE_ACCESSMODE(from, to) do { \
+ switch (from & GF_O_ACCMODE) { \
+ case GF_O_RDONLY: to |= O_RDONLY; \
+ break; \
+ case GF_O_WRONLY: to |= O_WRONLY; \
+ break; \
+ case GF_O_RDWR: to |= O_RDWR; \
+ break; \
+ } \
+ } while (0)
+
+static inline uint32_t
+gf_flags_from_flags (uint32_t flags)
+{
+ uint32_t gf_flags = 0;
+
+ XLATE_ACCESSMODE (flags, gf_flags);
+
+ XLATE_BIT (flags, gf_flags, O_CREAT);
+ XLATE_BIT (flags, gf_flags, O_EXCL);
+ XLATE_BIT (flags, gf_flags, O_NOCTTY);
+ XLATE_BIT (flags, gf_flags, O_TRUNC);
+ XLATE_BIT (flags, gf_flags, O_APPEND);
+ XLATE_BIT (flags, gf_flags, O_NONBLOCK);
+ XLATE_BIT (flags, gf_flags, O_SYNC);
+ XLATE_BIT (flags, gf_flags, O_ASYNC);
+
+ XLATE_BIT (flags, gf_flags, O_DIRECT);
+ XLATE_BIT (flags, gf_flags, O_DIRECTORY);
+ XLATE_BIT (flags, gf_flags, O_NOFOLLOW);
+#ifdef O_NOATIME
+ XLATE_BIT (flags, gf_flags, O_NOATIME);
+#endif
+#ifdef O_CLOEXEC
+ XLATE_BIT (flags, gf_flags, O_CLOEXEC);
+#endif
+ XLATE_BIT (flags, gf_flags, O_LARGEFILE);
+
+ return gf_flags;
+}
+
+static inline uint32_t
+gf_flags_to_flags (uint32_t gf_flags)
+{
+ uint32_t flags = 0;
+
+ UNXLATE_ACCESSMODE (gf_flags, flags);
+
+ UNXLATE_BIT (gf_flags, flags, O_CREAT);
+ UNXLATE_BIT (gf_flags, flags, O_EXCL);
+ UNXLATE_BIT (gf_flags, flags, O_NOCTTY);
+ UNXLATE_BIT (gf_flags, flags, O_TRUNC);
+ UNXLATE_BIT (gf_flags, flags, O_APPEND);
+ UNXLATE_BIT (gf_flags, flags, O_NONBLOCK);
+ UNXLATE_BIT (gf_flags, flags, O_SYNC);
+ UNXLATE_BIT (gf_flags, flags, O_ASYNC);
+
+ UNXLATE_BIT (gf_flags, flags, O_DIRECT);
+ UNXLATE_BIT (gf_flags, flags, O_DIRECTORY);
+ UNXLATE_BIT (gf_flags, flags, O_NOFOLLOW);
+#ifdef O_NOATIME
+ UNXLATE_BIT (gf_flags, flags, O_NOATIME);
+#endif
+#ifdef O_CLOEXEC
+ UNXLATE_BIT (gf_flags, flags, O_CLOEXEC);
+#endif
+ UNXLATE_BIT (gf_flags, flags, O_LARGEFILE);
+
+ return flags;
+}
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ char path[0]; /* NULL terminated */
+} __attribute__((packed)) gf_fop_stat_req_t;;
+typedef struct {
+ struct gf_stat stat;
+} __attribute__((packed)) gf_fop_stat_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ uint32_t size;
+ char path[0]; /* NULL terminated */
+} __attribute__((packed)) gf_fop_readlink_req_t;
+typedef struct {
+ struct gf_stat buf;
+ char path[0]; /* NULL terminated */
+} __attribute__((packed)) gf_fop_readlink_rsp_t;
+
+
+typedef struct {
+ uint64_t par;
+ uint64_t gen;
+ uint64_t dev;
+ uint32_t mode;
+ char path[0]; /* NULL terminated */
+ char bname[0]; /* NULL terminated */
+} __attribute__((packed)) gf_fop_mknod_req_t;
+typedef struct {
+ struct gf_stat stat;
+ struct gf_stat preparent;
+ struct gf_stat postparent;
+} __attribute__((packed)) gf_fop_mknod_rsp_t;
+
+
+typedef struct {
+ uint64_t par;
+ uint64_t gen;
+ uint32_t mode;
+ char path[0]; /* NULL terminated */
+ char bname[0]; /* NULL terminated */
+} __attribute__((packed)) gf_fop_mkdir_req_t;
+typedef struct {
+ struct gf_stat stat;
+ struct gf_stat preparent;
+ struct gf_stat postparent;
+} __attribute__((packed)) gf_fop_mkdir_rsp_t;
+
+
+typedef struct {
+ uint64_t par;
+ uint64_t gen;
+ char path[0]; /* NULL terminated */
+ char bname[0]; /* NULL terminated */
+} __attribute__((packed)) gf_fop_unlink_req_t;
+typedef struct {
+ struct gf_stat preparent;
+ struct gf_stat postparent;
+} __attribute__((packed)) gf_fop_unlink_rsp_t;
+
+
+typedef struct {
+ uint64_t par;
+ uint64_t gen;
+ char path[0];
+ char bname[0]; /* NULL terminated */
+} __attribute__((packed)) gf_fop_rmdir_req_t;
+typedef struct {
+ struct gf_stat preparent;
+ struct gf_stat postparent;
+} __attribute__((packed)) gf_fop_rmdir_rsp_t;
+
+
+typedef struct {
+ uint64_t par;
+ uint64_t gen;
+ char path[0];
+ char bname[0];
+ char linkname[0];
+} __attribute__((packed)) gf_fop_symlink_req_t;
+typedef struct {
+ struct gf_stat stat;
+ struct gf_stat preparent;
+ struct gf_stat postparent;
+}__attribute__((packed)) gf_fop_symlink_rsp_t;
+
+
+typedef struct {
+ uint64_t oldpar;
+ uint64_t oldgen;
+ uint64_t newpar;
+ uint64_t newgen;
+ char oldpath[0];
+ char oldbname[0]; /* NULL terminated */
+ char newpath[0];
+ char newbname[0]; /* NULL terminated */
+} __attribute__((packed)) gf_fop_rename_req_t;
+typedef struct {
+ struct gf_stat stat;
+ struct gf_stat preoldparent;
+ struct gf_stat postoldparent;
+ struct gf_stat prenewparent;
+ struct gf_stat postnewparent;
+} __attribute__((packed)) gf_fop_rename_rsp_t;
+
+
+typedef struct {
+ uint64_t oldino;
+ uint64_t oldgen;
+ uint64_t newpar;
+ uint64_t newgen;
+ char oldpath[0];
+ char newpath[0];
+ char newbname[0];
+}__attribute__((packed)) gf_fop_link_req_t;
+typedef struct {
+ struct gf_stat stat;
+ struct gf_stat preparent;
+ struct gf_stat postparent;
+} __attribute__((packed)) gf_fop_link_rsp_t;
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ uint64_t offset;
+ char path[0];
+} __attribute__((packed)) gf_fop_truncate_req_t;
+typedef struct {
+ struct gf_stat prestat;
+ struct gf_stat poststat;
+} __attribute__((packed)) gf_fop_truncate_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ uint32_t flags;
+ uint32_t wbflags;
+ char path[0];
+} __attribute__((packed)) gf_fop_open_req_t;
+typedef struct {
+ int64_t fd;
+} __attribute__((packed)) gf_fop_open_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+ uint64_t offset;
+ uint32_t size;
+} __attribute__((packed)) gf_fop_read_req_t;
+typedef struct {
+ struct gf_stat stat;
+ char buf[0];
+} __attribute__((packed)) gf_fop_read_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+ uint64_t offset;
+ uint32_t size;
+} __attribute__((packed)) gf_fop_write_req_t;
+typedef struct {
+ struct gf_stat prestat;
+ struct gf_stat poststat;
+} __attribute__((packed)) gf_fop_write_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ char path[0];
+} __attribute__((packed)) gf_fop_statfs_req_t;
+typedef struct {
+ struct gf_statfs statfs;
+} __attribute__((packed)) gf_fop_statfs_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+} __attribute__((packed)) gf_fop_flush_req_t;
+typedef struct { } __attribute__((packed)) gf_fop_flush_rsp_t;
+
+
+typedef struct fsync_req {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+ uint32_t data;
+} __attribute__((packed)) gf_fop_fsync_req_t;
+typedef struct {
+ struct gf_stat prestat;
+ struct gf_stat poststat;
+} __attribute__((packed)) gf_fop_fsync_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ uint32_t flags;
+ uint32_t dict_len;
+ char dict[0];
+ char path[0];
+} __attribute__((packed)) gf_fop_setxattr_req_t;
+typedef struct { } __attribute__((packed)) gf_fop_setxattr_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+ uint32_t flags;
+ uint32_t dict_len;
+ char dict[0];
+} __attribute__((packed)) gf_fop_fsetxattr_req_t;
+typedef struct { } __attribute__((packed)) gf_fop_fsetxattr_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ uint32_t flags;
+ uint32_t dict_len;
+ char dict[0];
+ char path[0];
+} __attribute__((packed)) gf_fop_xattrop_req_t;
+
+typedef struct {
+ uint32_t dict_len;
+ char dict[0];
+} __attribute__((packed)) gf_fop_xattrop_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+ uint32_t flags;
+ uint32_t dict_len;
+ char dict[0];
+} __attribute__((packed)) gf_fop_fxattrop_req_t;
+
+typedef struct {
+ uint32_t dict_len;
+ char dict[0];
+} __attribute__((packed)) gf_fop_fxattrop_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ uint32_t namelen;
+ char path[0];
+ char name[0];
+} __attribute__((packed)) gf_fop_getxattr_req_t;
+typedef struct {
+ uint32_t dict_len;
+ char dict[0];
+} __attribute__((packed)) gf_fop_getxattr_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+ uint32_t namelen;
+ char name[0];
+} __attribute__((packed)) gf_fop_fgetxattr_req_t;
+typedef struct {
+ uint32_t dict_len;
+ char dict[0];
+} __attribute__((packed)) gf_fop_fgetxattr_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ char path[0];
+ char name[0];
+} __attribute__((packed)) gf_fop_removexattr_req_t;
+typedef struct { } __attribute__((packed)) gf_fop_removexattr_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ char path[0];
+} __attribute__((packed)) gf_fop_opendir_req_t;
+typedef struct {
+ int64_t fd;
+} __attribute__((packed)) gf_fop_opendir_rsp_t;
+
+
+typedef struct fsyncdir_req {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+ int32_t data;
+} __attribute__((packed)) gf_fop_fsyncdir_req_t;
+typedef struct {
+} __attribute__((packed)) gf_fop_fsyncdir_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+ uint64_t offset;
+ uint32_t size;
+} __attribute__((packed)) gf_fop_readdir_req_t;
+typedef struct {
+ uint32_t size;
+ char buf[0];
+} __attribute__((packed)) gf_fop_readdir_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+ uint64_t offset;
+ uint32_t size;
+} __attribute__((packed)) gf_fop_readdirp_req_t;
+typedef struct {
+ uint32_t size;
+ char buf[0];
+} __attribute__((packed)) gf_fop_readdirp_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ uint32_t mask;
+ char path[0];
+} __attribute__((packed)) gf_fop_access_req_t;
+typedef struct {
+} __attribute__((packed)) gf_fop_access_rsp_t;
+
+
+typedef struct {
+ uint64_t par;
+ uint64_t gen;
+ uint32_t flags;
+ uint32_t mode;
+ char path[0];
+ char bname[0];
+} __attribute__((packed)) gf_fop_create_req_t;
+typedef struct {
+ struct gf_stat stat;
+ uint64_t fd;
+ struct gf_stat preparent;
+ struct gf_stat postparent;
+} __attribute__((packed)) gf_fop_create_rsp_t;
+
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+ uint64_t offset;
+} __attribute__((packed)) gf_fop_ftruncate_req_t;
+typedef struct {
+ struct gf_stat prestat;
+ struct gf_stat poststat;
+} __attribute__((packed)) gf_fop_ftruncate_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+} __attribute__((packed)) gf_fop_fstat_req_t;
+typedef struct {
+ struct gf_stat stat;
+} __attribute__((packed)) gf_fop_fstat_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+ uint32_t cmd;
+ uint32_t type;
+ struct gf_flock flock;
+} __attribute__((packed)) gf_fop_lk_req_t;
+typedef struct {
+ struct gf_flock flock;
+} __attribute__((packed)) gf_fop_lk_rsp_t;
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ uint32_t cmd;
+ uint32_t type;
+ struct gf_flock flock;
+ char path[0];
+ char volume[0];
+} __attribute__((packed)) gf_fop_inodelk_req_t;
+typedef struct {
+} __attribute__((packed)) gf_fop_inodelk_rsp_t;
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+ uint32_t cmd;
+ uint32_t type;
+ struct gf_flock flock;
+ char volume[0];
+} __attribute__((packed)) gf_fop_finodelk_req_t;
+typedef struct {
+} __attribute__((packed)) gf_fop_finodelk_rsp_t;
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ uint32_t cmd;
+ uint32_t type;
+ uint64_t namelen;
+ char path[0];
+ char name[0];
+ char volume[0];
+} __attribute__((packed)) gf_fop_entrylk_req_t;
+typedef struct {
+} __attribute__((packed)) gf_fop_entrylk_rsp_t;
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+ uint32_t cmd;
+ uint32_t type;
+ uint64_t namelen;
+ char name[0];
+ char volume[0];
+} __attribute__((packed)) gf_fop_fentrylk_req_t;
+typedef struct {
+} __attribute__((packed)) gf_fop_fentrylk_rsp_t;
+
+typedef struct {
+ uint64_t ino; /* NOTE: used only in case of 'root' lookup */
+ uint64_t par;
+ uint64_t gen;
+ uint32_t flags;
+ uint32_t dictlen;
+ char path[0];
+ char bname[0];
+ char dict[0];
+} __attribute__((packed)) gf_fop_lookup_req_t;
+typedef struct {
+ struct gf_stat stat;
+ struct gf_stat postparent;
+ uint32_t dict_len;
+ char dict[0];
+} __attribute__((packed)) gf_fop_lookup_rsp_t;
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ uint32_t flag;
+ char path[0];
+} __attribute__((packed)) gf_fop_checksum_req_t;
+typedef struct {
+ unsigned char fchecksum[0];
+ unsigned char dchecksum[0];
+} __attribute__((packed)) gf_fop_checksum_rsp_t;
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ struct gf_stat stbuf;
+ int32_t valid;
+ char path[0];
+} __attribute__((packed)) gf_fop_setattr_req_t;
+typedef struct {
+ struct gf_stat statpre;
+ struct gf_stat statpost;
+} __attribute__((packed)) gf_fop_setattr_rsp_t;
+
+typedef struct {
+ int64_t fd;
+ struct gf_stat stbuf;
+ int32_t valid;
+} __attribute__((packed)) gf_fop_fsetattr_req_t;
+typedef struct {
+ struct gf_stat statpre;
+ struct gf_stat statpost;
+} __attribute__((packed)) gf_fop_fsetattr_rsp_t;
+
+typedef struct {
+ int64_t fd;
+ uint64_t offset;
+ uint32_t len;
+} __attribute__((packed)) gf_fop_rchecksum_req_t;
+typedef struct {
+ uint32_t weak_checksum;
+ unsigned char strong_checksum[0];
+} __attribute__((packed)) gf_fop_rchecksum_rsp_t;
+
+typedef struct {
+ uint32_t flags;
+ uint32_t keylen;
+ char key[0];
+} __attribute__((packed)) gf_mop_getspec_req_t;
+typedef struct {
+ char spec[0];
+} __attribute__((packed)) gf_mop_getspec_rsp_t;
+
+
+typedef struct {
+ uint32_t msglen;
+ char msg[0];
+} __attribute__((packed)) gf_mop_log_req_t;
+typedef struct {
+} __attribute__((packed)) gf_mop_log_rsp_t;
+
+
+typedef struct {
+ uint32_t dict_len;
+ char buf[0];
+} __attribute__((packed)) gf_mop_setvolume_req_t;
+typedef struct {
+ uint32_t dict_len;
+ char buf[0];
+} __attribute__((packed)) gf_mop_setvolume_rsp_t;
+
+
+typedef struct {
+} __attribute__((packed)) gf_mop_ping_req_t;
+typedef struct {
+} __attribute__((packed)) gf_mop_ping_rsp_t;
+
+typedef struct {
+ uint32_t flags;
+ char buf[0];
+} __attribute__((packed)) gf_mop_notify_req_t;
+typedef struct {
+ uint32_t flags;
+ char buf[0];
+} __attribute__((packed)) gf_mop_notify_rsp_t;
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+} __attribute__((packed)) gf_cbk_releasedir_req_t;
+typedef struct {
+} __attribute__((packed)) gf_cbk_releasedir_rsp_t;
+
+
+typedef struct {
+ uint64_t ino;
+ uint64_t gen;
+ int64_t fd;
+} __attribute__((packed)) gf_cbk_release_req_t;
+typedef struct {
+} __attribute__((packed)) gf_cbk_release_rsp_t;
+
+
+typedef struct {
+ uint32_t count;
+ uint64_t ino_array[0];
+} __attribute__((packed)) gf_cbk_forget_req_t;
+typedef struct { } __attribute__((packed)) gf_cbk_forget_rsp_t;
+
+
+typedef struct {
+ uint32_t pid;
+ uint32_t uid;
+ uint32_t gid;
+
+ /* Number of groups being sent through the array above. */
+ uint32_t ngrps;
+
+ /* Array of groups to which the uid belongs apart from the primary group
+ * in gid.
+ */
+ uint32_t groups[GF_REQUEST_MAXGROUPS];
+
+ uint64_t lk_owner;
+} __attribute__ ((packed)) gf_hdr_req_t;
+
+
+typedef struct {
+ uint32_t op_ret;
+ uint32_t op_errno;
+} __attribute__ ((packed)) gf_hdr_rsp_t;
+
+
+typedef struct {
+ uint64_t callid;
+ uint32_t type;
+ uint32_t op;
+ uint32_t size;
+ union {
+ gf_hdr_req_t req;
+ gf_hdr_rsp_t rsp;
+ } __attribute__ ((packed));
+} __attribute__ ((packed)) gf_hdr_common_t;
+
+
+static inline gf_hdr_common_t *
+__gf_hdr_new (int size)
+{
+ gf_hdr_common_t *hdr = NULL;
+
+ /* TODO: use mem-pool */
+ hdr = GF_CALLOC (sizeof (gf_hdr_common_t) + size, 1,
+ gf_common_mt_gf_hdr_common_t);
+
+ if (!hdr) {
+ return NULL;
+ }
+
+ hdr->size = hton32 (size);
+
+ return hdr;
+}
+
+
+#define gf_hdr_len(type, x) (sizeof (gf_hdr_common_t) + sizeof (*type) + x)
+#define gf_hdr_new(type, x) __gf_hdr_new (sizeof (*type) + x)
+
+
+static inline void *
+gf_param (gf_hdr_common_t *hdr)
+{
+ return ((void *)hdr) + sizeof (*hdr);
+}
+
+
+struct gf_dirent_nb {
+ uint64_t d_ino;
+ uint64_t d_off;
+ uint32_t d_len;
+ uint32_t d_type;
+ struct gf_stat d_stat;
+ char d_name[0];
+} __attribute__((packed));
+
+
+static inline int
+gf_dirent_nb_size (gf_dirent_t *entries)
+{
+ return (sizeof (struct gf_dirent_nb) + strlen (entries->d_name) + 1);
+}
+
+static inline int
+gf_dirent_serialize (gf_dirent_t *entries, char *buf, size_t buf_size)
+{
+ struct gf_dirent_nb *entry_nb = NULL;
+ gf_dirent_t *entry = NULL;
+ int size = 0;
+ int entry_size = 0;
+
+
+ list_for_each_entry (entry, &entries->list, list) {
+ entry_size = gf_dirent_nb_size (entry);
+
+ if (buf && (size + entry_size <= buf_size)) {
+ entry_nb = (void *) (buf + size);
+
+ entry_nb->d_ino = hton64 (entry->d_ino);
+ entry_nb->d_off = hton64 (entry->d_off);
+ entry_nb->d_len = hton32 (entry->d_len);
+ entry_nb->d_type = hton32 (entry->d_type);
+
+ gf_stat_from_iatt (&entry_nb->d_stat, &entry->d_stat);
+
+ strcpy (entry_nb->d_name, entry->d_name);
+ }
+ size += entry_size;
+ }
+
+ return size;
+}
+
+
+static inline int
+gf_dirent_unserialize (gf_dirent_t *entries, const char *buf, size_t buf_size)
+{
+ struct gf_dirent_nb *entry_nb = NULL;
+ int remaining_size = 0;
+ int least_dirent_size = 0;
+ int count = 0;
+ gf_dirent_t *entry = NULL;
+ int entry_strlen = 0;
+ int entry_len = 0;
+
+
+ remaining_size = buf_size;
+ least_dirent_size = (sizeof (struct gf_dirent_nb) + 2);
+
+ while (remaining_size >= least_dirent_size) {
+ entry_nb = (void *)(buf + (buf_size - remaining_size));
+
+ entry_strlen = strnlen (entry_nb->d_name, remaining_size);
+ if (entry_strlen == remaining_size) {
+ break;
+ }
+
+ entry_len = sizeof (gf_dirent_t) + entry_strlen + 1;
+ entry = GF_CALLOC (1, entry_len, gf_common_mt_gf_dirent_t);
+ if (!entry) {
+ break;
+ }
+
+ entry->d_ino = ntoh64 (entry_nb->d_ino);
+ entry->d_off = ntoh64 (entry_nb->d_off);
+ entry->d_len = ntoh32 (entry_nb->d_len);
+ entry->d_type = ntoh32 (entry_nb->d_type);
+
+ gf_stat_to_iatt (&entry_nb->d_stat, &entry->d_stat);
+
+ strcpy (entry->d_name, entry_nb->d_name);
+
+ list_add_tail (&entry->list, &entries->list);
+
+ remaining_size -= (sizeof (*entry_nb) + entry_strlen + 1);
+ count++;
+ }
+
+ return count;
+}
+
+#endif
diff --git a/xlators/protocol/lib/src/transport.c b/xlators/protocol/lib/src/transport.c
new file mode 100644
index 000000000..d460d0209
--- /dev/null
+++ b/xlators/protocol/lib/src/transport.c
@@ -0,0 +1,422 @@
+/*
+ Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#include <dlfcn.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <sys/poll.h>
+#include <fnmatch.h>
+#include <stdint.h>
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "logging.h"
+#include "transport.h"
+#include "glusterfs.h"
+#include "xlator.h"
+#include "list.h"
+
+
+transport_t *
+transport_load (dict_t *options,
+ xlator_t *xl)
+{
+ struct transport *trans = NULL, *return_trans = NULL;
+ char *name = NULL;
+ void *handle = NULL;
+ char *type = NULL;
+ char str[] = "ERROR";
+ int32_t ret = -1;
+ int8_t is_tcp = 0, is_unix = 0, is_ibsdp = 0;
+ volume_opt_list_t *vol_opt = NULL;
+
+ GF_VALIDATE_OR_GOTO("transport", options, fail);
+ GF_VALIDATE_OR_GOTO("transport", xl, fail);
+
+ trans = GF_CALLOC (1, sizeof (struct transport),
+ gf_common_mt_transport);
+ GF_VALIDATE_OR_GOTO("transport", trans, fail);
+
+ trans->xl = xl;
+ type = str;
+
+ /* Backward compatibility */
+ ret = dict_get_str (options, "transport-type", &type);
+ if (ret < 0) {
+ ret = dict_set_str (options, "transport-type", "socket");
+ if (ret < 0)
+ gf_log ("dict", GF_LOG_DEBUG,
+ "setting transport-type failed");
+ gf_log ("transport", GF_LOG_WARNING,
+ "missing 'option transport-type'. defaulting to "
+ "\"socket\"");
+ } else {
+ {
+ /* Backword compatibility to handle * /client,
+ * * /server.
+ */
+ char *tmp = strchr (type, '/');
+ if (tmp)
+ *tmp = '\0';
+ }
+
+ is_tcp = strcmp (type, "tcp");
+ is_unix = strcmp (type, "unix");
+ is_ibsdp = strcmp (type, "ib-sdp");
+ if ((is_tcp == 0) ||
+ (is_unix == 0) ||
+ (is_ibsdp == 0)) {
+ if (is_unix == 0)
+ ret = dict_set_str (options,
+ "transport.address-family",
+ "unix");
+ if (is_ibsdp == 0)
+ ret = dict_set_str (options,
+ "transport.address-family",
+ "inet-sdp");
+
+ if (ret < 0)
+ gf_log ("dict", GF_LOG_DEBUG,
+ "setting address-family failed");
+
+ ret = dict_set_str (options,
+ "transport-type", "socket");
+ if (ret < 0)
+ gf_log ("dict", GF_LOG_DEBUG,
+ "setting transport-type failed");
+ }
+ }
+
+ ret = dict_get_str (options, "transport-type", &type);
+ if (ret < 0) {
+ GF_FREE (trans);
+ gf_log ("transport", GF_LOG_ERROR,
+ "'option transport-type <xx>' missing in volume '%s'",
+ xl->name);
+ goto fail;
+ }
+
+ ret = gf_asprintf (&name, "%s/%s.so", TRANSPORTDIR, type);
+ if (-1 == ret) {
+ gf_log ("transport", GF_LOG_ERROR, "asprintf failed");
+ goto fail;
+ }
+ gf_log ("transport", GF_LOG_DEBUG,
+ "attempt to load file %s", name);
+
+ handle = dlopen (name, RTLD_NOW|RTLD_GLOBAL);
+ if (handle == NULL) {
+ gf_log ("transport", GF_LOG_ERROR, "%s", dlerror ());
+ gf_log ("transport", GF_LOG_ERROR,
+ "volume '%s': transport-type '%s' is not valid or "
+ "not found on this machine",
+ xl->name, type);
+ GF_FREE (name);
+ GF_FREE (trans);
+ goto fail;
+ }
+ GF_FREE (name);
+
+ trans->ops = dlsym (handle, "tops");
+ if (trans->ops == NULL) {
+ gf_log ("transport", GF_LOG_ERROR,
+ "dlsym (transport_ops) on %s", dlerror ());
+ GF_FREE (trans);
+ goto fail;
+ }
+
+ trans->init = dlsym (handle, "init");
+ if (trans->init == NULL) {
+ gf_log ("transport", GF_LOG_ERROR,
+ "dlsym (gf_transport_init) on %s", dlerror ());
+ GF_FREE (trans);
+ goto fail;
+ }
+
+ trans->fini = dlsym (handle, "fini");
+ if (trans->fini == NULL) {
+ gf_log ("transport", GF_LOG_ERROR,
+ "dlsym (gf_transport_fini) on %s", dlerror ());
+ GF_FREE (trans);
+ goto fail;
+ }
+
+ vol_opt = GF_CALLOC (1, sizeof (volume_opt_list_t),
+ gf_common_mt_volume_opt_list_t);
+ vol_opt->given_opt = dlsym (handle, "options");
+ if (vol_opt->given_opt == NULL) {
+ gf_log ("transport", GF_LOG_DEBUG,
+ "volume option validation not specified");
+ } else {
+ list_add_tail (&vol_opt->list, &xl->volume_options);
+ if (-1 ==
+ validate_xlator_volume_options (xl,
+ vol_opt->given_opt)) {
+ gf_log ("transport", GF_LOG_ERROR,
+ "volume option validation failed");
+ GF_FREE (trans);
+ goto fail;
+ }
+ }
+
+ ret = trans->init (trans);
+ if (ret != 0) {
+ gf_log ("transport", GF_LOG_ERROR,
+ "'%s' initialization failed", type);
+ GF_FREE (trans);
+ goto fail;
+ }
+
+ pthread_mutex_init (&trans->lock, NULL);
+ return_trans = trans;
+fail:
+ return return_trans;
+}
+
+
+int32_t
+transport_submit (transport_t *this, char *buf, int32_t len,
+ struct iovec *vector, int count,
+ struct iobref *iobref)
+{
+ int32_t ret = -1;
+ transport_t *peer_trans = NULL;
+ struct iobuf *iobuf = NULL;
+ struct transport_msg *msg = NULL;
+
+ if (this->peer_trans) {
+ peer_trans = this->peer_trans;
+
+ msg = GF_CALLOC (1, sizeof (*msg),
+ gf_common_mt_transport_msg);
+ if (!msg) {
+ return -ENOMEM;
+ }
+
+ msg->hdr = buf;
+ msg->hdrlen = len;
+
+ if (vector) {
+ iobuf = iobuf_get (this->xl->ctx->iobuf_pool);
+ if (!iobuf) {
+ GF_FREE (msg->hdr);
+ GF_FREE (msg);
+ return -ENOMEM;
+ }
+
+ iov_unload (iobuf->ptr, vector, count);
+ msg->iobuf = iobuf;
+ }
+
+ pthread_mutex_lock (&peer_trans->handover.mutex);
+ {
+ list_add_tail (&msg->list, &peer_trans->handover.msgs);
+ pthread_cond_broadcast (&peer_trans->handover.cond);
+ }
+ pthread_mutex_unlock (&peer_trans->handover.mutex);
+
+ return 0;
+ }
+
+ GF_VALIDATE_OR_GOTO("transport", this, fail);
+ GF_VALIDATE_OR_GOTO("transport", this->ops, fail);
+
+ ret = this->ops->submit (this, buf, len, vector, count, iobref);
+fail:
+ return ret;
+}
+
+
+int32_t
+transport_connect (transport_t *this)
+{
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO("transport", this, fail);
+
+ ret = this->ops->connect (this);
+fail:
+ return ret;
+}
+
+
+int32_t
+transport_listen (transport_t *this)
+{
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO("transport", this, fail);
+
+ ret = this->ops->listen (this);
+fail:
+ return ret;
+}
+
+
+int32_t
+transport_disconnect (transport_t *this)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("transport", this, fail);
+
+ ret = this->ops->disconnect (this);
+fail:
+ return ret;
+}
+
+
+int32_t
+transport_destroy (transport_t *this)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("transport", this, fail);
+
+ if (this->fini)
+ this->fini (this);
+
+ pthread_mutex_destroy (&this->lock);
+ GF_FREE (this);
+fail:
+ return ret;
+}
+
+
+transport_t *
+transport_ref (transport_t *this)
+{
+ transport_t *return_this = NULL;
+
+ GF_VALIDATE_OR_GOTO("transport", this, fail);
+
+ pthread_mutex_lock (&this->lock);
+ {
+ this->refcount ++;
+ }
+ pthread_mutex_unlock (&this->lock);
+
+ return_this = this;
+fail:
+ return return_this;
+}
+
+
+int32_t
+transport_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p,
+ struct iobuf **iobuf_p)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("transport", this, fail);
+
+ if (this->peer_trans) {
+ *hdr_p = this->handover.msg->hdr;
+ *hdrlen_p = this->handover.msg->hdrlen;
+ *iobuf_p = this->handover.msg->iobuf;
+
+ return 0;
+ }
+
+ ret = this->ops->receive (this, hdr_p, hdrlen_p, iobuf_p);
+fail:
+ return ret;
+}
+
+
+int32_t
+transport_unref (transport_t *this)
+{
+ int32_t refcount = 0;
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("transport", this, fail);
+
+ pthread_mutex_lock (&this->lock);
+ {
+ refcount = --this->refcount;
+ }
+ pthread_mutex_unlock (&this->lock);
+
+ if (refcount == 0) {
+ xlator_notify (this->xl, GF_EVENT_TRANSPORT_CLEANUP, this);
+ transport_destroy (this);
+ }
+
+ ret = 0;
+fail:
+ return ret;
+}
+
+
+void *
+transport_peerproc (void *trans_data)
+{
+ transport_t *trans = NULL;
+ struct transport_msg *msg = NULL;
+
+ trans = trans_data;
+
+ while (1) {
+ pthread_mutex_lock (&trans->handover.mutex);
+ {
+ while (list_empty (&trans->handover.msgs))
+ pthread_cond_wait (&trans->handover.cond,
+ &trans->handover.mutex);
+
+ msg = list_entry (trans->handover.msgs.next,
+ struct transport_msg, list);
+
+ list_del_init (&msg->list);
+ }
+ pthread_mutex_unlock (&trans->handover.mutex);
+
+ trans->handover.msg = msg;
+
+ xlator_notify (trans->xl, GF_EVENT_POLLIN, trans);
+
+ GF_FREE (msg);
+ }
+}
+
+
+int
+transport_setpeer (transport_t *trans, transport_t *peer_trans)
+{
+ trans->peer_trans = transport_ref (peer_trans);
+
+ INIT_LIST_HEAD (&trans->handover.msgs);
+ pthread_cond_init (&trans->handover.cond, NULL);
+ pthread_mutex_init (&trans->handover.mutex, NULL);
+ pthread_create (&trans->handover.thread, NULL,
+ transport_peerproc, trans);
+
+ peer_trans->peer_trans = transport_ref (trans);
+
+ INIT_LIST_HEAD (&peer_trans->handover.msgs);
+ pthread_cond_init (&peer_trans->handover.cond, NULL);
+ pthread_mutex_init (&peer_trans->handover.mutex, NULL);
+ pthread_create (&peer_trans->handover.thread, NULL,
+ transport_peerproc, peer_trans);
+
+ return 0;
+}
diff --git a/xlators/protocol/lib/src/transport.h b/xlators/protocol/lib/src/transport.h
new file mode 100644
index 000000000..f0623d5b4
--- /dev/null
+++ b/xlators/protocol/lib/src/transport.h
@@ -0,0 +1,106 @@
+/*
+ Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __TRANSPORT_H__
+#define __TRANSPORT_H__
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include <inttypes.h>
+
+struct transport_ops;
+typedef struct transport transport_t;
+
+#include "xlator.h"
+#include "dict.h"
+#include "compat.h"
+
+typedef struct peer_info {
+ struct sockaddr_storage sockaddr;
+ socklen_t sockaddr_len;
+ char identifier[UNIX_PATH_MAX];
+}peer_info_t;
+
+struct transport_msg {
+ struct list_head list;
+ char *hdr;
+ int hdrlen;
+ struct iobuf *iobuf;
+};
+
+struct transport {
+ struct transport_ops *ops;
+ void *private;
+ void *xl_private;
+ pthread_mutex_t lock;
+ int32_t refcount;
+
+ xlator_t *xl;
+ void *dnscache;
+ data_t *buf;
+ int32_t (*init) (transport_t *this);
+ void (*fini) (transport_t *this);
+ /* int (*notify) (transport_t *this, int event, void *data); */
+ peer_info_t peerinfo;
+ peer_info_t myinfo;
+
+ transport_t *peer_trans;
+ struct {
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ pthread_t thread;
+ struct list_head msgs;
+ struct transport_msg *msg;
+ } handover;
+
+};
+
+struct transport_ops {
+ int32_t (*receive) (transport_t *this, char **hdr_p, size_t *hdrlen_p,
+ struct iobuf **iobuf_p);
+ int32_t (*submit) (transport_t *this, char *buf, int len,
+ struct iovec *vector, int count,
+ struct iobref *iobref);
+ int32_t (*connect) (transport_t *this);
+ int32_t (*listen) (transport_t *this);
+ int32_t (*disconnect) (transport_t *this);
+};
+
+
+int32_t transport_listen (transport_t *this);
+int32_t transport_connect (transport_t *this);
+int32_t transport_disconnect (transport_t *this);
+int32_t transport_notify (transport_t *this, int event);
+int32_t transport_submit (transport_t *this, char *buf, int len,
+ struct iovec *vector, int count,
+ struct iobref *iobref);
+int32_t transport_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p,
+ struct iobuf **iobuf_p);
+int32_t transport_destroy (transport_t *this);
+
+transport_t *transport_load (dict_t *options, xlator_t *xl);
+transport_t *transport_ref (transport_t *trans);
+int32_t transport_unref (transport_t *trans);
+
+int transport_setpeer (transport_t *trans, transport_t *trans_peer);
+
+#endif /* __TRANSPORT_H__ */
diff --git a/xlators/protocol/server/src/Makefile.am b/xlators/protocol/server/src/Makefile.am
index ae93912fc..faf82ee21 100644
--- a/xlators/protocol/server/src/Makefile.am
+++ b/xlators/protocol/server/src/Makefile.am
@@ -5,14 +5,15 @@ xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/protocol
server_la_LDFLAGS = -module -avoidversion
server_la_SOURCES = server-protocol.c server-resolve.c server-helpers.c
-server_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+server_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ $(top_builddir)/xlators/protocol/lib/src/libgfproto.la
noinst_HEADERS = server-protocol.h server-helpers.h
AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS) \
-I$(top_srcdir)/libglusterfs/src -shared -nostartfiles \
-DDATADIR=\"$(localstatedir)\" -DCONFDIR=\"$(sysconfdir)/glusterfs\" \
- $(GF_CFLAGS)
+ $(GF_CFLAGS) -I$(top_srcdir)/xlators/protocol/lib/src
CLEANFILES =
diff --git a/xlators/protocol/transport/Makefile.am b/xlators/protocol/transport/Makefile.am
new file mode 100644
index 000000000..e2f97437c
--- /dev/null
+++ b/xlators/protocol/transport/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = socket $(IBVERBS_SUBDIR)
+
+CLEANFILES =
diff --git a/xlators/protocol/transport/ib-verbs/Makefile.am b/xlators/protocol/transport/ib-verbs/Makefile.am
new file mode 100644
index 000000000..f963effea
--- /dev/null
+++ b/xlators/protocol/transport/ib-verbs/Makefile.am
@@ -0,0 +1 @@
+SUBDIRS = src \ No newline at end of file
diff --git a/xlators/protocol/transport/ib-verbs/src/Makefile.am b/xlators/protocol/transport/ib-verbs/src/Makefile.am
new file mode 100644
index 000000000..8f6e6a35b
--- /dev/null
+++ b/xlators/protocol/transport/ib-verbs/src/Makefile.am
@@ -0,0 +1,19 @@
+# TODO : need to change transportdir
+
+transport_LTLIBRARIES = ib-verbs.la
+transportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/transport
+
+ib_verbs_la_LDFLAGS = -module -avoidversion
+
+ib_verbs_la_SOURCES = ib-verbs.c name.c
+ib_verbs_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ -libverbs $(top_builddir)/xlators/protocol/lib/src/libgfproto.la
+
+noinst_HEADERS = ib-verbs.h name.h
+
+AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS) \
+ -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) \
+ -I$(top_srcdir)/xlators/protocol/transport/ib-verbs \
+ -I$(top_srcdir)/xlators/protocol/lib/src
+
+CLEANFILES = *~
diff --git a/xlators/protocol/transport/ib-verbs/src/ib-verbs-mem-types.h b/xlators/protocol/transport/ib-verbs/src/ib-verbs-mem-types.h
new file mode 100644
index 000000000..bac559646
--- /dev/null
+++ b/xlators/protocol/transport/ib-verbs/src/ib-verbs-mem-types.h
@@ -0,0 +1,39 @@
+
+/*
+ Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+
+#ifndef __IB_VERBS_MEM_TYPES_H__
+#define __IB_VERBS_MEM_TYPES_H__
+
+#include "mem-types.h"
+
+enum gf_ib_verbs_mem_types_ {
+ gf_ibv_mt_ib_verbs_private_t = gf_common_mt_end + 1,
+ gf_ibv_mt_ib_verbs_ioq_t,
+ gf_ibv_mt_transport_t,
+ gf_ibv_mt_ib_verbs_local_t,
+ gf_ibv_mt_ib_verbs_post_t,
+ gf_ibv_mt_char,
+ gf_ibv_mt_qpent,
+ gf_ibv_mt_ib_verbs_device_t,
+ gf_ibv_mt_end
+};
+#endif
+
diff --git a/xlators/protocol/transport/ib-verbs/src/ib-verbs.c b/xlators/protocol/transport/ib-verbs/src/ib-verbs.c
new file mode 100644
index 000000000..a252a13d8
--- /dev/null
+++ b/xlators/protocol/transport/ib-verbs/src/ib-verbs.c
@@ -0,0 +1,2613 @@
+/*
+ Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "dict.h"
+#include "glusterfs.h"
+#include "transport.h"
+#include "protocol.h"
+#include "logging.h"
+#include "xlator.h"
+#include "name.h"
+#include "ib-verbs.h"
+#include <signal.h>
+
+int32_t
+gf_resolve_ip6 (const char *hostname,
+ uint16_t port,
+ int family,
+ void **dnscache,
+ struct addrinfo **addr_info);
+
+static uint16_t
+ib_verbs_get_local_lid (struct ibv_context *context,
+ int32_t port)
+{
+ struct ibv_port_attr attr;
+
+ if (ibv_query_port (context, port, &attr))
+ return 0;
+
+ return attr.lid;
+}
+
+static const char *
+get_port_state_str(enum ibv_port_state pstate)
+{
+ switch (pstate) {
+ case IBV_PORT_DOWN: return "PORT_DOWN";
+ case IBV_PORT_INIT: return "PORT_INIT";
+ case IBV_PORT_ARMED: return "PORT_ARMED";
+ case IBV_PORT_ACTIVE: return "PORT_ACTIVE";
+ case IBV_PORT_ACTIVE_DEFER: return "PORT_ACTIVE_DEFER";
+ default: return "invalid state";
+ }
+}
+
+static int32_t
+ib_check_active_port (struct ibv_context *ctx, uint8_t port)
+{
+ struct ibv_port_attr port_attr;
+
+ int32_t ret = 0;
+ const char *state_str = NULL;
+
+ if (!ctx) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "Error in supplied context");
+ return -1;
+ }
+
+ ret = ibv_query_port (ctx, port, &port_attr);
+
+ if (ret) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "Failed to query port %u properties", port);
+ return -1;
+ }
+
+ state_str = get_port_state_str (port_attr.state);
+ gf_log ("transport/ib-verbs", GF_LOG_TRACE,
+ "Infiniband PORT: (%u) STATE: (%s)",
+ port, state_str);
+
+ if (port_attr.state == IBV_PORT_ACTIVE)
+ return 0;
+
+ return -1;
+}
+
+static int32_t
+ib_get_active_port (struct ibv_context *ib_ctx)
+{
+ struct ibv_device_attr ib_device_attr;
+
+ int32_t ret = -1;
+ uint8_t ib_port = 0;
+
+ if (!ib_ctx) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "Error in supplied context");
+ return -1;
+ }
+ if (ibv_query_device (ib_ctx, &ib_device_attr)) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "Failed to query device properties");
+ return -1;
+ }
+
+ for (ib_port = 1; ib_port <= ib_device_attr.phys_port_cnt; ++ib_port) {
+ ret = ib_check_active_port (ib_ctx, ib_port);
+ if (ret == 0)
+ return ib_port;
+
+ gf_log ("transport/ib-verbs", GF_LOG_TRACE,
+ "Port:(%u) not active", ib_port);
+ continue;
+ }
+ return ret;
+}
+
+
+
+static void
+ib_verbs_put_post (ib_verbs_queue_t *queue,
+ ib_verbs_post_t *post)
+{
+ pthread_mutex_lock (&queue->lock);
+ if (post->prev) {
+ queue->active_count--;
+ post->prev->next = post->next;
+ }
+ if (post->next)
+ post->next->prev = post->prev;
+ post->prev = &queue->passive_posts;
+ post->next = post->prev->next;
+ post->prev->next = post;
+ post->next->prev = post;
+ queue->passive_count++;
+ pthread_mutex_unlock (&queue->lock);
+}
+
+
+static ib_verbs_post_t *
+ib_verbs_new_post (ib_verbs_device_t *device, int32_t len)
+{
+ ib_verbs_post_t *post;
+
+ post = (ib_verbs_post_t *) GF_CALLOC (1, sizeof (*post),
+ gf_ibv_mt_ib_verbs_post_t);
+ if (!post)
+ return NULL;
+
+ post->buf_size = len;
+
+ post->buf = valloc (len);
+ if (!post->buf) {
+ GF_FREE (post);
+ return NULL;
+ }
+
+ post->mr = ibv_reg_mr (device->pd,
+ post->buf,
+ post->buf_size,
+ IBV_ACCESS_LOCAL_WRITE);
+ if (!post->mr) {
+ free (post->buf);
+ GF_FREE (post);
+ return NULL;
+ }
+
+ return post;
+}
+
+
+static ib_verbs_post_t *
+ib_verbs_get_post (ib_verbs_queue_t *queue)
+{
+ ib_verbs_post_t *post;
+
+ pthread_mutex_lock (&queue->lock);
+ {
+ post = queue->passive_posts.next;
+ if (post == &queue->passive_posts)
+ post = NULL;
+
+ if (post) {
+ if (post->prev)
+ post->prev->next = post->next;
+ if (post->next)
+ post->next->prev = post->prev;
+ post->prev = &queue->active_posts;
+ post->next = post->prev->next;
+ post->prev->next = post;
+ post->next->prev = post;
+ post->reused++;
+ queue->active_count++;
+ }
+ }
+ pthread_mutex_unlock (&queue->lock);
+
+ return post;
+}
+
+void
+ib_verbs_destroy_post (ib_verbs_post_t *post)
+{
+ ibv_dereg_mr (post->mr);
+ free (post->buf);
+ GF_FREE (post);
+}
+
+
+static int32_t
+__ib_verbs_quota_get (ib_verbs_peer_t *peer)
+{
+ int32_t ret = -1;
+ ib_verbs_private_t *priv = peer->trans->private;
+
+ if (priv->connected && peer->quota > 0) {
+ ret = peer->quota--;
+ }
+
+ return ret;
+}
+
+/*
+ static int32_t
+ ib_verbs_quota_get (ib_verbs_peer_t *peer)
+ {
+ int32_t ret = -1;
+ ib_verbs_private_t *priv = peer->trans->private;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ ret = __ib_verbs_quota_get (peer);
+ }
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ return ret;
+ }
+*/
+
+static void
+__ib_verbs_ioq_entry_free (ib_verbs_ioq_t *entry)
+{
+ list_del_init (&entry->list);
+ if (entry->iobref)
+ iobref_unref (entry->iobref);
+
+ /* TODO: use mem-pool */
+ GF_FREE (entry->buf);
+
+ /* TODO: use mem-pool */
+ GF_FREE (entry);
+}
+
+
+static void
+__ib_verbs_ioq_flush (ib_verbs_peer_t *peer)
+{
+ ib_verbs_ioq_t *entry = NULL, *dummy = NULL;
+
+ list_for_each_entry_safe (entry, dummy, &peer->ioq, list) {
+ __ib_verbs_ioq_entry_free (entry);
+ }
+}
+
+
+static int32_t
+__ib_verbs_disconnect (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ int32_t ret = 0;
+
+ if (priv->connected || priv->tcp_connected) {
+ fcntl (priv->sock, F_SETFL, O_NONBLOCK);
+ if (shutdown (priv->sock, SHUT_RDWR) != 0) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_DEBUG,
+ "shutdown () - error: %s",
+ strerror (errno));
+ ret = -errno;
+ priv->tcp_connected = 0;
+ }
+ }
+
+ return ret;
+}
+
+
+static int32_t
+ib_verbs_post_send (struct ibv_qp *qp,
+ ib_verbs_post_t *post,
+ int32_t len)
+{
+ struct ibv_sge list = {
+ .addr = (unsigned long) post->buf,
+ .length = len,
+ .lkey = post->mr->lkey
+ };
+
+ struct ibv_send_wr wr = {
+ .wr_id = (unsigned long) post,
+ .sg_list = &list,
+ .num_sge = 1,
+ .opcode = IBV_WR_SEND,
+ .send_flags = IBV_SEND_SIGNALED,
+ }, *bad_wr;
+
+ if (!qp)
+ return -1;
+
+ return ibv_post_send (qp, &wr, &bad_wr);
+}
+
+
+static int32_t
+__ib_verbs_ioq_churn_entry (ib_verbs_peer_t *peer, ib_verbs_ioq_t *entry)
+{
+ int32_t ret = 0, quota = 0;
+ ib_verbs_private_t *priv = peer->trans->private;
+ ib_verbs_device_t *device = priv->device;
+ ib_verbs_options_t *options = &priv->options;
+ ib_verbs_post_t *post = NULL;
+ int32_t len = 0;
+
+ quota = __ib_verbs_quota_get (peer);
+ if (quota > 0) {
+ post = ib_verbs_get_post (&device->sendq);
+ if (!post)
+ post = ib_verbs_new_post (device,
+ (options->send_size + 2048));
+
+ len = iov_length ((const struct iovec *)&entry->vector,
+ entry->count);
+ if (len >= (options->send_size + 2048)) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "increase value of option 'transport.ib-verbs."
+ "work-request-send-size' (given=> %"PRId64") "
+ "to send bigger (%d) messages",
+ (options->send_size + 2048), len);
+ return -1;
+ }
+
+ iov_unload (post->buf,
+ (const struct iovec *)&entry->vector,
+ entry->count);
+
+ ret = ib_verbs_post_send (peer->qp, post, len);
+ if (!ret) {
+ __ib_verbs_ioq_entry_free (entry);
+ ret = len;
+ } else {
+ gf_log ("transport/ib-verbs", GF_LOG_DEBUG,
+ "ibv_post_send failed with ret = %d", ret);
+ ib_verbs_put_post (&device->sendq, post);
+ __ib_verbs_disconnect (peer->trans);
+ ret = -1;
+ }
+ }
+
+ return ret;
+}
+
+
+static int32_t
+__ib_verbs_ioq_churn (ib_verbs_peer_t *peer)
+{
+ ib_verbs_ioq_t *entry = NULL;
+ int32_t ret = 0;
+
+ while (!list_empty (&peer->ioq))
+ {
+ /* pick next entry */
+ entry = peer->ioq_next;
+
+ ret = __ib_verbs_ioq_churn_entry (peer, entry);
+
+ if (ret <= 0)
+ break;
+ }
+
+ /*
+ list_for_each_entry_safe (entry, dummy, &peer->ioq, list) {
+ ret = __ib_verbs_ioq_churn_entry (peer, entry);
+ if (ret <= 0) {
+ break;
+ }
+ }
+ */
+
+ return ret;
+}
+
+static int32_t
+__ib_verbs_quota_put (ib_verbs_peer_t *peer)
+{
+ int32_t ret;
+
+ peer->quota++;
+ ret = peer->quota;
+
+ if (!list_empty (&peer->ioq)) {
+ ret = __ib_verbs_ioq_churn (peer);
+ }
+
+ return ret;
+}
+
+
+static int32_t
+ib_verbs_quota_put (ib_verbs_peer_t *peer)
+{
+ int32_t ret;
+ ib_verbs_private_t *priv = peer->trans->private;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ ret = __ib_verbs_quota_put (peer);
+ }
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ return ret;
+}
+
+
+static int32_t
+ib_verbs_post_recv (struct ibv_srq *srq,
+ ib_verbs_post_t *post)
+{
+ struct ibv_sge list = {
+ .addr = (unsigned long) post->buf,
+ .length = post->buf_size,
+ .lkey = post->mr->lkey
+ };
+
+ struct ibv_recv_wr wr = {
+ .wr_id = (unsigned long) post,
+ .sg_list = &list,
+ .num_sge = 1,
+ }, *bad_wr;
+
+ return ibv_post_srq_recv (srq, &wr, &bad_wr);
+}
+
+
+static int32_t
+ib_verbs_writev (transport_t *this,
+ ib_verbs_ioq_t *entry)
+{
+ int32_t ret = 0, need_append = 1;
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_peer_t *peer = NULL;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ if (!priv->connected) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "ib-verbs is not connected to post a "
+ "send request");
+ ret = -1;
+ goto unlock;
+ }
+
+ peer = &priv->peer;
+ if (list_empty (&peer->ioq)) {
+ ret = __ib_verbs_ioq_churn_entry (peer, entry);
+ if (ret != 0) {
+ need_append = 0;
+ }
+ }
+
+ if (need_append) {
+ list_add_tail (&entry->list, &peer->ioq);
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+ return ret;
+}
+
+
+static ib_verbs_ioq_t *
+ib_verbs_ioq_new (char *buf, int len, struct iovec *vector,
+ int count, struct iobref *iobref)
+{
+ ib_verbs_ioq_t *entry = NULL;
+
+ /* TODO: use mem-pool */
+ entry = GF_CALLOC (1, sizeof (*entry), gf_ibv_mt_ib_verbs_ioq_t);
+
+ assert (count <= (MAX_IOVEC-2));
+
+ entry->header.colonO[0] = ':';
+ entry->header.colonO[1] = 'O';
+ entry->header.colonO[2] = '\0';
+ entry->header.version = 42;
+ entry->header.size1 = hton32 (len);
+ entry->header.size2 = hton32 (iov_length (vector, count));
+
+ entry->vector[0].iov_base = &entry->header;
+ entry->vector[0].iov_len = sizeof (entry->header);
+ entry->count++;
+
+ entry->vector[1].iov_base = buf;
+ entry->vector[1].iov_len = len;
+ entry->count++;
+
+ if (vector && count)
+ {
+ memcpy (&entry->vector[2], vector, sizeof (*vector) * count);
+ entry->count += count;
+ }
+
+ if (iobref)
+ entry->iobref = iobref_ref (iobref);
+
+ entry->buf = buf;
+
+ INIT_LIST_HEAD (&entry->list);
+
+ return entry;
+}
+
+
+static int32_t
+ib_verbs_submit (transport_t *this, char *buf, int32_t len,
+ struct iovec *vector, int count, struct iobref *iobref)
+{
+ int32_t ret = 0;
+ ib_verbs_ioq_t *entry = NULL;
+
+ entry = ib_verbs_ioq_new (buf, len, vector, count, iobref);
+ ret = ib_verbs_writev (this, entry);
+
+ if (ret > 0) {
+ ret = 0;
+ }
+
+ return ret;
+}
+
+static int
+ib_verbs_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p,
+ struct iobuf **iobuf_p)
+{
+ ib_verbs_private_t *priv = this->private;
+ /* TODO: return error if !priv->connected, check with locks */
+ /* TODO: boundry checks for data_ptr/offset */
+ char *copy_from = NULL;
+ ib_verbs_header_t *header = NULL;
+ uint32_t size1, size2, data_len = 0;
+ char *hdr = NULL;
+ struct iobuf *iobuf = NULL;
+ int32_t ret = 0;
+
+ pthread_mutex_lock (&priv->recv_mutex);
+ {
+/*
+ while (!priv->data_ptr)
+ pthread_cond_wait (&priv->recv_cond, &priv->recv_mutex);
+*/
+
+ copy_from = priv->data_ptr + priv->data_offset;
+
+ priv->data_ptr = NULL;
+ data_len = priv->data_len;
+ pthread_cond_broadcast (&priv->recv_cond);
+ }
+ pthread_mutex_unlock (&priv->recv_mutex);
+
+ header = (ib_verbs_header_t *)copy_from;
+ if (strcmp (header->colonO, ":O")) {
+ gf_log ("transport/ib-verbs", GF_LOG_DEBUG,
+ "%s: corrupt header received", this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ size1 = ntoh32 (header->size1);
+ size2 = ntoh32 (header->size2);
+
+ if (data_len != (size1 + size2 + sizeof (*header))) {
+ gf_log ("transport/ib-verbs", GF_LOG_DEBUG,
+ "%s: sizeof data read from transport is not equal "
+ "to the size specified in the header",
+ this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ copy_from += sizeof (*header);
+
+ if (size1) {
+ hdr = GF_CALLOC (1, size1, gf_ibv_mt_char);
+ if (!hdr) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unable to allocate header for peer %s",
+ this->peerinfo.identifier);
+ ret = -ENOMEM;
+ goto err;
+ }
+ memcpy (hdr, copy_from, size1);
+ copy_from += size1;
+ *hdr_p = hdr;
+ }
+ *hdrlen_p = size1;
+
+ if (size2) {
+ iobuf = iobuf_get (this->xl->ctx->iobuf_pool);
+ if (!iobuf) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unable to allocate IO buffer for peer %s",
+ this->peerinfo.identifier);
+ ret = -ENOMEM;
+ goto err;
+ }
+ memcpy (iobuf->ptr, copy_from, size2);
+ *iobuf_p = iobuf;
+ }
+
+err:
+ return ret;
+}
+
+
+static void
+ib_verbs_destroy_cq (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_device_t *device = priv->device;
+
+ if (device->recv_cq)
+ ibv_destroy_cq (device->recv_cq);
+ device->recv_cq = NULL;
+
+ if (device->send_cq)
+ ibv_destroy_cq (device->send_cq);
+ device->send_cq = NULL;
+
+ return;
+}
+
+
+static int32_t
+ib_verbs_create_cq (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = &priv->options;
+ ib_verbs_device_t *device = priv->device;
+ int32_t ret = 0;
+
+ device->recv_cq = ibv_create_cq (priv->device->context,
+ options->recv_count * 2,
+ device,
+ device->recv_chan,
+ 0);
+ if (!device->recv_cq) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: creation of CQ failed",
+ this->xl->name);
+ ret = -1;
+ } else if (ibv_req_notify_cq (device->recv_cq, 0)) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: ibv_req_notify_cq on CQ failed",
+ this->xl->name);
+ ret = -1;
+ }
+
+ do {
+ /* TODO: make send_cq size dynamically adaptive */
+ device->send_cq = ibv_create_cq (priv->device->context,
+ options->send_count * 1024,
+ device,
+ device->send_chan,
+ 0);
+ if (!device->send_cq) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: creation of send_cq failed",
+ this->xl->name);
+ ret = -1;
+ break;
+ }
+
+ if (ibv_req_notify_cq (device->send_cq, 0)) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: ibv_req_notify_cq on send_cq failed",
+ this->xl->name);
+ ret = -1;
+ break;
+ }
+ } while (0);
+
+ if (ret != 0)
+ ib_verbs_destroy_cq (this);
+
+ return ret;
+}
+
+
+static void
+ib_verbs_register_peer (ib_verbs_device_t *device,
+ int32_t qp_num,
+ ib_verbs_peer_t *peer)
+{
+ struct _qpent *ent;
+ ib_verbs_qpreg_t *qpreg = &device->qpreg;
+ int32_t hash = qp_num % 42;
+
+ pthread_mutex_lock (&qpreg->lock);
+ ent = qpreg->ents[hash].next;
+ while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num))
+ ent = ent->next;
+ if (ent->qp_num == qp_num) {
+ pthread_mutex_unlock (&qpreg->lock);
+ return;
+ }
+ ent = (struct _qpent *) GF_CALLOC (1, sizeof (*ent), gf_ibv_mt_qpent);
+ ERR_ABORT (ent);
+ /* TODO: ref reg->peer */
+ ent->peer = peer;
+ ent->next = &qpreg->ents[hash];
+ ent->prev = ent->next->prev;
+ ent->next->prev = ent;
+ ent->prev->next = ent;
+ ent->qp_num = qp_num;
+ qpreg->count++;
+ pthread_mutex_unlock (&qpreg->lock);
+}
+
+
+static void
+ib_verbs_unregister_peer (ib_verbs_device_t *device,
+ int32_t qp_num)
+{
+ struct _qpent *ent;
+ ib_verbs_qpreg_t *qpreg = &device->qpreg;
+ int32_t hash = qp_num % 42;
+
+ pthread_mutex_lock (&qpreg->lock);
+ ent = qpreg->ents[hash].next;
+ while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num))
+ ent = ent->next;
+ if (ent->qp_num != qp_num) {
+ pthread_mutex_unlock (&qpreg->lock);
+ return;
+ }
+ ent->prev->next = ent->next;
+ ent->next->prev = ent->prev;
+ /* TODO: unref reg->peer */
+ GF_FREE (ent);
+ qpreg->count--;
+ pthread_mutex_unlock (&qpreg->lock);
+}
+
+
+static ib_verbs_peer_t *
+__ib_verbs_lookup_peer (ib_verbs_device_t *device, int32_t qp_num)
+{
+ struct _qpent *ent = NULL;
+ ib_verbs_peer_t *peer = NULL;
+ ib_verbs_qpreg_t *qpreg = NULL;
+ int32_t hash = 0;
+
+ qpreg = &device->qpreg;
+ hash = qp_num % 42;
+ ent = qpreg->ents[hash].next;
+ while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num))
+ ent = ent->next;
+
+ if (ent != &qpreg->ents[hash]) {
+ peer = ent->peer;
+ }
+
+ return peer;
+}
+
+/*
+static ib_verbs_peer_t *
+ib_verbs_lookup_peer (ib_verbs_device_t *device,
+ int32_t qp_num)
+{
+ ib_verbs_qpreg_t *qpreg = NULL;
+ ib_verbs_peer_t *peer = NULL;
+
+ qpreg = &device->qpreg;
+ pthread_mutex_lock (&qpreg->lock);
+ {
+ peer = __ib_verbs_lookup_peer (device, qp_num);
+ }
+ pthread_mutex_unlock (&qpreg->lock);
+
+ return peer;
+}
+*/
+
+
+static void
+__ib_verbs_destroy_qp (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+
+ if (priv->peer.qp) {
+ ib_verbs_unregister_peer (priv->device, priv->peer.qp->qp_num);
+ ibv_destroy_qp (priv->peer.qp);
+ }
+ priv->peer.qp = NULL;
+
+ return;
+}
+
+
+static int32_t
+ib_verbs_create_qp (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = &priv->options;
+ ib_verbs_device_t *device = priv->device;
+ int32_t ret = 0;
+ ib_verbs_peer_t *peer;
+
+ peer = &priv->peer;
+ struct ibv_qp_init_attr init_attr = {
+ .send_cq = device->send_cq,
+ .recv_cq = device->recv_cq,
+ .srq = device->srq,
+ .cap = {
+ .max_send_wr = peer->send_count,
+ .max_recv_wr = peer->recv_count,
+ .max_send_sge = 1,
+ .max_recv_sge = 1
+ },
+ .qp_type = IBV_QPT_RC
+ };
+
+ struct ibv_qp_attr attr = {
+ .qp_state = IBV_QPS_INIT,
+ .pkey_index = 0,
+ .port_num = options->port,
+ .qp_access_flags = 0
+ };
+
+ peer->qp = ibv_create_qp (device->pd, &init_attr);
+ if (!peer->qp) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_CRITICAL,
+ "%s: could not create QP",
+ this->xl->name);
+ ret = -1;
+ goto out;
+ } else if (ibv_modify_qp (peer->qp, &attr,
+ IBV_QP_STATE |
+ IBV_QP_PKEY_INDEX |
+ IBV_QP_PORT |
+ IBV_QP_ACCESS_FLAGS)) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: failed to modify QP to INIT state",
+ this->xl->name);
+ ret = -1;
+ goto out;
+ }
+
+ peer->local_lid = ib_verbs_get_local_lid (device->context,
+ options->port);
+ peer->local_qpn = peer->qp->qp_num;
+ peer->local_psn = lrand48 () & 0xffffff;
+
+ ib_verbs_register_peer (device, peer->qp->qp_num, peer);
+
+out:
+ if (ret == -1)
+ __ib_verbs_destroy_qp (this);
+
+ return ret;
+}
+
+
+static void
+ib_verbs_destroy_posts (transport_t *this)
+{
+
+}
+
+
+static int32_t
+__ib_verbs_create_posts (transport_t *this,
+ int32_t count,
+ int32_t size,
+ ib_verbs_queue_t *q)
+{
+ int32_t i;
+ int32_t ret = 0;
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_device_t *device = priv->device;
+
+ for (i=0 ; i<count ; i++) {
+ ib_verbs_post_t *post;
+
+ post = ib_verbs_new_post (device, size + 2048);
+ if (!post) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: post creation failed",
+ this->xl->name);
+ ret = -1;
+ break;
+ }
+
+ ib_verbs_put_post (q, post);
+ }
+ return ret;
+}
+
+
+static int32_t
+ib_verbs_create_posts (transport_t *this)
+{
+ int32_t i, ret;
+ ib_verbs_post_t *post = NULL;
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = &priv->options;
+ ib_verbs_device_t *device = priv->device;
+
+ ret = __ib_verbs_create_posts (this, options->send_count,
+ options->send_size,
+ &device->sendq);
+ if (!ret)
+ ret = __ib_verbs_create_posts (this, options->recv_count,
+ options->recv_size,
+ &device->recvq);
+
+ if (!ret) {
+ for (i=0 ; i<options->recv_count ; i++) {
+ post = ib_verbs_get_post (&device->recvq);
+ if (ib_verbs_post_recv (device->srq, post) != 0) {
+ ret = -1;
+ break;
+ }
+ }
+ }
+
+ if (ret)
+ ib_verbs_destroy_posts (this);
+
+ return ret;
+}
+
+
+static int32_t
+ib_verbs_connect_qp (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = &priv->options;
+ struct ibv_qp_attr attr = {
+ .qp_state = IBV_QPS_RTR,
+ .path_mtu = options->mtu,
+ .dest_qp_num = priv->peer.remote_qpn,
+ .rq_psn = priv->peer.remote_psn,
+ .max_dest_rd_atomic = 1,
+ .min_rnr_timer = 12,
+ .ah_attr = {
+ .is_global = 0,
+ .dlid = priv->peer.remote_lid,
+ .sl = 0,
+ .src_path_bits = 0,
+ .port_num = options->port
+ }
+ };
+ if (ibv_modify_qp (priv->peer.qp, &attr,
+ IBV_QP_STATE |
+ IBV_QP_AV |
+ IBV_QP_PATH_MTU |
+ IBV_QP_DEST_QPN |
+ IBV_QP_RQ_PSN |
+ IBV_QP_MAX_DEST_RD_ATOMIC |
+ IBV_QP_MIN_RNR_TIMER)) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_CRITICAL,
+ "Failed to modify QP to RTR\n");
+ return -1;
+ }
+
+ /* TODO: make timeout and retry_cnt configurable from options */
+ attr.qp_state = IBV_QPS_RTS;
+ attr.timeout = 14;
+ attr.retry_cnt = 7;
+ attr.rnr_retry = 7;
+ attr.sq_psn = priv->peer.local_psn;
+ attr.max_rd_atomic = 1;
+ if (ibv_modify_qp (priv->peer.qp, &attr,
+ IBV_QP_STATE |
+ IBV_QP_TIMEOUT |
+ IBV_QP_RETRY_CNT |
+ IBV_QP_RNR_RETRY |
+ IBV_QP_SQ_PSN |
+ IBV_QP_MAX_QP_RD_ATOMIC)) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_CRITICAL,
+ "Failed to modify QP to RTS\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int32_t
+__ib_verbs_teardown (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+
+ __ib_verbs_destroy_qp (this);
+
+ if (!list_empty (&priv->peer.ioq)) {
+ __ib_verbs_ioq_flush (&priv->peer);
+ }
+
+ /* TODO: decrement cq size */
+ return 0;
+}
+
+/*
+ * return value:
+ * 0 = success (completed)
+ * -1 = error
+ * > 0 = incomplete
+ */
+
+static int
+__tcp_rwv (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count,
+ int write)
+{
+ ib_verbs_private_t *priv = NULL;
+ int sock = -1;
+ int ret = -1;
+ struct iovec *opvector = vector;
+ int opcount = count;
+ int moved = 0;
+
+ priv = this->private;
+ sock = priv->sock;
+
+ while (opcount)
+ {
+ if (write)
+ {
+ ret = writev (sock, opvector, opcount);
+
+ if (ret == 0 || (ret == -1 && errno == EAGAIN))
+ {
+ /* done for now */
+ break;
+ }
+ }
+ else
+ {
+ ret = readv (sock, opvector, opcount);
+
+ if (ret == -1 && errno == EAGAIN)
+ {
+ /* done for now */
+ break;
+ }
+ }
+
+ if (ret == 0)
+ {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "EOF from peer %s", this->peerinfo.identifier);
+ opcount = -1;
+ errno = ENOTCONN;
+ break;
+ }
+
+ if (ret == -1)
+ {
+ if (errno == EINTR)
+ continue;
+
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "%s failed (%s)", write ? "writev" : "readv",
+ strerror (errno));
+ if (write && !priv->connected &&
+ (errno == ECONNREFUSED))
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "possible mismatch of 'transport-type'"
+ " in protocol server and client. "
+ "check volume file");
+ opcount = -1;
+ break;
+ }
+
+ moved = 0;
+
+ while (moved < ret)
+ {
+ if ((ret - moved) >= opvector[0].iov_len)
+ {
+ moved += opvector[0].iov_len;
+ opvector++;
+ opcount--;
+ }
+ else
+ {
+ opvector[0].iov_len -= (ret - moved);
+ opvector[0].iov_base += (ret - moved);
+ moved += (ret - moved);
+ }
+ while (opcount && !opvector[0].iov_len)
+ {
+ opvector++;
+ opcount--;
+ }
+ }
+ }
+
+ if (pending_vector)
+ *pending_vector = opvector;
+
+ if (pending_count)
+ *pending_count = opcount;
+
+ return opcount;
+}
+
+
+static int
+__tcp_readv (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count)
+{
+ int ret = -1;
+
+ ret = __tcp_rwv (this, vector, count,
+ pending_vector, pending_count, 0);
+
+ return ret;
+}
+
+
+static int
+__tcp_writev (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count)
+{
+ int ret = -1;
+ ib_verbs_private_t *priv = this->private;
+
+ ret = __tcp_rwv (this, vector, count, pending_vector,
+ pending_count, 1);
+
+ if (ret > 0) {
+ /* TODO: Avoid multiple calls when socket is already
+ registered for POLLOUT */
+ priv->idx = event_select_on (this->xl->ctx->event_pool,
+ priv->sock, priv->idx, -1, 1);
+ } else if (ret == 0) {
+ priv->idx = event_select_on (this->xl->ctx->event_pool,
+ priv->sock,
+ priv->idx, -1, 0);
+ }
+
+ return ret;
+}
+
+
+static void *
+ib_verbs_recv_completion_proc (void *data)
+{
+ struct ibv_comp_channel *chan = data;
+ ib_verbs_private_t *priv = NULL;
+ ib_verbs_device_t *device;
+ ib_verbs_post_t *post;
+ ib_verbs_peer_t *peer;
+ struct ibv_cq *event_cq;
+ struct ibv_wc wc;
+ void *event_ctx;
+ int32_t ret = 0;
+
+
+ while (1) {
+ ret = ibv_get_cq_event (chan, &event_cq, &event_ctx);
+ if (ret) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "ibv_get_cq_event failed, terminating recv "
+ "thread %d (%d)", ret, errno);
+ continue;
+ }
+
+ device = event_ctx;
+
+ ret = ibv_req_notify_cq (event_cq, 0);
+ if (ret) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "ibv_req_notify_cq on %s failed, terminating "
+ "recv thread: %d (%d)",
+ device->device_name, ret, errno);
+ continue;
+ }
+
+ device = (ib_verbs_device_t *) event_ctx;
+
+ while ((ret = ibv_poll_cq (event_cq, 1, &wc)) > 0) {
+ post = (ib_verbs_post_t *) (long) wc.wr_id;
+
+ pthread_mutex_lock (&device->qpreg.lock);
+ {
+ peer = __ib_verbs_lookup_peer (device,
+ wc.qp_num);
+
+ /*
+ * keep a refcount on transport so that it
+ * doesnot get freed because of some error
+ * indicated by wc.status till we are done
+ * with usage of peer and thereby that of trans.
+ */
+ if (peer != NULL) {
+ transport_ref (peer->trans);
+ }
+ }
+ pthread_mutex_unlock (&device->qpreg.lock);
+
+ if (wc.status != IBV_WC_SUCCESS) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "recv work request on `%s' returned "
+ "error (%d)",
+ device->device_name,
+ wc.status);
+ if (peer) {
+ transport_unref (peer->trans);
+ transport_disconnect (peer->trans);
+ }
+
+ if (post) {
+ ib_verbs_post_recv (device->srq, post);
+ }
+ continue;
+ }
+
+ if (peer) {
+ priv = peer->trans->private;
+
+ pthread_mutex_lock (&priv->recv_mutex);
+ {
+ while (priv->data_ptr)
+ pthread_cond_wait (&priv->recv_cond,
+ &priv->recv_mutex);
+
+ priv->data_ptr = post->buf;
+ priv->data_offset = 0;
+ priv->data_len = wc.byte_len;
+
+ /*pthread_cond_broadcast (&priv->recv_cond);*/
+ }
+ pthread_mutex_unlock (&priv->recv_mutex);
+
+ if ((ret = xlator_notify (peer->trans->xl, GF_EVENT_POLLIN,
+ peer->trans, NULL)) == -1) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_DEBUG,
+ "pollin notification to %s "
+ "failed, disconnecting "
+ "transport",
+ peer->trans->xl->name);
+ transport_disconnect (peer->trans);
+ }
+
+ transport_unref (peer->trans);
+ } else {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_DEBUG,
+ "could not lookup peer for qp_num: %d",
+ wc.qp_num);
+ }
+ ib_verbs_post_recv (device->srq, post);
+ }
+
+ if (ret < 0) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "ibv_poll_cq on `%s' returned error "
+ "(ret = %d, errno = %d)",
+ device->device_name, ret, errno);
+ continue;
+ }
+ ibv_ack_cq_events (event_cq, 1);
+ }
+ return NULL;
+}
+
+
+static void *
+ib_verbs_send_completion_proc (void *data)
+{
+ struct ibv_comp_channel *chan = data;
+ ib_verbs_post_t *post;
+ ib_verbs_peer_t *peer;
+ struct ibv_cq *event_cq;
+ void *event_ctx;
+ ib_verbs_device_t *device;
+ struct ibv_wc wc;
+ int32_t ret;
+
+ while (1) {
+ ret = ibv_get_cq_event (chan, &event_cq, &event_ctx);
+ if (ret) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "ibv_get_cq_event on failed, terminating "
+ "send thread: %d (%d)", ret, errno);
+ continue;
+ }
+
+ device = event_ctx;
+
+ ret = ibv_req_notify_cq (event_cq, 0);
+ if (ret) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "ibv_req_notify_cq on %s failed, terminating "
+ "send thread: %d (%d)",
+ device->device_name, ret, errno);
+ continue;
+ }
+
+ while ((ret = ibv_poll_cq (event_cq, 1, &wc)) > 0) {
+ post = (ib_verbs_post_t *) (long) wc.wr_id;
+
+ pthread_mutex_lock (&device->qpreg.lock);
+ {
+ peer = __ib_verbs_lookup_peer (device,
+ wc.qp_num);
+
+ /*
+ * keep a refcount on transport so that it
+ * doesnot get freed because of some error
+ * indicated by wc.status till we are done
+ * with usage of peer and thereby that of trans.
+ */
+ if (peer != NULL) {
+ transport_ref (peer->trans);
+ }
+ }
+ pthread_mutex_unlock (&device->qpreg.lock);
+
+ if (wc.status != IBV_WC_SUCCESS) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "send work request on `%s' returned "
+ "error wc.status = %d, wc.vendor_err "
+ "= %d, post->buf = %p, wc.byte_len = "
+ "%d, post->reused = %d",
+ device->device_name, wc.status,
+ wc.vendor_err,
+ post->buf, wc.byte_len, post->reused);
+ if (wc.status == IBV_WC_RETRY_EXC_ERR)
+ gf_log ("ib-verbs", GF_LOG_ERROR,
+ "connection between client and"
+ " server not working. check by"
+ " running 'ibv_srq_pingpong'. "
+ "also make sure subnet manager"
+ " is running (eg: 'opensm'), "
+ "or check if ib-verbs port is "
+ "valid (or active) by running "
+ " 'ibv_devinfo'. contact "
+ "Gluster Support Team if "
+ "the problem persists.");
+ if (peer)
+ transport_disconnect (peer->trans);
+ }
+
+ if (post) {
+ ib_verbs_put_post (&device->sendq, post);
+ }
+
+ if (peer) {
+ int quota_ret = ib_verbs_quota_put (peer);
+ if (quota_ret < 0) {
+ gf_log ("ib-verbs", GF_LOG_DEBUG,
+ "failed to send message");
+
+ }
+
+ transport_unref (peer->trans);
+ } else {
+ gf_log ("transport/ib-verbs", GF_LOG_DEBUG,
+ "could not lookup peer for qp_num: %d",
+ wc.qp_num);
+ }
+ }
+
+ if (ret < 0) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "ibv_poll_cq on `%s' returned error (ret = %d,"
+ " errno = %d)",
+ device->device_name, ret, errno);
+ continue;
+ }
+ ibv_ack_cq_events (event_cq, 1);
+ }
+
+ return NULL;
+}
+
+static void
+ib_verbs_options_init (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = &priv->options;
+ int32_t mtu;
+ data_t *temp;
+
+ /* TODO: validate arguments from options below */
+
+ options->send_size = this->xl->ctx->page_size * 4; /* 512 KB */
+ options->recv_size = this->xl->ctx->page_size * 4; /* 512 KB */
+ options->send_count = 32;
+ options->recv_count = 32;
+
+ temp = dict_get (this->xl->options,
+ "transport.ib-verbs.work-request-send-count");
+ if (temp)
+ options->send_count = data_to_int32 (temp);
+
+ temp = dict_get (this->xl->options,
+ "transport.ib-verbs.work-request-recv-count");
+ if (temp)
+ options->recv_count = data_to_int32 (temp);
+
+ options->port = 0;
+ temp = dict_get (this->xl->options,
+ "transport.ib-verbs.port");
+ if (temp)
+ options->port = data_to_uint64 (temp);
+
+ options->mtu = mtu = IBV_MTU_2048;
+ temp = dict_get (this->xl->options,
+ "transport.ib-verbs.mtu");
+ if (temp)
+ mtu = data_to_int32 (temp);
+ switch (mtu) {
+ case 256: options->mtu = IBV_MTU_256;
+ break;
+ case 512: options->mtu = IBV_MTU_512;
+ break;
+ case 1024: options->mtu = IBV_MTU_1024;
+ break;
+ case 2048: options->mtu = IBV_MTU_2048;
+ break;
+ case 4096: options->mtu = IBV_MTU_4096;
+ break;
+ default:
+ if (temp)
+ gf_log ("transport/ib-verbs", GF_LOG_WARNING,
+ "%s: unrecognized MTU value '%s', defaulting "
+ "to '2048'", this->xl->name,
+ data_to_str (temp));
+ else
+ gf_log ("transport/ib-verbs", GF_LOG_TRACE,
+ "%s: defaulting MTU to '2048'",
+ this->xl->name);
+ options->mtu = IBV_MTU_2048;
+ break;
+ }
+
+ temp = dict_get (this->xl->options,
+ "transport.ib-verbs.device-name");
+ if (temp)
+ options->device_name = gf_strdup (temp->data);
+
+ return;
+}
+
+static void
+ib_verbs_queue_init (ib_verbs_queue_t *queue)
+{
+ pthread_mutex_init (&queue->lock, NULL);
+
+ queue->active_posts.next = &queue->active_posts;
+ queue->active_posts.prev = &queue->active_posts;
+ queue->passive_posts.next = &queue->passive_posts;
+ queue->passive_posts.prev = &queue->passive_posts;
+}
+
+
+static ib_verbs_device_t *
+ib_verbs_get_device (transport_t *this,
+ struct ibv_context *ibctx)
+{
+ glusterfs_ctx_t *ctx = this->xl->ctx;
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = &priv->options;
+ char *device_name = priv->options.device_name;
+ uint32_t port = priv->options.port;
+
+ uint8_t active_port = 0;
+ int32_t ret = 0;
+ int32_t i = 0;
+
+ ib_verbs_device_t *trav;
+
+ trav = ctx->ib;
+ while (trav) {
+ if ((!strcmp (trav->device_name, device_name)) &&
+ (trav->port == port))
+ break;
+ trav = trav->next;
+ }
+
+ if (!trav) {
+
+ trav = GF_CALLOC (1, sizeof (*trav),
+ gf_ibv_mt_ib_verbs_device_t);
+ ERR_ABORT (trav);
+ priv->device = trav;
+
+ trav->context = ibctx;
+
+ ret = ib_get_active_port (trav->context);
+
+ if (ret < 0) {
+ if (!port) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "Failed to find any active ports and "
+ "none specified in volume file,"
+ " exiting");
+ return NULL;
+ }
+ }
+
+ active_port = ret;
+
+ if (port) {
+ ret = ib_check_active_port (trav->context, port);
+ if (ret < 0) {
+ gf_log ("transport/ib-verbs", GF_LOG_WARNING,
+ "On device %s: provided port:%u is "
+ "found to be offline, continuing to "
+ "use the same port", device_name, port);
+ }
+ } else {
+ priv->options.port = active_port;
+ port = active_port;
+ gf_log ("transport/ib-verbs", GF_LOG_TRACE,
+ "Port unspecified in volume file using active "
+ "port: %u", port);
+ }
+
+ trav->device_name = gf_strdup (device_name);
+ trav->port = port;
+
+ trav->next = ctx->ib;
+ ctx->ib = trav;
+
+ trav->send_chan = ibv_create_comp_channel (trav->context);
+ if (!trav->send_chan) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: could not create send completion channel",
+ device_name);
+ /* TODO: cleanup current mess */
+ return NULL;
+ }
+
+ trav->recv_chan = ibv_create_comp_channel (trav->context);
+ if (!trav->recv_chan) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "could not create recv completion channel");
+ /* TODO: cleanup current mess */
+ return NULL;
+ }
+
+ if (ib_verbs_create_cq (this) < 0) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: could not create CQ",
+ this->xl->name);
+ return NULL;
+ }
+
+ /* protection domain */
+ trav->pd = ibv_alloc_pd (trav->context);
+
+ if (!trav->pd) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: could not allocate protection domain",
+ this->xl->name);
+ return NULL;
+ }
+
+ struct ibv_srq_init_attr attr = {
+ .attr = {
+ .max_wr = options->recv_count,
+ .max_sge = 1
+ }
+ };
+ trav->srq = ibv_create_srq (trav->pd, &attr);
+
+ if (!trav->srq) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: could not create SRQ",
+ this->xl->name);
+ return NULL;
+ }
+
+ /* queue init */
+ ib_verbs_queue_init (&trav->sendq);
+ ib_verbs_queue_init (&trav->recvq);
+
+ if (ib_verbs_create_posts (this) < 0) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: could not allocate posts",
+ this->xl->name);
+ return NULL;
+ }
+
+ /* completion threads */
+ ret = pthread_create (&trav->send_thread,
+ NULL,
+ ib_verbs_send_completion_proc,
+ trav->send_chan);
+ if (ret) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "could not create send completion thread");
+ return NULL;
+ }
+ ret = pthread_create (&trav->recv_thread,
+ NULL,
+ ib_verbs_recv_completion_proc,
+ trav->recv_chan);
+ if (ret) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "could not create recv completion thread");
+ return NULL;
+ }
+
+ /* qpreg */
+ pthread_mutex_init (&trav->qpreg.lock, NULL);
+ for (i=0; i<42; i++) {
+ trav->qpreg.ents[i].next = &trav->qpreg.ents[i];
+ trav->qpreg.ents[i].prev = &trav->qpreg.ents[i];
+ }
+ }
+ return trav;
+}
+
+static int32_t
+ib_verbs_init (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = &priv->options;
+ struct ibv_device **dev_list;
+ struct ibv_context *ib_ctx = NULL;
+ int32_t ret = 0;
+
+ ib_verbs_options_init (this);
+
+ {
+ dev_list = ibv_get_device_list (NULL);
+
+ if (!dev_list) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_CRITICAL,
+ "Failed to get IB devices");
+ ret = -1;
+ goto cleanup;
+ }
+
+ if (!*dev_list) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_CRITICAL,
+ "No IB devices found");
+ ret = -1;
+ goto cleanup;
+ }
+
+ if (!options->device_name) {
+ if (*dev_list) {
+ options->device_name =
+ gf_strdup (ibv_get_device_name (*dev_list));
+ } else {
+ gf_log ("transport/ib-verbs", GF_LOG_CRITICAL,
+ "IB device list is empty. Check for "
+ "'ib_uverbs' module");
+ return -1;
+ goto cleanup;
+ }
+ }
+
+ while (*dev_list) {
+ if (!strcmp (ibv_get_device_name (*dev_list),
+ options->device_name)) {
+ ib_ctx = ibv_open_device (*dev_list);
+
+ if (!ib_ctx) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "Failed to get infiniband"
+ "device context");
+ ret = -1;
+ goto cleanup;
+ }
+ break;
+ }
+ ++dev_list;
+ }
+
+ priv->device = ib_verbs_get_device (this, ib_ctx);
+
+ if (!priv->device) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "could not create ib_verbs device for %s",
+ priv->device->device_name);
+ ret = -1;
+ goto cleanup;
+ }
+ }
+
+ priv->peer.trans = this;
+ INIT_LIST_HEAD (&priv->peer.ioq);
+
+ pthread_mutex_init (&priv->read_mutex, NULL);
+ pthread_mutex_init (&priv->write_mutex, NULL);
+ pthread_mutex_init (&priv->recv_mutex, NULL);
+ pthread_cond_init (&priv->recv_cond, NULL);
+
+cleanup:
+ if (-1 == ret) {
+ if (ib_ctx)
+ ibv_close_device (ib_ctx);
+ }
+
+ if (dev_list)
+ ibv_free_device_list (dev_list);
+
+ return ret;
+}
+
+
+static int32_t
+ib_verbs_disconnect (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ int32_t ret = 0;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ ret = __ib_verbs_disconnect (this);
+ }
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ return ret;
+}
+
+
+static int32_t
+__tcp_connect_finish (int fd)
+{
+ int ret = -1;
+ int optval = 0;
+ socklen_t optlen = sizeof (int);
+
+ ret = getsockopt (fd, SOL_SOCKET, SO_ERROR,
+ (void *)&optval, &optlen);
+
+ if (ret == 0 && optval)
+ {
+ errno = optval;
+ ret = -1;
+ }
+
+ return ret;
+}
+
+static inline void
+ib_verbs_fill_handshake_data (char *buf, struct ib_verbs_nbio *nbio,
+ ib_verbs_private_t *priv)
+{
+ sprintf (buf,
+ "QP1:RECV_BLKSIZE=%08x:SEND_BLKSIZE=%08x\n"
+ "QP1:LID=%04x:QPN=%06x:PSN=%06x\n",
+ priv->peer.recv_size,
+ priv->peer.send_size,
+ priv->peer.local_lid,
+ priv->peer.local_qpn,
+ priv->peer.local_psn);
+
+ nbio->vector.iov_base = buf;
+ nbio->vector.iov_len = strlen (buf) + 1;
+ nbio->count = 1;
+ return;
+}
+
+static inline void
+ib_verbs_fill_handshake_ack (char *buf, struct ib_verbs_nbio *nbio)
+{
+ sprintf (buf, "DONE\n");
+ nbio->vector.iov_base = buf;
+ nbio->vector.iov_len = strlen (buf) + 1;
+ nbio->count = 1;
+ return;
+}
+
+static int
+ib_verbs_handshake_pollin (transport_t *this)
+{
+ int ret = 0;
+ ib_verbs_private_t *priv = this->private;
+ char *buf = priv->handshake.incoming.buf;
+ int32_t recv_buf_size, send_buf_size;
+ socklen_t sock_len;
+
+ if (priv->handshake.incoming.state == IB_VERBS_HANDSHAKE_COMPLETE) {
+ return -1;
+ }
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ while (priv->handshake.incoming.state != IB_VERBS_HANDSHAKE_COMPLETE)
+ {
+ switch (priv->handshake.incoming.state)
+ {
+ case IB_VERBS_HANDSHAKE_START:
+ buf = priv->handshake.incoming.buf = GF_CALLOC (1, 256, gf_ibv_mt_char);
+ ib_verbs_fill_handshake_data (buf, &priv->handshake.incoming, priv);
+ buf[0] = 0;
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVING_DATA;
+ break;
+
+ case IB_VERBS_HANDSHAKE_RECEIVING_DATA:
+ ret = __tcp_readv (this,
+ &priv->handshake.incoming.vector,
+ priv->handshake.incoming.count,
+ &priv->handshake.incoming.pending_vector,
+ &priv->handshake.incoming.pending_count);
+ if (ret == -1) {
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "partial header read on NB socket. continue later");
+ goto unlock;
+ }
+
+ if (!ret) {
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVED_DATA;
+ }
+ break;
+
+ case IB_VERBS_HANDSHAKE_RECEIVED_DATA:
+ ret = sscanf (buf,
+ "QP1:RECV_BLKSIZE=%08x:SEND_BLKSIZE=%08x\n"
+ "QP1:LID=%04x:QPN=%06x:PSN=%06x\n",
+ &recv_buf_size,
+ &send_buf_size,
+ &priv->peer.remote_lid,
+ &priv->peer.remote_qpn,
+ &priv->peer.remote_psn);
+
+ if ((ret != 5) && (strncmp (buf, "QP1:", 4))) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_CRITICAL,
+ "%s: remote-host(%s)'s "
+ "transport type is different",
+ this->xl->name,
+ this->peerinfo.identifier);
+ ret = -1;
+ goto unlock;
+ }
+
+ if (recv_buf_size < priv->peer.recv_size)
+ priv->peer.recv_size = recv_buf_size;
+ if (send_buf_size < priv->peer.send_size)
+ priv->peer.send_size = send_buf_size;
+
+ gf_log ("transport/ib-verbs", GF_LOG_TRACE,
+ "%s: transacted recv_size=%d "
+ "send_size=%d",
+ this->xl->name, priv->peer.recv_size,
+ priv->peer.send_size);
+
+ priv->peer.quota = priv->peer.send_count;
+
+ if (ib_verbs_connect_qp (this)) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: failed to connect with "
+ "remote QP", this->xl->name);
+ ret = -1;
+ goto unlock;
+ }
+ ib_verbs_fill_handshake_ack (buf, &priv->handshake.incoming);
+ buf[0] = 0;
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVING_ACK;
+ break;
+
+ case IB_VERBS_HANDSHAKE_RECEIVING_ACK:
+ ret = __tcp_readv (this,
+ &priv->handshake.incoming.vector,
+ priv->handshake.incoming.count,
+ &priv->handshake.incoming.pending_vector,
+ &priv->handshake.incoming.pending_count);
+ if (ret == -1) {
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "partial header read on NB "
+ "socket. continue later");
+ goto unlock;
+ }
+
+ if (!ret) {
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVED_ACK;
+ }
+ break;
+
+ case IB_VERBS_HANDSHAKE_RECEIVED_ACK:
+ if (strncmp (buf, "DONE", 4)) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_DEBUG,
+ "%s: handshake-3 did not "
+ "return 'DONE' (%s)",
+ this->xl->name, buf);
+ ret = -1;
+ goto unlock;
+ }
+ ret = 0;
+ priv->connected = 1;
+ sock_len = sizeof (struct sockaddr_storage);
+ getpeername (priv->sock,
+ (struct sockaddr *) &this->peerinfo.sockaddr,
+ &sock_len);
+
+ GF_FREE (priv->handshake.incoming.buf);
+ priv->handshake.incoming.buf = NULL;
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_COMPLETE;
+ }
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ if (ret == -1) {
+ transport_disconnect (this);
+ } else {
+ ret = 0;
+ }
+
+ if (!ret && priv->connected) {
+ ret = xlator_notify (this->xl, GF_EVENT_CHILD_UP, this);
+ }
+
+ return ret;
+}
+
+static int
+ib_verbs_handshake_pollout (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ char *buf = priv->handshake.outgoing.buf;
+ int32_t ret = 0;
+
+ if (priv->handshake.outgoing.state == IB_VERBS_HANDSHAKE_COMPLETE) {
+ return 0;
+ }
+
+ pthread_mutex_unlock (&priv->write_mutex);
+ {
+ while (priv->handshake.outgoing.state != IB_VERBS_HANDSHAKE_COMPLETE)
+ {
+ switch (priv->handshake.outgoing.state)
+ {
+ case IB_VERBS_HANDSHAKE_START:
+ buf = priv->handshake.outgoing.buf = GF_CALLOC (1, 256, gf_ibv_mt_char);
+ ib_verbs_fill_handshake_data (buf, &priv->handshake.outgoing, priv);
+ priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_SENDING_DATA;
+ break;
+
+ case IB_VERBS_HANDSHAKE_SENDING_DATA:
+ ret = __tcp_writev (this,
+ &priv->handshake.outgoing.vector,
+ priv->handshake.outgoing.count,
+ &priv->handshake.outgoing.pending_vector,
+ &priv->handshake.outgoing.pending_count);
+ if (ret == -1) {
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "partial header read on NB socket. continue later");
+ goto unlock;
+ }
+
+ if (!ret) {
+ priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_SENT_DATA;
+ }
+ break;
+
+ case IB_VERBS_HANDSHAKE_SENT_DATA:
+ ib_verbs_fill_handshake_ack (buf, &priv->handshake.outgoing);
+ priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_SENDING_ACK;
+ break;
+
+ case IB_VERBS_HANDSHAKE_SENDING_ACK:
+ ret = __tcp_writev (this,
+ &priv->handshake.outgoing.vector,
+ priv->handshake.outgoing.count,
+ &priv->handshake.outgoing.pending_vector,
+ &priv->handshake.outgoing.pending_count);
+
+ if (ret == -1) {
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "partial header read on NB "
+ "socket. continue later");
+ goto unlock;
+ }
+
+ if (!ret) {
+ GF_FREE (priv->handshake.outgoing.buf);
+ priv->handshake.outgoing.buf = NULL;
+ priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_COMPLETE;
+ }
+ break;
+ }
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ if (ret == -1) {
+ transport_disconnect (this);
+ } else {
+ ret = 0;
+ }
+
+ return ret;
+}
+
+static int
+ib_verbs_handshake_pollerr (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ int32_t ret = 0;
+ char need_unref = 0;
+
+ gf_log ("transport/ib-verbs", GF_LOG_DEBUG,
+ "%s: peer disconnected, cleaning up",
+ this->xl->name);
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ __ib_verbs_teardown (this);
+
+ if (priv->sock != -1) {
+ event_unregister (this->xl->ctx->event_pool,
+ priv->sock, priv->idx);
+ need_unref = 1;
+
+ if (close (priv->sock) != 0) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "close () - error: %s",
+ strerror (errno));
+ ret = -errno;
+ }
+ priv->tcp_connected = priv->connected = 0;
+ priv->sock = -1;
+ }
+
+ if (priv->handshake.incoming.buf) {
+ GF_FREE (priv->handshake.incoming.buf);
+ priv->handshake.incoming.buf = NULL;
+ }
+
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_START;
+
+ if (priv->handshake.outgoing.buf) {
+ GF_FREE (priv->handshake.outgoing.buf);
+ priv->handshake.outgoing.buf = NULL;
+ }
+
+ priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_START;
+ }
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ xlator_notify (this->xl, GF_EVENT_POLLERR, this, NULL);
+
+ if (need_unref)
+ transport_unref (this);
+
+ return 0;
+}
+
+
+static int
+tcp_connect_finish (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ int error = 0, ret = 0;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ ret = __tcp_connect_finish (priv->sock);
+
+ if (!ret) {
+ this->myinfo.sockaddr_len =
+ sizeof (this->myinfo.sockaddr);
+ ret = getsockname (priv->sock,
+ (struct sockaddr *)&this->myinfo.sockaddr,
+ &this->myinfo.sockaddr_len);
+ if (ret == -1)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "getsockname on new client-socket %d "
+ "failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ error = 1;
+ goto unlock;
+ }
+
+ get_transport_identifiers (this);
+ priv->tcp_connected = 1;
+ }
+
+ if (ret == -1 && errno != EINPROGRESS) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "tcp connect to %s failed (%s)",
+ this->peerinfo.identifier, strerror (errno));
+ error = 1;
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ if (error) {
+ transport_disconnect (this);
+ }
+
+ return ret;
+}
+
+static int
+ib_verbs_event_handler (int fd, int idx, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ transport_t *this = data;
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = NULL;
+ int ret = 0;
+
+ if (!priv->tcp_connected) {
+ ret = tcp_connect_finish (this);
+ if (priv->tcp_connected) {
+ options = &priv->options;
+
+ priv->peer.send_count = options->send_count;
+ priv->peer.recv_count = options->recv_count;
+ priv->peer.send_size = options->send_size;
+ priv->peer.recv_size = options->recv_size;
+
+ if ((ret = ib_verbs_create_qp (this)) < 0) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: could not create QP",
+ this->xl->name);
+ transport_disconnect (this);
+ }
+ }
+ }
+
+ if (!ret && poll_out && priv->tcp_connected) {
+ ret = ib_verbs_handshake_pollout (this);
+ }
+
+ if (!ret && poll_in && priv->tcp_connected) {
+ if (priv->handshake.incoming.state == IB_VERBS_HANDSHAKE_COMPLETE) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: pollin received on tcp socket (peer: %s) "
+ "after handshake is complete",
+ this->xl->name, this->peerinfo.identifier);
+ ib_verbs_handshake_pollerr (this);
+ return 0;
+ }
+ ret = ib_verbs_handshake_pollin (this);
+ }
+
+ if (ret < 0 || poll_err) {
+ ret = ib_verbs_handshake_pollerr (this);
+ }
+
+ return 0;
+}
+
+static int
+__tcp_nonblock (int fd)
+{
+ int flags = 0;
+ int ret = -1;
+
+ flags = fcntl (fd, F_GETFL);
+
+ if (flags != -1)
+ ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK);
+
+ return ret;
+}
+
+static int32_t
+ib_verbs_connect (struct transport *this)
+{
+ dict_t *options = this->xl->options;
+
+ ib_verbs_private_t *priv = this->private;
+
+ int32_t ret = 0;
+ gf_boolean_t non_blocking = 1;
+ struct sockaddr_storage sockaddr;
+ socklen_t sockaddr_len = 0;
+
+ if (priv->connected) {
+ return 0;
+ }
+
+ if (dict_get (options, "non-blocking-io")) {
+ char *nb_connect = data_to_str (dict_get (this->xl->options,
+ "non-blocking-io"));
+
+ if (gf_string2boolean (nb_connect, &non_blocking) == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "'non-blocking-io' takes only boolean "
+ "options, not taking any action");
+ non_blocking = 1;
+ }
+ }
+
+ ret = ibverbs_client_get_remote_sockaddr (this, (struct sockaddr *)&sockaddr,
+ &sockaddr_len);
+ if (ret != 0) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "cannot get remote address to connect");
+ return ret;
+ }
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ if (priv->sock != -1) {
+ ret = 0;
+ goto unlock;
+ }
+
+ priv->sock = socket (((struct sockaddr *)&sockaddr)->sa_family,
+ SOCK_STREAM, 0);
+
+ if (priv->sock == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket () - error: %s", strerror (errno));
+ ret = -errno;
+ goto unlock;
+ }
+
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "socket fd = %d", priv->sock);
+
+ memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len);
+ this->peerinfo.sockaddr_len = sockaddr_len;
+
+ ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family =
+ ((struct sockaddr *)&this->peerinfo.sockaddr)->sa_family;
+
+ if (non_blocking)
+ {
+ ret = __tcp_nonblock (priv->sock);
+
+ if (ret == -1)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "could not set socket %d to non "
+ "blocking mode (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ ret = client_bind (this,
+ (struct sockaddr *)&this->myinfo.sockaddr,
+ &this->myinfo.sockaddr_len, priv->sock);
+ if (ret == -1)
+ {
+ gf_log (this->xl->name, GF_LOG_WARNING,
+ "client bind failed: %s", strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ ret = connect (priv->sock,
+ (struct sockaddr *)&this->peerinfo.sockaddr,
+ this->peerinfo.sockaddr_len);
+ if (ret == -1 && errno != EINPROGRESS)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connection attempt failed (%s)",
+ strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ priv->tcp_connected = priv->connected = 0;
+
+ transport_ref (this);
+
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_START;
+ priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_START;
+
+ priv->idx = event_register (this->xl->ctx->event_pool,
+ priv->sock, ib_verbs_event_handler,
+ this, 1, 1);
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ return ret;
+}
+
+static int
+ib_verbs_server_event_handler (int fd, int idx, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ int32_t main_sock = -1;
+ transport_t *this, *trans = data;
+ ib_verbs_private_t *priv = NULL;
+ ib_verbs_private_t *trans_priv = (ib_verbs_private_t *) trans->private;
+ ib_verbs_options_t *options = NULL;
+
+ if (!poll_in)
+ return 0;
+
+ this = GF_CALLOC (1, sizeof (transport_t),
+ gf_ibv_mt_transport_t);
+ ERR_ABORT (this);
+ priv = GF_CALLOC (1, sizeof (ib_verbs_private_t),
+ gf_ibv_mt_ib_verbs_private_t);
+ ERR_ABORT (priv);
+ this->private = priv;
+ /* Copy all the ib_verbs related values in priv, from trans_priv
+ as other than QP, all the values remain same */
+ priv->device = trans_priv->device;
+ priv->options = trans_priv->options;
+ options = &priv->options;
+
+ this->ops = trans->ops;
+ this->xl = trans->xl;
+ this->init = trans->init;
+ this->fini = trans->fini;
+
+ memcpy (&this->myinfo.sockaddr, &trans->myinfo.sockaddr,
+ trans->myinfo.sockaddr_len);
+ this->myinfo.sockaddr_len = trans->myinfo.sockaddr_len;
+
+ main_sock = (trans_priv)->sock;
+ this->peerinfo.sockaddr_len = sizeof (this->peerinfo.sockaddr);
+ priv->sock = accept (main_sock,
+ (struct sockaddr *)&this->peerinfo.sockaddr,
+ &this->peerinfo.sockaddr_len);
+ if (priv->sock == -1) {
+ gf_log ("ib-verbs/server", GF_LOG_ERROR,
+ "accept() failed: %s",
+ strerror (errno));
+ GF_FREE (this->private);
+ GF_FREE (this);
+ return -1;
+ }
+
+ priv->peer.trans = this;
+ transport_ref (this);
+
+ get_transport_identifiers (this);
+
+ priv->tcp_connected = 1;
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_START;
+ priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_START;
+
+ priv->peer.send_count = options->send_count;
+ priv->peer.recv_count = options->recv_count;
+ priv->peer.send_size = options->send_size;
+ priv->peer.recv_size = options->recv_size;
+ INIT_LIST_HEAD (&priv->peer.ioq);
+
+ if (ib_verbs_create_qp (this) < 0) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: could not create QP",
+ this->xl->name);
+ transport_disconnect (this);
+ return -1;
+ }
+
+ priv->idx = event_register (this->xl->ctx->event_pool, priv->sock,
+ ib_verbs_event_handler, this, 1, 1);
+
+ pthread_mutex_init (&priv->read_mutex, NULL);
+ pthread_mutex_init (&priv->write_mutex, NULL);
+ pthread_mutex_init (&priv->recv_mutex, NULL);
+ /* pthread_cond_init (&priv->recv_cond, NULL); */
+
+ return 0;
+}
+
+static int32_t
+ib_verbs_listen (transport_t *this)
+{
+ struct sockaddr_storage sockaddr;
+ socklen_t sockaddr_len;
+ ib_verbs_private_t *priv = this->private;
+ int opt = 1, ret = 0;
+ char service[NI_MAXSERV], host[NI_MAXHOST];
+
+ memset (&sockaddr, 0, sizeof (sockaddr));
+ ret = ibverbs_server_get_local_sockaddr (this,
+ (struct sockaddr *)&sockaddr,
+ &sockaddr_len);
+ if (ret != 0) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "cannot find network address of server to bind to");
+ goto err;
+ }
+
+ priv->sock = socket (((struct sockaddr *)&sockaddr)->sa_family,
+ SOCK_STREAM, 0);
+ if (priv->sock == -1) {
+ gf_log ("ib-verbs/server", GF_LOG_CRITICAL,
+ "init: failed to create socket, error: %s",
+ strerror (errno));
+ GF_FREE (this->private);
+ ret = -1;
+ goto err;
+ }
+
+ memcpy (&this->myinfo.sockaddr, &sockaddr, sockaddr_len);
+ this->myinfo.sockaddr_len = sockaddr_len;
+
+ ret = getnameinfo ((struct sockaddr *)&this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len,
+ host, sizeof (host),
+ service, sizeof (service),
+ NI_NUMERICHOST);
+ if (ret != 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "getnameinfo failed (%s)", gai_strerror (ret));
+ goto err;
+ }
+ sprintf (this->myinfo.identifier, "%s:%s", host, service);
+
+ setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt));
+ if (bind (priv->sock,
+ (struct sockaddr *)&sockaddr,
+ sockaddr_len) != 0) {
+ ret = -1;
+ gf_log ("ib-verbs/server", GF_LOG_ERROR,
+ "init: failed to bind to socket for %s (%s)",
+ this->myinfo.identifier, strerror (errno));
+ goto err;
+ }
+
+ if (listen (priv->sock, 10) != 0) {
+ gf_log ("ib-verbs/server", GF_LOG_ERROR,
+ "init: listen () failed on socket for %s (%s)",
+ this->myinfo.identifier, strerror (errno));
+ ret = -1;
+ goto err;
+ }
+
+ /* Register the main socket */
+ priv->idx = event_register (this->xl->ctx->event_pool, priv->sock,
+ ib_verbs_server_event_handler,
+ transport_ref (this), 1, 0);
+
+err:
+ return ret;
+}
+
+struct transport_ops tops = {
+ .receive = ib_verbs_receive,
+ .submit = ib_verbs_submit,
+ .connect = ib_verbs_connect,
+ .disconnect = ib_verbs_disconnect,
+ .listen = ib_verbs_listen,
+};
+
+int32_t
+init (transport_t *this)
+{
+ ib_verbs_private_t *priv = GF_CALLOC (1, sizeof (*priv),
+ gf_ibv_mt_ib_verbs_private_t);
+ this->private = priv;
+ priv->sock = -1;
+
+ if (ib_verbs_init (this)) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "Failed to initialize IB Device");
+ return -1;
+ }
+
+ return 0;
+}
+
+void
+fini (struct transport *this)
+{
+ /* TODO: verify this function does graceful finish */
+ ib_verbs_private_t *priv = this->private;
+ this->private = NULL;
+
+ pthread_mutex_destroy (&priv->recv_mutex);
+ pthread_mutex_destroy (&priv->write_mutex);
+ pthread_mutex_destroy (&priv->read_mutex);
+ /* pthread_cond_destroy (&priv->recv_cond); */
+
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "called fini on transport: %p",
+ this);
+ GF_FREE (priv);
+ return;
+}
+
+int32_t
+mem_acct_init (xlator_t *this)
+{
+ int ret = -1;
+
+ if (!this)
+ return ret;
+
+ ret = xlator_mem_acct_init (this, gf_common_mt_end + 1);
+
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR, "Memory accounting init"
+ "failed");
+ return ret;
+ }
+
+ return ret;
+}
+
+/* TODO: expand each option */
+struct volume_options options[] = {
+ { .key = {"transport.ib-verbs.port",
+ "ib-verbs-port"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 1,
+ .max = 4,
+ .description = "check the option by 'ibv_devinfo'"
+ },
+ { .key = {"transport.ib-verbs.mtu",
+ "ib-verbs-mtu"},
+ .type = GF_OPTION_TYPE_INT,
+ },
+ { .key = {"transport.ib-verbs.device-name",
+ "ib-verbs-device-name"},
+ .type = GF_OPTION_TYPE_ANY,
+ .description = "check by 'ibv_devinfo'"
+ },
+ { .key = {"transport.ib-verbs.work-request-send-count",
+ "ib-verbs-work-request-send-count"},
+ .type = GF_OPTION_TYPE_INT,
+ },
+ { .key = {"transport.ib-verbs.work-request-recv-count",
+ "ib-verbs-work-request-recv-count"},
+ .type = GF_OPTION_TYPE_INT,
+ },
+ { .key = {"remote-port",
+ "transport.remote-port",
+ "transport.ib-verbs.remote-port"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.ib-verbs.listen-port", "listen-port"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.ib-verbs.connect-path", "connect-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.ib-verbs.bind-path", "bind-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.ib-verbs.listen-path", "listen-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.address-family",
+ "address-family"},
+ .value = {"inet", "inet6", "inet/inet6", "inet6/inet",
+ "unix", "inet-sdp" },
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {NULL} }
+};
diff --git a/xlators/protocol/transport/ib-verbs/src/ib-verbs.h b/xlators/protocol/transport/ib-verbs/src/ib-verbs.h
new file mode 100644
index 000000000..c385b62e5
--- /dev/null
+++ b/xlators/protocol/transport/ib-verbs/src/ib-verbs.h
@@ -0,0 +1,220 @@
+/*
+ Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _XPORT_IB_VERBS_H
+#define _XPORT_IB_VERBS_H
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#ifndef MAX_IOVEC
+#define MAX_IOVEC 16
+#endif /* MAX_IOVEC */
+
+#include "xlator.h"
+#include "event.h"
+#include "ib-verbs-mem-types.h"
+
+#include <stdio.h>
+#include <list.h>
+#include <arpa/inet.h>
+#include <infiniband/verbs.h>
+
+#define GF_DEFAULT_IBVERBS_LISTEN_PORT 6997
+
+/* options per transport end point */
+struct _ib_verbs_options {
+ int32_t port;
+ char *device_name;
+ enum ibv_mtu mtu;
+ int32_t send_count;
+ int32_t recv_count;
+ uint64_t recv_size;
+ uint64_t send_size;
+};
+typedef struct _ib_verbs_options ib_verbs_options_t;
+
+
+struct _ib_verbs_header {
+ char colonO[3];
+ uint32_t size1;
+ uint32_t size2;
+ char version;
+} __attribute__((packed));
+typedef struct _ib_verbs_header ib_verbs_header_t;
+
+struct _ib_verbs_ioq {
+ union {
+ struct list_head list;
+ struct {
+ struct _ib_verbs_ioq *next;
+ struct _ib_verbs_ioq *prev;
+ };
+ };
+ ib_verbs_header_t header;
+ struct iovec vector[MAX_IOVEC];
+ int count;
+ char *buf;
+ struct iobref *iobref;
+};
+typedef struct _ib_verbs_ioq ib_verbs_ioq_t;
+
+/* represents one communication peer, two per transport_t */
+struct _ib_verbs_peer {
+ transport_t *trans;
+ struct ibv_qp *qp;
+
+ int32_t recv_count;
+ int32_t send_count;
+ int32_t recv_size;
+ int32_t send_size;
+
+ int32_t quota;
+ union {
+ struct list_head ioq;
+ struct {
+ ib_verbs_ioq_t *ioq_next;
+ ib_verbs_ioq_t *ioq_prev;
+ };
+ };
+
+ /* QP attributes, needed to connect with remote QP */
+ int32_t local_lid;
+ int32_t local_psn;
+ int32_t local_qpn;
+ int32_t remote_lid;
+ int32_t remote_psn;
+ int32_t remote_qpn;
+};
+typedef struct _ib_verbs_peer ib_verbs_peer_t;
+
+
+struct _ib_verbs_post {
+ struct _ib_verbs_post *next, *prev;
+ struct ibv_mr *mr;
+ char *buf;
+ int32_t buf_size;
+ char aux;
+ int32_t reused;
+ pthread_barrier_t wait;
+};
+typedef struct _ib_verbs_post ib_verbs_post_t;
+
+
+struct _ib_verbs_queue {
+ ib_verbs_post_t active_posts, passive_posts;
+ int32_t active_count, passive_count;
+ pthread_mutex_t lock;
+};
+typedef struct _ib_verbs_queue ib_verbs_queue_t;
+
+
+struct _ib_verbs_qpreg {
+ pthread_mutex_t lock;
+ int32_t count;
+ struct _qpent {
+ struct _qpent *next, *prev;
+ int32_t qp_num;
+ ib_verbs_peer_t *peer;
+ } ents[42];
+};
+typedef struct _ib_verbs_qpreg ib_verbs_qpreg_t;
+
+/* context per device, stored in global glusterfs_ctx_t->ib */
+struct _ib_verbs_device {
+ struct _ib_verbs_device *next;
+ const char *device_name;
+ struct ibv_context *context;
+ int32_t port;
+ struct ibv_pd *pd;
+ struct ibv_srq *srq;
+ ib_verbs_qpreg_t qpreg;
+ struct ibv_comp_channel *send_chan, *recv_chan;
+ struct ibv_cq *send_cq, *recv_cq;
+ ib_verbs_queue_t sendq, recvq;
+ pthread_t send_thread, recv_thread;
+};
+typedef struct _ib_verbs_device ib_verbs_device_t;
+
+typedef enum {
+ IB_VERBS_HANDSHAKE_START = 0,
+ IB_VERBS_HANDSHAKE_SENDING_DATA,
+ IB_VERBS_HANDSHAKE_RECEIVING_DATA,
+ IB_VERBS_HANDSHAKE_SENT_DATA,
+ IB_VERBS_HANDSHAKE_RECEIVED_DATA,
+ IB_VERBS_HANDSHAKE_SENDING_ACK,
+ IB_VERBS_HANDSHAKE_RECEIVING_ACK,
+ IB_VERBS_HANDSHAKE_RECEIVED_ACK,
+ IB_VERBS_HANDSHAKE_COMPLETE,
+} ib_verbs_handshake_state_t;
+
+struct ib_verbs_nbio {
+ int state;
+ char *buf;
+ int count;
+ struct iovec vector;
+ struct iovec *pending_vector;
+ int pending_count;
+};
+
+
+struct _ib_verbs_private {
+ int32_t sock;
+ int32_t idx;
+ unsigned char connected;
+ unsigned char tcp_connected;
+ unsigned char ib_connected;
+ in_addr_t addr;
+ unsigned short port;
+
+ /* IB Verbs Driver specific variables, pointers */
+ ib_verbs_peer_t peer;
+ ib_verbs_device_t *device;
+ ib_verbs_options_t options;
+
+ /* Used by trans->op->receive */
+ char *data_ptr;
+ int32_t data_offset;
+ int32_t data_len;
+
+ /* Mutex */
+ pthread_mutex_t read_mutex;
+ pthread_mutex_t write_mutex;
+ pthread_barrier_t handshake_barrier;
+ char handshake_ret;
+
+ pthread_mutex_t recv_mutex;
+ pthread_cond_t recv_cond;
+
+ /* used during ib_verbs_handshake */
+ struct {
+ struct ib_verbs_nbio incoming;
+ struct ib_verbs_nbio outgoing;
+ int state;
+ ib_verbs_header_t header;
+ char *buf;
+ size_t size;
+ } handshake;
+};
+typedef struct _ib_verbs_private ib_verbs_private_t;
+
+#endif /* _XPORT_IB_VERBS_H */
diff --git a/xlators/protocol/transport/ib-verbs/src/name.c b/xlators/protocol/transport/ib-verbs/src/name.c
new file mode 100644
index 000000000..a3e184814
--- /dev/null
+++ b/xlators/protocol/transport/ib-verbs/src/name.c
@@ -0,0 +1,712 @@
+/*
+ Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <errno.h>
+#include <netdb.h>
+#include <string.h>
+
+#ifdef CLIENT_PORT_CEILING
+#undef CLIENT_PORT_CEILING
+#endif
+
+#define CLIENT_PORT_CEILING 1024
+
+#ifndef AF_INET_SDP
+#define AF_INET_SDP 27
+#endif
+
+#include "transport.h"
+#include "ib-verbs.h"
+
+int32_t
+gf_resolve_ip6 (const char *hostname,
+ uint16_t port,
+ int family,
+ void **dnscache,
+ struct addrinfo **addr_info);
+
+static int32_t
+af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr,
+ socklen_t sockaddr_len, int ceiling)
+{
+ int32_t ret = -1;
+ /* struct sockaddr_in sin = {0, }; */
+ uint16_t port = ceiling - 1;
+
+ while (port)
+ {
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET6:
+ ((struct sockaddr_in6 *)sockaddr)->sin6_port = htons (port);
+ break;
+
+ case AF_INET_SDP:
+ case AF_INET:
+ ((struct sockaddr_in *)sockaddr)->sin_port = htons (port);
+ break;
+ }
+
+ ret = bind (fd, sockaddr, sockaddr_len);
+
+ if (ret == 0)
+ break;
+
+ if (ret == -1 && errno == EACCES)
+ break;
+
+ port--;
+ }
+
+ return ret;
+}
+
+static int32_t
+af_unix_client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t sockaddr_len,
+ int sock)
+{
+ data_t *path_data = NULL;
+ struct sockaddr_un *addr = NULL;
+ int32_t ret = -1;
+
+ path_data = dict_get (this->xl->options,
+ "transport.ib-verbs.bind-path");
+ if (path_data) {
+ char *path = data_to_str (path_data);
+ if (!path || strlen (path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "transport.ib-verbs.bind-path not specfied "
+ "for unix socket, letting connect to assign "
+ "default value");
+ goto err;
+ }
+
+ addr = (struct sockaddr_un *) sockaddr;
+ strcpy (addr->sun_path, path);
+ ret = bind (sock, (struct sockaddr *)addr, sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot bind to unix-domain socket %d (%s)",
+ sock, strerror (errno));
+ goto err;
+ }
+ }
+
+err:
+ return ret;
+}
+
+static int32_t
+client_fill_address_family (transport_t *this, struct sockaddr *sockaddr)
+{
+ data_t *address_family_data = NULL;
+
+ address_family_data = dict_get (this->xl->options,
+ "transport.address-family");
+ if (!address_family_data) {
+ data_t *remote_host_data = NULL, *connect_path_data = NULL;
+ remote_host_data = dict_get (this->xl->options, "remote-host");
+ connect_path_data = dict_get (this->xl->options,
+ "transport.ib-verbs.connect-path");
+
+ if (!(remote_host_data || connect_path_data) ||
+ (remote_host_data && connect_path_data)) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "address-family not specified and not able to "
+ "determine the same from other options "
+ "(remote-host:%s and connect-path:%s)",
+ data_to_str (remote_host_data),
+ data_to_str (connect_path_data));
+ return -1;
+ }
+
+ if (remote_host_data) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "address-family not specified, guessing it "
+ "to be inet/inet6");
+ sockaddr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "address-family not specified, guessing it "
+ "to be unix");
+ sockaddr->sa_family = AF_UNIX;
+ }
+
+ } else {
+ char *address_family = data_to_str (address_family_data);
+ if (!strcasecmp (address_family, "unix")) {
+ sockaddr->sa_family = AF_UNIX;
+ } else if (!strcasecmp (address_family, "inet")) {
+ sockaddr->sa_family = AF_INET;
+ } else if (!strcasecmp (address_family, "inet6")) {
+ sockaddr->sa_family = AF_INET6;
+ } else if (!strcasecmp (address_family, "inet-sdp")) {
+ sockaddr->sa_family = AF_INET_SDP;
+ } else if (!strcasecmp (address_family, "inet/inet6")
+ || !strcasecmp (address_family, "inet6/inet")) {
+ sockaddr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address-family (%s) specified",
+ address_family);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static int32_t
+af_inet_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ dict_t *options = this->xl->options;
+ data_t *remote_host_data = NULL;
+ data_t *remote_port_data = NULL;
+ char *remote_host = NULL;
+ uint16_t remote_port = 0;
+ struct addrinfo *addr_info = NULL;
+ int32_t ret = 0;
+
+ remote_host_data = dict_get (options, "remote-host");
+ if (remote_host_data == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-host missing in volume %s",
+ this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ remote_host = data_to_str (remote_host_data);
+ if (remote_host == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-host has data NULL in volume %s",
+ this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ remote_port_data = dict_get (options, "remote-port");
+ if (remote_port_data == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "option remote-port missing in volume %s. "
+ "Defaulting to %d",
+ this->xl->name, GF_DEFAULT_IBVERBS_LISTEN_PORT);
+
+ remote_port = GF_DEFAULT_IBVERBS_LISTEN_PORT;
+ }
+ else
+ {
+ remote_port = data_to_uint16 (remote_port_data);
+ }
+
+ if (remote_port == (uint16_t)-1)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-port has invalid port in volume %s",
+ this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ /* TODO: gf_resolve is a blocking call. kick in some
+ non blocking dns techniques */
+ ret = gf_resolve_ip6 (remote_host, remote_port,
+ sockaddr->sa_family,
+ &this->dnscache, &addr_info);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "DNS resolution failed on host %s", remote_host);
+ goto err;
+ }
+
+ memcpy (sockaddr, addr_info->ai_addr, addr_info->ai_addrlen);
+ *sockaddr_len = addr_info->ai_addrlen;
+
+err:
+ return ret;
+}
+
+static int32_t
+af_unix_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ struct sockaddr_un *sockaddr_un = NULL;
+ char *connect_path = NULL;
+ data_t *connect_path_data = NULL;
+ int32_t ret = 0;
+
+ connect_path_data = dict_get (this->xl->options,
+ "transport.ib-verbs.connect-path");
+ if (!connect_path_data) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option transport.ib-verbs.connect-path not "
+ "specified for address-family unix");
+ ret = -1;
+ goto err;
+ }
+
+ connect_path = data_to_str (connect_path_data);
+ if (!connect_path) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connect-path is null-string");
+ ret = -1;
+ goto err;
+ }
+
+ if (strlen (connect_path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connect-path value length %"GF_PRI_SIZET" > "
+ "%d octets", strlen (connect_path), UNIX_PATH_MAX);
+ ret = -1;
+ goto err;
+ }
+
+ gf_log (this->xl->name,
+ GF_LOG_DEBUG,
+ "using connect-path %s", connect_path);
+ sockaddr_un = (struct sockaddr_un *)sockaddr;
+ strcpy (sockaddr_un->sun_path, connect_path);
+ *sockaddr_len = sizeof (struct sockaddr_un);
+
+err:
+ return ret;
+}
+
+static int32_t
+af_unix_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ data_t *listen_path_data = NULL;
+ char *listen_path = NULL;
+ int32_t ret = 0;
+ struct sockaddr_un *sunaddr = (struct sockaddr_un *)addr;
+
+
+ listen_path_data = dict_get (this->xl->options,
+ "transport.ib-verbs.listen-path");
+ if (!listen_path_data) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "missing option listen-path");
+ ret = -1;
+ goto err;
+ }
+
+ listen_path = data_to_str (listen_path_data);
+
+#ifndef UNIX_PATH_MAX
+#define UNIX_PATH_MAX 108
+#endif
+
+ if (strlen (listen_path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option listen-path has value length %"GF_PRI_SIZET" > %d",
+ strlen (listen_path), UNIX_PATH_MAX);
+ ret = -1;
+ goto err;
+ }
+
+ sunaddr->sun_family = AF_UNIX;
+ strcpy (sunaddr->sun_path, listen_path);
+ *addr_len = sizeof (struct sockaddr_un);
+
+err:
+ return ret;
+}
+
+static int32_t
+af_inet_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ struct addrinfo hints, *res = 0;
+ data_t *listen_port_data = NULL, *listen_host_data = NULL;
+ uint16_t listen_port = -1;
+ char service[NI_MAXSERV], *listen_host = NULL;
+ dict_t *options = NULL;
+ int32_t ret = 0;
+
+ options = this->xl->options;
+
+ listen_port_data = dict_get (options, "transport.ib-verbs.listen-port");
+ listen_host_data = dict_get (options, "transport.ib-verbs.bind-address");
+
+ if (listen_port_data)
+ {
+ listen_port = data_to_uint16 (listen_port_data);
+ } else {
+ if (addr->sa_family == AF_INET6) {
+ struct sockaddr_in6 *in = (struct sockaddr_in6 *) addr;
+ in->sin6_addr = in6addr_any;
+ in->sin6_port = htons(listen_port);
+ *addr_len = sizeof(struct sockaddr_in6);
+ goto out;
+ } else if (addr->sa_family == AF_INET) {
+ struct sockaddr_in *in = (struct sockaddr_in *) addr;
+ in->sin_addr.s_addr = htonl(INADDR_ANY);
+ in->sin_port = htons(listen_port);
+ *addr_len = sizeof(struct sockaddr_in);
+ goto out;
+ }
+ }
+
+ if (listen_port == (uint16_t) -1)
+ listen_port = GF_DEFAULT_IBVERBS_LISTEN_PORT;
+
+
+ if (listen_host_data)
+ {
+ listen_host = data_to_str (listen_host_data);
+ }
+
+ memset (service, 0, sizeof (service));
+ sprintf (service, "%d", listen_port);
+
+ memset (&hints, 0, sizeof (hints));
+ hints.ai_family = addr->sa_family;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE;
+
+ ret = getaddrinfo(listen_host, service, &hints, &res);
+ if (ret != 0) {
+ gf_log (this->xl->name,
+ GF_LOG_ERROR,
+ "getaddrinfo failed for host %s, service %s (%s)",
+ listen_host, service, gai_strerror (ret));
+ ret = -1;
+ goto out;
+ }
+
+ memcpy (addr, res->ai_addr, res->ai_addrlen);
+ *addr_len = res->ai_addrlen;
+
+ freeaddrinfo (res);
+
+out:
+ return ret;
+}
+
+int32_t
+client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len,
+ int sock)
+{
+ int ret = 0;
+
+ *sockaddr_len = sizeof (struct sockaddr_in6);
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET_SDP:
+ case AF_INET:
+ *sockaddr_len = sizeof (struct sockaddr_in);
+
+ case AF_INET6:
+ ret = af_inet_bind_to_port_lt_ceiling (sock, sockaddr,
+ *sockaddr_len,
+ CLIENT_PORT_CEILING);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_WARNING,
+ "cannot bind inet socket (%d) to port "
+ "less than %d (%s)",
+ sock, CLIENT_PORT_CEILING, strerror (errno));
+ ret = 0;
+ }
+ break;
+
+ case AF_UNIX:
+ *sockaddr_len = sizeof (struct sockaddr_un);
+ ret = af_unix_client_bind (this, (struct sockaddr *)sockaddr,
+ *sockaddr_len, sock);
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family %d", sockaddr->sa_family);
+ ret = -1;
+ break;
+ }
+
+ return ret;
+}
+
+int32_t
+ibverbs_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ ret = client_fill_address_family (this, sockaddr);
+ if (ret) {
+ ret = -1;
+ goto err;
+ }
+
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET_SDP:
+ sockaddr->sa_family = AF_INET;
+ is_inet_sdp = 1;
+
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ ret = af_inet_client_get_remote_sockaddr (this,
+ sockaddr,
+ sockaddr_len);
+
+ if (is_inet_sdp) {
+ sockaddr->sa_family = AF_INET_SDP;
+ }
+
+ break;
+
+ case AF_UNIX:
+ ret = af_unix_client_get_remote_sockaddr (this,
+ sockaddr,
+ sockaddr_len);
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address-family %d", sockaddr->sa_family);
+ ret = -1;
+ }
+
+err:
+ return ret;
+}
+
+int32_t
+ibverbs_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ data_t *address_family_data = NULL;
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ address_family_data = dict_get (this->xl->options,
+ "transport.address-family");
+ if (address_family_data) {
+ char *address_family = NULL;
+ address_family = data_to_str (address_family_data);
+
+ if (!strcasecmp (address_family, "inet")) {
+ addr->sa_family = AF_INET;
+ } else if (!strcasecmp (address_family, "inet6")) {
+ addr->sa_family = AF_INET6;
+ } else if (!strcasecmp (address_family, "inet-sdp")) {
+ addr->sa_family = AF_INET_SDP;
+ } else if (!strcasecmp (address_family, "unix")) {
+ addr->sa_family = AF_UNIX;
+ } else if (!strcasecmp (address_family, "inet/inet6")
+ || !strcasecmp (address_family, "inet6/inet")) {
+ addr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family (%s) specified",
+ address_family);
+ ret = -1;
+ goto err;
+ }
+ } else {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "option address-family not specified, defaulting "
+ "to inet/inet6");
+ addr->sa_family = AF_UNSPEC;
+ }
+
+ switch (addr->sa_family)
+ {
+ case AF_INET_SDP:
+ is_inet_sdp = 1;
+ addr->sa_family = AF_INET;
+
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ ret = af_inet_server_get_local_sockaddr (this, addr, addr_len);
+ if (is_inet_sdp && !ret) {
+ addr->sa_family = AF_INET_SDP;
+ }
+ break;
+
+ case AF_UNIX:
+ ret = af_unix_server_get_local_sockaddr (this, addr, addr_len);
+ break;
+ }
+
+err:
+ return ret;
+}
+
+int32_t
+fill_inet6_inet_identifiers (transport_t *this, struct sockaddr_storage *addr,
+ int32_t addr_len, char *identifier)
+{
+ int32_t ret = 0, tmpaddr_len = 0;
+ char service[NI_MAXSERV], host[NI_MAXHOST];
+ struct sockaddr_storage tmpaddr;
+
+ memset (&tmpaddr, 0, sizeof (tmpaddr));
+ tmpaddr = *addr;
+ tmpaddr_len = addr_len;
+
+ if (((struct sockaddr *) &tmpaddr)->sa_family == AF_INET6) {
+ int32_t one_to_four, four_to_eight, twelve_to_sixteen;
+ int16_t eight_to_ten, ten_to_twelve;
+
+ one_to_four = four_to_eight = twelve_to_sixteen = 0;
+ eight_to_ten = ten_to_twelve = 0;
+
+ one_to_four = ((struct sockaddr_in6 *)
+ &tmpaddr)->sin6_addr.s6_addr32[0];
+ four_to_eight = ((struct sockaddr_in6 *)
+ &tmpaddr)->sin6_addr.s6_addr32[1];
+#ifdef GF_SOLARIS_HOST_OS
+ eight_to_ten = S6_ADDR16(((struct sockaddr_in6 *)
+ &tmpaddr)->sin6_addr)[4];
+#else
+ eight_to_ten = ((struct sockaddr_in6 *)
+ &tmpaddr)->sin6_addr.s6_addr16[4];
+#endif
+
+#ifdef GF_SOLARIS_HOST_OS
+ ten_to_twelve = S6_ADDR16(((struct sockaddr_in6 *)
+ &tmpaddr)->sin6_addr)[5];
+#else
+ ten_to_twelve = ((struct sockaddr_in6 *)
+ &tmpaddr)->sin6_addr.s6_addr16[5];
+#endif
+ twelve_to_sixteen = ((struct sockaddr_in6 *)
+ &tmpaddr)->sin6_addr.s6_addr32[3];
+
+ /* ipv4 mapped ipv6 address has
+ bits 0-80: 0
+ bits 80-96: 0xffff
+ bits 96-128: ipv4 address
+ */
+
+ if (one_to_four == 0 &&
+ four_to_eight == 0 &&
+ eight_to_ten == 0 &&
+ ten_to_twelve == -1) {
+ struct sockaddr_in *in_ptr = (struct sockaddr_in *)&tmpaddr;
+ memset (&tmpaddr, 0, sizeof (tmpaddr));
+
+ in_ptr->sin_family = AF_INET;
+ in_ptr->sin_port = ((struct sockaddr_in6 *)addr)->sin6_port;
+ in_ptr->sin_addr.s_addr = twelve_to_sixteen;
+ tmpaddr_len = sizeof (*in_ptr);
+ }
+ }
+
+ ret = getnameinfo ((struct sockaddr *) &tmpaddr,
+ tmpaddr_len,
+ host, sizeof (host),
+ service, sizeof (service),
+ NI_NUMERICHOST | NI_NUMERICSERV);
+ if (ret != 0) {
+ gf_log (this->xl->name,
+ GF_LOG_ERROR,
+ "getnameinfo failed (%s)", gai_strerror (ret));
+ }
+
+ sprintf (identifier, "%s:%s", host, service);
+
+ return ret;
+}
+
+int32_t
+get_transport_identifiers (transport_t *this)
+{
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ switch (((struct sockaddr *) &this->myinfo.sockaddr)->sa_family)
+ {
+ case AF_INET_SDP:
+ is_inet_sdp = 1;
+ ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET;
+
+ case AF_INET:
+ case AF_INET6:
+ {
+ ret = fill_inet6_inet_identifiers (this,
+ &this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len,
+ this->myinfo.identifier);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "can't fill inet/inet6 identifier for server");
+ goto err;
+ }
+
+ ret = fill_inet6_inet_identifiers (this,
+ &this->peerinfo.sockaddr,
+ this->peerinfo.sockaddr_len,
+ this->peerinfo.identifier);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "can't fill inet/inet6 identifier for client");
+ goto err;
+ }
+
+ if (is_inet_sdp) {
+ ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET_SDP;
+ }
+ }
+ break;
+
+ case AF_UNIX:
+ {
+ struct sockaddr_un *sunaddr = NULL;
+
+ sunaddr = (struct sockaddr_un *) &this->myinfo.sockaddr;
+ strcpy (this->myinfo.identifier, sunaddr->sun_path);
+
+ sunaddr = (struct sockaddr_un *) &this->peerinfo.sockaddr;
+ strcpy (this->peerinfo.identifier, sunaddr->sun_path);
+ }
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family (%d)",
+ ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family);
+ ret = -1;
+ break;
+ }
+
+err:
+ return ret;
+}
diff --git a/xlators/protocol/transport/ib-verbs/src/name.h b/xlators/protocol/transport/ib-verbs/src/name.h
new file mode 100644
index 000000000..4f0f47711
--- /dev/null
+++ b/xlators/protocol/transport/ib-verbs/src/name.h
@@ -0,0 +1,47 @@
+/*
+ Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _IB_VERBS_NAME_H
+#define _IB_VERBS_NAME_H
+
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include "compat.h"
+
+int32_t
+client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len,
+ int sock);
+
+int32_t
+ibverbs_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len);
+
+int32_t
+ibverbs_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len);
+
+int32_t
+get_transport_identifiers (transport_t *this);
+
+#endif /* _IB_VERBS_NAME_H */
diff --git a/xlators/protocol/transport/socket/Makefile.am b/xlators/protocol/transport/socket/Makefile.am
new file mode 100644
index 000000000..f963effea
--- /dev/null
+++ b/xlators/protocol/transport/socket/Makefile.am
@@ -0,0 +1 @@
+SUBDIRS = src \ No newline at end of file
diff --git a/xlators/protocol/transport/socket/src/Makefile.am b/xlators/protocol/transport/socket/src/Makefile.am
new file mode 100644
index 000000000..f5c46f1ac
--- /dev/null
+++ b/xlators/protocol/transport/socket/src/Makefile.am
@@ -0,0 +1,19 @@
+# TODO : change to proper transport dir
+
+transport_LTLIBRARIES = socket.la
+transportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/transport
+
+socket_la_LDFLAGS = -module -avoidversion
+
+socket_la_SOURCES = socket.c name.c
+socket_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ $(top_builddir)/xlators/protocol/lib/src/libgfproto.la
+
+noinst_HEADERS = socket.h name.h
+
+AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\
+ -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) \
+ -I$(top_srcdir)/xlators/protocol/transport/socket/src \
+ -I$(top_srcdir)/xlators/protocol/lib/src
+
+CLEANFILES = *~
diff --git a/xlators/protocol/transport/socket/src/name.c b/xlators/protocol/transport/socket/src/name.c
new file mode 100644
index 000000000..120a669c8
--- /dev/null
+++ b/xlators/protocol/transport/socket/src/name.c
@@ -0,0 +1,737 @@
+/*
+ Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <errno.h>
+#include <netdb.h>
+#include <string.h>
+
+#ifdef CLIENT_PORT_CEILING
+#undef CLIENT_PORT_CEILING
+#endif
+
+#define CLIENT_PORT_CEILING 1024
+
+#ifndef AF_INET_SDP
+#define AF_INET_SDP 27
+#endif
+
+#include "transport.h"
+#include "socket.h"
+
+int32_t
+gf_resolve_ip6 (const char *hostname,
+ uint16_t port,
+ int family,
+ void **dnscache,
+ struct addrinfo **addr_info);
+
+static int32_t
+af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr,
+ socklen_t sockaddr_len, int ceiling)
+{
+ int32_t ret = -1;
+ /* struct sockaddr_in sin = {0, }; */
+ uint16_t port = ceiling - 1;
+
+ while (port)
+ {
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET6:
+ ((struct sockaddr_in6 *)sockaddr)->sin6_port = htons (port);
+ break;
+
+ case AF_INET_SDP:
+ case AF_INET:
+ ((struct sockaddr_in *)sockaddr)->sin_port = htons (port);
+ break;
+ }
+
+ ret = bind (fd, sockaddr, sockaddr_len);
+
+ if (ret == 0)
+ break;
+
+ if (ret == -1 && errno == EACCES)
+ break;
+
+ port--;
+ }
+
+ return ret;
+}
+
+static int32_t
+af_unix_client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t sockaddr_len,
+ int sock)
+{
+ data_t *path_data = NULL;
+ struct sockaddr_un *addr = NULL;
+ int32_t ret = 0;
+
+ path_data = dict_get (this->xl->options, "transport.socket.bind-path");
+ if (path_data) {
+ char *path = data_to_str (path_data);
+ if (!path || strlen (path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "bind-path not specfied for unix socket, "
+ "letting connect to assign default value");
+ goto err;
+ }
+
+ addr = (struct sockaddr_un *) sockaddr;
+ strcpy (addr->sun_path, path);
+ ret = bind (sock, (struct sockaddr *)addr, sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot bind to unix-domain socket %d (%s)",
+ sock, strerror (errno));
+ goto err;
+ }
+ } else {
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "bind-path not specfied for unix socket, "
+ "letting connect to assign default value");
+ }
+
+err:
+ return ret;
+}
+
+int32_t
+client_fill_address_family (transport_t *this, sa_family_t *sa_family)
+{
+ data_t *address_family_data = NULL;
+ int32_t ret = -1;
+
+ if (sa_family == NULL) {
+ goto out;
+ }
+
+ address_family_data = dict_get (this->xl->options,
+ "transport.address-family");
+ if (!address_family_data) {
+ data_t *remote_host_data = NULL, *connect_path_data = NULL;
+ remote_host_data = dict_get (this->xl->options, "remote-host");
+ connect_path_data = dict_get (this->xl->options,
+ "transport.socket.connect-path");
+
+ if (!(remote_host_data || connect_path_data) ||
+ (remote_host_data && connect_path_data)) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "transport.address-family not specified and "
+ "not able to determine the "
+ "same from other options (remote-host:%s and "
+ "transport.unix.connect-path:%s)",
+ data_to_str (remote_host_data),
+ data_to_str (connect_path_data));
+ goto out;
+ }
+
+ if (remote_host_data) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "address-family not specified, guessing it "
+ "to be inet/inet6");
+ *sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "address-family not specified, guessing it "
+ "to be unix");
+ *sa_family = AF_UNIX;
+ }
+
+ } else {
+ char *address_family = data_to_str (address_family_data);
+ if (!strcasecmp (address_family, "unix")) {
+ *sa_family = AF_UNIX;
+ } else if (!strcasecmp (address_family, "inet")) {
+ *sa_family = AF_INET;
+ } else if (!strcasecmp (address_family, "inet6")) {
+ *sa_family = AF_INET6;
+ } else if (!strcasecmp (address_family, "inet-sdp")) {
+ *sa_family = AF_INET_SDP;
+ } else if (!strcasecmp (address_family, "inet/inet6")
+ || !strcasecmp (address_family, "inet6/inet")) {
+ *sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address-family (%s) specified",
+ address_family);
+ goto out;
+ }
+ }
+
+ ret = 0;
+
+out:
+ return ret;
+}
+
+static int32_t
+af_inet_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ dict_t *options = this->xl->options;
+ data_t *remote_host_data = NULL;
+ data_t *remote_port_data = NULL;
+ char *remote_host = NULL;
+ uint16_t remote_port = 0;
+ struct addrinfo *addr_info = NULL;
+ int32_t ret = 0;
+
+ remote_host_data = dict_get (options, "remote-host");
+ if (remote_host_data == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-host missing in volume %s", this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ remote_host = data_to_str (remote_host_data);
+ if (remote_host == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-host has data NULL in volume %s", this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ remote_port_data = dict_get (options, "remote-port");
+ if (remote_port_data == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "option remote-port missing in volume %s. Defaulting to %d",
+ this->xl->name, GF_DEFAULT_SOCKET_LISTEN_PORT);
+
+ remote_port = GF_DEFAULT_SOCKET_LISTEN_PORT;
+ }
+ else
+ {
+ remote_port = data_to_uint16 (remote_port_data);
+ }
+
+ if (remote_port == (uint16_t)-1)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-port has invalid port in volume %s",
+ this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ /* TODO: gf_resolve is a blocking call. kick in some
+ non blocking dns techniques */
+ ret = gf_resolve_ip6 (remote_host, remote_port,
+ sockaddr->sa_family, &this->dnscache, &addr_info);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "DNS resolution failed on host %s", remote_host);
+ goto err;
+ }
+
+ memcpy (sockaddr, addr_info->ai_addr, addr_info->ai_addrlen);
+ *sockaddr_len = addr_info->ai_addrlen;
+
+err:
+ return ret;
+}
+
+static int32_t
+af_unix_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ struct sockaddr_un *sockaddr_un = NULL;
+ char *connect_path = NULL;
+ data_t *connect_path_data = NULL;
+ int32_t ret = 0;
+
+ connect_path_data = dict_get (this->xl->options,
+ "transport.socket.connect-path");
+ if (!connect_path_data) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option transport.unix.connect-path not specified for "
+ "address-family unix");
+ ret = -1;
+ goto err;
+ }
+
+ connect_path = data_to_str (connect_path_data);
+ if (!connect_path) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "transport.unix.connect-path is null-string");
+ ret = -1;
+ goto err;
+ }
+
+ if (strlen (connect_path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connect-path value length %"GF_PRI_SIZET" > %d octets",
+ strlen (connect_path), UNIX_PATH_MAX);
+ ret = -1;
+ goto err;
+ }
+
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "using connect-path %s", connect_path);
+ sockaddr_un = (struct sockaddr_un *)sockaddr;
+ strcpy (sockaddr_un->sun_path, connect_path);
+ *sockaddr_len = sizeof (struct sockaddr_un);
+
+err:
+ return ret;
+}
+
+static int32_t
+af_unix_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ data_t *listen_path_data = NULL;
+ char *listen_path = NULL;
+ int32_t ret = 0;
+ struct sockaddr_un *sunaddr = (struct sockaddr_un *)addr;
+
+
+ listen_path_data = dict_get (this->xl->options,
+ "transport.socket.listen-path");
+ if (!listen_path_data) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "missing option transport.socket.listen-path");
+ ret = -1;
+ goto err;
+ }
+
+ listen_path = data_to_str (listen_path_data);
+
+#ifndef UNIX_PATH_MAX
+#define UNIX_PATH_MAX 108
+#endif
+
+ if (strlen (listen_path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option transport.unix.listen-path has value length "
+ "%"GF_PRI_SIZET" > %d",
+ strlen (listen_path), UNIX_PATH_MAX);
+ ret = -1;
+ goto err;
+ }
+
+ sunaddr->sun_family = AF_UNIX;
+ strcpy (sunaddr->sun_path, listen_path);
+ *addr_len = sizeof (struct sockaddr_un);
+
+err:
+ return ret;
+}
+
+static int32_t
+af_inet_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ struct addrinfo hints, *res = 0;
+ data_t *listen_port_data = NULL, *listen_host_data = NULL;
+ uint16_t listen_port = -1;
+ char service[NI_MAXSERV], *listen_host = NULL;
+ dict_t *options = NULL;
+ int32_t ret = 0;
+
+ options = this->xl->options;
+
+ listen_port_data = dict_get (options, "transport.socket.listen-port");
+ listen_host_data = dict_get (options, "transport.socket.bind-address");
+
+ if (listen_port_data)
+ {
+ listen_port = data_to_uint16 (listen_port_data);
+ }
+
+ if (listen_port == (uint16_t) -1)
+ listen_port = GF_DEFAULT_SOCKET_LISTEN_PORT;
+
+
+ if (listen_host_data)
+ {
+ listen_host = data_to_str (listen_host_data);
+ } else {
+ if (addr->sa_family == AF_INET6) {
+ struct sockaddr_in6 *in = (struct sockaddr_in6 *) addr;
+ in->sin6_addr = in6addr_any;
+ in->sin6_port = htons(listen_port);
+ *addr_len = sizeof(struct sockaddr_in6);
+ goto out;
+ } else if (addr->sa_family == AF_INET) {
+ struct sockaddr_in *in = (struct sockaddr_in *) addr;
+ in->sin_addr.s_addr = htonl(INADDR_ANY);
+ in->sin_port = htons(listen_port);
+ *addr_len = sizeof(struct sockaddr_in);
+ goto out;
+ }
+ }
+
+ memset (service, 0, sizeof (service));
+ sprintf (service, "%d", listen_port);
+
+ memset (&hints, 0, sizeof (hints));
+ hints.ai_family = addr->sa_family;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE;
+
+ ret = getaddrinfo(listen_host, service, &hints, &res);
+ if (ret != 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "getaddrinfo failed for host %s, service %s (%s)",
+ listen_host, service, gai_strerror (ret));
+ ret = -1;
+ goto out;
+ }
+
+ memcpy (addr, res->ai_addr, res->ai_addrlen);
+ *addr_len = res->ai_addrlen;
+
+ freeaddrinfo (res);
+
+out:
+ return ret;
+}
+
+int32_t
+client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len,
+ int sock)
+{
+ int ret = 0;
+
+ *sockaddr_len = sizeof (struct sockaddr_in6);
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET_SDP:
+ case AF_INET:
+ *sockaddr_len = sizeof (struct sockaddr_in);
+
+ case AF_INET6:
+ ret = af_inet_bind_to_port_lt_ceiling (sock, sockaddr,
+ *sockaddr_len, CLIENT_PORT_CEILING);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_WARNING,
+ "cannot bind inet socket (%d) to port less than %d (%s)",
+ sock, CLIENT_PORT_CEILING, strerror (errno));
+ ret = 0;
+ }
+ break;
+
+ case AF_UNIX:
+ *sockaddr_len = sizeof (struct sockaddr_un);
+ ret = af_unix_client_bind (this, (struct sockaddr *)sockaddr,
+ *sockaddr_len, sock);
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family %d", sockaddr->sa_family);
+ ret = -1;
+ break;
+ }
+
+ return ret;
+}
+
+int32_t
+socket_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len,
+ sa_family_t *sa_family)
+{
+ int32_t ret = 0;
+
+ if ((sockaddr == NULL) || (sockaddr_len == NULL)
+ || (sa_family == NULL)) {
+ ret = -1;
+ goto err;
+ }
+
+
+ ret = client_fill_address_family (this, &sockaddr->sa_family);
+ if (ret) {
+ ret = -1;
+ goto err;
+ }
+
+ *sa_family = sockaddr->sa_family;
+
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET_SDP:
+ sockaddr->sa_family = AF_INET;
+
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ ret = af_inet_client_get_remote_sockaddr (this, sockaddr,
+ sockaddr_len);
+ break;
+
+ case AF_UNIX:
+ ret = af_unix_client_get_remote_sockaddr (this, sockaddr,
+ sockaddr_len);
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address-family %d", sockaddr->sa_family);
+ ret = -1;
+ }
+
+ if (*sa_family == AF_UNSPEC) {
+ *sa_family = sockaddr->sa_family;
+ }
+
+err:
+ return ret;
+}
+
+
+int32_t
+server_fill_address_family (transport_t *this, sa_family_t *sa_family)
+{
+ data_t *address_family_data = NULL;
+ int32_t ret = -1;
+
+ if (sa_family == NULL) {
+ goto out;
+ }
+
+ address_family_data = dict_get (this->xl->options,
+ "transport.address-family");
+ if (address_family_data) {
+ char *address_family = NULL;
+ address_family = data_to_str (address_family_data);
+
+ if (!strcasecmp (address_family, "inet")) {
+ *sa_family = AF_INET;
+ } else if (!strcasecmp (address_family, "inet6")) {
+ *sa_family = AF_INET6;
+ } else if (!strcasecmp (address_family, "inet-sdp")) {
+ *sa_family = AF_INET_SDP;
+ } else if (!strcasecmp (address_family, "unix")) {
+ *sa_family = AF_UNIX;
+ } else if (!strcasecmp (address_family, "inet/inet6")
+ || !strcasecmp (address_family, "inet6/inet")) {
+ *sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family (%s) specified", address_family);
+ goto out;
+ }
+ } else {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "option address-family not specified, defaulting to inet/inet6");
+ *sa_family = AF_UNSPEC;
+ }
+
+ ret = 0;
+out:
+ return ret;
+}
+
+
+int32_t
+socket_server_get_local_sockaddr (transport_t *this, struct sockaddr *addr,
+ socklen_t *addr_len, sa_family_t *sa_family)
+{
+ int32_t ret = -1;
+
+ if ((addr == NULL) || (addr_len == NULL) || (sa_family == NULL)) {
+ goto err;
+ }
+
+ ret = server_fill_address_family (this, &addr->sa_family);
+ if (ret == -1) {
+ goto err;
+ }
+
+ *sa_family = addr->sa_family;
+
+ switch (addr->sa_family)
+ {
+ case AF_INET_SDP:
+ addr->sa_family = AF_INET;
+
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ ret = af_inet_server_get_local_sockaddr (this, addr, addr_len);
+ break;
+
+ case AF_UNIX:
+ ret = af_unix_server_get_local_sockaddr (this, addr, addr_len);
+ break;
+ }
+
+ if (*sa_family == AF_UNSPEC) {
+ *sa_family = addr->sa_family;
+ }
+
+err:
+ return ret;
+}
+
+int32_t
+fill_inet6_inet_identifiers (transport_t *this, struct sockaddr_storage *addr,
+ int32_t addr_len, char *identifier)
+{
+ int32_t ret = 0, tmpaddr_len = 0;
+ char service[NI_MAXSERV], host[NI_MAXHOST];
+ struct sockaddr_storage tmpaddr;
+
+ memset (&tmpaddr, 0, sizeof (tmpaddr));
+ tmpaddr = *addr;
+ tmpaddr_len = addr_len;
+
+ if (((struct sockaddr *) &tmpaddr)->sa_family == AF_INET6) {
+ int32_t one_to_four, four_to_eight, twelve_to_sixteen;
+ int16_t eight_to_ten, ten_to_twelve;
+
+ one_to_four = four_to_eight = twelve_to_sixteen = 0;
+ eight_to_ten = ten_to_twelve = 0;
+
+ one_to_four = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[0];
+ four_to_eight = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[1];
+#ifdef GF_SOLARIS_HOST_OS
+ eight_to_ten = S6_ADDR16(((struct sockaddr_in6 *) &tmpaddr)->sin6_addr)[4];
+#else
+ eight_to_ten = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[4];
+#endif
+
+#ifdef GF_SOLARIS_HOST_OS
+ ten_to_twelve = S6_ADDR16(((struct sockaddr_in6 *) &tmpaddr)->sin6_addr)[5];
+#else
+ ten_to_twelve = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[5];
+#endif
+
+ twelve_to_sixteen = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[3];
+
+ /* ipv4 mapped ipv6 address has
+ bits 0-80: 0
+ bits 80-96: 0xffff
+ bits 96-128: ipv4 address
+ */
+
+ if (one_to_four == 0 &&
+ four_to_eight == 0 &&
+ eight_to_ten == 0 &&
+ ten_to_twelve == -1) {
+ struct sockaddr_in *in_ptr = (struct sockaddr_in *)&tmpaddr;
+ memset (&tmpaddr, 0, sizeof (tmpaddr));
+
+ in_ptr->sin_family = AF_INET;
+ in_ptr->sin_port = ((struct sockaddr_in6 *)addr)->sin6_port;
+ in_ptr->sin_addr.s_addr = twelve_to_sixteen;
+ tmpaddr_len = sizeof (*in_ptr);
+ }
+ }
+
+ ret = getnameinfo ((struct sockaddr *) &tmpaddr,
+ tmpaddr_len,
+ host, sizeof (host),
+ service, sizeof (service),
+ NI_NUMERICHOST | NI_NUMERICSERV);
+ if (ret != 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "getnameinfo failed (%s)", gai_strerror (ret));
+ }
+
+ sprintf (identifier, "%s:%s", host, service);
+
+ return ret;
+}
+
+int32_t
+get_transport_identifiers (transport_t *this)
+{
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ switch (((struct sockaddr *) &this->myinfo.sockaddr)->sa_family)
+ {
+ case AF_INET_SDP:
+ is_inet_sdp = 1;
+ ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET;
+
+ case AF_INET:
+ case AF_INET6:
+ {
+ ret = fill_inet6_inet_identifiers (this,
+ &this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len,
+ this->myinfo.identifier);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot fill inet/inet6 identifier for server");
+ goto err;
+ }
+
+ ret = fill_inet6_inet_identifiers (this,
+ &this->peerinfo.sockaddr,
+ this->peerinfo.sockaddr_len,
+ this->peerinfo.identifier);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot fill inet/inet6 identifier for client");
+ goto err;
+ }
+
+ if (is_inet_sdp) {
+ ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET_SDP;
+ }
+ }
+ break;
+
+ case AF_UNIX:
+ {
+ struct sockaddr_un *sunaddr = NULL;
+
+ sunaddr = (struct sockaddr_un *) &this->myinfo.sockaddr;
+ strcpy (this->myinfo.identifier, sunaddr->sun_path);
+
+ sunaddr = (struct sockaddr_un *) &this->peerinfo.sockaddr;
+ strcpy (this->peerinfo.identifier, sunaddr->sun_path);
+ }
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family (%d)",
+ ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family);
+ ret = -1;
+ break;
+ }
+
+err:
+ return ret;
+}
diff --git a/xlators/protocol/transport/socket/src/name.h b/xlators/protocol/transport/socket/src/name.h
new file mode 100644
index 000000000..f50a7b7f4
--- /dev/null
+++ b/xlators/protocol/transport/socket/src/name.h
@@ -0,0 +1,44 @@
+/*
+ Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _SOCKET_NAME_H
+#define _SOCKET_NAME_H
+
+#include "compat.h"
+
+int32_t
+client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len,
+ int sock);
+
+int32_t
+socket_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len,
+ sa_family_t *sa_family);
+
+int32_t
+socket_server_get_local_sockaddr (transport_t *this, struct sockaddr *addr,
+ socklen_t *addr_len, sa_family_t *sa_family);
+
+int32_t
+get_transport_identifiers (transport_t *this);
+
+#endif /* _SOCKET_NAME_H */
diff --git a/xlators/protocol/transport/socket/src/socket-mem-types.h b/xlators/protocol/transport/socket/src/socket-mem-types.h
new file mode 100644
index 000000000..f50f4a75d
--- /dev/null
+++ b/xlators/protocol/transport/socket/src/socket-mem-types.h
@@ -0,0 +1,36 @@
+
+/*
+ Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+
+#ifndef __SOCKET_MEM_TYPES_H__
+#define __SOCKET_MEM_TYPES_H__
+
+#include "mem-types.h"
+
+enum gf_socket_mem_types_ {
+ gf_socket_mt_socket_private_t = gf_common_mt_end + 1,
+ gf_socket_mt_ioq,
+ gf_socket_mt_transport_t,
+ gf_socket_mt_socket_local_t,
+ gf_socket_mt_char,
+ gf_socket_mt_end
+};
+#endif
+
diff --git a/xlators/protocol/transport/socket/src/socket.c b/xlators/protocol/transport/socket/src/socket.c
new file mode 100644
index 000000000..7f7f8093a
--- /dev/null
+++ b/xlators/protocol/transport/socket/src/socket.c
@@ -0,0 +1,1552 @@
+/*
+ Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "socket.h"
+#include "name.h"
+#include "dict.h"
+#include "transport.h"
+#include "logging.h"
+#include "xlator.h"
+#include "byte-order.h"
+#include "common-utils.h"
+#include "compat-errno.h"
+
+#include <fcntl.h>
+#include <errno.h>
+#include <netinet/tcp.h>
+
+
+#define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR)
+#define SA(ptr) ((struct sockaddr *)ptr)
+
+int socket_init (transport_t *this);
+
+/*
+ * return value:
+ * 0 = success (completed)
+ * -1 = error
+ * > 0 = incomplete
+ */
+
+int
+__socket_rwv (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count,
+ int write)
+{
+ socket_private_t *priv = NULL;
+ int sock = -1;
+ int ret = -1;
+ struct iovec *opvector = NULL;
+ int opcount = 0;
+ int moved = 0;
+
+ priv = this->private;
+ sock = priv->sock;
+
+ opvector = vector;
+ opcount = count;
+
+ while (opcount) {
+ if (write) {
+ ret = writev (sock, opvector, opcount);
+
+ if (ret == 0 || (ret == -1 && errno == EAGAIN)) {
+ /* done for now */
+ break;
+ }
+ } else {
+ ret = readv (sock, opvector, opcount);
+
+ if (ret == -1 && errno == EAGAIN) {
+ /* done for now */
+ break;
+ }
+ }
+
+ if (ret == 0) {
+ /* Mostly due to 'umount' in client */
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "EOF from peer %s", this->peerinfo.identifier);
+ opcount = -1;
+ errno = ENOTCONN;
+ break;
+ }
+
+ if (ret == -1) {
+ if (errno == EINTR)
+ continue;
+
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "%s failed (%s)", write ? "writev" : "readv",
+ strerror (errno));
+ opcount = -1;
+ break;
+ }
+
+ moved = 0;
+
+ while (moved < ret) {
+ if ((ret - moved) >= opvector[0].iov_len) {
+ moved += opvector[0].iov_len;
+ opvector++;
+ opcount--;
+ } else {
+ opvector[0].iov_len -= (ret - moved);
+ opvector[0].iov_base += (ret - moved);
+ moved += (ret - moved);
+ }
+ while (opcount && !opvector[0].iov_len) {
+ opvector++;
+ opcount--;
+ }
+ }
+ }
+
+ if (pending_vector)
+ *pending_vector = opvector;
+
+ if (pending_count)
+ *pending_count = opcount;
+
+ return opcount;
+}
+
+
+int
+__socket_readv (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count)
+{
+ int ret = -1;
+
+ ret = __socket_rwv (this, vector, count,
+ pending_vector, pending_count, 0);
+
+ return ret;
+}
+
+
+int
+__socket_writev (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count)
+{
+ int ret = -1;
+
+ ret = __socket_rwv (this, vector, count,
+ pending_vector, pending_count, 1);
+
+ return ret;
+}
+
+
+int
+__socket_disconnect (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ if (priv->sock != -1) {
+ ret = shutdown (priv->sock, SHUT_RDWR);
+ priv->connected = -1;
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "shutdown() returned %d. set connection state to -1",
+ ret);
+ }
+
+ return ret;
+}
+
+
+int
+__socket_server_bind (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ int opt = 1;
+
+ priv = this->private;
+
+ ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR,
+ &opt, sizeof (opt));
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "setsockopt() for SO_REUSEADDR failed (%s)",
+ strerror (errno));
+ }
+
+ ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "binding to %s failed: %s",
+ this->myinfo.identifier, strerror (errno));
+ if (errno == EADDRINUSE) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "Port is already in use");
+ }
+ }
+
+ return ret;
+}
+
+
+int
+__socket_nonblock (int fd)
+{
+ int flags = 0;
+ int ret = -1;
+
+ flags = fcntl (fd, F_GETFL);
+
+ if (flags != -1)
+ ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK);
+
+ return ret;
+}
+
+
+int
+__socket_nodelay (int fd)
+{
+ int on = 1;
+ int ret = -1;
+
+ ret = setsockopt (fd, IPPROTO_TCP, TCP_NODELAY,
+ &on, sizeof (on));
+ if (!ret)
+ gf_log ("", GF_LOG_TRACE,
+ "NODELAY enabled for socket %d", fd);
+
+ return ret;
+}
+
+int
+__socket_connect_finish (int fd)
+{
+ int ret = -1;
+ int optval = 0;
+ socklen_t optlen = sizeof (int);
+
+ ret = getsockopt (fd, SOL_SOCKET, SO_ERROR, (void *)&optval, &optlen);
+
+ if (ret == 0 && optval) {
+ errno = optval;
+ ret = -1;
+ }
+
+ return ret;
+}
+
+
+void
+__socket_reset (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+
+ priv = this->private;
+
+ /* TODO: use mem-pool on incoming data */
+
+ if (priv->incoming.hdr_p)
+ GF_FREE (priv->incoming.hdr_p);
+
+ if (priv->incoming.iobuf)
+ iobuf_unref (priv->incoming.iobuf);
+
+ memset (&priv->incoming, 0, sizeof (priv->incoming));
+
+ event_unregister (this->xl->ctx->event_pool, priv->sock, priv->idx);
+ close (priv->sock);
+ priv->sock = -1;
+ priv->idx = -1;
+ priv->connected = -1;
+}
+
+
+struct ioq *
+__socket_ioq_new (transport_t *this, char *buf, int len,
+ struct iovec *vector, int count, struct iobref *iobref)
+{
+ socket_private_t *priv = NULL;
+ struct ioq *entry = NULL;
+
+ priv = this->private;
+
+ /* TODO: use mem-pool */
+ entry = GF_CALLOC (1, sizeof (*entry),
+ gf_common_mt_ioq);
+ if (!entry)
+ return NULL;
+
+ assert (count <= (MAX_IOVEC-2));
+
+ entry->header.colonO[0] = ':';
+ entry->header.colonO[1] = 'O';
+ entry->header.colonO[2] = '\0';
+ entry->header.version = 42;
+ entry->header.size1 = hton32 (len);
+ entry->header.size2 = hton32 (iov_length (vector, count));
+
+ entry->vector[0].iov_base = &entry->header;
+ entry->vector[0].iov_len = sizeof (entry->header);
+ entry->count++;
+
+ entry->vector[1].iov_base = buf;
+ entry->vector[1].iov_len = len;
+ entry->count++;
+
+ if (vector && count) {
+ memcpy (&entry->vector[2], vector, sizeof (*vector) * count);
+ entry->count += count;
+ }
+
+ entry->pending_vector = entry->vector;
+ entry->pending_count = entry->count;
+
+ if (iobref)
+ entry->iobref = iobref_ref (iobref);
+
+ entry->buf = buf;
+
+ INIT_LIST_HEAD (&entry->list);
+
+ return entry;
+}
+
+
+void
+__socket_ioq_entry_free (struct ioq *entry)
+{
+ list_del_init (&entry->list);
+ if (entry->iobref)
+ iobref_unref (entry->iobref);
+
+ /* TODO: use mem-pool */
+ GF_FREE (entry->buf);
+
+ /* TODO: use mem-pool */
+ GF_FREE (entry);
+}
+
+
+void
+__socket_ioq_flush (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ struct ioq *entry = NULL;
+
+ priv = this->private;
+
+ while (!list_empty (&priv->ioq)) {
+ entry = priv->ioq_next;
+ __socket_ioq_entry_free (entry);
+ }
+
+ return;
+}
+
+
+int
+__socket_ioq_churn_entry (transport_t *this, struct ioq *entry)
+{
+ int ret = -1;
+
+ ret = __socket_writev (this, entry->pending_vector,
+ entry->pending_count,
+ &entry->pending_vector,
+ &entry->pending_count);
+
+ if (ret == 0) {
+ /* current entry was completely written */
+ assert (entry->pending_count == 0);
+ __socket_ioq_entry_free (entry);
+ }
+
+ return ret;
+}
+
+
+int
+__socket_ioq_churn (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ struct ioq *entry = NULL;
+
+ priv = this->private;
+
+ while (!list_empty (&priv->ioq)) {
+ /* pick next entry */
+ entry = priv->ioq_next;
+
+ ret = __socket_ioq_churn_entry (this, entry);
+
+ if (ret != 0)
+ break;
+ }
+
+ if (list_empty (&priv->ioq)) {
+ /* all pending writes done, not interested in POLLOUT */
+ priv->idx = event_select_on (this->xl->ctx->event_pool,
+ priv->sock, priv->idx, -1, 0);
+ }
+
+ return ret;
+}
+
+
+int
+socket_event_poll_err (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ __socket_ioq_flush (this);
+ __socket_reset (this);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ xlator_notify (this->xl, GF_EVENT_POLLERR, this);
+
+ return ret;
+}
+
+
+int
+socket_event_poll_out (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected == 1) {
+ ret = __socket_ioq_churn (this);
+
+ if (ret == -1) {
+ __socket_disconnect (this);
+ }
+ }
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ xlator_notify (this->xl, GF_EVENT_POLLOUT, this);
+
+ return ret;
+}
+
+
+int
+__socket_proto_validate_header (transport_t *this,
+ struct socket_header *header,
+ size_t *size1_p, size_t *size2_p)
+{
+ size_t size1 = 0;
+ size_t size2 = 0;
+
+ if (strcmp (header->colonO, ":O")) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "socket header signature does not match :O (%x.%x.%x)",
+ header->colonO[0], header->colonO[1],
+ header->colonO[2]);
+ return -1;
+ }
+
+ if (header->version != 42) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "socket header version does not match 42 != %d",
+ header->version);
+ return -1;
+ }
+
+ size1 = ntoh32 (header->size1);
+ size2 = ntoh32 (header->size2);
+
+ if (size1 <= 0 || size1 > 1048576) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "socket header has incorrect size1=%"GF_PRI_SIZET,
+ size1);
+ return -1;
+ }
+
+ if (size2 > (131072)) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "socket header has incorrect size2=%"GF_PRI_SIZET,
+ size2);
+ return -1;
+ }
+
+ if (size1_p)
+ *size1_p = size1;
+
+ if (size2_p)
+ *size2_p = size2;
+
+ return 0;
+}
+
+
+
+/* socket protocol state machine */
+
+int
+__socket_proto_state_machine (transport_t *this)
+{
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ size_t size1 = 0;
+ size_t size2 = 0;
+ int previous_state = -1;
+ struct socket_header *hdr = NULL;
+ struct iobuf *iobuf = NULL;
+
+
+ priv = this->private;
+
+ while (priv->incoming.state != SOCKET_PROTO_STATE_COMPLETE) {
+ /* debug check against infinite loops */
+ if (previous_state == priv->incoming.state) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "state did not change! (%d) breaking",
+ previous_state);
+ ret = -1;
+ goto unlock;
+ }
+ previous_state = priv->incoming.state;
+
+ switch (priv->incoming.state) {
+
+ case SOCKET_PROTO_STATE_NADA:
+ priv->incoming.pending_vector =
+ priv->incoming.vector;
+
+ priv->incoming.pending_vector->iov_base =
+ &priv->incoming.header;
+
+ priv->incoming.pending_vector->iov_len =
+ sizeof (struct socket_header);
+
+ priv->incoming.state =
+ SOCKET_PROTO_STATE_HEADER_COMING;
+ break;
+
+ case SOCKET_PROTO_STATE_HEADER_COMING:
+
+ ret = __socket_readv (this,
+ priv->incoming.pending_vector, 1,
+ &priv->incoming.pending_vector,
+ NULL);
+ if (ret == 0) {
+ priv->incoming.state =
+ SOCKET_PROTO_STATE_HEADER_CAME;
+ break;
+ }
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "read (%s) in state %d (%s)",
+ strerror (errno),
+ SOCKET_PROTO_STATE_HEADER_COMING,
+ this->peerinfo.identifier);
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "partial header read on NB socket.");
+ goto unlock;
+ }
+ break;
+
+ case SOCKET_PROTO_STATE_HEADER_CAME:
+ hdr = &priv->incoming.header;
+ ret = __socket_proto_validate_header (this, hdr,
+ &size1, &size2);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket header validate failed (%s). "
+ "possible mismatch of transport-type "
+ "between server and client volumes, "
+ "or version mismatch",
+ this->peerinfo.identifier);
+ goto unlock;
+ }
+
+ priv->incoming.hdrlen = size1;
+ priv->incoming.buflen = size2;
+
+ /* TODO: use mem-pool */
+ priv->incoming.hdr_p = GF_MALLOC (size1,
+ gf_common_mt_char);
+ if (size2) {
+ /* TODO: sanity check size2 < page size
+ */
+ iobuf = iobuf_get (this->xl->ctx->iobuf_pool);
+ if (!iobuf) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unable to allocate IO buffer "
+ "for peer %s",
+ this->peerinfo.identifier);
+ ret = -ENOMEM;
+ goto unlock;
+ }
+ priv->incoming.iobuf = iobuf;
+ priv->incoming.buf_p = iobuf->ptr;
+ }
+
+ priv->incoming.vector[0].iov_base =
+ priv->incoming.hdr_p;
+
+ priv->incoming.vector[0].iov_len = size1;
+
+ priv->incoming.vector[1].iov_base =
+ priv->incoming.buf_p;
+
+ priv->incoming.vector[1].iov_len = size2;
+ priv->incoming.count = size2 ? 2 : 1;
+
+ priv->incoming.pending_vector =
+ priv->incoming.vector;
+
+ priv->incoming.pending_count =
+ priv->incoming.count;
+
+ priv->incoming.state =
+ SOCKET_PROTO_STATE_DATA_COMING;
+ break;
+
+ case SOCKET_PROTO_STATE_DATA_COMING:
+
+ ret = __socket_readv (this,
+ priv->incoming.pending_vector,
+ priv->incoming.pending_count,
+ &priv->incoming.pending_vector,
+ &priv->incoming.pending_count);
+ if (ret == 0) {
+ priv->incoming.state =
+ SOCKET_PROTO_STATE_DATA_CAME;
+ break;
+ }
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "read (%s) in state %d (%s)",
+ strerror (errno),
+ SOCKET_PROTO_STATE_DATA_COMING,
+ this->peerinfo.identifier);
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "partial data read on NB socket");
+ goto unlock;
+ }
+ break;
+
+ case SOCKET_PROTO_STATE_DATA_CAME:
+ memset (&priv->incoming.vector, 0,
+ sizeof (priv->incoming.vector));
+ priv->incoming.pending_vector = NULL;
+ priv->incoming.pending_count = 0;
+ priv->incoming.state = SOCKET_PROTO_STATE_COMPLETE;
+ break;
+
+ case SOCKET_PROTO_STATE_COMPLETE:
+ /* not reached */
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "undefined state reached: %d",
+ priv->incoming.state);
+ goto unlock;
+ }
+ }
+unlock:
+
+ return ret;
+}
+
+
+int
+socket_proto_state_machine (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = 0;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ ret = __socket_proto_state_machine (this);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+int
+socket_event_poll_in (transport_t *this)
+{
+ int ret = -1;
+
+ ret = socket_proto_state_machine (this);
+
+ /* call POLLIN on xlator even if complete block is not received,
+ just to keep the last_received timestamp ticking */
+
+ if (ret == 0)
+ ret = xlator_notify (this->xl, GF_EVENT_POLLIN, this);
+
+ return ret;
+}
+
+
+int
+socket_connect_finish (transport_t *this)
+{
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ int event = -1;
+ char notify_xlator = 0;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected)
+ goto unlock;
+
+ ret = __socket_connect_finish (priv->sock);
+
+ if (ret == -1 && errno == EINPROGRESS)
+ ret = 1;
+
+ if (ret == -1 && errno != EINPROGRESS) {
+ if (!priv->connect_finish_log) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connection to %s failed (%s)",
+ this->peerinfo.identifier,
+ strerror (errno));
+ priv->connect_finish_log = 1;
+ }
+ __socket_disconnect (this);
+ notify_xlator = 1;
+ event = GF_EVENT_POLLERR;
+ goto unlock;
+ }
+
+ if (ret == 0) {
+ notify_xlator = 1;
+
+ this->myinfo.sockaddr_len =
+ sizeof (this->myinfo.sockaddr);
+
+ ret = getsockname (priv->sock,
+ SA (&this->myinfo.sockaddr),
+ &this->myinfo.sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "getsockname on (%d) failed (%s)",
+ priv->sock, strerror (errno));
+ __socket_disconnect (this);
+ event = GF_EVENT_POLLERR;
+ goto unlock;
+ }
+
+ priv->connected = 1;
+ priv->connect_finish_log = 0;
+ event = GF_EVENT_CHILD_UP;
+ get_transport_identifiers (this);
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ if (notify_xlator)
+ xlator_notify (this->xl, event, this);
+
+ return 0;
+}
+
+
+int
+socket_event_handler (int fd, int idx, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ transport_t *this = NULL;
+ socket_private_t *priv = NULL;
+ int ret = 0;
+
+ this = data;
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ priv->idx = idx;
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ if (!priv->connected) {
+ ret = socket_connect_finish (this);
+ }
+
+ if (!ret && poll_out) {
+ ret = socket_event_poll_out (this);
+ }
+
+ if (!ret && poll_in) {
+ ret = socket_event_poll_in (this);
+ }
+
+ if (ret < 0 || poll_err) {
+ socket_event_poll_err (this);
+ transport_unref (this);
+ }
+
+ return 0;
+}
+
+
+int
+socket_server_event_handler (int fd, int idx, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ transport_t *this = NULL;
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ int new_sock = -1;
+ transport_t *new_trans = NULL;
+ struct sockaddr_storage new_sockaddr = {0, };
+ socklen_t addrlen = sizeof (new_sockaddr);
+ socket_private_t *new_priv = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+
+ this = data;
+ priv = this->private;
+ ctx = this->xl->ctx;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ priv->idx = idx;
+
+ if (poll_in) {
+ new_sock = accept (priv->sock, SA (&new_sockaddr),
+ &addrlen);
+
+ if (new_sock == -1)
+ goto unlock;
+
+ if (!priv->bio) {
+ ret = __socket_nonblock (new_sock);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "NBIO on %d failed (%s)",
+ new_sock, strerror (errno));
+ close (new_sock);
+ goto unlock;
+ }
+ }
+
+ if (priv->nodelay) {
+ ret = __socket_nodelay (new_sock);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "setsockopt() failed for "
+ "NODELAY (%s)",
+ strerror (errno));
+ }
+ }
+
+ new_trans = GF_CALLOC (1, sizeof (*new_trans),
+ gf_common_mt_transport_t);
+ new_trans->xl = this->xl;
+ new_trans->fini = this->fini;
+
+ memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr,
+ addrlen);
+ new_trans->peerinfo.sockaddr_len = addrlen;
+
+ new_trans->myinfo.sockaddr_len =
+ sizeof (new_trans->myinfo.sockaddr);
+
+ ret = getsockname (new_sock,
+ SA (&new_trans->myinfo.sockaddr),
+ &new_trans->myinfo.sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "getsockname on %d failed (%s)",
+ new_sock, strerror (errno));
+ close (new_sock);
+ goto unlock;
+ }
+
+ get_transport_identifiers (new_trans);
+ socket_init (new_trans);
+ new_trans->ops = this->ops;
+ new_trans->init = this->init;
+ new_trans->fini = this->fini;
+
+ new_priv = new_trans->private;
+
+ pthread_mutex_lock (&new_priv->lock);
+ {
+ new_priv->sock = new_sock;
+ new_priv->connected = 1;
+
+ transport_ref (new_trans);
+ new_priv->idx =
+ event_register (ctx->event_pool,
+ new_sock,
+ socket_event_handler,
+ new_trans, 1, 0);
+
+ if (new_priv->idx == -1)
+ ret = -1;
+ }
+ pthread_mutex_unlock (&new_priv->lock);
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+int
+socket_disconnect (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ ret = __socket_disconnect (this);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+int
+socket_connect (transport_t *this)
+{
+ int ret = -1;
+ int sock = -1;
+ socket_private_t *priv = NULL;
+ struct sockaddr_storage sockaddr = {0, };
+ socklen_t sockaddr_len = 0;
+ glusterfs_ctx_t *ctx = NULL;
+ sa_family_t sa_family = {0, };
+
+ priv = this->private;
+ ctx = this->xl->ctx;
+
+ if (!priv) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "connect() called on uninitialized transport");
+ goto err;
+ }
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ sock = priv->sock;
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ if (sock != -1) {
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "connect () called on transport already connected");
+ ret = 0;
+ goto err;
+ }
+
+ ret = socket_client_get_remote_sockaddr (this, SA (&sockaddr),
+ &sockaddr_len, &sa_family);
+ if (ret == -1) {
+ /* logged inside client_get_remote_sockaddr */
+ goto err;
+ }
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->sock != -1) {
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "connect() -- already connected");
+ goto unlock;
+ }
+
+ memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len);
+ this->peerinfo.sockaddr_len = sockaddr_len;
+
+ priv->sock = socket (sa_family, SOCK_STREAM, 0);
+ if (priv->sock == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket creation failed (%s)",
+ strerror (errno));
+ goto unlock;
+ }
+
+ /* Cant help if setting socket options fails. We can continue
+ * working nonetheless.
+ */
+ if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF,
+ &priv->windowsize,
+ sizeof (priv->windowsize)) < 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "setting receive window size failed: %d: %d: "
+ "%s", priv->sock, priv->windowsize,
+ strerror (errno));
+ }
+
+ if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF,
+ &priv->windowsize,
+ sizeof (priv->windowsize)) < 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "setting send window size failed: %d: %d: "
+ "%s", priv->sock, priv->windowsize,
+ strerror (errno));
+ }
+
+
+ if (priv->nodelay && priv->lowlat) {
+ ret = __socket_nodelay (priv->sock);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "setsockopt() failed for NODELAY (%s)",
+ strerror (errno));
+ }
+ }
+
+ if (!priv->bio) {
+ ret = __socket_nonblock (priv->sock);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ SA (&this->myinfo.sockaddr)->sa_family =
+ SA (&this->peerinfo.sockaddr)->sa_family;
+
+ ret = client_bind (this, SA (&this->myinfo.sockaddr),
+ &this->myinfo.sockaddr_len, priv->sock);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_WARNING,
+ "client bind failed: %s", strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ ret = connect (priv->sock, SA (&this->peerinfo.sockaddr),
+ this->peerinfo.sockaddr_len);
+
+ if (ret == -1 && errno != EINPROGRESS) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connection attempt failed (%s)",
+ strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ priv->connected = 0;
+
+ transport_ref (this);
+
+ priv->idx = event_register (ctx->event_pool, priv->sock,
+ socket_event_handler, this, 1, 1);
+ if (priv->idx == -1)
+ ret = -1;
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+err:
+ return ret;
+}
+
+
+int
+socket_listen (transport_t *this)
+{
+ socket_private_t * priv = NULL;
+ int ret = -1;
+ int sock = -1;
+ struct sockaddr_storage sockaddr;
+ socklen_t sockaddr_len;
+ peer_info_t *myinfo = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ sa_family_t sa_family = {0, };
+
+ priv = this->private;
+ myinfo = &this->myinfo;
+ ctx = this->xl->ctx;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ sock = priv->sock;
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ if (sock != -1) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "alreading listening");
+ return ret;
+ }
+
+ ret = socket_server_get_local_sockaddr (this, SA (&sockaddr),
+ &sockaddr_len, &sa_family);
+ if (ret == -1) {
+ return ret;
+ }
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->sock != -1) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "already listening");
+ goto unlock;
+ }
+
+ memcpy (&myinfo->sockaddr, &sockaddr, sockaddr_len);
+ myinfo->sockaddr_len = sockaddr_len;
+
+ priv->sock = socket (sa_family, SOCK_STREAM, 0);
+
+ if (priv->sock == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket creation failed (%s)",
+ strerror (errno));
+ goto unlock;
+ }
+
+ /* Cant help if setting socket options fails. We can continue
+ * working nonetheless.
+ */
+ if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF,
+ &priv->windowsize,
+ sizeof (priv->windowsize)) < 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "setting receive window size failed: %d: %d: "
+ "%s", priv->sock, priv->windowsize,
+ strerror (errno));
+ }
+
+ if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF,
+ &priv->windowsize,
+ sizeof (priv->windowsize)) < 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "setting send window size failed: %d: %d: "
+ "%s", priv->sock, priv->windowsize,
+ strerror (errno));
+ }
+
+ if (priv->nodelay) {
+ ret = __socket_nodelay (priv->sock);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "setsockopt() failed for NODELAY (%s)",
+ strerror (errno));
+ }
+ }
+
+ if (!priv->bio) {
+ ret = __socket_nonblock (priv->sock);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ ret = __socket_server_bind (this);
+
+ if (ret == -1) {
+ /* logged inside __socket_server_bind() */
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ ret = listen (priv->sock, 10);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "could not set socket %d to listen mode (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ transport_ref (this);
+
+ priv->idx = event_register (ctx->event_pool, priv->sock,
+ socket_server_event_handler,
+ this, 1, 0);
+
+ if (priv->idx == -1) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "could not register socket %d with events",
+ priv->sock);
+ ret = -1;
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+int
+socket_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p,
+ struct iobuf **iobuf_p)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected != 1) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "socket not connected to receive");
+ goto unlock;
+ }
+
+ if (!hdr_p || !hdrlen_p || !iobuf_p) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "bad parameters %p %p %p",
+ hdr_p, hdrlen_p, iobuf_p);
+ goto unlock;
+ }
+
+ if (priv->incoming.state == SOCKET_PROTO_STATE_COMPLETE) {
+ *hdr_p = priv->incoming.hdr_p;
+ *hdrlen_p = priv->incoming.hdrlen;
+ *iobuf_p = priv->incoming.iobuf;
+
+ memset (&priv->incoming, 0, sizeof (priv->incoming));
+ priv->incoming.state = SOCKET_PROTO_STATE_NADA;
+
+ ret = 0;
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+/* TODO: implement per transfer limit */
+int
+socket_submit (transport_t *this, char *buf, int len,
+ struct iovec *vector, int count,
+ struct iobref *iobref)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ char need_poll_out = 0;
+ char need_append = 1;
+ struct ioq *entry = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+
+ priv = this->private;
+ ctx = this->xl->ctx;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected != 1) {
+ if (!priv->submit_log && !priv->connect_finish_log) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "not connected (priv->connected = %d)",
+ priv->connected);
+ priv->submit_log = 1;
+ }
+ goto unlock;
+ }
+
+ priv->submit_log = 0;
+ entry = __socket_ioq_new (this, buf, len, vector, count, iobref);
+ if (!entry)
+ goto unlock;
+
+ if (list_empty (&priv->ioq)) {
+ ret = __socket_ioq_churn_entry (this, entry);
+
+ if (ret == 0)
+ need_append = 0;
+
+ if (ret > 0)
+ need_poll_out = 1;
+ }
+
+ if (need_append) {
+ list_add_tail (&entry->list, &priv->ioq);
+ ret = 0;
+ }
+
+ if (need_poll_out) {
+ /* first entry to wait. continue writing on POLLOUT */
+ priv->idx = event_select_on (ctx->event_pool,
+ priv->sock,
+ priv->idx, -1, 1);
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+struct transport_ops tops = {
+ .listen = socket_listen,
+ .connect = socket_connect,
+ .disconnect = socket_disconnect,
+ .submit = socket_submit,
+ .receive = socket_receive
+};
+
+
+int
+socket_init (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ gf_boolean_t tmp_bool = 0;
+ uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
+ char *optstr = NULL;
+
+ if (this->private) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "double init attempted");
+ return -1;
+ }
+
+ priv = GF_CALLOC (1, sizeof (*priv),
+ gf_common_mt_socket_private_t);
+ if (!priv) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "calloc (1, %"GF_PRI_SIZET") returned NULL",
+ sizeof (*priv));
+ return -1;
+ }
+
+ pthread_mutex_init (&priv->lock, NULL);
+
+ priv->sock = -1;
+ priv->idx = -1;
+ priv->connected = -1;
+
+ INIT_LIST_HEAD (&priv->ioq);
+
+ if (dict_get (this->xl->options, "non-blocking-io")) {
+ optstr = data_to_str (dict_get (this->xl->options,
+ "non-blocking-io"));
+
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "'non-blocking-io' takes only boolean options,"
+ " not taking any action");
+ tmp_bool = 1;
+ }
+ priv->bio = 0;
+ if (!tmp_bool) {
+ priv->bio = 1;
+ gf_log (this->xl->name, GF_LOG_WARNING,
+ "disabling non-blocking IO");
+ }
+ }
+
+ optstr = NULL;
+
+ // By default, we enable NODELAY
+ priv->nodelay = 1;
+ if (dict_get (this->xl->options, "transport.socket.nodelay")) {
+ optstr = data_to_str (dict_get (this->xl->options,
+ "transport.socket.nodelay"));
+
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "'transport.socket.nodelay' takes only "
+ "boolean options, not taking any action");
+ tmp_bool = 1;
+ }
+ if (!tmp_bool) {
+ priv->nodelay = 0;
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "disabling nodelay");
+ }
+ }
+
+
+ optstr = NULL;
+ if (dict_get_str (this->xl->options, "transport.window-size",
+ &optstr) == 0) {
+ if (gf_string2bytesize (optstr, &windowsize) != 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "invalid number format: %s", optstr);
+ return -1;
+ }
+ }
+
+ optstr = NULL;
+
+ if (dict_get_str (this->xl->options, "transport.socket.lowlat",
+ &optstr) == 0) {
+ priv->lowlat = 1;
+ }
+
+ priv->windowsize = (int)windowsize;
+ this->private = priv;
+
+ return 0;
+}
+
+
+void
+fini (transport_t *this)
+{
+ socket_private_t *priv = this->private;
+
+ gf_log (this->xl->name, GF_LOG_TRACE,
+ "transport %p destroyed", this);
+
+ pthread_mutex_destroy (&priv->lock);
+ GF_FREE (priv);
+}
+
+int32_t
+mem_acct_init (xlator_t *this)
+{
+ int ret = -1;
+
+ if (!this)
+ return ret;
+
+ ret = xlator_mem_acct_init (this, gf_common_mt_end + 1);
+
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR, "Memory accounting init"
+ "failed");
+ return ret;
+ }
+
+ return ret;
+}
+
+int32_t
+init (transport_t *this)
+{
+ int ret = -1;
+
+ ret = socket_init (this);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_DEBUG, "socket_init() failed");
+ }
+
+ return ret;
+}
+
+struct volume_options options[] = {
+ { .key = {"remote-port",
+ "transport.remote-port",
+ "transport.socket.remote-port"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.socket.listen-port", "listen-port"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.socket.bind-address", "bind-address" },
+ .type = GF_OPTION_TYPE_INTERNET_ADDRESS
+ },
+ { .key = {"transport.socket.connect-path", "connect-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.socket.bind-path", "bind-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.socket.listen-path", "listen-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = { "transport.address-family",
+ "address-family" },
+ .value = {"inet", "inet6", "inet/inet6", "inet6/inet",
+ "unix", "inet-sdp" },
+ .type = GF_OPTION_TYPE_STR
+ },
+
+ { .key = {"non-blocking-io"},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {"transport.window-size"},
+ .type = GF_OPTION_TYPE_SIZET,
+ .min = GF_MIN_SOCKET_WINDOW_SIZE,
+ .max = GF_MAX_SOCKET_WINDOW_SIZE,
+ },
+ { .key = {"transport.socket.nodelay"},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {"transport.socket.lowlat"},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {NULL} }
+};
+
diff --git a/xlators/protocol/transport/socket/src/socket.h b/xlators/protocol/transport/socket/src/socket.h
new file mode 100644
index 000000000..bc6d3b27c
--- /dev/null
+++ b/xlators/protocol/transport/socket/src/socket.h
@@ -0,0 +1,125 @@
+/*
+ Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _SOCKET_H
+#define _SOCKET_H
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "event.h"
+#include "transport.h"
+#include "logging.h"
+#include "dict.h"
+#include "mem-pool.h"
+#include "socket-mem-types.h"
+
+#ifndef MAX_IOVEC
+#define MAX_IOVEC 16
+#endif /* MAX_IOVEC */
+
+#define GF_DEFAULT_SOCKET_LISTEN_PORT 6996
+
+/* This is the size set through setsockopt for
+ * both the TCP receive window size and the
+ * send buffer size.
+ * Till the time iobuf size becomes configurable, this size is set to include
+ * two iobufs + the GlusterFS protocol headers.
+ * Linux allows us to over-ride the max values for the system.
+ * Should we over-ride them? Because if we set a value larger than the default
+ * setsockopt will fail. Having larger values might be beneficial for
+ * IB links.
+ */
+#define GF_DEFAULT_SOCKET_WINDOW_SIZE (512 * GF_UNIT_KB)
+#define GF_MAX_SOCKET_WINDOW_SIZE (1 * GF_UNIT_MB)
+#define GF_MIN_SOCKET_WINDOW_SIZE (128 * GF_UNIT_KB)
+
+typedef enum {
+ SOCKET_PROTO_STATE_NADA = 0,
+ SOCKET_PROTO_STATE_HEADER_COMING,
+ SOCKET_PROTO_STATE_HEADER_CAME,
+ SOCKET_PROTO_STATE_DATA_COMING,
+ SOCKET_PROTO_STATE_DATA_CAME,
+ SOCKET_PROTO_STATE_COMPLETE,
+} socket_proto_state_t;
+
+struct socket_header {
+ char colonO[3];
+ uint32_t size1;
+ uint32_t size2;
+ char version;
+} __attribute__((packed));
+
+
+struct ioq {
+ union {
+ struct list_head list;
+ struct {
+ struct ioq *next;
+ struct ioq *prev;
+ };
+ };
+ struct socket_header header;
+ struct iovec vector[MAX_IOVEC];
+ int count;
+ struct iovec *pending_vector;
+ int pending_count;
+ char *buf;
+ struct iobref *iobref;
+};
+
+
+typedef struct {
+ int32_t sock;
+ int32_t idx;
+ unsigned char connected; // -1 = not connected. 0 = in progress. 1 = connected
+ char bio;
+ char connect_finish_log;
+ char submit_log;
+ union {
+ struct list_head ioq;
+ struct {
+ struct ioq *ioq_next;
+ struct ioq *ioq_prev;
+ };
+ };
+ struct {
+ int state;
+ struct socket_header header;
+ char *hdr_p;
+ size_t hdrlen;
+ struct iobuf *iobuf;
+ char *buf_p;
+ size_t buflen;
+ struct iovec vector[2];
+ int count;
+ struct iovec *pending_vector;
+ int pending_count;
+ } incoming;
+ pthread_mutex_t lock;
+ int windowsize;
+ char lowlat;
+ char nodelay;
+} socket_private_t;
+
+
+#endif