summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport/rdma
diff options
context:
space:
mode:
authorRaghavendra G <raghavendra@gluster.com>2010-08-30 08:03:52 +0000
committerVijay Bellur <vijay@dev.gluster.com>2010-08-30 06:54:12 -0700
commit300b4fefcbb5a5ced4f0554e109679e44cdf44ea (patch)
treec77b64f94272188b8f2ce81e2f300222630ebdbe /rpc/rpc-transport/rdma
parent55bbf23b3a608f67b7a05939f4205049e92d081a (diff)
Bring in new transport rdma.
- rdma is new transport and improvement over current ib-verbs. It uses rdma-read and rdma-write for efficiently transferring large buffers. For more details please refer to rfc-5666 and rfc-5667. Signed-off-by: Raghavendra G <raghavendra@gluster.com> Signed-off-by: Vijay Bellur <vijay@dev.gluster.com> BUG: 513 (Introduce 0 copy rdma) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=513
Diffstat (limited to 'rpc/rpc-transport/rdma')
-rw-r--r--rpc/rpc-transport/rdma/Makefile.am1
-rw-r--r--rpc/rpc-transport/rdma/src/Makefile.am20
-rw-r--r--rpc/rpc-transport/rdma/src/name.c713
-rw-r--r--rpc/rpc-transport/rdma/src/name.h47
-rw-r--r--rpc/rpc-transport/rdma/src/rdma.c4795
-rw-r--r--rpc/rpc-transport/rdma/src/rdma.h395
6 files changed, 5971 insertions, 0 deletions
diff --git a/rpc/rpc-transport/rdma/Makefile.am b/rpc/rpc-transport/rdma/Makefile.am
new file mode 100644
index 000000000..f963effea
--- /dev/null
+++ b/rpc/rpc-transport/rdma/Makefile.am
@@ -0,0 +1 @@
+SUBDIRS = src \ No newline at end of file
diff --git a/rpc/rpc-transport/rdma/src/Makefile.am b/rpc/rpc-transport/rdma/src/Makefile.am
new file mode 100644
index 000000000..bc888b175
--- /dev/null
+++ b/rpc/rpc-transport/rdma/src/Makefile.am
@@ -0,0 +1,20 @@
+# TODO : need to change transportdir
+
+transport_LTLIBRARIES = rdma.la
+transportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport
+
+rdma_la_LDFLAGS = -module -avoidversion
+
+rdma_la_SOURCES = rdma.c name.c
+rdma_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ -libverbs
+
+noinst_HEADERS = rdma.h name.h
+ -I$(top_srcdir)/libglusterfs/src -I$(top_srcdir)/rpc/rpc-lib/src/ \
+ -I$(top_srcdir)/xlators/protocol/lib/src/ -shared -nostartfiles $(GF_CFLAGS)
+
+AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS) \
+ -I$(top_srcdir)/libglusterfs/src -I$(top_srcdir)/rpc/rpc-lib/src/ \
+ -I$(top_srcdir)/xlators/protocol/lib/src -shared -nostartfiles $(GF_CFLAGS)
+
+CLEANFILES = *~
diff --git a/rpc/rpc-transport/rdma/src/name.c b/rpc/rpc-transport/rdma/src/name.c
new file mode 100644
index 000000000..fa8ecdd15
--- /dev/null
+++ b/rpc/rpc-transport/rdma/src/name.c
@@ -0,0 +1,713 @@
+/*
+ Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <errno.h>
+#include <netdb.h>
+#include <string.h>
+
+#ifdef CLIENT_PORT_CEILING
+#undef CLIENT_PORT_CEILING
+#endif
+
+#define CLIENT_PORT_CEILING 1024
+
+#ifndef AF_INET_SDP
+#define AF_INET_SDP 27
+#endif
+
+#include "rpc-transport.h"
+#include "rdma.h"
+
+int32_t
+gf_resolve_ip6 (const char *hostname,
+ uint16_t port,
+ int family,
+ void **dnscache,
+ struct addrinfo **addr_info);
+
+static int32_t
+af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr,
+ socklen_t sockaddr_len, int ceiling)
+{
+ int32_t ret = -1;
+ /* struct sockaddr_in sin = {0, }; */
+ uint16_t port = ceiling - 1;
+
+ while (port)
+ {
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET6:
+ ((struct sockaddr_in6 *)sockaddr)->sin6_port = htons (port);
+ break;
+
+ case AF_INET_SDP:
+ case AF_INET:
+ ((struct sockaddr_in *)sockaddr)->sin_port = htons (port);
+ break;
+ }
+
+ ret = bind (fd, sockaddr, sockaddr_len);
+
+ if (ret == 0)
+ break;
+
+ if (ret == -1 && errno == EACCES)
+ break;
+
+ port--;
+ }
+
+ return ret;
+}
+
+static int32_t
+af_unix_client_bind (rpc_transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t sockaddr_len,
+ int sock)
+{
+ data_t *path_data = NULL;
+ struct sockaddr_un *addr = NULL;
+ int32_t ret = -1;
+
+ path_data = dict_get (this->options,
+ "transport.rdma.bind-path");
+ if (path_data) {
+ char *path = data_to_str (path_data);
+ if (!path || strlen (path) > UNIX_PATH_MAX) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "transport.rdma.bind-path not specfied "
+ "for unix socket, letting connect to assign "
+ "default value");
+ goto err;
+ }
+
+ addr = (struct sockaddr_un *) sockaddr;
+ strcpy (addr->sun_path, path);
+ ret = bind (sock, (struct sockaddr *)addr, sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "cannot bind to unix-domain socket %d (%s)",
+ sock, strerror (errno));
+ goto err;
+ }
+ }
+
+err:
+ return ret;
+}
+
+static int32_t
+client_fill_address_family (rpc_transport_t *this, struct sockaddr *sockaddr)
+{
+ data_t *address_family_data = NULL;
+
+ address_family_data = dict_get (this->options,
+ "transport.address-family");
+ if (!address_family_data) {
+ data_t *remote_host_data = NULL, *connect_path_data = NULL;
+ remote_host_data = dict_get (this->options, "remote-host");
+ connect_path_data = dict_get (this->options,
+ "transport.rdma.connect-path");
+
+ if (!(remote_host_data || connect_path_data) ||
+ (remote_host_data && connect_path_data)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "address-family not specified and not able to "
+ "determine the same from other options "
+ "(remote-host:%s and connect-path:%s)",
+ data_to_str (remote_host_data),
+ data_to_str (connect_path_data));
+ return -1;
+ }
+
+ if (remote_host_data) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "address-family not specified, guessing it "
+ "to be inet/inet6");
+ sockaddr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "address-family not specified, guessing it "
+ "to be unix");
+ sockaddr->sa_family = AF_UNIX;
+ }
+
+ } else {
+ char *address_family = data_to_str (address_family_data);
+ if (!strcasecmp (address_family, "unix")) {
+ sockaddr->sa_family = AF_UNIX;
+ } else if (!strcasecmp (address_family, "inet")) {
+ sockaddr->sa_family = AF_INET;
+ } else if (!strcasecmp (address_family, "inet6")) {
+ sockaddr->sa_family = AF_INET6;
+ } else if (!strcasecmp (address_family, "inet-sdp")) {
+ sockaddr->sa_family = AF_INET_SDP;
+ } else if (!strcasecmp (address_family, "inet/inet6")
+ || !strcasecmp (address_family, "inet6/inet")) {
+ sockaddr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "unknown address-family (%s) specified",
+ address_family);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static int32_t
+af_inet_client_get_remote_sockaddr (rpc_transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ dict_t *options = this->options;
+ data_t *remote_host_data = NULL;
+ data_t *remote_port_data = NULL;
+ char *remote_host = NULL;
+ uint16_t remote_port = 0;
+ struct addrinfo *addr_info = NULL;
+ int32_t ret = 0;
+
+ remote_host_data = dict_get (options, "remote-host");
+ if (remote_host_data == NULL)
+ {
+ gf_log (this->name, GF_LOG_ERROR,
+ "option remote-host missing in volume %s",
+ this->name);
+ ret = -1;
+ goto err;
+ }
+
+ remote_host = data_to_str (remote_host_data);
+ if (remote_host == NULL)
+ {
+ gf_log (this->name, GF_LOG_ERROR,
+ "option remote-host has data NULL in volume %s",
+ this->name);
+ ret = -1;
+ goto err;
+ }
+
+ remote_port_data = dict_get (options, "remote-port");
+ if (remote_port_data == NULL)
+ {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "option remote-port missing in volume %s. "
+ "Defaulting to %d",
+ this->name, GF_DEFAULT_RDMA_LISTEN_PORT);
+
+ remote_port = GF_DEFAULT_RDMA_LISTEN_PORT;
+ }
+ else
+ {
+ remote_port = data_to_uint16 (remote_port_data);
+ }
+
+ if (remote_port == (uint16_t)-1)
+ {
+ gf_log (this->name, GF_LOG_ERROR,
+ "option remote-port has invalid port in volume %s",
+ this->name);
+ ret = -1;
+ goto err;
+ }
+
+ /* TODO: gf_resolve is a blocking call. kick in some
+ non blocking dns techniques */
+ ret = gf_resolve_ip6 (remote_host, remote_port,
+ sockaddr->sa_family,
+ &this->dnscache, &addr_info);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "DNS resolution failed on host %s", remote_host);
+ goto err;
+ }
+
+ memcpy (sockaddr, addr_info->ai_addr, addr_info->ai_addrlen);
+ *sockaddr_len = addr_info->ai_addrlen;
+
+err:
+ return ret;
+}
+
+static int32_t
+af_unix_client_get_remote_sockaddr (rpc_transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ struct sockaddr_un *sockaddr_un = NULL;
+ char *connect_path = NULL;
+ data_t *connect_path_data = NULL;
+ int32_t ret = 0;
+
+ connect_path_data = dict_get (this->options,
+ "transport.rdma.connect-path");
+ if (!connect_path_data) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "option transport.rdma.connect-path not "
+ "specified for address-family unix");
+ ret = -1;
+ goto err;
+ }
+
+ connect_path = data_to_str (connect_path_data);
+ if (!connect_path) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "connect-path is null-string");
+ ret = -1;
+ goto err;
+ }
+
+ if (strlen (connect_path) > UNIX_PATH_MAX) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "connect-path value length %"GF_PRI_SIZET" > "
+ "%d octets", strlen (connect_path), UNIX_PATH_MAX);
+ ret = -1;
+ goto err;
+ }
+
+ gf_log (this->name,
+ GF_LOG_DEBUG,
+ "using connect-path %s", connect_path);
+ sockaddr_un = (struct sockaddr_un *)sockaddr;
+ strcpy (sockaddr_un->sun_path, connect_path);
+ *sockaddr_len = sizeof (struct sockaddr_un);
+
+err:
+ return ret;
+}
+
+static int32_t
+af_unix_server_get_local_sockaddr (rpc_transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ data_t *listen_path_data = NULL;
+ char *listen_path = NULL;
+ int32_t ret = 0;
+ struct sockaddr_un *sunaddr = (struct sockaddr_un *)addr;
+
+
+ listen_path_data = dict_get (this->options,
+ "transport.rdma.listen-path");
+ if (!listen_path_data) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "missing option listen-path");
+ ret = -1;
+ goto err;
+ }
+
+ listen_path = data_to_str (listen_path_data);
+
+#ifndef UNIX_PATH_MAX
+#define UNIX_PATH_MAX 108
+#endif
+
+ if (strlen (listen_path) > UNIX_PATH_MAX) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "option listen-path has value length %"GF_PRI_SIZET" > %d",
+ strlen (listen_path), UNIX_PATH_MAX);
+ ret = -1;
+ goto err;
+ }
+
+ sunaddr->sun_family = AF_UNIX;
+ strcpy (sunaddr->sun_path, listen_path);
+ *addr_len = sizeof (struct sockaddr_un);
+
+err:
+ return ret;
+}
+
+static int32_t
+af_inet_server_get_local_sockaddr (rpc_transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ struct addrinfo hints, *res = 0;
+ data_t *listen_port_data = NULL, *listen_host_data = NULL;
+ uint16_t listen_port = -1;
+ char service[NI_MAXSERV], *listen_host = NULL;
+ dict_t *options = NULL;
+ int32_t ret = 0;
+
+ options = this->options;
+
+ listen_port_data = dict_get (options, "transport.rdma.listen-port");
+ listen_host_data = dict_get (options,
+ "transport.rdma.bind-address");
+
+ if (listen_port_data)
+ {
+ listen_port = data_to_uint16 (listen_port_data);
+ } else {
+ if (addr->sa_family == AF_INET6) {
+ struct sockaddr_in6 *in = (struct sockaddr_in6 *) addr;
+ in->sin6_addr = in6addr_any;
+ in->sin6_port = htons(listen_port);
+ *addr_len = sizeof(struct sockaddr_in6);
+ goto out;
+ } else if (addr->sa_family == AF_INET) {
+ struct sockaddr_in *in = (struct sockaddr_in *) addr;
+ in->sin_addr.s_addr = htonl(INADDR_ANY);
+ in->sin_port = htons(listen_port);
+ *addr_len = sizeof(struct sockaddr_in);
+ goto out;
+ }
+ }
+
+ if (listen_port == (uint16_t) -1)
+ listen_port = GF_DEFAULT_RDMA_LISTEN_PORT;
+
+
+ if (listen_host_data)
+ {
+ listen_host = data_to_str (listen_host_data);
+ }
+
+ memset (service, 0, sizeof (service));
+ sprintf (service, "%d", listen_port);
+
+ memset (&hints, 0, sizeof (hints));
+ hints.ai_family = addr->sa_family;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE;
+
+ ret = getaddrinfo(listen_host, service, &hints, &res);
+ if (ret != 0) {
+ gf_log (this->name,
+ GF_LOG_ERROR,
+ "getaddrinfo failed for host %s, service %s (%s)",
+ listen_host, service, gai_strerror (ret));
+ ret = -1;
+ goto out;
+ }
+
+ memcpy (addr, res->ai_addr, res->ai_addrlen);
+ *addr_len = res->ai_addrlen;
+
+ freeaddrinfo (res);
+
+out:
+ return ret;
+}
+
+int32_t
+gf_rdma_client_bind (rpc_transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len,
+ int sock)
+{
+ int ret = 0;
+
+ *sockaddr_len = sizeof (struct sockaddr_in6);
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET_SDP:
+ case AF_INET:
+ *sockaddr_len = sizeof (struct sockaddr_in);
+
+ case AF_INET6:
+ ret = af_inet_bind_to_port_lt_ceiling (sock, sockaddr,
+ *sockaddr_len,
+ CLIENT_PORT_CEILING);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "cannot bind inet socket (%d) to port "
+ "less than %d (%s)",
+ sock, CLIENT_PORT_CEILING, strerror (errno));
+ ret = 0;
+ }
+ break;
+
+ case AF_UNIX:
+ *sockaddr_len = sizeof (struct sockaddr_un);
+ ret = af_unix_client_bind (this, (struct sockaddr *)sockaddr,
+ *sockaddr_len, sock);
+ break;
+
+ default:
+ gf_log (this->name, GF_LOG_ERROR,
+ "unknown address family %d", sockaddr->sa_family);
+ ret = -1;
+ break;
+ }
+
+ return ret;
+}
+
+int32_t
+gf_rdma_client_get_remote_sockaddr (rpc_transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ ret = client_fill_address_family (this, sockaddr);
+ if (ret) {
+ ret = -1;
+ goto err;
+ }
+
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET_SDP:
+ sockaddr->sa_family = AF_INET;
+ is_inet_sdp = 1;
+
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ ret = af_inet_client_get_remote_sockaddr (this,
+ sockaddr,
+ sockaddr_len);
+
+ if (is_inet_sdp) {
+ sockaddr->sa_family = AF_INET_SDP;
+ }
+
+ break;
+
+ case AF_UNIX:
+ ret = af_unix_client_get_remote_sockaddr (this,
+ sockaddr,
+ sockaddr_len);
+ break;
+
+ default:
+ gf_log (this->name, GF_LOG_ERROR,
+ "unknown address-family %d", sockaddr->sa_family);
+ ret = -1;
+ }
+
+err:
+ return ret;
+}
+
+int32_t
+gf_rdma_server_get_local_sockaddr (rpc_transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ data_t *address_family_data = NULL;
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ address_family_data = dict_get (this->options,
+ "transport.address-family");
+ if (address_family_data) {
+ char *address_family = NULL;
+ address_family = data_to_str (address_family_data);
+
+ if (!strcasecmp (address_family, "inet")) {
+ addr->sa_family = AF_INET;
+ } else if (!strcasecmp (address_family, "inet6")) {
+ addr->sa_family = AF_INET6;
+ } else if (!strcasecmp (address_family, "inet-sdp")) {
+ addr->sa_family = AF_INET_SDP;
+ } else if (!strcasecmp (address_family, "unix")) {
+ addr->sa_family = AF_UNIX;
+ } else if (!strcasecmp (address_family, "inet/inet6")
+ || !strcasecmp (address_family, "inet6/inet")) {
+ addr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "unknown address family (%s) specified",
+ address_family);
+ ret = -1;
+ goto err;
+ }
+ } else {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "option address-family not specified, defaulting "
+ "to inet/inet6");
+ addr->sa_family = AF_UNSPEC;
+ }
+
+ switch (addr->sa_family)
+ {
+ case AF_INET_SDP:
+ is_inet_sdp = 1;
+ addr->sa_family = AF_INET;
+
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ ret = af_inet_server_get_local_sockaddr (this, addr, addr_len);
+ if (is_inet_sdp && !ret) {
+ addr->sa_family = AF_INET_SDP;
+ }
+ break;
+
+ case AF_UNIX:
+ ret = af_unix_server_get_local_sockaddr (this, addr, addr_len);
+ break;
+ }
+
+err:
+ return ret;
+}
+
+int32_t
+fill_inet6_inet_identifiers (rpc_transport_t *this, struct sockaddr_storage *addr,
+ int32_t addr_len, char *identifier)
+{
+ int32_t ret = 0, tmpaddr_len = 0;
+ char service[NI_MAXSERV], host[NI_MAXHOST];
+ struct sockaddr_storage tmpaddr;
+
+ memset (&tmpaddr, 0, sizeof (tmpaddr));
+ tmpaddr = *addr;
+ tmpaddr_len = addr_len;
+
+ if (((struct sockaddr *) &tmpaddr)->sa_family == AF_INET6) {
+ int32_t one_to_four, four_to_eight, twelve_to_sixteen;
+ int16_t eight_to_ten, ten_to_twelve;
+
+ one_to_four = four_to_eight = twelve_to_sixteen = 0;
+ eight_to_ten = ten_to_twelve = 0;
+
+ one_to_four = ((struct sockaddr_in6 *)
+ &tmpaddr)->sin6_addr.s6_addr32[0];
+ four_to_eight = ((struct sockaddr_in6 *)
+ &tmpaddr)->sin6_addr.s6_addr32[1];
+#ifdef GF_SOLARIS_HOST_OS
+ eight_to_ten = S6_ADDR16(((struct sockaddr_in6 *)
+ &tmpaddr)->sin6_addr)[4];
+#else
+ eight_to_ten = ((struct sockaddr_in6 *)
+ &tmpaddr)->sin6_addr.s6_addr16[4];
+#endif
+
+#ifdef GF_SOLARIS_HOST_OS
+ ten_to_twelve = S6_ADDR16(((struct sockaddr_in6 *)
+ &tmpaddr)->sin6_addr)[5];
+#else
+ ten_to_twelve = ((struct sockaddr_in6 *)
+ &tmpaddr)->sin6_addr.s6_addr16[5];
+#endif
+ twelve_to_sixteen = ((struct sockaddr_in6 *)
+ &tmpaddr)->sin6_addr.s6_addr32[3];
+
+ /* ipv4 mapped ipv6 address has
+ bits 0-80: 0
+ bits 80-96: 0xffff
+ bits 96-128: ipv4 address
+ */
+
+ if (one_to_four == 0 &&
+ four_to_eight == 0 &&
+ eight_to_ten == 0 &&
+ ten_to_twelve == -1) {
+ struct sockaddr_in *in_ptr = (struct sockaddr_in *)&tmpaddr;
+ memset (&tmpaddr, 0, sizeof (tmpaddr));
+
+ in_ptr->sin_family = AF_INET;
+ in_ptr->sin_port = ((struct sockaddr_in6 *)addr)->sin6_port;
+ in_ptr->sin_addr.s_addr = twelve_to_sixteen;
+ tmpaddr_len = sizeof (*in_ptr);
+ }
+ }
+
+ ret = getnameinfo ((struct sockaddr *) &tmpaddr,
+ tmpaddr_len,
+ host, sizeof (host),
+ service, sizeof (service),
+ NI_NUMERICHOST | NI_NUMERICSERV);
+ if (ret != 0) {
+ gf_log (this->name,
+ GF_LOG_ERROR,
+ "getnameinfo failed (%s)", gai_strerror (ret));
+ }
+
+ sprintf (identifier, "%s:%s", host, service);
+
+ return ret;
+}
+
+int32_t
+gf_rdma_get_transport_identifiers (rpc_transport_t *this)
+{
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ switch (((struct sockaddr *) &this->myinfo.sockaddr)->sa_family)
+ {
+ case AF_INET_SDP:
+ is_inet_sdp = 1;
+ ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET;
+
+ case AF_INET:
+ case AF_INET6:
+ {
+ ret = fill_inet6_inet_identifiers (this,
+ &this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len,
+ this->myinfo.identifier);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "can't fill inet/inet6 identifier for server");
+ goto err;
+ }
+
+ ret = fill_inet6_inet_identifiers (this,
+ &this->peerinfo.sockaddr,
+ this->peerinfo.sockaddr_len,
+ this->peerinfo.identifier);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "can't fill inet/inet6 identifier for client");
+ goto err;
+ }
+
+ if (is_inet_sdp) {
+ ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET_SDP;
+ }
+ }
+ break;
+
+ case AF_UNIX:
+ {
+ struct sockaddr_un *sunaddr = NULL;
+
+ sunaddr = (struct sockaddr_un *) &this->myinfo.sockaddr;
+ strcpy (this->myinfo.identifier, sunaddr->sun_path);
+
+ sunaddr = (struct sockaddr_un *) &this->peerinfo.sockaddr;
+ strcpy (this->peerinfo.identifier, sunaddr->sun_path);
+ }
+ break;
+
+ default:
+ gf_log (this->name, GF_LOG_ERROR,
+ "unknown address family (%d)",
+ ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family);
+ ret = -1;
+ break;
+ }
+
+err:
+ return ret;
+}
diff --git a/rpc/rpc-transport/rdma/src/name.h b/rpc/rpc-transport/rdma/src/name.h
new file mode 100644
index 000000000..bd5e3ce6f
--- /dev/null
+++ b/rpc/rpc-transport/rdma/src/name.h
@@ -0,0 +1,47 @@
+/*
+ Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _IB_VERBS_NAME_H
+#define _IB_VERBS_NAME_H
+
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include "compat.h"
+
+int32_t
+gf_rdma_client_bind (rpc_transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len,
+ int sock);
+
+int32_t
+gf_rdma_client_get_remote_sockaddr (rpc_transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len);
+
+int32_t
+gf_rdma_server_get_local_sockaddr (rpc_transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len);
+
+int32_t
+gf_rdma_get_transport_identifiers (rpc_transport_t *this);
+
+#endif /* _IB_VERBS_NAME_H */
diff --git a/rpc/rpc-transport/rdma/src/rdma.c b/rpc/rpc-transport/rdma/src/rdma.c
new file mode 100644
index 000000000..601c65df7
--- /dev/null
+++ b/rpc/rpc-transport/rdma/src/rdma.c
@@ -0,0 +1,4795 @@
+/*
+ Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "dict.h"
+#include "glusterfs.h"
+#include "logging.h"
+#include "rdma.h"
+#include "name.h"
+#include "byte-order.h"
+#include "xlator.h"
+#include <signal.h>
+
+#define RDMA_LOG_NAME "rpc-transport/rdma"
+
+static int32_t
+__rdma_ioq_churn (rdma_peer_t *peer);
+
+rdma_post_t *
+rdma_post_ref (rdma_post_t *post);
+
+int
+rdma_post_unref (rdma_post_t *post);
+
+int32_t
+gf_resolve_ip6 (const char *hostname,
+ uint16_t port,
+ int family,
+ void **dnscache,
+ struct addrinfo **addr_info);
+
+static uint16_t
+rdma_get_local_lid (struct ibv_context *context,
+ int32_t port)
+{
+ struct ibv_port_attr attr;
+
+ if (ibv_query_port (context, port, &attr))
+ return 0;
+
+ return attr.lid;
+}
+
+static const char *
+get_port_state_str(enum ibv_port_state pstate)
+{
+ switch (pstate) {
+ case IBV_PORT_DOWN: return "PORT_DOWN";
+ case IBV_PORT_INIT: return "PORT_INIT";
+ case IBV_PORT_ARMED: return "PORT_ARMED";
+ case IBV_PORT_ACTIVE: return "PORT_ACTIVE";
+ case IBV_PORT_ACTIVE_DEFER: return "PORT_ACTIVE_DEFER";
+ default: return "invalid state";
+ }
+}
+
+static int32_t
+ib_check_active_port (struct ibv_context *ctx, uint8_t port)
+{
+ struct ibv_port_attr port_attr;
+
+ int32_t ret = 0;
+ const char *state_str = NULL;
+
+ if (!ctx) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "Error in supplied context");
+ return -1;
+ }
+
+ ret = ibv_query_port (ctx, port, &port_attr);
+
+ if (ret) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "Failed to query port %u properties", port);
+ return -1;
+ }
+
+ state_str = get_port_state_str (port_attr.state);
+ gf_log (RDMA_LOG_NAME, GF_LOG_TRACE,
+ "Infiniband PORT: (%u) STATE: (%s)",
+ port, state_str);
+
+ if (port_attr.state == IBV_PORT_ACTIVE)
+ return 0;
+
+ return -1;
+}
+
+static int32_t
+ib_get_active_port (struct ibv_context *ib_ctx)
+{
+ struct ibv_device_attr ib_device_attr;
+
+ int32_t ret = -1;
+ uint8_t ib_port = 0;
+
+ if (!ib_ctx) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "Error in supplied context");
+ return -1;
+ }
+ if (ibv_query_device (ib_ctx, &ib_device_attr)) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "Failed to query device properties");
+ return -1;
+ }
+
+ for (ib_port = 1; ib_port <= ib_device_attr.phys_port_cnt; ++ib_port) {
+ ret = ib_check_active_port (ib_ctx, ib_port);
+ if (ret == 0)
+ return ib_port;
+
+ gf_log (RDMA_LOG_NAME, GF_LOG_TRACE,
+ "Port:(%u) not active", ib_port);
+ continue;
+ }
+ return ret;
+}
+
+
+static void
+rdma_put_post (rdma_queue_t *queue,
+ rdma_post_t *post)
+{
+ post->ctx.is_request = 0;
+
+ pthread_mutex_lock (&queue->lock);
+ {
+ if (post->prev) {
+ queue->active_count--;
+ post->prev->next = post->next;
+ }
+
+ if (post->next) {
+ post->next->prev = post->prev;
+ }
+
+ post->prev = &queue->passive_posts;
+ post->next = post->prev->next;
+ post->prev->next = post;
+ post->next->prev = post;
+ queue->passive_count++;
+ }
+ pthread_mutex_unlock (&queue->lock);
+}
+
+
+static rdma_post_t *
+rdma_new_post (rdma_device_t *device, int32_t len, rdma_post_type_t type)
+{
+ rdma_post_t *post = NULL;
+ int ret = -1;
+
+ post = (rdma_post_t *) GF_CALLOC (1, sizeof (*post),
+ gf_common_mt_rdma_post_t);
+ if (post == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ goto out;
+ }
+
+ pthread_mutex_init (&post->lock, NULL);
+
+ post->buf_size = len;
+
+ post->buf = valloc (len);
+ if (!post->buf) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ goto out;
+ }
+
+ post->mr = ibv_reg_mr (device->pd,
+ post->buf,
+ post->buf_size,
+ IBV_ACCESS_LOCAL_WRITE);
+ if (!post->mr) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "memory registration failed");
+ goto out;
+ }
+
+ post->device = device;
+ post->type = type;
+
+ ret = 0;
+out:
+ if (ret != 0) {
+ if (post->buf != NULL) {
+ free (post->buf);
+ }
+
+ GF_FREE (post);
+ post = NULL;
+ }
+
+ return post;
+}
+
+
+static rdma_post_t *
+rdma_get_post (rdma_queue_t *queue)
+{
+ rdma_post_t *post;
+
+ pthread_mutex_lock (&queue->lock);
+ {
+ post = queue->passive_posts.next;
+ if (post == &queue->passive_posts)
+ post = NULL;
+
+ if (post) {
+ if (post->prev)
+ post->prev->next = post->next;
+ if (post->next)
+ post->next->prev = post->prev;
+ post->prev = &queue->active_posts;
+ post->next = post->prev->next;
+ post->prev->next = post;
+ post->next->prev = post;
+ post->reused++;
+ queue->active_count++;
+ }
+ }
+ pthread_mutex_unlock (&queue->lock);
+
+ return post;
+}
+
+void
+rdma_destroy_post (rdma_post_t *post)
+{
+ ibv_dereg_mr (post->mr);
+ free (post->buf);
+ GF_FREE (post);
+}
+
+
+static int32_t
+__rdma_quota_get (rdma_peer_t *peer)
+{
+ int32_t ret = -1;
+ rdma_private_t *priv = peer->trans->private;
+
+ if (priv->connected && peer->quota > 0) {
+ ret = peer->quota--;
+ }
+
+ return ret;
+}
+
+/*
+ static int32_t
+ rdma_quota_get (rdma_peer_t *peer)
+ {
+ int32_t ret = -1;
+ rdma_private_t *priv = peer->trans->private;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ ret = __rdma_quota_get (peer);
+ }
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ return ret;
+ }
+*/
+
+static void
+__rdma_ioq_entry_free (rdma_ioq_t *entry)
+{
+ list_del_init (&entry->list);
+
+ if (entry->iobref) {
+ iobref_unref (entry->iobref);
+ entry->iobref = NULL;
+ }
+
+ if (entry->msg.request.rsp_iobref) {
+ iobref_unref (entry->msg.request.rsp_iobref);
+ entry->msg.request.rsp_iobref = NULL;
+ }
+ /* TODO: use mem-pool */
+ mem_put (entry->pool, entry);
+}
+
+
+static void
+__rdma_ioq_flush (rdma_peer_t *peer)
+{
+ rdma_ioq_t *entry = NULL, *dummy = NULL;
+
+ list_for_each_entry_safe (entry, dummy, &peer->ioq, list) {
+ __rdma_ioq_entry_free (entry);
+ }
+}
+
+
+static int32_t
+__rdma_disconnect (rpc_transport_t *this)
+{
+ rdma_private_t *priv = this->private;
+ int32_t ret = 0;
+
+ if (priv->connected || priv->tcp_connected) {
+ fcntl (priv->sock, F_SETFL, O_NONBLOCK);
+ if (shutdown (priv->sock, SHUT_RDWR) != 0) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_DEBUG,
+ "shutdown () - error: %s",
+ strerror (errno));
+ ret = -errno;
+ priv->tcp_connected = 0;
+ }
+ }
+
+ return ret;
+}
+
+
+static int32_t
+rdma_post_send (struct ibv_qp *qp,
+ rdma_post_t *post,
+ int32_t len)
+{
+ struct ibv_sge list = {
+ .addr = (unsigned long) post->buf,
+ .length = len,
+ .lkey = post->mr->lkey
+ };
+
+ struct ibv_send_wr wr = {
+ .wr_id = (unsigned long) post,
+ .sg_list = &list,
+ .num_sge = 1,
+ .opcode = IBV_WR_SEND,
+ .send_flags = IBV_SEND_SIGNALED,
+ }, *bad_wr;
+
+ if (!qp)
+ return -1;
+
+ return ibv_post_send (qp, &wr, &bad_wr);
+}
+
+int
+__rdma_encode_error(rdma_peer_t *peer, rdma_reply_info_t *reply_info,
+ struct iovec *rpchdr, uint32_t *ptr,
+ rdma_errcode_t err)
+{
+ uint32_t *startp = NULL;
+ struct rpc_msg *rpc_msg = NULL;
+
+ startp = ptr;
+ if (reply_info != NULL) {
+ *ptr++ = hton32(reply_info->rm_xid);
+ } else {
+ rpc_msg = rpchdr[0].iov_base; /* assume rpchdr contains
+ * only one vector.
+ * (which is true)
+ */
+ *ptr++ = rpc_msg->rm_xid;
+ }
+
+ *ptr++ = hton32(RDMA_VERSION);
+ *ptr++ = hton32(peer->send_count);
+ *ptr++ = hton32(RDMA_ERROR);
+ *ptr++ = hton32(err);
+ if (err == ERR_VERS) {
+ *ptr++ = hton32(RDMA_VERSION);
+ *ptr++ = hton32(RDMA_VERSION);
+ }
+
+ return (int)((unsigned long)ptr - (unsigned long)startp);
+}
+
+
+int32_t
+__rdma_send_error (rdma_peer_t *peer, rdma_ioq_t *entry, rdma_post_t *post,
+ rdma_reply_info_t *reply_info, rdma_errcode_t err)
+{
+ int32_t ret = -1, len;
+
+ len = __rdma_encode_error (peer, reply_info, entry->rpchdr,
+ (uint32_t *)post->buf, err);
+ if (len == -1) {
+ goto out;
+ }
+
+ rdma_post_ref (post);
+
+ ret = rdma_post_send (peer->qp, post, len);
+ if (!ret) {
+ ret = len;
+ } else {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "ibv_post_send failed with ret = %d", ret);
+ rdma_post_unref (post);
+ __rdma_disconnect (peer->trans);
+ ret = -1;
+ }
+
+out:
+ return ret;
+}
+
+
+int32_t
+__rdma_create_read_chunks_from_vector (rdma_peer_t *peer,
+ rdma_read_chunk_t **readch_ptr,
+ int32_t *pos, struct iovec *vector,
+ int count,
+ rdma_request_context_t *request_ctx)
+{
+ int i = 0;
+ rdma_private_t *priv = NULL;
+ rdma_device_t *device = NULL;
+ struct ibv_mr *mr = NULL;
+ rdma_read_chunk_t *readch = NULL;
+ int32_t ret = -1;
+
+ if ((peer == NULL) || (readch_ptr == NULL) || (*readch_ptr == NULL)
+ || (request_ctx == NULL) || (vector == NULL)) {
+ goto out;
+ }
+
+ priv = peer->trans->private;
+ device = priv->device;
+ readch = *readch_ptr;
+
+ for (i = 0; i < count; i++) {
+ readch->rc_discrim = hton32 (1);
+ readch->rc_position = hton32 (*pos);
+
+ mr = ibv_reg_mr (device->pd, vector[i].iov_base,
+ vector[i].iov_len,
+ IBV_ACCESS_REMOTE_READ);
+ if (!mr) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "memory registration failed");
+ goto out;
+ }
+
+ request_ctx->mr[request_ctx->mr_count++] = mr;
+
+ readch->rc_target.rs_handle = hton32 (mr->rkey);
+ readch->rc_target.rs_length
+ = hton32 (vector[i].iov_len);
+ readch->rc_target.rs_offset
+ = hton64 ((uint64_t)(unsigned long)vector[i].iov_base);
+
+ *pos = *pos + vector[i].iov_len;
+ readch++;
+ }
+
+ *readch_ptr = readch;
+
+ ret = 0;
+out:
+ return ret;
+}
+
+
+int32_t
+__rdma_create_read_chunks (rdma_peer_t *peer, rdma_ioq_t *entry,
+ rdma_chunktype_t type, uint32_t **ptr,
+ rdma_request_context_t *request_ctx)
+{
+ int32_t ret = -1;
+ rdma_device_t *device = NULL;
+ rdma_private_t *priv = NULL;
+ int pos = 0;
+
+ if ((peer == NULL) || (entry == NULL) || (ptr == NULL)
+ || (*ptr == NULL) || (request_ctx == NULL)) {
+ goto out;
+ }
+
+ priv = peer->trans->private;
+ device = priv->device;
+
+ request_ctx->iobref = iobref_ref (entry->iobref);
+
+ if (type == rdma_areadch) {
+ pos = 0;
+ ret = __rdma_create_read_chunks_from_vector (peer,
+ (rdma_read_chunk_t **)ptr,
+ &pos,
+ entry->rpchdr,
+ entry->rpchdr_count,
+ request_ctx);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "cannot create read chunks from vector, "
+ "entry->rpchdr");
+ goto out;
+ }
+
+ ret = __rdma_create_read_chunks_from_vector (peer,
+ (rdma_read_chunk_t **)ptr,
+ &pos,
+ entry->proghdr,
+ entry->proghdr_count,
+ request_ctx);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "cannot create read chunks from vector, "
+ "entry->proghdr");
+ }
+
+ if (entry->prog_payload_count != 0) {
+ ret = __rdma_create_read_chunks_from_vector (peer,
+ (rdma_read_chunk_t **)ptr,
+ &pos,
+ entry->prog_payload,
+ entry->prog_payload_count,
+ request_ctx);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "cannot create read chunks from vector,"
+ " entry->prog_payload");
+ }
+ }
+ } else {
+ pos = iov_length (entry->rpchdr, entry->rpchdr_count);
+ ret = __rdma_create_read_chunks_from_vector (peer,
+ (rdma_read_chunk_t **)ptr,
+ &pos,
+ entry->prog_payload,
+ entry->prog_payload_count,
+ request_ctx);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "cannot create read chunks from vector, "
+ "entry->prog_payload");
+ }
+ }
+
+ /* terminate read-chunk list*/
+ **ptr = 0;
+ *ptr = *ptr + 1;
+out:
+ return ret;
+}
+
+
+int32_t
+__rdma_create_write_chunks_from_vector (rdma_peer_t *peer,
+ rdma_write_chunk_t **writech_ptr,
+ struct iovec *vector, int count,
+ rdma_request_context_t *request_ctx)
+{
+ int i = 0;
+ rdma_private_t *priv = NULL;
+ rdma_device_t *device = NULL;
+ struct ibv_mr *mr = NULL;
+ rdma_write_chunk_t *writech = NULL;
+ int32_t ret = -1;
+
+ if ((peer == NULL) || (writech_ptr == NULL) || (*writech_ptr == NULL)
+ || (request_ctx == NULL) || (vector == NULL)) {
+ goto out;
+ }
+
+ writech = *writech_ptr;
+
+ priv = peer->trans->private;
+ device = priv->device;
+
+ for (i = 0; i < count; i++) {
+ mr = ibv_reg_mr (device->pd, vector[i].iov_base,
+ vector[i].iov_len,
+ IBV_ACCESS_REMOTE_WRITE
+ | IBV_ACCESS_LOCAL_WRITE);
+ if (!mr) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "memory registration failed");
+ goto out;
+ }
+
+ request_ctx->mr[request_ctx->mr_count++] = mr;
+
+ writech->wc_target.rs_handle = hton32 (mr->rkey);
+ writech->wc_target.rs_length = hton32 (vector[i].iov_len);
+ writech->wc_target.rs_offset
+ = hton64 (((uint64_t)(unsigned long)vector[i].iov_base));
+
+ writech++;
+ }
+
+ *writech_ptr = writech;
+
+ ret = 0;
+out:
+ return ret;
+}
+
+
+int32_t
+__rdma_create_write_chunks (rdma_peer_t *peer, rdma_ioq_t *entry,
+ rdma_chunktype_t chunk_type, uint32_t **ptr,
+ rdma_request_context_t *request_ctx)
+{
+ int32_t ret = -1;
+ rdma_write_array_t *warray = NULL;
+
+ if ((peer == NULL) || (entry == NULL) || (ptr == NULL)
+ || (*ptr == NULL) || (request_ctx == NULL)) {
+ goto out;
+ }
+
+ if ((chunk_type == rdma_replych)
+ && ((entry->msg.request.rsphdr_count != 1) ||
+ (entry->msg.request.rsphdr_vec[0].iov_base == NULL))) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ (entry->msg.request.rsphdr_count == 1)
+ ? "chunktype specified as reply chunk but the vector "
+ "specifying the buffer to be used for holding reply"
+ " header is not correct" :
+ "chunktype specified as reply chunk, but more than one "
+ "buffer provided for holding reply");
+ goto out;
+ }
+
+/*
+ if ((chunk_type == rdma_writech)
+ && ((entry->msg.request.rsphdr_count == 0)
+ || (entry->msg.request.rsphdr_vec[0].iov_base == NULL))) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "vector specifying buffer to hold the program's reply "
+ "header should also be provided when buffers are "
+ "provided for holding the program's payload in reply");
+ goto out;
+ }
+*/
+
+ if (chunk_type == rdma_writech) {
+ warray = (rdma_write_array_t *)*ptr;
+ warray->wc_discrim = hton32 (1);
+ warray->wc_nchunks
+ = hton32 (entry->msg.request.rsp_payload_count);
+
+ *ptr = (uint32_t *)&warray->wc_array[0];
+
+ ret = __rdma_create_write_chunks_from_vector (peer,
+ (rdma_write_chunk_t **)ptr,
+ entry->msg.request.rsp_payload,
+ entry->msg.request.rsp_payload_count,
+ request_ctx);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "cannot create write chunks from vector "
+ "entry->rpc_payload");
+ goto out;
+ }
+
+ /* terminate write chunklist */
+ **ptr = 0;
+ *ptr = *ptr + 1;
+
+ /* no reply chunklist */
+ **ptr = 0;
+ *ptr = *ptr + 1;
+ } else {
+ /* no write chunklist */
+ **ptr = 0;
+ *ptr = *ptr + 1;
+
+ warray = (rdma_write_array_t *)*ptr;
+ warray->wc_discrim = hton32 (1);
+ warray->wc_nchunks = hton32 (entry->msg.request.rsphdr_count);
+
+ *ptr = (uint32_t *)&warray->wc_array[0];
+
+ ret = __rdma_create_write_chunks_from_vector (peer,
+ (rdma_write_chunk_t **)ptr,
+ entry->msg.request.rsphdr_vec,
+ entry->msg.request.rsphdr_count,
+ request_ctx);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "cannot create write chunks from vector "
+ "entry->rpchdr");
+ goto out;
+ }
+
+ /* terminate reply chunklist */
+ **ptr = 0;
+ *ptr = *ptr + 1;
+ }
+
+out:
+ return ret;
+}
+
+
+inline void
+__rdma_deregister_mr (struct ibv_mr **mr, int count)
+{
+ int i = 0;
+
+ if (mr == NULL) {
+ goto out;
+ }
+
+ for (i = 0; i < count; i++) {
+ ibv_dereg_mr (mr[i]);
+ }
+
+out:
+ return;
+}
+
+
+static int32_t
+__rdma_quota_put (rdma_peer_t *peer)
+{
+ int32_t ret;
+
+ peer->quota++;
+ ret = peer->quota;
+
+ if (!list_empty (&peer->ioq)) {
+ ret = __rdma_ioq_churn (peer);
+ }
+
+ return ret;
+}
+
+
+static int32_t
+rdma_quota_put (rdma_peer_t *peer)
+{
+ int32_t ret;
+ rdma_private_t *priv = peer->trans->private;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ ret = __rdma_quota_put (peer);
+ }
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ return ret;
+}
+
+
+/* to be called with priv->mutex held */
+void
+__rdma_request_context_destroy (rdma_request_context_t *context)
+{
+ rdma_peer_t *peer = NULL;
+ rdma_private_t *priv = NULL;
+ int32_t ret = 0;
+
+ if (context == NULL) {
+ goto out;
+ }
+
+ peer = context->peer;
+
+ __rdma_deregister_mr (context->mr, context->mr_count);
+
+ priv = peer->trans->private;
+
+ if (priv->connected) {
+ ret = __rdma_quota_put (peer);
+ if (ret < 0) {
+ gf_log ("rdma", GF_LOG_DEBUG,
+ "failed to send "
+ "message");
+ mem_put (context->pool, context);
+ __rdma_disconnect (peer->trans);
+ goto out;
+ }
+ }
+
+ if (context->iobref != NULL) {
+ iobref_unref (context->iobref);
+ context->iobref = NULL;
+ }
+
+ if (context->rsp_iobref != NULL) {
+ iobref_unref (context->rsp_iobref);
+ context->rsp_iobref = NULL;
+ }
+
+ mem_put (context->pool, context);
+
+out:
+ return;
+}
+
+
+
+void
+rdma_post_context_destroy (rdma_post_context_t *ctx)
+{
+ if (ctx == NULL) {
+ goto out;
+ }
+
+ __rdma_deregister_mr (ctx->mr, ctx->mr_count);
+
+ if (ctx->iobref != NULL) {
+ iobref_unref (ctx->iobref);
+ }
+
+ memset (ctx, 0, sizeof (*ctx));
+out:
+ return;
+}
+
+
+static int32_t
+rdma_post_recv (struct ibv_srq *srq,
+ rdma_post_t *post)
+{
+ struct ibv_sge list = {
+ .addr = (unsigned long) post->buf,
+ .length = post->buf_size,
+ .lkey = post->mr->lkey
+ };
+
+ struct ibv_recv_wr wr = {
+ .wr_id = (unsigned long) post,
+ .sg_list = &list,
+ .num_sge = 1,
+ }, *bad_wr;
+
+ rdma_post_ref (post);
+
+ return ibv_post_srq_recv (srq, &wr, &bad_wr);
+}
+
+
+int
+rdma_post_unref (rdma_post_t *post)
+{
+ int refcount = -1;
+
+ if (post == NULL) {
+ goto out;
+ }
+
+ pthread_mutex_lock (&post->lock);
+ {
+ refcount = --post->refcount;
+ }
+ pthread_mutex_unlock (&post->lock);
+
+ if (refcount == 0) {
+ rdma_post_context_destroy (&post->ctx);
+ if (post->type == RDMA_SEND_POST) {
+ rdma_put_post (&post->device->sendq, post);
+ } else {
+ rdma_post_recv (post->device->srq, post);
+ }
+ }
+out:
+ return refcount;
+}
+
+
+int
+rdma_post_get_refcount (rdma_post_t *post)
+{
+ int refcount = -1;
+
+ if (post == NULL) {
+ goto out;
+ }
+
+ pthread_mutex_lock (&post->lock);
+ {
+ refcount = post->refcount;
+ }
+ pthread_mutex_unlock (&post->lock);
+
+out:
+ return refcount;
+}
+
+rdma_post_t *
+rdma_post_ref (rdma_post_t *post)
+{
+ if (post == NULL) {
+ goto out;
+ }
+
+ pthread_mutex_lock (&post->lock);
+ {
+ post->refcount++;
+ }
+ pthread_mutex_unlock (&post->lock);
+
+out:
+ return post;
+}
+
+
+int32_t
+__rdma_ioq_churn_request (rdma_peer_t *peer, rdma_ioq_t *entry,
+ rdma_post_t *post)
+{
+ rdma_chunktype_t rtype = rdma_noch, wtype = rdma_noch;
+ uint64_t send_size = 0;
+ rdma_header_t *hdr = NULL;
+ struct rpc_msg *rpc_msg = NULL;
+ uint32_t *chunkptr = NULL;
+ char *buf = NULL;
+ int32_t ret = 0;
+ rdma_private_t *priv = NULL;
+ rdma_device_t *device = NULL;
+ int chunk_count = 0;
+ rdma_request_context_t *request_ctx = NULL;
+ uint32_t prog_payload_length = 0, len = 0;
+ struct rpc_req *rpc_req = NULL;
+
+ if ((peer == NULL) || (entry == NULL) || (post == NULL)) {
+ goto out;
+ }
+
+ if ((entry->msg.request.rsphdr_count != 0)
+ && (entry->msg.request.rsp_payload_count != 0)) {
+ ret = -1;
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "both write-chunklist and reply-chunk cannot be "
+ "present");
+ goto out;
+ }
+
+ post->ctx.is_request = 1;
+ priv = peer->trans->private;
+ device = priv->device;
+
+ hdr = (rdma_header_t *)post->buf;
+
+ send_size = iov_length (entry->rpchdr, entry->rpchdr_count)
+ + iov_length (entry->proghdr, entry->proghdr_count)
+ + RDMA_MAX_HEADER_SIZE;
+
+ if (entry->prog_payload_count != 0) {
+ prog_payload_length
+ = iov_length (entry->prog_payload,
+ entry->prog_payload_count);
+ }
+
+ if (send_size > RDMA_INLINE_THRESHOLD) {
+ rtype = rdma_areadch;
+ } else if ((send_size + prog_payload_length) < RDMA_INLINE_THRESHOLD) {
+ rtype = rdma_noch;
+ } else if (entry->prog_payload_count != 0) {
+ rtype = rdma_readch;
+ }
+
+ if (entry->msg.request.rsphdr_count != 0) {
+ wtype = rdma_replych;
+ } else if (entry->msg.request.rsp_payload_count != 0) {
+ wtype = rdma_writech;
+ }
+
+ if (rtype == rdma_readch) {
+ chunk_count += entry->prog_payload_count;
+ } else if (rtype == rdma_areadch) {
+ chunk_count += entry->rpchdr_count;
+ chunk_count += entry->proghdr_count;
+ }
+
+ if (wtype == rdma_writech) {
+ chunk_count += entry->msg.request.rsp_payload_count;
+ } else if (wtype == rdma_replych) {
+ chunk_count += entry->msg.request.rsphdr_count;
+ }
+
+ if (chunk_count > RDMA_MAX_SEGMENTS) {
+ ret = -1;
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "chunk count(%d) exceeding maximum allowed RDMA "
+ "segment count(%d)", chunk_count, RDMA_MAX_SEGMENTS);
+ goto out;
+ }
+
+ request_ctx = mem_get (priv->request_ctx_pool);
+ if (request_ctx == NULL) {
+ ret = -1;
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ goto out;
+ }
+
+ memset (request_ctx, 0, sizeof (*request_ctx));
+
+ request_ctx->pool = priv->request_ctx_pool;
+ request_ctx->peer = peer;
+
+ entry->msg.request.rpc_req->conn_private = request_ctx;
+ if (entry->msg.request.rsp_iobref != NULL) {
+ request_ctx->rsp_iobref
+ = iobref_ref (entry->msg.request.rsp_iobref);
+ }
+
+ rpc_msg = (struct rpc_msg *) entry->rpchdr[0].iov_base;
+
+ hdr->rm_xid = rpc_msg->rm_xid; /* no need of hton32(rpc_msg->rm_xid),
+ * since rpc_msg->rm_xid is already
+ * hton32ed value of actual xid
+ */
+ hdr->rm_vers = hton32 (RDMA_VERSION);
+ hdr->rm_credit = hton32 (peer->send_count);
+ if (rtype == rdma_areadch) {
+ hdr->rm_type = hton32 (RDMA_NOMSG);
+ } else {
+ hdr->rm_type = hton32 (RDMA_MSG);
+ }
+
+ chunkptr = &hdr->rm_body.rm_chunks[0];
+ if (rtype != rdma_noch) {
+ ret = __rdma_create_read_chunks (peer, entry, rtype, &chunkptr,
+ request_ctx);
+ if (ret != 0) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "creation of read chunks failed");
+ goto out;
+ }
+ } else {
+ *chunkptr++ = 0; /* no read chunks */
+ }
+
+ if (wtype != rdma_noch) {
+ ret = __rdma_create_write_chunks (peer, entry, wtype, &chunkptr,
+ request_ctx);
+ if (ret != 0) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "creation of write/reply chunk failed");
+ goto out;
+ }
+ } else {
+ *chunkptr++ = 0; /* no write chunks */
+ *chunkptr++ = 0; /* no reply chunk */
+ }
+
+ buf = (char *)chunkptr;
+
+ if (rtype != rdma_areadch) {
+ iov_unload (buf, entry->rpchdr, entry->rpchdr_count);
+ buf += iov_length (entry->rpchdr, entry->rpchdr_count);
+
+ iov_unload (buf, entry->proghdr, entry->proghdr_count);
+ buf += iov_length (entry->proghdr, entry->proghdr_count);
+
+ if (rtype != rdma_readch) {
+ iov_unload (buf, entry->prog_payload,
+ entry->prog_payload_count);
+ buf += iov_length (entry->prog_payload,
+ entry->prog_payload_count);
+ }
+ }
+
+ len = buf - post->buf;
+
+ rdma_post_ref (post);
+
+ ret = rdma_post_send (peer->qp, post, len);
+ if (!ret) {
+ ret = len;
+ } else {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "ibv_post_send failed with ret = %d", ret);
+ rdma_post_unref (post);
+ __rdma_disconnect (peer->trans);
+ ret = -1;
+ }
+
+out:
+ if (ret == -1) {
+ rpc_req = entry->msg.request.rpc_req;
+
+ __rdma_request_context_destroy (rpc_req->conn_private);
+ rpc_req->conn_private = NULL;
+ }
+
+ return ret;
+}
+
+
+inline void
+__rdma_fill_reply_header (rdma_header_t *header, struct iovec *rpchdr,
+ rdma_reply_info_t *reply_info, int credits)
+{
+ struct rpc_msg *rpc_msg = NULL;
+
+ if (reply_info != NULL) {
+ header->rm_xid = hton32 (reply_info->rm_xid);
+ } else {
+ rpc_msg = rpchdr[0].iov_base; /* assume rpchdr contains
+ * only one vector.
+ * (which is true)
+ */
+ header->rm_xid = rpc_msg->rm_xid;
+ }
+
+ header->rm_type = hton32 (RDMA_MSG);
+ header->rm_vers = hton32 (RDMA_VERSION);
+ header->rm_credit = hton32 (credits);
+
+ header->rm_body.rm_chunks[0] = 0; /* no read chunks */
+ header->rm_body.rm_chunks[1] = 0; /* no write chunks */
+ header->rm_body.rm_chunks[2] = 0; /* no reply chunks */
+
+ return;
+}
+
+
+int32_t
+__rdma_send_reply_inline (rdma_peer_t *peer, rdma_ioq_t *entry,
+ rdma_post_t *post, rdma_reply_info_t *reply_info)
+{
+ rdma_header_t *header = NULL;
+ int32_t send_size = 0, ret = 0;
+ char *buf = NULL;
+ rdma_private_t *priv = NULL;
+ rdma_device_t *device = NULL;
+
+ priv = peer->trans->private;
+ device = priv->device;
+
+ send_size = iov_length (entry->rpchdr, entry->rpchdr_count)
+ + iov_length (entry->proghdr, entry->proghdr_count)
+ + iov_length (entry->prog_payload, entry->prog_payload_count)
+ + sizeof (rdma_header_t); /*
+ * remember, no chunklists in the
+ * reply
+ */
+
+ if (send_size > RDMA_INLINE_THRESHOLD) {
+ ret = __rdma_send_error (peer, entry, post, reply_info,
+ ERR_CHUNK);
+ goto out;
+ }
+
+ header = (rdma_header_t *)post->buf;
+
+ __rdma_fill_reply_header (header, entry->rpchdr, reply_info,
+ peer->send_count);
+
+ buf = (char *)&header->rm_body.rm_chunks[3];
+
+ if (entry->rpchdr_count != 0) {
+ iov_unload (buf, entry->rpchdr, entry->rpchdr_count);
+ buf += iov_length (entry->rpchdr, entry->rpchdr_count);
+ }
+
+ if (entry->proghdr_count != 0) {
+ iov_unload (buf, entry->proghdr, entry->proghdr_count);
+ buf += iov_length (entry->proghdr, entry->proghdr_count);
+ }
+
+ if (entry->prog_payload_count != 0) {
+ iov_unload (buf, entry->prog_payload,
+ entry->prog_payload_count);
+ buf += iov_length (entry->prog_payload,
+ entry->prog_payload_count);
+ }
+
+ rdma_post_ref (post);
+
+ ret = rdma_post_send (peer->qp, post, (buf - post->buf));
+ if (!ret) {
+ ret = send_size;
+ } else {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "ibv_post_send failed with ret = %d", ret);
+ rdma_post_unref (post);
+ __rdma_disconnect (peer->trans);
+ ret = -1;
+ }
+
+out:
+ return ret;
+}
+
+
+int32_t
+__rdma_reply_encode_write_chunks (rdma_peer_t *peer, uint32_t payload_size,
+ rdma_post_t *post,
+ rdma_reply_info_t *reply_info,
+ uint32_t **ptr)
+{
+ uint32_t chunk_size = 0;
+ int32_t ret = -1;
+ rdma_write_array_t *target_array = NULL;
+ int i = 0;
+
+ target_array = (rdma_write_array_t *)*ptr;
+
+ for (i = 0; i < reply_info->wc_array->wc_nchunks; i++) {
+ chunk_size +=
+ reply_info->wc_array->wc_array[i].wc_target.rs_length;
+ }
+
+ if (chunk_size < payload_size) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "length of payload (%d) is exceeding the total "
+ "write chunk length (%d)", payload_size, chunk_size);
+ goto out;
+ }
+
+ target_array->wc_discrim = hton32 (1);
+ for (i = 0; (i < reply_info->wc_array->wc_nchunks)
+ && (payload_size != 0);
+ i++) {
+ target_array->wc_array[i].wc_target.rs_offset
+ = hton64 (reply_info->wc_array->wc_array[i].wc_target.rs_offset);
+
+ target_array->wc_array[i].wc_target.rs_length
+ = hton32 (min (payload_size,
+ reply_info->wc_array->wc_array[i].wc_target.rs_length));
+ }
+
+ target_array->wc_nchunks = hton32 (i);
+ target_array->wc_array[i].wc_target.rs_handle = 0; /* terminate
+ chunklist */
+
+ ret = 0;
+
+ *ptr = &target_array->wc_array[i].wc_target.rs_length;
+out:
+ return ret;
+}
+
+
+inline int32_t
+__rdma_register_local_mr_for_rdma (rdma_peer_t *peer,
+ struct iovec *vector, int count,
+ rdma_post_context_t *ctx)
+{
+ int i = 0;
+ int32_t ret = -1;
+ rdma_private_t *priv = NULL;
+ rdma_device_t *device = NULL;
+
+ if ((ctx == NULL) || (vector == NULL)) {
+ goto out;
+ }
+
+ priv = peer->trans->private;
+ device = priv->device;
+
+ for (i = 0; i < count; i++) {
+ /* what if the memory is registered more than once?
+ * Assume that a single write buffer is passed to afr, which
+ * then passes it to its children. If more than one children
+ * happen to use rdma, then the buffer is registered more than
+ * once.
+ * Ib-verbs specification says that multiple registrations of
+ * same memory location is allowed. Refer to 10.6.3.8 of
+ * Infiniband Architecture Specification Volume 1
+ * (Release 1.2.1)
+ */
+ ctx->mr[ctx->mr_count] = ibv_reg_mr (device->pd,
+ vector[i].iov_base,
+ vector[i].iov_len,
+ IBV_ACCESS_LOCAL_WRITE);
+ if (ctx->mr[ctx->mr_count] == NULL) {
+ goto out;
+ }
+
+ ctx->mr_count++;
+ }
+
+ ret = 0;
+out:
+ return ret;
+}
+
+/* 1. assumes xfer_len of data is pointed by vector(s) starting from vec[*idx]
+ * 2. modifies vec
+ */
+int32_t
+__rdma_write (rdma_peer_t *peer, rdma_post_t *post, struct iovec *vec,
+ uint32_t xfer_len, int *idx, rdma_write_chunk_t *writech)
+{
+ int size = 0, num_sge = 0, i = 0;
+ int32_t ret = -1;
+ struct ibv_sge *sg_list = NULL;
+ struct ibv_send_wr wr = {
+ .opcode = IBV_WR_RDMA_WRITE,
+ .send_flags = IBV_SEND_SIGNALED,
+ }, *bad_wr;
+
+ if ((peer == NULL) || (writech == NULL) || (idx == NULL)
+ || (post == NULL) || (vec == NULL) || (xfer_len == 0)) {
+ goto out;
+ }
+
+ for (i = *idx; size < xfer_len; i++) {
+ size += vec[i].iov_len;
+ }
+
+ num_sge = i - *idx;
+
+ sg_list = GF_CALLOC (num_sge, sizeof (struct ibv_sge), gf_common_mt_sge);
+ if (sg_list == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ ret = -1;
+ goto out;
+ }
+
+ for ((i = *idx), (num_sge = 0); (xfer_len != 0); i++, num_sge++) {
+ size = min (xfer_len, vec[i].iov_len);
+
+ sg_list [num_sge].addr = (unsigned long)vec[i].iov_base;
+ sg_list [num_sge].length = size;
+ sg_list [num_sge].lkey = post->ctx.mr[i]->lkey;
+
+ xfer_len -= size;
+ }
+
+ *idx = i;
+
+ if (size < vec[i - 1].iov_len) {
+ vec[i - 1].iov_base += size;
+ vec[i - 1].iov_len -= size;
+ *idx = i - 1;
+ }
+
+ wr.sg_list = sg_list;
+ wr.num_sge = num_sge;
+ wr.wr_id = (unsigned long) rdma_post_ref (post);
+ wr.wr.rdma.rkey = writech->wc_target.rs_handle;
+ wr.wr.rdma.remote_addr = writech->wc_target.rs_offset;
+
+ ret = ibv_post_send(peer->qp, &wr, &bad_wr);
+
+ GF_FREE (sg_list);
+out:
+ return ret;
+}
+
+
+int32_t
+__rdma_do_rdma_write (rdma_peer_t *peer, rdma_post_t *post,
+ struct iovec *vector, int count, struct iobref *iobref,
+ rdma_reply_info_t *reply_info)
+{
+ int i = 0, payload_idx = 0;
+ uint32_t payload_size = 0, xfer_len = 0;
+ int32_t ret = -1;
+
+ if (count != 0) {
+ payload_size = iov_length (vector, count);
+ }
+
+ if (payload_size == 0) {
+ ret = 0;
+ goto out;
+ }
+
+ ret = __rdma_register_local_mr_for_rdma (peer, vector, count,
+ &post->ctx);
+ if (ret == -1) {
+ goto out;
+ }
+
+ post->ctx.iobref = iobref_ref (iobref);
+
+ for (i = 0; (i < reply_info->wc_array->wc_nchunks)
+ && (payload_size != 0);
+ i++) {
+ xfer_len = min (payload_size,
+ reply_info->wc_array->wc_array[i].wc_target.rs_length);
+
+ ret = __rdma_write (peer, post, vector, xfer_len, &payload_idx,
+ &reply_info->wc_array->wc_array[i]);
+ if (ret == -1) {
+ goto out;
+ }
+
+ payload_size -= xfer_len;
+ }
+
+ ret = 0;
+out:
+
+ return ret;
+}
+
+
+int32_t
+__rdma_send_reply_type_nomsg (rdma_peer_t *peer, rdma_ioq_t *entry,
+ rdma_post_t *post, rdma_reply_info_t *reply_info)
+{
+ rdma_header_t *header = NULL;
+ char *buf = NULL;
+ uint32_t payload_size = 0;
+ int count = 0, i = 0;
+ rdma_private_t *priv = NULL;
+ rdma_device_t *device = NULL;
+ int32_t ret = 0;
+ struct iovec vector[MAX_IOVEC];
+
+ priv = peer->trans->private;
+ device = priv->device;
+
+ header = (rdma_header_t *)post->buf;
+
+ __rdma_fill_reply_header (header, entry->rpchdr, reply_info,
+ peer->send_count);
+
+ header->rm_type = hton32 (RDMA_NOMSG);
+
+ payload_size = iov_length (entry->rpchdr, entry->rpchdr_count) +
+ iov_length (entry->proghdr, entry->proghdr_count);
+
+ /* encode reply chunklist */
+ buf = (char *)&header->rm_body.rm_chunks[2];
+ ret = __rdma_reply_encode_write_chunks (peer, payload_size, post,
+ reply_info, (uint32_t **)&buf);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "encoding write chunks failed");
+ ret = __rdma_send_error (peer, entry, post, reply_info,
+ ERR_CHUNK);
+ goto out;
+ }
+
+ rdma_post_ref (post);
+
+ for (i = 0; i < entry->rpchdr_count; i++) {
+ vector[count++] = entry->rpchdr[i];
+ }
+
+ for (i = 0; i < entry->proghdr_count; i++) {
+ vector[count++] = entry->proghdr[i];
+ }
+
+ ret = __rdma_do_rdma_write (peer, post, vector, count, entry->iobref,
+ reply_info);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "rdma write to client failed");
+ rdma_post_unref (post);
+ goto out;
+ }
+
+ ret = rdma_post_send (peer->qp, post, (buf - post->buf));
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "rdma send to client failed");
+ rdma_post_unref (post);
+ } else {
+ ret = payload_size;
+ }
+
+out:
+ return ret;
+}
+
+
+int32_t
+__rdma_send_reply_type_msg (rdma_peer_t *peer, rdma_ioq_t *entry,
+ rdma_post_t *post, rdma_reply_info_t *reply_info)
+{
+ rdma_header_t *header = NULL;
+ int32_t send_size = 0, ret = 0;
+ char *ptr = NULL;
+ uint32_t payload_size = 0;
+ rdma_private_t *priv = NULL;
+ rdma_device_t *device = NULL;
+
+ priv = peer->trans->private;
+ device = priv->device;
+
+ send_size = iov_length (entry->rpchdr, entry->rpchdr_count)
+ + iov_length (entry->proghdr, entry->proghdr_count)
+ + RDMA_MAX_HEADER_SIZE;
+
+ if (send_size > RDMA_INLINE_THRESHOLD) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "client has provided only write chunks, but the "
+ "combined size of rpc and program header (%d) is "
+ "exceeding the size of msg that can be sent using "
+ "RDMA send (%d)", send_size, RDMA_INLINE_THRESHOLD);
+
+ ret = __rdma_send_error (peer, entry, post, reply_info,
+ ERR_CHUNK);
+ goto out;
+ }
+
+ header = (rdma_header_t *)post->buf;
+
+ __rdma_fill_reply_header (header, entry->rpchdr, reply_info,
+ peer->send_count);
+
+ payload_size = iov_length (entry->prog_payload,
+ entry->prog_payload_count);
+ ptr = (char *)&header->rm_body.rm_chunks[1];
+
+ ret = __rdma_reply_encode_write_chunks (peer, payload_size, post,
+ reply_info, (uint32_t **)&ptr);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "encoding write chunks failed");
+ ret = __rdma_send_error (peer, entry, post, reply_info,
+ ERR_CHUNK);
+ goto out;
+ }
+
+ *(uint32_t *)ptr = 0; /* terminate reply chunklist */
+ ptr += sizeof (uint32_t);
+
+ rdma_post_ref (post);
+
+ ret = __rdma_do_rdma_write (peer, post, entry->prog_payload,
+ entry->prog_payload_count, entry->iobref,
+ reply_info);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "rdma write to client failed");
+ rdma_post_unref (post);
+ goto out;
+ }
+
+ iov_unload (ptr, entry->rpchdr, entry->rpchdr_count);
+ ptr += iov_length (entry->rpchdr, entry->rpchdr_count);
+
+ iov_unload (ptr, entry->proghdr, entry->proghdr_count);
+ ptr += iov_length (entry->proghdr, entry->proghdr_count);
+
+ ret = rdma_post_send (peer->qp, post, (ptr - post->buf));
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "rdma send to client failed");
+ rdma_post_unref (post);
+ } else {
+ ret = send_size + payload_size;
+ }
+
+out:
+ return ret;
+}
+
+
+void
+rdma_reply_info_destroy (rdma_reply_info_t *reply_info)
+{
+ if (reply_info == NULL) {
+ goto out;
+ }
+
+ if (reply_info->wc_array != NULL) {
+ GF_FREE (reply_info->wc_array);
+ reply_info->wc_array = NULL;
+ }
+
+ mem_put (reply_info->pool, reply_info);
+out:
+ return;
+}
+
+
+rdma_reply_info_t *
+rdma_reply_info_alloc (rdma_peer_t *peer)
+{
+ rdma_reply_info_t *reply_info = NULL;
+ rdma_private_t *priv = NULL;
+
+ priv = peer->trans->private;
+
+ reply_info = mem_get (priv->reply_info_pool);
+ if (reply_info == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ goto out;
+ }
+
+ memset (reply_info, 0, sizeof (*reply_info));
+ reply_info->pool = priv->reply_info_pool;
+
+out:
+ return reply_info;
+}
+
+
+int32_t
+__rdma_ioq_churn_reply (rdma_peer_t *peer, rdma_ioq_t *entry, rdma_post_t *post)
+{
+ rdma_reply_info_t *reply_info = NULL;
+ int32_t ret = -1;
+ rdma_chunktype_t type = rdma_noch;
+
+ if ((peer == NULL) || (entry == NULL) || (post == NULL)) {
+ goto out;
+ }
+
+ reply_info = entry->msg.reply_info;
+ if (reply_info != NULL) {
+ type = reply_info->type;
+ }
+
+ switch (type) {
+ case rdma_noch:
+ ret = __rdma_send_reply_inline (peer, entry, post, reply_info);
+ break;
+
+ case rdma_replych:
+ ret = __rdma_send_reply_type_nomsg (peer, entry, post,
+ reply_info);
+ break;
+
+ case rdma_writech:
+ ret = __rdma_send_reply_type_msg (peer, entry, post,
+ reply_info);
+ break;
+
+ default:
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "invalid chunktype (%d) specified for sending reply",
+ type);
+ break;
+ }
+
+ if (reply_info != NULL) {
+ rdma_reply_info_destroy (reply_info);
+ }
+out:
+ return ret;
+}
+
+
+int32_t
+__rdma_ioq_churn_entry (rdma_peer_t *peer, rdma_ioq_t *entry)
+{
+ int32_t ret = 0, quota = 0;
+ rdma_private_t *priv = NULL;
+ rdma_device_t *device = NULL;
+ rdma_options_t *options = NULL;
+ rdma_post_t *post = NULL;
+
+ priv = peer->trans->private;
+ options = &priv->options;
+ device = priv->device;
+
+ quota = __rdma_quota_get (peer);
+ if (quota > 0) {
+ post = rdma_get_post (&device->sendq);
+ if (post == NULL) {
+ post = rdma_new_post (device,
+ (options->send_size + 2048),
+ RDMA_SEND_POST);
+ }
+
+ if (post == NULL) {
+ ret = -1;
+ goto out;
+ }
+
+ if (entry->is_request) {
+ ret = __rdma_ioq_churn_request (peer, entry, post);
+ } else {
+ ret = __rdma_ioq_churn_reply (peer, entry, post);
+ }
+
+ if (ret > 0) {
+ __rdma_ioq_entry_free (entry);
+ }
+ }
+
+out:
+ return ret;
+}
+
+
+static int32_t
+__rdma_ioq_churn (rdma_peer_t *peer)
+{
+ rdma_ioq_t *entry = NULL;
+ int32_t ret = 0;
+
+ while (!list_empty (&peer->ioq))
+ {
+ /* pick next entry */
+ entry = peer->ioq_next;
+
+ ret = __rdma_ioq_churn_entry (peer, entry);
+
+ if (ret <= 0)
+ break;
+ }
+
+ /*
+ list_for_each_entry_safe (entry, dummy, &peer->ioq, list) {
+ ret = __rdma_ioq_churn_entry (peer, entry);
+ if (ret <= 0) {
+ break;
+ }
+ }
+ */
+
+ return ret;
+}
+
+
+static int32_t
+rdma_writev (rpc_transport_t *this,
+ rdma_ioq_t *entry)
+{
+ int32_t ret = 0, need_append = 1;
+ rdma_private_t *priv = this->private;
+ rdma_peer_t *peer = NULL;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ if (!priv->connected) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "rdma is not connected to post a "
+ "send request");
+ ret = -1;
+ goto unlock;
+ }
+
+ peer = &priv->peer;
+ if (list_empty (&peer->ioq)) {
+ ret = __rdma_ioq_churn_entry (peer, entry);
+ if (ret > 0) {
+ need_append = 0;
+ }
+ }
+
+ if (need_append) {
+ list_add_tail (&entry->list, &peer->ioq);
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+ return ret;
+}
+
+
+rdma_ioq_t *
+rdma_ioq_new (rpc_transport_t *this, rpc_transport_data_t *data)
+{
+ rdma_ioq_t *entry = NULL;
+ int count = 0, i = 0;
+ rpc_transport_msg_t *msg = NULL;
+ rdma_private_t *priv = NULL;
+
+ if ((data == NULL) || (this == NULL)) {
+ goto out;
+ }
+
+ priv = this->private;
+ /* TODO: use mem-pool */
+ entry = mem_get (priv->ioq_pool);
+ if (entry == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ goto out;
+ }
+ memset (entry, 0, sizeof (*entry));
+ entry->pool = priv->ioq_pool;
+
+ if (data->is_request) {
+ msg = &data->data.req.msg;
+ if (data->data.req.rsp.rsphdr_count != 0) {
+ for (i = 0; i < data->data.req.rsp.rsphdr_count; i++) {
+ entry->msg.request.rsphdr_vec[i]
+ = data->data.req.rsp.rsphdr[i];
+ }
+
+ entry->msg.request.rsphdr_count =
+ data->data.req.rsp.rsphdr_count;
+ }
+
+ if (data->data.req.rsp.rsp_payload_count != 0) {
+ for (i = 0; i < data->data.req.rsp.rsp_payload_count;
+ i++) {
+ entry->msg.request.rsp_payload[i]
+ = data->data.req.rsp.rsp_payload[i];
+ }
+
+ entry->msg.request.rsp_payload_count =
+ data->data.req.rsp.rsp_payload_count;
+ }
+
+ entry->msg.request.rpc_req = data->data.req.rpc_req;
+
+ if (data->data.req.rsp.rsp_iobref != NULL) {
+ entry->msg.request.rsp_iobref
+ = iobref_ref (data->data.req.rsp.rsp_iobref);
+ }
+ } else {
+ msg = &data->data.reply.msg;
+ entry->msg.reply_info = data->data.reply.private;
+ }
+
+ entry->is_request = data->is_request;
+
+ count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount;
+
+ assert (count <= MAX_IOVEC);
+
+ if (msg->rpchdr != NULL) {
+ memcpy (&entry->rpchdr[0], msg->rpchdr,
+ sizeof (struct iovec) * msg->rpchdrcount);
+ entry->rpchdr_count = msg->rpchdrcount;
+ }
+
+ if (msg->proghdr != NULL) {
+ memcpy (&entry->proghdr[0], msg->proghdr,
+ sizeof (struct iovec) * msg->proghdrcount);
+ entry->proghdr_count = msg->proghdrcount;
+ }
+
+ if (msg->progpayload != NULL) {
+ memcpy (&entry->prog_payload[0], msg->progpayload,
+ sizeof (struct iovec) * msg->progpayloadcount);
+ entry->prog_payload_count = msg->progpayloadcount;
+ }
+
+ if (msg->iobref != NULL) {
+ entry->iobref = iobref_ref (msg->iobref);
+ }
+
+ INIT_LIST_HEAD (&entry->list);
+
+out:
+ return entry;
+}
+
+
+int32_t
+rdma_submit_request (rpc_transport_t *this,
+ rpc_transport_req_t *req)
+{
+ int32_t ret = 0;
+ rdma_ioq_t *entry = NULL;
+ rpc_transport_data_t data = {0, };
+
+ if (req == NULL) {
+ goto out;
+ }
+
+ data.is_request = 1;
+ data.data.req = *req;
+
+ entry = rdma_ioq_new (this, &data);
+ if (entry == NULL) {
+ goto out;
+ }
+
+ ret = rdma_writev (this, entry);
+
+ if (ret > 0) {
+ ret = 0;
+ } else if (ret < 0) {
+ rpc_transport_disconnect (this);
+ }
+
+out:
+ return ret;
+}
+
+int32_t
+rdma_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
+{
+ int32_t ret = 0;
+ rdma_ioq_t *entry = NULL;
+ rpc_transport_data_t data = {0, };
+
+ if (reply == NULL) {
+ goto out;
+ }
+
+ data.data.reply = *reply;
+
+ entry = rdma_ioq_new (this, &data);
+ if (entry == NULL) {
+ goto out;
+ }
+
+ ret = rdma_writev (this, entry);
+ if (ret > 0) {
+ ret = 0;
+ } else if (ret < 0) {
+ rpc_transport_disconnect (this);
+ }
+
+out:
+ return ret;
+}
+
+#if 0
+static int
+rdma_receive (rpc_transport_t *this, char **hdr_p, size_t *hdrlen_p,
+ struct iobuf **iobuf_p)
+{
+ rdma_private_t *priv = this->private;
+ /* TODO: return error if !priv->connected, check with locks */
+ /* TODO: boundry checks for data_ptr/offset */
+ char *copy_from = NULL;
+ rdma_header_t *header = NULL;
+ uint32_t size1, size2, data_len = 0;
+ char *hdr = NULL;
+ struct iobuf *iobuf = NULL;
+ int32_t ret = 0;
+
+ pthread_mutex_lock (&priv->recv_mutex);
+ {
+/*
+ while (!priv->data_ptr)
+ pthread_cond_wait (&priv->recv_cond, &priv->recv_mutex);
+*/
+
+ copy_from = priv->data_ptr + priv->data_offset;
+
+ priv->data_ptr = NULL;
+ data_len = priv->data_len;
+ pthread_cond_broadcast (&priv->recv_cond);
+ }
+ pthread_mutex_unlock (&priv->recv_mutex);
+
+ header = (rdma_header_t *)copy_from;
+ if (strcmp (header->colonO, ":O")) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "%s: corrupt header received", this->name);
+ ret = -1;
+ goto err;
+ }
+
+ size1 = ntoh32 (header->size1);
+ size2 = ntoh32 (header->size2);
+
+ if (data_len != (size1 + size2 + sizeof (*header))) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "%s: sizeof data read from transport is not equal "
+ "to the size specified in the header",
+ this->name);
+ ret = -1;
+ goto err;
+ }
+
+ copy_from += sizeof (*header);
+
+ if (size1) {
+ hdr = GF_CALLOC (1, size1, gf_common_mt_char);
+ if (!hdr) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "unable to allocate header for peer %s",
+ this->peerinfo.identifier);
+ ret = -ENOMEM;
+ goto err;
+ }
+ memcpy (hdr, copy_from, size1);
+ copy_from += size1;
+ *hdr_p = hdr;
+ }
+ *hdrlen_p = size1;
+
+ if (size2) {
+ iobuf = iobuf_get (this->ctx->iobuf_pool);
+ if (!iobuf) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "unable to allocate IO buffer for peer %s",
+ this->peerinfo.identifier);
+ ret = -ENOMEM;
+ goto err;
+ }
+ memcpy (iobuf->ptr, copy_from, size2);
+ *iobuf_p = iobuf;
+ }
+
+err:
+ return ret;
+}
+#endif
+
+
+static void
+rdma_destroy_cq (rpc_transport_t *this)
+{
+ rdma_private_t *priv = this->private;
+ rdma_device_t *device = priv->device;
+
+ if (device->recv_cq)
+ ibv_destroy_cq (device->recv_cq);
+ device->recv_cq = NULL;
+
+ if (device->send_cq)
+ ibv_destroy_cq (device->send_cq);
+ device->send_cq = NULL;
+
+ return;
+}
+
+
+static int32_t
+rdma_create_cq (rpc_transport_t *this)
+{
+ rdma_private_t *priv = this->private;
+ rdma_options_t *options = &priv->options;
+ rdma_device_t *device = priv->device;
+ int32_t ret = 0;
+
+ device->recv_cq = ibv_create_cq (priv->device->context,
+ options->recv_count * 2,
+ device,
+ device->recv_chan,
+ 0);
+ if (!device->recv_cq) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_ERROR,
+ "%s: creation of CQ failed",
+ this->name);
+ ret = -1;
+ } else if (ibv_req_notify_cq (device->recv_cq, 0)) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_ERROR,
+ "%s: ibv_req_notify_cq on CQ failed",
+ this->name);
+ ret = -1;
+ }
+
+ do {
+ /* TODO: make send_cq size dynamically adaptive */
+ device->send_cq = ibv_create_cq (priv->device->context,
+ options->send_count * 1024,
+ device,
+ device->send_chan,
+ 0);
+ if (!device->send_cq) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_ERROR,
+ "%s: creation of send_cq failed",
+ this->name);
+ ret = -1;
+ break;
+ }
+
+ if (ibv_req_notify_cq (device->send_cq, 0)) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_ERROR,
+ "%s: ibv_req_notify_cq on send_cq failed",
+ this->name);
+ ret = -1;
+ break;
+ }
+ } while (0);
+
+ if (ret != 0)
+ rdma_destroy_cq (this);
+
+ return ret;
+}
+
+
+static int
+rdma_register_peer (rdma_device_t *device,
+ int32_t qp_num,
+ rdma_peer_t *peer)
+{
+ struct _qpent *ent = NULL;
+ rdma_qpreg_t *qpreg = NULL;
+ int32_t hash = 0;
+ int ret = -1;
+
+ qpreg = &device->qpreg;
+ hash = qp_num % 42;
+
+ pthread_mutex_lock (&qpreg->lock);
+ {
+ ent = qpreg->ents[hash].next;
+ while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num)) {
+ ent = ent->next;
+ }
+
+ if (ent->qp_num == qp_num) {
+ ret = 0;
+ goto unlock;
+ }
+
+ ent = (struct _qpent *) GF_CALLOC (1, sizeof (*ent),
+ gf_common_mt_qpent);
+ if (ent == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ goto unlock;
+ }
+
+ /* TODO: ref reg->peer */
+ ent->peer = peer;
+ ent->next = &qpreg->ents[hash];
+ ent->prev = ent->next->prev;
+ ent->next->prev = ent;
+ ent->prev->next = ent;
+ ent->qp_num = qp_num;
+ qpreg->count++;
+ ret = 0;
+ }
+unlock:
+ pthread_mutex_unlock (&qpreg->lock);
+
+ return ret;
+}
+
+
+static void
+rdma_unregister_peer (rdma_device_t *device,
+ int32_t qp_num)
+{
+ struct _qpent *ent;
+ rdma_qpreg_t *qpreg = &device->qpreg;
+ int32_t hash = qp_num % 42;
+
+ pthread_mutex_lock (&qpreg->lock);
+ ent = qpreg->ents[hash].next;
+ while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num))
+ ent = ent->next;
+ if (ent->qp_num != qp_num) {
+ pthread_mutex_unlock (&qpreg->lock);
+ return;
+ }
+ ent->prev->next = ent->next;
+ ent->next->prev = ent->prev;
+ /* TODO: unref reg->peer */
+ GF_FREE (ent);
+ qpreg->count--;
+ pthread_mutex_unlock (&qpreg->lock);
+}
+
+
+static rdma_peer_t *
+__rdma_lookup_peer (rdma_device_t *device, int32_t qp_num)
+{
+ struct _qpent *ent = NULL;
+ rdma_peer_t *peer = NULL;
+ rdma_qpreg_t *qpreg = NULL;
+ int32_t hash = 0;
+
+ qpreg = &device->qpreg;
+ hash = qp_num % 42;
+ ent = qpreg->ents[hash].next;
+ while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num))
+ ent = ent->next;
+
+ if (ent != &qpreg->ents[hash]) {
+ peer = ent->peer;
+ }
+
+ return peer;
+}
+
+/*
+ static rdma_peer_t *
+ rdma_lookup_peer (rdma_device_t *device,
+ int32_t qp_num)
+ {
+ rdma_qpreg_t *qpreg = NULL;
+ rdma_peer_t *peer = NULL;
+
+ qpreg = &device->qpreg;
+ pthread_mutex_lock (&qpreg->lock);
+ {
+ peer = __rdma_lookup_peer (device, qp_num);
+ }
+ pthread_mutex_unlock (&qpreg->lock);
+
+ return peer;
+ }
+*/
+
+
+static void
+__rdma_destroy_qp (rpc_transport_t *this)
+{
+ rdma_private_t *priv = this->private;
+
+ if (priv->peer.qp) {
+ rdma_unregister_peer (priv->device, priv->peer.qp->qp_num);
+ ibv_destroy_qp (priv->peer.qp);
+ }
+ priv->peer.qp = NULL;
+
+ return;
+}
+
+
+static int32_t
+rdma_create_qp (rpc_transport_t *this)
+{
+ rdma_private_t *priv = this->private;
+ rdma_options_t *options = &priv->options;
+ rdma_device_t *device = priv->device;
+ int32_t ret = 0;
+ rdma_peer_t *peer;
+
+ peer = &priv->peer;
+ struct ibv_qp_init_attr init_attr = {
+ .send_cq = device->send_cq,
+ .recv_cq = device->recv_cq,
+ .srq = device->srq,
+ .cap = {
+ .max_send_wr = peer->send_count,
+ .max_recv_wr = peer->recv_count,
+ .max_send_sge = 1,
+ .max_recv_sge = 1
+ },
+ .qp_type = IBV_QPT_RC
+ };
+
+ struct ibv_qp_attr attr = {
+ .qp_state = IBV_QPS_INIT,
+ .pkey_index = 0,
+ .port_num = options->port,
+ .qp_access_flags
+ = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE
+ };
+
+ peer->qp = ibv_create_qp (device->pd, &init_attr);
+ if (!peer->qp) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_CRITICAL,
+ "%s: could not create QP",
+ this->name);
+ ret = -1;
+ goto out;
+ } else if (ibv_modify_qp (peer->qp, &attr,
+ IBV_QP_STATE |
+ IBV_QP_PKEY_INDEX |
+ IBV_QP_PORT |
+ IBV_QP_ACCESS_FLAGS)) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_ERROR,
+ "%s: failed to modify QP to INIT state",
+ this->name);
+ ret = -1;
+ goto out;
+ }
+
+ peer->local_lid = rdma_get_local_lid (device->context,
+ options->port);
+ peer->local_qpn = peer->qp->qp_num;
+ peer->local_psn = lrand48 () & 0xffffff;
+
+ ret = rdma_register_peer (device, peer->qp->qp_num, peer);
+
+out:
+ if (ret == -1)
+ __rdma_destroy_qp (this);
+
+ return ret;
+}
+
+
+static void
+rdma_destroy_posts (rpc_transport_t *this)
+{
+
+}
+
+
+static int32_t
+__rdma_create_posts (rpc_transport_t *this, int32_t count, int32_t size,
+ rdma_queue_t *q, rdma_post_type_t type)
+{
+ int32_t i;
+ int32_t ret = 0;
+ rdma_private_t *priv = this->private;
+ rdma_device_t *device = priv->device;
+
+ for (i=0 ; i<count ; i++) {
+ rdma_post_t *post;
+
+ post = rdma_new_post (device, size + 2048, type);
+ if (!post) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_ERROR,
+ "%s: post creation failed",
+ this->name);
+ ret = -1;
+ break;
+ }
+
+ rdma_put_post (q, post);
+ }
+ return ret;
+}
+
+
+static int32_t
+rdma_create_posts (rpc_transport_t *this)
+{
+ int32_t i, ret;
+ rdma_post_t *post = NULL;
+ rdma_private_t *priv = this->private;
+ rdma_options_t *options = &priv->options;
+ rdma_device_t *device = priv->device;
+
+ ret = __rdma_create_posts (this, options->send_count,
+ options->send_size,
+ &device->sendq, RDMA_SEND_POST);
+ if (!ret)
+ ret = __rdma_create_posts (this, options->recv_count,
+ options->recv_size,
+ &device->recvq, RDMA_RECV_POST);
+
+ if (!ret) {
+ for (i=0 ; i<options->recv_count ; i++) {
+ post = rdma_get_post (&device->recvq);
+ if (rdma_post_recv (device->srq, post) != 0) {
+ ret = -1;
+ break;
+ }
+ }
+ }
+
+ if (ret)
+ rdma_destroy_posts (this);
+
+ return ret;
+}
+
+
+static int32_t
+rdma_connect_qp (rpc_transport_t *this)
+{
+ rdma_private_t *priv = this->private;
+ rdma_options_t *options = &priv->options;
+ struct ibv_qp_attr attr = {
+ .qp_state = IBV_QPS_RTR,
+ .path_mtu = options->mtu,
+ .dest_qp_num = priv->peer.remote_qpn,
+ .rq_psn = priv->peer.remote_psn,
+ .max_dest_rd_atomic = 1,
+ .min_rnr_timer = 12,
+ .qp_access_flags
+ = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE,
+ .ah_attr = {
+ .is_global = 0,
+ .dlid = priv->peer.remote_lid,
+ .sl = 0,
+ .src_path_bits = 0,
+ .port_num = options->port
+ }
+ };
+ if (ibv_modify_qp (priv->peer.qp, &attr,
+ IBV_QP_STATE |
+ IBV_QP_AV |
+ IBV_QP_PATH_MTU |
+ IBV_QP_DEST_QPN |
+ IBV_QP_RQ_PSN |
+ IBV_QP_MAX_DEST_RD_ATOMIC |
+ IBV_QP_MIN_RNR_TIMER)) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_CRITICAL,
+ "Failed to modify QP to RTR\n");
+ return -1;
+ }
+
+ /* TODO: make timeout and retry_cnt configurable from options */
+ attr.qp_state = IBV_QPS_RTS;
+ attr.timeout = 14;
+ attr.retry_cnt = 7;
+ attr.rnr_retry = 7;
+ attr.sq_psn = priv->peer.local_psn;
+ attr.max_rd_atomic = 1;
+ if (ibv_modify_qp (priv->peer.qp, &attr,
+ IBV_QP_STATE |
+ IBV_QP_TIMEOUT |
+ IBV_QP_RETRY_CNT |
+ IBV_QP_RNR_RETRY |
+ IBV_QP_SQ_PSN |
+ IBV_QP_MAX_QP_RD_ATOMIC)) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_CRITICAL,
+ "Failed to modify QP to RTS\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int32_t
+__rdma_teardown (rpc_transport_t *this)
+{
+ rdma_private_t *priv = this->private;
+
+ __rdma_destroy_qp (this);
+
+ if (!list_empty (&priv->peer.ioq)) {
+ __rdma_ioq_flush (&priv->peer);
+ }
+
+ /* TODO: decrement cq size */
+ return 0;
+}
+
+/*
+ * return value:
+ * 0 = success (completed)
+ * -1 = error
+ * > 0 = incomplete
+ */
+
+static int
+__tcp_rwv (rpc_transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count,
+ int write)
+{
+ rdma_private_t *priv = NULL;
+ int sock = -1;
+ int ret = -1;
+ struct iovec *opvector = vector;
+ int opcount = count;
+ int moved = 0;
+
+ priv = this->private;
+ sock = priv->sock;
+
+ while (opcount)
+ {
+ if (write)
+ {
+ ret = writev (sock, opvector, opcount);
+
+ if (ret == 0 || (ret == -1 && errno == EAGAIN))
+ {
+ /* done for now */
+ break;
+ }
+ }
+ else
+ {
+ ret = readv (sock, opvector, opcount);
+
+ if (ret == -1 && errno == EAGAIN)
+ {
+ /* done for now */
+ break;
+ }
+ }
+
+ if (ret == 0)
+ {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "EOF from peer %s", this->peerinfo.identifier);
+ opcount = -1;
+ errno = ENOTCONN;
+ break;
+ }
+
+ if (ret == -1)
+ {
+ if (errno == EINTR)
+ continue;
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "%s failed (%s)", write ? "writev" : "readv",
+ strerror (errno));
+ if (write && !priv->connected &&
+ (errno == ECONNREFUSED))
+ gf_log (this->name, GF_LOG_ERROR,
+ "possible mismatch of 'rpc-transport-type'"
+ " in protocol server and client. "
+ "check volume file");
+ opcount = -1;
+ break;
+ }
+
+ moved = 0;
+
+ while (moved < ret)
+ {
+ if ((ret - moved) >= opvector[0].iov_len)
+ {
+ moved += opvector[0].iov_len;
+ opvector++;
+ opcount--;
+ }
+ else
+ {
+ opvector[0].iov_len -= (ret - moved);
+ opvector[0].iov_base += (ret - moved);
+ moved += (ret - moved);
+ }
+ while (opcount && !opvector[0].iov_len)
+ {
+ opvector++;
+ opcount--;
+ }
+ }
+ }
+
+ if (pending_vector)
+ *pending_vector = opvector;
+
+ if (pending_count)
+ *pending_count = opcount;
+
+ return opcount;
+}
+
+
+static int
+__tcp_readv (rpc_transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count)
+{
+ int ret = -1;
+
+ ret = __tcp_rwv (this, vector, count,
+ pending_vector, pending_count, 0);
+
+ return ret;
+}
+
+
+static int
+__tcp_writev (rpc_transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count)
+{
+ int ret = -1;
+ rdma_private_t *priv = this->private;
+
+ ret = __tcp_rwv (this, vector, count, pending_vector,
+ pending_count, 1);
+
+ if (ret > 0) {
+ /* TODO: Avoid multiple calls when socket is already
+ registered for POLLOUT */
+ priv->idx = event_select_on (this->ctx->event_pool,
+ priv->sock, priv->idx, -1, 1);
+ } else if (ret == 0) {
+ priv->idx = event_select_on (this->ctx->event_pool,
+ priv->sock,
+ priv->idx, -1, 0);
+ }
+
+ return ret;
+}
+
+
+/*
+ * allocates new memory to hold write-chunklist. New memory is needed since
+ * write-chunklist will be used while sending reply and the post holding initial
+ * write-chunklist sent from client will be put back to srq before a pollin
+ * event is sent to upper layers.
+ */
+int32_t
+rdma_get_write_chunklist (char **ptr, rdma_write_array_t **write_ary)
+{
+ rdma_write_array_t *from = NULL, *to = NULL;
+ int32_t ret = -1, size = 0, i = 0;
+
+ from = (rdma_write_array_t *) *ptr;
+ if (from->wc_discrim == 0) {
+ ret = 0;
+ goto out;
+ }
+
+ from->wc_nchunks = ntoh32 (from->wc_nchunks);
+
+ size = sizeof (*from)
+ + (sizeof (rdma_write_chunk_t) * from->wc_nchunks);
+
+ to = GF_CALLOC (1, size, gf_common_mt_char);
+ if (to == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ ret = -1;
+ goto out;
+ }
+
+ to->wc_discrim = ntoh32 (from->wc_discrim);
+ to->wc_nchunks = from->wc_nchunks;
+
+ for (i = 0; i < to->wc_nchunks; i++) {
+ to->wc_array[i].wc_target.rs_handle
+ = ntoh32 (from->wc_array[i].wc_target.rs_handle);
+ to->wc_array[i].wc_target.rs_length
+ = ntoh32 (from->wc_array[i].wc_target.rs_length);
+ to->wc_array[i].wc_target.rs_offset
+ = ntoh64 (from->wc_array[i].wc_target.rs_offset);
+ }
+
+ *write_ary = to;
+ ret = 0;
+ *ptr = (char *)&from->wc_array[i].wc_target.rs_handle;
+out:
+ return ret;
+}
+
+
+/*
+ * does not allocate new memory to hold read-chunklist. New memory is not
+ * needed, since post is not put back to srq till we've completed all the
+ * rdma-reads and hence readchunk-list can point to memory held by post.
+ */
+int32_t
+rdma_get_read_chunklist (char **ptr, rdma_read_chunk_t **readch)
+{
+ int32_t ret = -1;
+ rdma_read_chunk_t *chunk = NULL;
+ int i = 0;
+
+ chunk = (rdma_read_chunk_t *)*ptr;
+ if (chunk[0].rc_discrim == 0) {
+ ret = 0;
+ goto out;
+ }
+
+ for (i = 0; chunk[i].rc_discrim != 0; i++) {
+ chunk[i].rc_discrim = ntoh32 (chunk[i].rc_discrim);
+ chunk[i].rc_position = ntoh32 (chunk[i].rc_position);
+ chunk[i].rc_target.rs_handle
+ = ntoh32 (chunk[i].rc_target.rs_handle);
+ chunk[i].rc_target.rs_length
+ = ntoh32 (chunk[i].rc_target.rs_length);
+ chunk[i].rc_target.rs_offset
+ = ntoh64 (chunk[i].rc_target.rs_offset);
+ }
+
+ *readch = &chunk[0];
+ ret = 0;
+ *ptr = (char *)&chunk[i].rc_discrim;
+out:
+ return ret;
+}
+
+
+inline int32_t
+rdma_decode_error_msg (rdma_peer_t *peer, rdma_post_t *post,
+ size_t bytes_in_post)
+{
+ rdma_header_t *header = NULL;
+ struct iobuf *iobuf = NULL;
+ struct iobref *iobref = NULL;
+ int32_t ret = -1;
+
+ header = (rdma_header_t *)post->buf;
+ header->rm_body.rm_error.rm_type
+ = ntoh32 (header->rm_body.rm_error.rm_type);
+ if (header->rm_body.rm_error.rm_type == ERR_VERS) {
+ header->rm_body.rm_error.rm_version.rdma_vers_low =
+ ntoh32 (header->rm_body.rm_error.rm_version.rdma_vers_low);
+ header->rm_body.rm_error.rm_version.rdma_vers_high =
+ ntoh32 (header->rm_body.rm_error.rm_version.rdma_vers_high);
+ }
+
+ iobuf = iobuf_get (peer->trans->ctx->iobuf_pool);
+ if (iobuf == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ ret = -1;
+ goto out;
+ }
+
+ post->ctx.iobref = iobref = iobref_new ();
+ if (iobref == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ ret = -1;
+ goto out;
+ }
+
+ iobref_add (iobref, iobuf);
+ iobuf_unref (iobuf);
+ /*
+ * FIXME: construct an appropriate rpc-msg here, what is being sent
+ * to rpc is not correct.
+ */
+ post->ctx.vector[0].iov_base = iobuf_ptr (iobuf);
+ post->ctx.vector[0].iov_len = bytes_in_post;
+
+ memcpy (post->ctx.vector[0].iov_base, (char *)post->buf,
+ post->ctx.vector[0].iov_len);
+ post->ctx.count = 1;
+
+ iobuf = NULL;
+ iobref = NULL;
+
+out:
+ if (ret == -1) {
+ if (iobuf != NULL) {
+ iobuf_unref (iobuf);
+ }
+
+ if (iobref != NULL) {
+ iobref_unref (iobref);
+ }
+ }
+
+ return 0;
+}
+
+
+int32_t
+rdma_decode_msg (rdma_peer_t *peer, rdma_post_t *post,
+ rdma_read_chunk_t **readch, size_t bytes_in_post)
+{
+ int32_t ret = -1;
+ rdma_header_t *header = NULL;
+ rdma_reply_info_t *reply_info = NULL;
+ char *ptr = NULL;
+ rdma_write_array_t *write_ary = NULL;
+ size_t header_len = 0;
+ struct iobuf *iobuf = NULL;
+ struct iobref *iobref = NULL;
+
+ header = (rdma_header_t *)post->buf;
+
+ ptr = (char *)&header->rm_body.rm_chunks[0];
+
+ ret = rdma_get_read_chunklist (&ptr, readch);
+ if (ret == -1) {
+ goto out;
+ }
+
+ /* skip terminator of read-chunklist */
+ ptr = ptr + sizeof (uint32_t);
+
+ ret = rdma_get_write_chunklist (&ptr, &write_ary);
+ if (ret == -1) {
+ goto out;
+ }
+
+ /* skip terminator of write-chunklist */
+ ptr = ptr + sizeof (uint32_t);
+
+ if (write_ary != NULL) {
+ reply_info = rdma_reply_info_alloc (peer);
+ if (reply_info == NULL) {
+ ret = -1;
+ goto out;
+ }
+
+ reply_info->type = rdma_writech;
+ reply_info->wc_array = write_ary;
+ reply_info->rm_xid = header->rm_xid;
+ } else {
+ ret = rdma_get_write_chunklist (&ptr, &write_ary);
+ if (ret == -1) {
+ goto out;
+ }
+
+ if (write_ary != NULL) {
+ reply_info = rdma_reply_info_alloc (peer);
+ if (reply_info == NULL) {
+ ret = -1;
+ goto out;
+ }
+
+ reply_info->type = rdma_replych;
+ reply_info->wc_array = write_ary;
+ reply_info->rm_xid = header->rm_xid;
+ }
+ }
+
+ /* skip terminator of reply chunk */
+ ptr = ptr + sizeof (uint32_t);
+ if (header->rm_type != RDMA_NOMSG) {
+ iobuf = iobuf_get (peer->trans->ctx->iobuf_pool);
+ if (iobuf == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ ret = -1;
+ goto out;
+ }
+
+ post->ctx.iobref = iobref = iobref_new ();
+ if (iobref == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ ret = -1;
+ goto out;
+ }
+
+ iobref_add (iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ header_len = (long)ptr - (long)post->buf;
+ post->ctx.vector[0].iov_base = iobuf_ptr (iobuf);
+ post->ctx.vector[0].iov_len = bytes_in_post - header_len;
+ memcpy (post->ctx.vector[0].iov_base, ptr,
+ post->ctx.vector[0].iov_len);
+ post->ctx.count = 1;
+
+ iobuf = NULL;
+ iobref = NULL;
+ }
+
+ post->ctx.reply_info = reply_info;
+out:
+ if (ret == -1) {
+ if (*readch != NULL) {
+ GF_FREE (*readch);
+ *readch = NULL;
+ }
+
+ if (write_ary != NULL) {
+ GF_FREE (write_ary);
+ }
+
+ if (iobuf != NULL) {
+ iobuf_unref (iobuf);
+ }
+
+ if (iobref != NULL) {
+ iobref_unref (iobref);
+ }
+ }
+
+ return ret;
+}
+
+
+/* Assumes only one of either write-chunklist or a reply chunk is present */
+int32_t
+rdma_decode_header (rdma_peer_t *peer, rdma_post_t *post,
+ rdma_read_chunk_t **readch, size_t bytes_in_post)
+{
+ int32_t ret = -1;
+ rdma_header_t *header = NULL;
+
+ header = (rdma_header_t *)post->buf;
+
+ header->rm_xid = ntoh32 (header->rm_xid);
+ header->rm_vers = ntoh32 (header->rm_vers);
+ header->rm_credit = ntoh32 (header->rm_credit);
+ header->rm_type = ntoh32 (header->rm_type);
+
+ switch (header->rm_type) {
+ case RDMA_MSG:
+ case RDMA_NOMSG:
+ ret = rdma_decode_msg (peer, post, readch, bytes_in_post);
+ break;
+
+ case RDMA_MSGP:
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "rdma msg of msg-type RDMA_MSGP should not have been "
+ "recieved");
+ ret = -1;
+ break;
+
+ case RDMA_DONE:
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "rdma msg of msg-type RDMA_DONE should not have been "
+ "recieved");
+ ret = -1;
+ break;
+
+ case RDMA_ERROR:
+ ret = rdma_decode_error_msg (peer, post, bytes_in_post);
+ break;
+
+ default:
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "unknown rdma msg-type (%d)", header->rm_type);
+ }
+
+ return ret;
+}
+
+
+int32_t
+__rdma_read (rdma_peer_t *peer, rdma_post_t *post, struct iovec *to,
+ rdma_read_chunk_t *readch)
+{
+ int32_t ret = -1;
+ struct ibv_sge list = {0, };
+ struct ibv_send_wr wr = {0, }, *bad_wr = NULL;
+
+ ret = __rdma_register_local_mr_for_rdma (peer, to, 1, &post->ctx);
+ if (ret == -1) {
+ goto out;
+ }
+
+ list.addr = (unsigned long) to->iov_base;
+ list.length = to->iov_len;
+ list.lkey = post->ctx.mr[post->ctx.mr_count - 1]->lkey;
+
+ wr.wr_id = (unsigned long) rdma_post_ref (post);
+ wr.sg_list = &list;
+ wr.num_sge = 1;
+ wr.opcode = IBV_WR_RDMA_READ;
+ wr.send_flags = IBV_SEND_SIGNALED;
+ wr.wr.rdma.remote_addr = readch->rc_target.rs_offset;
+ wr.wr.rdma.rkey = readch->rc_target.rs_handle;
+
+ ret = ibv_post_send (peer->qp, &wr, &bad_wr);
+ if (ret == -1) {
+ rdma_post_unref (post);
+ }
+out:
+ return ret;
+}
+
+
+int32_t
+rdma_do_reads (rdma_peer_t *peer, rdma_post_t *post, rdma_read_chunk_t *readch)
+{
+ int32_t ret = -1, i = 0;
+ size_t size = 0;
+ char *ptr = NULL;
+ struct iobuf *iobuf = NULL;
+ rdma_private_t *priv = NULL;
+
+ priv = peer->trans->private;
+
+ for (i = 0; readch[i].rc_discrim != 0; i++) {
+ size += readch[i].rc_target.rs_length;
+ }
+
+ if (size > peer->trans->ctx->page_size) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "total size of rdma-read (%lu) is greater than "
+ "page-size (%lu). This is not supported till variable "
+ "sized iobufs are implemented", (unsigned long)size,
+ (unsigned long)peer->trans->ctx->page_size);
+ goto out;
+ }
+
+ iobuf = iobuf_get (peer->trans->ctx->iobuf_pool);
+ if (iobuf == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ goto out;
+ }
+
+ if (post->ctx.iobref == NULL) {
+ post->ctx.iobref = iobref_new ();
+ if (post->ctx.iobref == NULL) {
+ iobuf_unref (iobuf);
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ goto out;
+ }
+ }
+
+ iobref_add (post->ctx.iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ ptr = iobuf_ptr (iobuf);
+ iobuf = NULL;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ if (!priv->connected) {
+ goto unlock;
+ }
+
+ for (i = 0; readch[i].rc_discrim != 0; i++) {
+ post->ctx.vector[post->ctx.count].iov_base = ptr;
+ post->ctx.vector[post->ctx.count].iov_len
+ = readch[i].rc_target.rs_length;
+
+ ret = __rdma_read (peer, post,
+ &post->ctx.vector[post->ctx.count],
+ &readch[i]);
+ if (ret == -1) {
+ goto unlock;
+ }
+
+ post->ctx.count++;
+ ptr += readch[i].rc_target.rs_length;
+ }
+
+ ret = 0;
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+out:
+
+ if (ret == -1) {
+ if (iobuf != NULL) {
+ iobuf_unref (iobuf);
+ }
+ }
+
+ return ret;
+}
+
+
+int32_t
+rdma_pollin_notify (rdma_peer_t *peer, rdma_post_t *post)
+{
+ int32_t ret = -1;
+ enum msg_type msg_type = 0;
+ struct rpc_req *rpc_req = NULL;
+ rdma_request_context_t *request_context = NULL;
+ rpc_request_info_t request_info = {0, };
+ rdma_private_t *priv = NULL;
+ uint32_t *ptr = NULL;
+ rpc_transport_pollin_t *pollin = NULL;
+
+ if ((peer == NULL) || (post == NULL)) {
+ goto out;
+ }
+
+ pollin = rpc_transport_pollin_alloc (peer->trans,
+ post->ctx.vector,
+ post->ctx.count,
+ post->ctx.iobref,
+ post->ctx.reply_info);
+ if (pollin == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ goto out;
+ }
+
+ ptr = (uint32_t *)pollin->vector[0].iov_base;
+
+ request_info.xid = ntoh32 (*ptr);
+ msg_type = ntoh32 (*(ptr + 1));
+
+ if (msg_type == REPLY) {
+ ret = rpc_transport_notify (peer->trans,
+ RPC_TRANSPORT_MAP_XID_REQUEST,
+ &request_info);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "cannot get request information from rpc "
+ "layer");
+ goto out;
+ }
+
+ rpc_req = request_info.rpc_req;
+ if (rpc_req == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "rpc request structure not found");
+ ret = -1;
+ goto out;
+ }
+
+ request_context = rpc_req->conn_private;
+ rpc_req->conn_private = NULL;
+
+ priv = peer->trans->private;
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ __rdma_request_context_destroy (request_context);
+ }
+ pthread_mutex_unlock (&priv->write_mutex);
+ }
+
+ ret = rpc_transport_notify (peer->trans, RPC_TRANSPORT_MSG_RECEIVED,
+ pollin);
+
+out:
+ if (pollin != NULL) {
+ pollin->private = NULL;
+ rpc_transport_pollin_destroy (pollin);
+ }
+
+ return ret;
+}
+
+
+int32_t
+rdma_recv_reply (rdma_peer_t *peer, rdma_post_t *post)
+{
+ int32_t ret = -1;
+ rdma_header_t *header = NULL;
+ rdma_reply_info_t *reply_info = NULL;
+ rdma_write_array_t *wc_array = NULL;
+ int i = 0;
+ uint32_t *ptr = NULL;
+ rdma_request_context_t *ctx = NULL;
+ rpc_request_info_t request_info = {0, };
+ struct rpc_req *rpc_req = NULL;
+
+ header = (rdma_header_t *)post->buf;
+ reply_info = post->ctx.reply_info;
+
+ /* no write chunklist, just notify upper layers */
+ if (reply_info == NULL) {
+ ret = 0;
+ goto out;
+ }
+
+ wc_array = reply_info->wc_array;
+
+ if (header->rm_type == RDMA_NOMSG) {
+ post->ctx.vector[0].iov_base
+ = (void *)(long)wc_array->wc_array[0].wc_target.rs_offset;
+ post->ctx.vector[0].iov_len
+ = wc_array->wc_array[0].wc_target.rs_length;
+
+ post->ctx.count = 1;
+ } else {
+ for (i = 0; i < wc_array->wc_nchunks; i++) {
+ post->ctx.vector[i + 1].iov_base
+ = (void *)(long)wc_array->wc_array[i].wc_target.rs_offset;
+ post->ctx.vector[i + 1].iov_len
+ = wc_array->wc_array[i].wc_target.rs_length;
+ }
+
+ post->ctx.count += wc_array->wc_nchunks;
+ }
+
+ ptr = (uint32_t *)post->ctx.vector[0].iov_base;
+ request_info.xid = ntoh32 (*ptr);
+
+ ret = rpc_transport_notify (peer->trans,
+ RPC_TRANSPORT_MAP_XID_REQUEST,
+ &request_info);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "cannot get request information from rpc "
+ "layer");
+ goto out;
+ }
+
+ rpc_req = request_info.rpc_req;
+ if (rpc_req == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "rpc request structure not found");
+ ret = -1;
+ goto out;
+ }
+
+ ctx = rpc_req->conn_private;
+ if ((post->ctx.iobref != NULL) && (ctx->iobref != NULL)) {
+ iobref_merge (post->ctx.iobref, ctx->iobref);
+ } else if (post->ctx.iobref == NULL) {
+ post->ctx.iobref = iobref_ref (ctx->iobref);
+ }
+
+ ret = 0;
+
+ rdma_reply_info_destroy (reply_info);
+
+out:
+ if (ret == 0) {
+ ret = rdma_pollin_notify (peer, post);
+ }
+
+ return ret;
+}
+
+
+inline int32_t
+rdma_recv_request (rdma_peer_t *peer, rdma_post_t *post,
+ rdma_read_chunk_t *readch)
+{
+ int32_t ret = -1;
+
+ if (readch != NULL) {
+ ret = rdma_do_reads (peer, post, readch);
+ } else {
+ ret = rdma_pollin_notify (peer, post);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "pollin notification failed");
+ }
+ }
+
+ return ret;
+}
+
+void
+rdma_process_recv (rdma_peer_t *peer, struct ibv_wc *wc)
+{
+ rdma_post_t *post = NULL;
+ rdma_read_chunk_t *readch = NULL;
+ int ret = -1;
+ uint32_t *ptr = NULL;
+ enum msg_type msg_type = 0;
+ rdma_header_t *header = NULL;
+
+ post = (rdma_post_t *) (long) wc->wr_id;
+ if (post == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "no post found in successful work completion element");
+ goto out;
+ }
+
+ ret = rdma_decode_header (peer, post, &readch, wc->byte_len);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "decoding of header failed");
+ goto out;
+ }
+
+ header = (rdma_header_t *)post->buf;
+
+ switch (header->rm_type) {
+ case RDMA_MSG:
+ ptr = (uint32_t *)post->ctx.vector[0].iov_base;
+ msg_type = ntoh32 (*(ptr + 1));
+ break;
+
+ case RDMA_NOMSG:
+ if (readch != NULL) {
+ msg_type = CALL;
+ } else {
+ msg_type = REPLY;
+ }
+ break;
+
+ case RDMA_ERROR:
+ ret = rdma_pollin_notify (peer, post);
+ if (ret == -1) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "pollin notification failed");
+ }
+ goto out;
+
+ default:
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "invalid rdma msg-type (%d)", header->rm_type);
+ break;
+ }
+
+ if (msg_type == CALL) {
+ ret = rdma_recv_request (peer, post, readch);
+ } else {
+ ret = rdma_recv_reply (peer, post);
+ }
+
+out:
+ if (ret == -1) {
+ rpc_transport_disconnect (peer->trans);
+ }
+
+ return;
+}
+
+
+static void *
+rdma_recv_completion_proc (void *data)
+{
+ struct ibv_comp_channel *chan = NULL;
+ rdma_device_t *device = NULL;;
+ rdma_post_t *post = NULL;
+ rdma_peer_t *peer = NULL;
+ struct ibv_cq *event_cq = NULL;
+ struct ibv_wc wc = {0, };
+ void *event_ctx = NULL;
+ int32_t ret = 0;
+
+ chan = data;
+
+ while (1) {
+ ret = ibv_get_cq_event (chan, &event_cq, &event_ctx);
+ if (ret) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "ibv_get_cq_event failed, terminating recv "
+ "thread %d (%d)", ret, errno);
+ continue;
+ }
+
+ device = event_ctx;
+
+ ret = ibv_req_notify_cq (event_cq, 0);
+ if (ret) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "ibv_req_notify_cq on %s failed, terminating "
+ "recv thread: %d (%d)",
+ device->device_name, ret, errno);
+ continue;
+ }
+
+ device = (rdma_device_t *) event_ctx;
+
+ while ((ret = ibv_poll_cq (event_cq, 1, &wc)) > 0) {
+ post = (rdma_post_t *) (long) wc.wr_id;
+
+ pthread_mutex_lock (&device->qpreg.lock);
+ {
+ peer = __rdma_lookup_peer (device,
+ wc.qp_num);
+
+ /*
+ * keep a refcount on transport so that it
+ * doesnot get freed because of some error
+ * indicated by wc.status till we are done
+ * with usage of peer and thereby that of trans.
+ */
+ if (peer != NULL) {
+ rpc_transport_ref (peer->trans);
+ }
+ }
+ pthread_mutex_unlock (&device->qpreg.lock);
+
+ if (wc.status != IBV_WC_SUCCESS) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "recv work request on `%s' returned "
+ "error (%d)", device->device_name,
+ wc.status);
+ if (peer) {
+ rpc_transport_unref (peer->trans);
+ rpc_transport_disconnect (peer->trans);
+ }
+
+ if (post) {
+ rdma_post_unref (post);
+ }
+ continue;
+ }
+
+ if (peer) {
+ rdma_process_recv (peer, &wc);
+ rpc_transport_unref (peer->trans);
+ } else {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_DEBUG,
+ "could not lookup peer for qp_num: %d",
+ wc.qp_num);
+ }
+
+ rdma_post_unref (post);
+ }
+
+ if (ret < 0) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_ERROR,
+ "ibv_poll_cq on `%s' returned error "
+ "(ret = %d, errno = %d)",
+ device->device_name, ret, errno);
+ continue;
+ }
+ ibv_ack_cq_events (event_cq, 1);
+ }
+
+ return NULL;
+}
+
+
+void
+rdma_handle_failed_send_completion (rdma_peer_t *peer, struct ibv_wc *wc)
+{
+ rdma_post_t *post = NULL;
+ rdma_device_t *device = NULL;
+ rdma_private_t *priv = NULL;
+
+ if (peer != NULL) {
+ priv = peer->trans->private;
+ if (priv != NULL) {
+ device = priv->device;
+ }
+ }
+
+
+ post = (rdma_post_t *) (long) wc->wr_id;
+
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "send work request on `%s' returned error "
+ "wc.status = %d, wc.vendor_err = %d, post->buf = %p, "
+ "wc.byte_len = %d, post->reused = %d",
+ (device != NULL) ? device->device_name : NULL, wc->status,
+ wc->vendor_err, post->buf, wc->byte_len, post->reused);
+
+ if (wc->status == IBV_WC_RETRY_EXC_ERR) {
+ gf_log ("rdma", GF_LOG_ERROR, "connection between client and"
+ " server not working. check by running "
+ "'ibv_srq_pingpong'. also make sure subnet manager"
+ " is running (eg: 'opensm'), or check if rdma port is "
+ "valid (or active) by running 'ibv_devinfo'. contact "
+ "Gluster Support Team if the problem persists.");
+ }
+
+ if (peer) {
+ rpc_transport_disconnect (peer->trans);
+ }
+
+ return;
+}
+
+
+void
+rdma_handle_successful_send_completion (rdma_peer_t *peer, struct ibv_wc *wc)
+{
+ rdma_post_t *post = NULL;
+ int refcount = 0, ret = 0;
+
+ if (wc->opcode != IBV_WC_RDMA_READ) {
+ goto out;
+ }
+
+ post = (rdma_post_t *)(long) wc->wr_id;
+
+ refcount = rdma_post_get_refcount (post);
+ if (refcount != 1) {
+ /* if it is not the last rdma read, we've got nothing to do */
+ goto out;
+ }
+
+ ret = rdma_pollin_notify (peer, post);
+ if (ret == -1) {
+ rpc_transport_disconnect (peer->trans);
+ }
+
+out:
+ return;
+}
+
+
+static void *
+rdma_send_completion_proc (void *data)
+{
+ struct ibv_comp_channel *chan = NULL;
+ rdma_post_t *post = NULL;
+ rdma_peer_t *peer = NULL;
+ struct ibv_cq *event_cq = NULL;
+ void *event_ctx = NULL;
+ rdma_device_t *device = NULL;
+ struct ibv_wc wc = {0, };
+ char is_request = 0;
+ int32_t ret = 0, quota_ret = 0;
+
+ chan = data;
+ while (1) {
+ ret = ibv_get_cq_event (chan, &event_cq, &event_ctx);
+ if (ret) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "ibv_get_cq_event on failed, terminating "
+ "send thread: %d (%d)", ret, errno);
+ continue;
+ }
+
+ device = event_ctx;
+
+ ret = ibv_req_notify_cq (event_cq, 0);
+ if (ret) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "ibv_req_notify_cq on %s failed, terminating "
+ "send thread: %d (%d)",
+ device->device_name, ret, errno);
+ continue;
+ }
+
+ while ((ret = ibv_poll_cq (event_cq, 1, &wc)) > 0) {
+ post = (rdma_post_t *) (long) wc.wr_id;
+
+ pthread_mutex_lock (&device->qpreg.lock);
+ {
+ peer = __rdma_lookup_peer (device, wc.qp_num);
+
+ /*
+ * keep a refcount on transport so that it
+ * does not get freed because of some error
+ * indicated by wc.status, till we are done
+ * with usage of peer and thereby that of trans.
+ */
+ if (peer != NULL) {
+ rpc_transport_ref (peer->trans);
+ }
+ }
+ pthread_mutex_unlock (&device->qpreg.lock);
+
+ if (wc.status != IBV_WC_SUCCESS) {
+ rdma_handle_failed_send_completion (peer, &wc);
+ } else {
+ rdma_handle_successful_send_completion (peer,
+ &wc);
+ }
+
+ if (post) {
+ is_request = post->ctx.is_request;
+
+ ret = rdma_post_unref (post);
+ if ((ret == 0)
+ && (wc.status == IBV_WC_SUCCESS)
+ && !is_request
+ && (post->type == RDMA_SEND_POST)) {
+ /* An RDMA_RECV_POST can end up in
+ * rdma_send_completion_proc for
+ * rdma-reads, and we do not take
+ * quota for getting an RDMA_RECV_POST.
+ */
+
+ /*
+ * if it is request, quota is returned
+ * after reply has come.
+ */
+ quota_ret = rdma_quota_put (peer);
+ if (quota_ret < 0) {
+ gf_log ("rdma", GF_LOG_DEBUG,
+ "failed to send "
+ "message");
+ }
+ }
+ }
+
+ if (peer) {
+ rpc_transport_unref (peer->trans);
+ } else {
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "could not lookup peer for qp_num: %d",
+ wc.qp_num);
+ }
+ }
+
+ if (ret < 0) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "ibv_poll_cq on `%s' returned error (ret = %d,"
+ " errno = %d)",
+ device->device_name, ret, errno);
+ continue;
+ }
+
+ ibv_ack_cq_events (event_cq, 1);
+ }
+
+ return NULL;
+}
+
+
+static void
+rdma_options_init (rpc_transport_t *this)
+{
+ rdma_private_t *priv = this->private;
+ rdma_options_t *options = &priv->options;
+ int32_t mtu;
+ data_t *temp;
+
+ /* TODO: validate arguments from options below */
+
+ options->send_size = this->ctx->page_size * 4; /* 512 KB */
+ options->recv_size = this->ctx->page_size * 4; /* 512 KB */
+ options->send_count = 32;
+ options->recv_count = 32;
+
+ temp = dict_get (this->options,
+ "transport.rdma.work-request-send-count");
+ if (temp)
+ options->send_count = data_to_int32 (temp);
+
+ temp = dict_get (this->options,
+ "transport.rdma.work-request-recv-count");
+ if (temp)
+ options->recv_count = data_to_int32 (temp);
+
+ options->port = 0;
+ temp = dict_get (this->options,
+ "transport.rdma.port");
+ if (temp)
+ options->port = data_to_uint64 (temp);
+
+ options->mtu = mtu = IBV_MTU_2048;
+ temp = dict_get (this->options,
+ "transport.rdma.mtu");
+ if (temp)
+ mtu = data_to_int32 (temp);
+ switch (mtu) {
+ case 256: options->mtu = IBV_MTU_256;
+ break;
+ case 512: options->mtu = IBV_MTU_512;
+ break;
+ case 1024: options->mtu = IBV_MTU_1024;
+ break;
+ case 2048: options->mtu = IBV_MTU_2048;
+ break;
+ case 4096: options->mtu = IBV_MTU_4096;
+ break;
+ default:
+ if (temp)
+ gf_log (RDMA_LOG_NAME, GF_LOG_WARNING,
+ "%s: unrecognized MTU value '%s', defaulting "
+ "to '2048'", this->name,
+ data_to_str (temp));
+ else
+ gf_log (RDMA_LOG_NAME, GF_LOG_TRACE,
+ "%s: defaulting MTU to '2048'",
+ this->name);
+ options->mtu = IBV_MTU_2048;
+ break;
+ }
+
+ temp = dict_get (this->options,
+ "transport.rdma.device-name");
+ if (temp)
+ options->device_name = gf_strdup (temp->data);
+
+ return;
+}
+
+static void
+rdma_queue_init (rdma_queue_t *queue)
+{
+ pthread_mutex_init (&queue->lock, NULL);
+
+ queue->active_posts.next = &queue->active_posts;
+ queue->active_posts.prev = &queue->active_posts;
+ queue->passive_posts.next = &queue->passive_posts;
+ queue->passive_posts.prev = &queue->passive_posts;
+}
+
+
+static rdma_device_t *
+rdma_get_device (rpc_transport_t *this,
+ struct ibv_context *ibctx)
+{
+ glusterfs_ctx_t *ctx = NULL;
+ rdma_private_t *priv = NULL;
+ rdma_options_t *options = NULL;
+ char *device_name = NULL;
+ uint32_t port = 0;
+ uint8_t active_port = 0;
+ int32_t ret = 0;
+ int32_t i = 0;
+
+ rdma_device_t *trav = NULL;
+
+ priv = this->private;
+ options = &priv->options;
+ device_name = priv->options.device_name;
+ ctx = this->ctx;
+ trav = ctx->ib;
+ port = priv->options.port;
+
+ while (trav) {
+ if ((!strcmp (trav->device_name, device_name)) &&
+ (trav->port == port))
+ break;
+ trav = trav->next;
+ }
+
+ if (!trav) {
+
+ trav = GF_CALLOC (1, sizeof (*trav),
+ gf_common_mt_rdma_device_t);
+ if (trav == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ return NULL;
+ }
+
+ priv->device = trav;
+
+ trav->context = ibctx;
+
+ ret = ib_get_active_port (trav->context);
+
+ if (ret < 0) {
+ if (!port) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "Failed to find any active ports and "
+ "none specified in volume file,"
+ " exiting");
+ return NULL;
+ }
+ }
+
+ active_port = ret;
+
+ if (port) {
+ ret = ib_check_active_port (trav->context, port);
+ if (ret < 0) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_WARNING,
+ "On device %s: provided port:%u is "
+ "found to be offline, continuing to "
+ "use the same port", device_name, port);
+ }
+ } else {
+ priv->options.port = active_port;
+ port = active_port;
+ gf_log (RDMA_LOG_NAME, GF_LOG_TRACE,
+ "Port unspecified in volume file using active "
+ "port: %u", port);
+ }
+
+ trav->device_name = gf_strdup (device_name);
+ trav->port = port;
+
+ trav->next = ctx->ib;
+ ctx->ib = trav;
+
+ trav->send_chan = ibv_create_comp_channel (trav->context);
+ if (!trav->send_chan) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "%s: could not create send completion channel",
+ device_name);
+ /* TODO: cleanup current mess */
+ return NULL;
+ }
+
+ trav->recv_chan = ibv_create_comp_channel (trav->context);
+ if (!trav->recv_chan) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "could not create recv completion channel");
+ /* TODO: cleanup current mess */
+ return NULL;
+ }
+
+ if (rdma_create_cq (this) < 0) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "%s: could not create CQ",
+ this->name);
+ return NULL;
+ }
+
+ /* protection domain */
+ trav->pd = ibv_alloc_pd (trav->context);
+
+ if (!trav->pd) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "%s: could not allocate protection domain",
+ this->name);
+ return NULL;
+ }
+
+ struct ibv_srq_init_attr attr = {
+ .attr = {
+ .max_wr = options->recv_count,
+ .max_sge = 1
+ }
+ };
+ trav->srq = ibv_create_srq (trav->pd, &attr);
+
+ if (!trav->srq) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "%s: could not create SRQ",
+ this->name);
+ return NULL;
+ }
+
+ /* queue init */
+ rdma_queue_init (&trav->sendq);
+ rdma_queue_init (&trav->recvq);
+
+ if (rdma_create_posts (this) < 0) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "%s: could not allocate posts",
+ this->name);
+ return NULL;
+ }
+
+ /* completion threads */
+ ret = pthread_create (&trav->send_thread,
+ NULL,
+ rdma_send_completion_proc,
+ trav->send_chan);
+ if (ret) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "could not create send completion thread");
+ return NULL;
+ }
+ ret = pthread_create (&trav->recv_thread,
+ NULL,
+ rdma_recv_completion_proc,
+ trav->recv_chan);
+ if (ret) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "could not create recv completion thread");
+ return NULL;
+ }
+
+ /* qpreg */
+ pthread_mutex_init (&trav->qpreg.lock, NULL);
+ for (i=0; i<42; i++) {
+ trav->qpreg.ents[i].next = &trav->qpreg.ents[i];
+ trav->qpreg.ents[i].prev = &trav->qpreg.ents[i];
+ }
+ }
+ return trav;
+}
+
+static int32_t
+rdma_init (rpc_transport_t *this)
+{
+ rdma_private_t *priv = this->private;
+ rdma_options_t *options = &priv->options;
+ struct ibv_device **dev_list;
+ struct ibv_context *ib_ctx = NULL;
+ int32_t ret = 0;
+
+ rdma_options_init (this);
+
+ {
+ dev_list = ibv_get_device_list (NULL);
+
+ if (!dev_list) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_CRITICAL,
+ "Failed to get IB devices");
+ ret = -1;
+ goto cleanup;
+ }
+
+ if (!*dev_list) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_CRITICAL,
+ "No IB devices found");
+ ret = -1;
+ goto cleanup;
+ }
+
+ if (!options->device_name) {
+ if (*dev_list) {
+ options->device_name =
+ gf_strdup (ibv_get_device_name (*dev_list));
+ } else {
+ gf_log (RDMA_LOG_NAME, GF_LOG_CRITICAL,
+ "IB device list is empty. Check for "
+ "'ib_uverbs' module");
+ return -1;
+ goto cleanup;
+ }
+ }
+
+ while (*dev_list) {
+ if (!strcmp (ibv_get_device_name (*dev_list),
+ options->device_name)) {
+ ib_ctx = ibv_open_device (*dev_list);
+
+ if (!ib_ctx) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_ERROR,
+ "Failed to get infiniband"
+ "device context");
+ ret = -1;
+ goto cleanup;
+ }
+ break;
+ }
+ ++dev_list;
+ }
+
+ priv->device = rdma_get_device (this, ib_ctx);
+
+ if (!priv->device) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "could not create rdma device for %s",
+ options->device_name);
+ ret = -1;
+ goto cleanup;
+ }
+ }
+
+ priv->peer.trans = this;
+ INIT_LIST_HEAD (&priv->peer.ioq);
+
+ pthread_mutex_init (&priv->read_mutex, NULL);
+ pthread_mutex_init (&priv->write_mutex, NULL);
+ pthread_mutex_init (&priv->recv_mutex, NULL);
+ pthread_cond_init (&priv->recv_cond, NULL);
+
+ priv->request_ctx_pool = mem_pool_new (rdma_request_context_t,
+ RDMA_POOL_SIZE);
+ if (priv->request_ctx_pool == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ GF_FREE (priv);
+ return -1;
+ }
+
+ priv->ioq_pool = mem_pool_new (rdma_ioq_t, RDMA_POOL_SIZE);
+ if (priv->ioq_pool == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ mem_pool_destroy (priv->request_ctx_pool);
+ GF_FREE (priv);
+ return -1;
+ }
+
+ priv->reply_info_pool = mem_pool_new (rdma_reply_info_t,
+ RDMA_POOL_SIZE);
+ if (priv->reply_info_pool == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ mem_pool_destroy (priv->request_ctx_pool);
+ mem_pool_destroy (priv->ioq_pool);
+ GF_FREE (priv);
+ return -1;
+ }
+
+
+cleanup:
+ if (-1 == ret) {
+ if (ib_ctx)
+ ibv_close_device (ib_ctx);
+ }
+
+ if (dev_list)
+ ibv_free_device_list (dev_list);
+
+ return ret;
+}
+
+
+static int32_t
+rdma_disconnect (rpc_transport_t *this)
+{
+ rdma_private_t *priv = this->private;
+ int32_t ret = 0;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ ret = __rdma_disconnect (this);
+ }
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ return ret;
+}
+
+
+static int32_t
+__tcp_connect_finish (int fd)
+{
+ int ret = -1;
+ int optval = 0;
+ socklen_t optlen = sizeof (int);
+
+ ret = getsockopt (fd, SOL_SOCKET, SO_ERROR,
+ (void *)&optval, &optlen);
+
+ if (ret == 0 && optval)
+ {
+ errno = optval;
+ ret = -1;
+ }
+
+ return ret;
+}
+
+static inline void
+rdma_fill_handshake_data (char *buf, struct rdma_nbio *nbio,
+ rdma_private_t *priv)
+{
+ sprintf (buf,
+ "QP1:RECV_BLKSIZE=%08x:SEND_BLKSIZE=%08x\n"
+ "QP1:LID=%04x:QPN=%06x:PSN=%06x\n",
+ priv->peer.recv_size,
+ priv->peer.send_size,
+ priv->peer.local_lid,
+ priv->peer.local_qpn,
+ priv->peer.local_psn);
+
+ nbio->vector.iov_base = buf;
+ nbio->vector.iov_len = strlen (buf) + 1;
+ nbio->count = 1;
+ return;
+}
+
+static inline void
+rdma_fill_handshake_ack (char *buf, struct rdma_nbio *nbio)
+{
+ sprintf (buf, "DONE\n");
+ nbio->vector.iov_base = buf;
+ nbio->vector.iov_len = strlen (buf) + 1;
+ nbio->count = 1;
+ return;
+}
+
+static int
+rdma_handshake_pollin (rpc_transport_t *this)
+{
+ int ret = 0;
+ rdma_private_t *priv = this->private;
+ char *buf = priv->handshake.incoming.buf;
+ int32_t recv_buf_size, send_buf_size;
+ socklen_t sock_len;
+
+ if (priv->handshake.incoming.state == RDMA_HANDSHAKE_COMPLETE) {
+ return -1;
+ }
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ while (priv->handshake.incoming.state != RDMA_HANDSHAKE_COMPLETE)
+ {
+ switch (priv->handshake.incoming.state)
+ {
+ case RDMA_HANDSHAKE_START:
+ buf = priv->handshake.incoming.buf = GF_CALLOC (1, 256, gf_common_mt_char);
+ rdma_fill_handshake_data (buf, &priv->handshake.incoming, priv);
+ buf[0] = 0;
+ priv->handshake.incoming.state = RDMA_HANDSHAKE_RECEIVING_DATA;
+ break;
+
+ case RDMA_HANDSHAKE_RECEIVING_DATA:
+ ret = __tcp_readv (this,
+ &priv->handshake.incoming.vector,
+ priv->handshake.incoming.count,
+ &priv->handshake.incoming.pending_vector,
+ &priv->handshake.incoming.pending_count);
+ if (ret == -1) {
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "partial header read on NB socket. continue later");
+ goto unlock;
+ }
+
+ if (!ret) {
+ priv->handshake.incoming.state = RDMA_HANDSHAKE_RECEIVED_DATA;
+ }
+ break;
+
+ case RDMA_HANDSHAKE_RECEIVED_DATA:
+ ret = sscanf (buf,
+ "QP1:RECV_BLKSIZE=%08x:SEND_BLKSIZE=%08x\n"
+ "QP1:LID=%04x:QPN=%06x:PSN=%06x\n",
+ &recv_buf_size,
+ &send_buf_size,
+ &priv->peer.remote_lid,
+ &priv->peer.remote_qpn,
+ &priv->peer.remote_psn);
+
+ if ((ret != 5) && (strncmp (buf, "QP1:", 4))) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_CRITICAL,
+ "%s: remote-host(%s)'s "
+ "transport type is different",
+ this->name,
+ this->peerinfo.identifier);
+ ret = -1;
+ goto unlock;
+ }
+
+ if (recv_buf_size < priv->peer.recv_size)
+ priv->peer.recv_size = recv_buf_size;
+ if (send_buf_size < priv->peer.send_size)
+ priv->peer.send_size = send_buf_size;
+
+ gf_log (RDMA_LOG_NAME, GF_LOG_TRACE,
+ "%s: transacted recv_size=%d "
+ "send_size=%d",
+ this->name, priv->peer.recv_size,
+ priv->peer.send_size);
+
+ priv->peer.quota = priv->peer.send_count;
+
+ if (rdma_connect_qp (this)) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_ERROR,
+ "%s: failed to connect with "
+ "remote QP", this->name);
+ ret = -1;
+ goto unlock;
+ }
+ rdma_fill_handshake_ack (buf, &priv->handshake.incoming);
+ buf[0] = 0;
+ priv->handshake.incoming.state = RDMA_HANDSHAKE_RECEIVING_ACK;
+ break;
+
+ case RDMA_HANDSHAKE_RECEIVING_ACK:
+ ret = __tcp_readv (this,
+ &priv->handshake.incoming.vector,
+ priv->handshake.incoming.count,
+ &priv->handshake.incoming.pending_vector,
+ &priv->handshake.incoming.pending_count);
+ if (ret == -1) {
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "partial header read on NB "
+ "socket. continue later");
+ goto unlock;
+ }
+
+ if (!ret) {
+ priv->handshake.incoming.state = RDMA_HANDSHAKE_RECEIVED_ACK;
+ }
+ break;
+
+ case RDMA_HANDSHAKE_RECEIVED_ACK:
+ if (strncmp (buf, "DONE", 4)) {
+ gf_log (RDMA_LOG_NAME,
+ GF_LOG_DEBUG,
+ "%s: handshake-3 did not "
+ "return 'DONE' (%s)",
+ this->name, buf);
+ ret = -1;
+ goto unlock;
+ }
+ ret = 0;
+ priv->connected = 1;
+ sock_len = sizeof (struct sockaddr_storage);
+ getpeername (priv->sock,
+ (struct sockaddr *) &this->peerinfo.sockaddr,
+ &sock_len);
+
+ GF_FREE (priv->handshake.incoming.buf);
+ priv->handshake.incoming.buf = NULL;
+ priv->handshake.incoming.state = RDMA_HANDSHAKE_COMPLETE;
+ }
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ if (ret == -1) {
+ rpc_transport_disconnect (this);
+ } else {
+ ret = 0;
+ }
+
+
+ if (!ret && priv->connected) {
+ if (priv->is_server) {
+ ret = rpc_transport_notify (priv->listener,
+ RPC_TRANSPORT_ACCEPT,
+ this);
+ } else {
+ ret = rpc_transport_notify (this, RPC_TRANSPORT_CONNECT,
+ this);
+ }
+ }
+
+ return ret;
+}
+
+static int
+rdma_handshake_pollout (rpc_transport_t *this)
+{
+ rdma_private_t *priv = this->private;
+ char *buf = priv->handshake.outgoing.buf;
+ int32_t ret = 0;
+
+ if (priv->handshake.outgoing.state == RDMA_HANDSHAKE_COMPLETE) {
+ return 0;
+ }
+
+ pthread_mutex_unlock (&priv->write_mutex);
+ {
+ while (priv->handshake.outgoing.state != RDMA_HANDSHAKE_COMPLETE)
+ {
+ switch (priv->handshake.outgoing.state)
+ {
+ case RDMA_HANDSHAKE_START:
+ buf = priv->handshake.outgoing.buf = GF_CALLOC (1, 256, gf_common_mt_char);
+ rdma_fill_handshake_data (buf, &priv->handshake.outgoing, priv);
+ priv->handshake.outgoing.state = RDMA_HANDSHAKE_SENDING_DATA;
+ break;
+
+ case RDMA_HANDSHAKE_SENDING_DATA:
+ ret = __tcp_writev (this,
+ &priv->handshake.outgoing.vector,
+ priv->handshake.outgoing.count,
+ &priv->handshake.outgoing.pending_vector,
+ &priv->handshake.outgoing.pending_count);
+ if (ret == -1) {
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "partial header read on NB socket. continue later");
+ goto unlock;
+ }
+
+ if (!ret) {
+ priv->handshake.outgoing.state = RDMA_HANDSHAKE_SENT_DATA;
+ }
+ break;
+
+ case RDMA_HANDSHAKE_SENT_DATA:
+ rdma_fill_handshake_ack (buf, &priv->handshake.outgoing);
+ priv->handshake.outgoing.state = RDMA_HANDSHAKE_SENDING_ACK;
+ break;
+
+ case RDMA_HANDSHAKE_SENDING_ACK:
+ ret = __tcp_writev (this,
+ &priv->handshake.outgoing.vector,
+ priv->handshake.outgoing.count,
+ &priv->handshake.outgoing.pending_vector,
+ &priv->handshake.outgoing.pending_count);
+
+ if (ret == -1) {
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "partial header read on NB "
+ "socket. continue later");
+ goto unlock;
+ }
+
+ if (!ret) {
+ GF_FREE (priv->handshake.outgoing.buf);
+ priv->handshake.outgoing.buf = NULL;
+ priv->handshake.outgoing.state = RDMA_HANDSHAKE_COMPLETE;
+ }
+ break;
+ }
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ if (ret == -1) {
+ rpc_transport_disconnect (this);
+ } else {
+ ret = 0;
+ }
+
+ return ret;
+}
+
+static int
+rdma_handshake_pollerr (rpc_transport_t *this)
+{
+ rdma_private_t *priv = this->private;
+ int32_t ret = 0;
+ char need_unref = 0;
+
+ gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG,
+ "%s: peer disconnected, cleaning up",
+ this->name);
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ __rdma_teardown (this);
+
+ if (priv->sock != -1) {
+ event_unregister (this->ctx->event_pool,
+ priv->sock, priv->idx);
+ need_unref = 1;
+
+ if (close (priv->sock) != 0) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "close () - error: %s",
+ strerror (errno));
+ ret = -errno;
+ }
+ priv->tcp_connected = priv->connected = 0;
+ priv->sock = -1;
+ }
+
+ if (priv->handshake.incoming.buf) {
+ GF_FREE (priv->handshake.incoming.buf);
+ priv->handshake.incoming.buf = NULL;
+ }
+
+ priv->handshake.incoming.state = RDMA_HANDSHAKE_START;
+
+ if (priv->handshake.outgoing.buf) {
+ GF_FREE (priv->handshake.outgoing.buf);
+ priv->handshake.outgoing.buf = NULL;
+ }
+
+ priv->handshake.outgoing.state = RDMA_HANDSHAKE_START;
+ }
+ pthread_mutex_unlock (&priv->write_mutex);
+
+/* xlator_notify (this->xl, GF_EVENT_POLLERR, this, NULL); */
+
+ if (need_unref)
+ rpc_transport_unref (this);
+
+ return 0;
+}
+
+
+static int
+tcp_connect_finish (rpc_transport_t *this)
+{
+ rdma_private_t *priv = this->private;
+ int error = 0, ret = 0;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ ret = __tcp_connect_finish (priv->sock);
+
+ if (!ret) {
+ this->myinfo.sockaddr_len =
+ sizeof (this->myinfo.sockaddr);
+ ret = getsockname (priv->sock,
+ (struct sockaddr *)&this->myinfo.sockaddr,
+ &this->myinfo.sockaddr_len);
+ if (ret == -1)
+ {
+ gf_log (this->name, GF_LOG_ERROR,
+ "getsockname on new client-socket %d "
+ "failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ error = 1;
+ goto unlock;
+ }
+
+ gf_rdma_get_transport_identifiers (this);
+ priv->tcp_connected = 1;
+ }
+
+ if (ret == -1 && errno != EINPROGRESS) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "tcp connect to %s failed (%s)",
+ this->peerinfo.identifier, strerror (errno));
+ error = 1;
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ if (error) {
+ rpc_transport_disconnect (this);
+ }
+
+ return ret;
+}
+
+static int
+rdma_event_handler (int fd, int idx, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ rpc_transport_t *this = data;
+ rdma_private_t *priv = this->private;
+ rdma_options_t *options = NULL;
+ int ret = 0;
+
+ if (!priv->tcp_connected) {
+ ret = tcp_connect_finish (this);
+ if (priv->tcp_connected) {
+ options = &priv->options;
+
+ priv->peer.send_count = options->send_count;
+ priv->peer.recv_count = options->recv_count;
+ priv->peer.send_size = options->send_size;
+ priv->peer.recv_size = options->recv_size;
+
+ if ((ret = rdma_create_qp (this)) < 0) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "%s: could not create QP",
+ this->name);
+ rpc_transport_disconnect (this);
+ }
+ }
+ }
+
+ if (!ret && poll_out && priv->tcp_connected) {
+ ret = rdma_handshake_pollout (this);
+ }
+
+ if (!ret && poll_in && priv->tcp_connected) {
+ if (priv->handshake.incoming.state == RDMA_HANDSHAKE_COMPLETE) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "%s: pollin received on tcp socket (peer: %s) "
+ "after handshake is complete",
+ this->name, this->peerinfo.identifier);
+ rdma_handshake_pollerr (this);
+ return 0;
+ }
+ ret = rdma_handshake_pollin (this);
+ }
+
+ if (ret < 0 || poll_err) {
+ ret = rdma_handshake_pollerr (this);
+ }
+
+ return 0;
+}
+
+static int
+__tcp_nonblock (int fd)
+{
+ int flags = 0;
+ int ret = -1;
+
+ flags = fcntl (fd, F_GETFL);
+
+ if (flags != -1)
+ ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK);
+
+ return ret;
+}
+
+static int32_t
+rdma_connect (struct rpc_transport *this, int port)
+{
+ dict_t *options = this->options;
+
+ rdma_private_t *priv = this->private;
+
+ int32_t ret = 0;
+ gf_boolean_t non_blocking = 1;
+ struct sockaddr_storage sockaddr;
+ socklen_t sockaddr_len = 0;
+
+ if (priv->connected) {
+ return 0;
+ }
+
+ if (dict_get (options, "non-blocking-io")) {
+ char *nb_connect = data_to_str (dict_get (this->options,
+ "non-blocking-io"));
+
+ if (gf_string2boolean (nb_connect, &non_blocking) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'non-blocking-io' takes only boolean "
+ "options, not taking any action");
+ non_blocking = 1;
+ }
+ }
+
+ ret = gf_rdma_client_get_remote_sockaddr (this,
+ (struct sockaddr *)&sockaddr,
+ &sockaddr_len);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "cannot get remote address to connect");
+ return ret;
+ }
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ if (priv->sock != -1) {
+ ret = 0;
+ goto unlock;
+ }
+
+ priv->sock = socket (((struct sockaddr *)&sockaddr)->sa_family,
+ SOCK_STREAM, 0);
+
+ if (priv->sock == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "socket () - error: %s", strerror (errno));
+ ret = -errno;
+ goto unlock;
+ }
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "socket fd = %d", priv->sock);
+
+ memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len);
+ this->peerinfo.sockaddr_len = sockaddr_len;
+
+ if (port > 0)
+ ((struct sockaddr_in *) (&sockaddr))->sin_port
+ = htons (port);
+
+ ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family =
+ ((struct sockaddr *)&this->peerinfo.sockaddr)->sa_family;
+
+ if (non_blocking)
+ {
+ ret = __tcp_nonblock (priv->sock);
+
+ if (ret == -1)
+ {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not set socket %d to non "
+ "blocking mode (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ ret = gf_rdma_client_bind (this,
+ (struct sockaddr *)&this->myinfo.sockaddr,
+ &this->myinfo.sockaddr_len, priv->sock);
+ if (ret == -1)
+ {
+ gf_log (this->name, GF_LOG_WARNING,
+ "client bind failed: %s", strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ ret = connect (priv->sock,
+ (struct sockaddr *)&this->peerinfo.sockaddr,
+ this->peerinfo.sockaddr_len);
+ if (ret == -1 && errno != EINPROGRESS)
+ {
+ gf_log (this->name, GF_LOG_ERROR,
+ "connection attempt failed (%s)",
+ strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ priv->tcp_connected = priv->connected = 0;
+
+ rpc_transport_ref (this);
+
+ priv->handshake.incoming.state = RDMA_HANDSHAKE_START;
+ priv->handshake.outgoing.state = RDMA_HANDSHAKE_START;
+
+ priv->idx = event_register (this->ctx->event_pool,
+ priv->sock, rdma_event_handler,
+ this, 1, 1);
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ return ret;
+}
+
+static int
+rdma_server_event_handler (int fd, int idx, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ int32_t main_sock = -1;
+ rpc_transport_t *this, *trans = data;
+ rdma_private_t *priv = NULL;
+ rdma_private_t *trans_priv = (rdma_private_t *) trans->private;
+ rdma_options_t *options = NULL;
+
+ if (!poll_in) {
+ return 0;
+ }
+
+ this = GF_CALLOC (1, sizeof (rpc_transport_t),
+ gf_common_mt_rpc_transport_t);
+ if (this == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ return -1;
+ }
+
+ priv = GF_CALLOC (1, sizeof (rdma_private_t),
+ gf_common_mt_rdma_private_t);
+ if (priv == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ GF_FREE (priv);
+ return -1;
+ }
+ this->private = priv;
+ /* Copy all the rdma related values in priv, from trans_priv
+ as other than QP, all the values remain same */
+ priv->device = trans_priv->device;
+ priv->options = trans_priv->options;
+ priv->request_ctx_pool = mem_pool_new (rdma_request_context_t,
+ RDMA_POOL_SIZE);
+ priv->is_server = 1;
+ priv->listener = trans;
+ if (priv->request_ctx_pool == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ GF_FREE (priv);
+ return -1;
+ }
+
+ priv->ioq_pool = mem_pool_new (rdma_ioq_t, RDMA_POOL_SIZE);
+ if (priv->ioq_pool == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ mem_pool_destroy (priv->request_ctx_pool);
+ GF_FREE (priv);
+ return -1;
+ }
+
+ priv->reply_info_pool = mem_pool_new (rdma_reply_info_t,
+ RDMA_POOL_SIZE);
+ if (priv->reply_info_pool == NULL) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR, "out of memory");
+ mem_pool_destroy (priv->request_ctx_pool);
+ mem_pool_destroy (priv->ioq_pool);
+ GF_FREE (priv);
+ return -1;
+ }
+
+ options = &priv->options;
+
+ this->ops = trans->ops;
+ this->init = trans->init;
+ this->fini = trans->fini;
+ this->ctx = trans->ctx;
+ this->name = gf_strdup (trans->name);
+ this->notify = trans->notify;
+ this->mydata = trans->mydata;
+
+ memcpy (&this->myinfo.sockaddr, &trans->myinfo.sockaddr,
+ trans->myinfo.sockaddr_len);
+ this->myinfo.sockaddr_len = trans->myinfo.sockaddr_len;
+
+ main_sock = (trans_priv)->sock;
+ this->peerinfo.sockaddr_len = sizeof (this->peerinfo.sockaddr);
+ priv->sock = accept (main_sock,
+ (struct sockaddr *)&this->peerinfo.sockaddr,
+ &this->peerinfo.sockaddr_len);
+ if (priv->sock == -1) {
+ gf_log ("rdma/server", GF_LOG_ERROR,
+ "accept() failed: %s",
+ strerror (errno));
+ mem_pool_destroy (priv->request_ctx_pool);
+ mem_pool_destroy (priv->ioq_pool);
+ GF_FREE (this->private);
+ GF_FREE (this);
+ return -1;
+ }
+
+ priv->peer.trans = this;
+ rpc_transport_ref (this);
+
+ gf_rdma_get_transport_identifiers (this);
+
+ priv->tcp_connected = 1;
+ priv->handshake.incoming.state = RDMA_HANDSHAKE_START;
+ priv->handshake.outgoing.state = RDMA_HANDSHAKE_START;
+
+ priv->peer.send_count = options->send_count;
+ priv->peer.recv_count = options->recv_count;
+ priv->peer.send_size = options->send_size;
+ priv->peer.recv_size = options->recv_size;
+ INIT_LIST_HEAD (&priv->peer.ioq);
+
+ if (rdma_create_qp (this) < 0) {
+ gf_log (RDMA_LOG_NAME, GF_LOG_ERROR,
+ "%s: could not create QP",
+ this->name);
+ rpc_transport_disconnect (this);
+ return -1;
+ }
+
+ priv->idx = event_register (this->ctx->event_pool, priv->sock,
+ rdma_event_handler, this, 1, 1);
+
+ pthread_mutex_init (&priv->read_mutex, NULL);
+ pthread_mutex_init (&priv->write_mutex, NULL);
+ pthread_mutex_init (&priv->recv_mutex, NULL);
+ /* pthread_cond_init (&priv->recv_cond, NULL); */
+ return 0;
+}
+
+static int32_t
+rdma_listen (rpc_transport_t *this)
+{
+ struct sockaddr_storage sockaddr;
+ socklen_t sockaddr_len;
+ rdma_private_t *priv = this->private;
+ int opt = 1, ret = 0;
+ char service[NI_MAXSERV], host[NI_MAXHOST];
+
+ memset (&sockaddr, 0, sizeof (sockaddr));
+ ret = gf_rdma_server_get_local_sockaddr (this,
+ (struct sockaddr *)&sockaddr,
+ &sockaddr_len);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "cannot find network address of server to bind to");
+ goto err;
+ }
+
+ priv->sock = socket (((struct sockaddr *)&sockaddr)->sa_family,
+ SOCK_STREAM, 0);
+ if (priv->sock == -1) {
+ gf_log ("rdma/server", GF_LOG_CRITICAL,
+ "init: failed to create socket, error: %s",
+ strerror (errno));
+ GF_FREE (this->private);
+ ret = -1;
+ goto err;
+ }
+
+ memcpy (&this->myinfo.sockaddr, &sockaddr, sockaddr_len);
+ this->myinfo.sockaddr_len = sockaddr_len;
+
+ ret = getnameinfo ((struct sockaddr *)&this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len,
+ host, sizeof (host),
+ service, sizeof (service),
+ NI_NUMERICHOST);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "getnameinfo failed (%s)", gai_strerror (ret));
+ goto err;
+ }
+ sprintf (this->myinfo.identifier, "%s:%s", host, service);
+
+ setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt));
+ if (bind (priv->sock,
+ (struct sockaddr *)&sockaddr,
+ sockaddr_len) != 0) {
+ ret = -1;
+ gf_log ("rdma/server", GF_LOG_ERROR,
+ "init: failed to bind to socket for %s (%s)",
+ this->myinfo.identifier, strerror (errno));
+ goto err;
+ }
+
+ if (listen (priv->sock, 10) != 0) {
+ gf_log ("rdma/server", GF_LOG_ERROR,
+ "init: listen () failed on socket for %s (%s)",
+ this->myinfo.identifier, strerror (errno));
+ ret = -1;
+ goto err;
+ }
+
+ /* Register the main socket */
+ priv->idx = event_register (this->ctx->event_pool, priv->sock,
+ rdma_server_event_handler,
+ rpc_transport_ref (this), 1, 0);
+
+err:
+ return ret;
+}
+
+struct rpc_transport_ops tops = {
+ .submit_request = rdma_submit_request,
+ .submit_reply = rdma_submit_reply,
+ .connect = rdma_connect,
+ .disconnect = rdma_disconnect,
+ .listen = rdma_listen,
+};
+
+int32_t
+init (rpc_transport_t *this)
+{
+ rdma_private_t *priv = NULL;
+
+ priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_rdma_private_t);
+
+ this->private = priv;
+ priv->sock = -1;
+
+ if (rdma_init (this)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to initialize IB Device");
+ return -1;
+ }
+
+ return 0;
+}
+
+void
+fini (struct rpc_transport *this)
+{
+ /* TODO: verify this function does graceful finish */
+ rdma_private_t *priv = this->private;
+ this->private = NULL;
+
+ pthread_mutex_destroy (&priv->recv_mutex);
+ pthread_mutex_destroy (&priv->write_mutex);
+ pthread_mutex_destroy (&priv->read_mutex);
+
+ mem_pool_destroy (priv->request_ctx_pool);
+ mem_pool_destroy (priv->ioq_pool);
+ mem_pool_destroy (priv->reply_info_pool);
+
+ /* pthread_cond_destroy (&priv->recv_cond); */
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "called fini on transport: %p",
+ this);
+ GF_FREE (priv);
+ return;
+}
+
+/* TODO: expand each option */
+struct volume_options options[] = {
+ { .key = {"transport.rdma.port",
+ "rdma-port"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 1,
+ .max = 4,
+ .description = "check the option by 'ibv_devinfo'"
+ },
+ { .key = {"transport.rdma.mtu",
+ "rdma-mtu"},
+ .type = GF_OPTION_TYPE_INT,
+ },
+ { .key = {"transport.rdma.device-name",
+ "rdma-device-name"},
+ .type = GF_OPTION_TYPE_ANY,
+ .description = "check by 'ibv_devinfo'"
+ },
+ { .key = {"transport.rdma.work-request-send-count",
+ "rdma-work-request-send-count"},
+ .type = GF_OPTION_TYPE_INT,
+ },
+ { .key = {"transport.rdma.work-request-recv-count",
+ "rdma-work-request-recv-count"},
+ .type = GF_OPTION_TYPE_INT,
+ },
+ { .key = {"remote-port",
+ "transport.remote-port",
+ "transport.rdma.remote-port"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.rdma.listen-port", "listen-port"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.rdma.connect-path", "connect-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.rdma.bind-path", "bind-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.rdma.listen-path", "listen-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.address-family",
+ "address-family"},
+ .value = {"inet", "inet6", "inet/inet6", "inet6/inet",
+ "unix", "inet-sdp" },
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {"transport.socket.lowlat"},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {NULL} }
+};
diff --git a/rpc/rpc-transport/rdma/src/rdma.h b/rpc/rpc-transport/rdma/src/rdma.h
new file mode 100644
index 000000000..470ee7206
--- /dev/null
+++ b/rpc/rpc-transport/rdma/src/rdma.h
@@ -0,0 +1,395 @@
+/*
+ Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _XPORT_RDMA_H
+#define _XPORT_RDMA_H
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#ifndef MAX_IOVEC
+#define MAX_IOVEC 16
+#endif /* MAX_IOVEC */
+
+#include "rpc-clnt.h"
+#include "rpc-transport.h"
+#include "xlator.h"
+#include "event.h"
+#include <stdio.h>
+#include <list.h>
+#include <arpa/inet.h>
+#include <infiniband/verbs.h>
+
+/* FIXME: give appropriate values to these macros */
+#define GF_DEFAULT_RDMA_LISTEN_PORT 6997
+#define RDMA_MAX_SEGMENTS 8
+#define RDMA_MAX_HEADER_SIZE (sizeof (rdma_header_t) \
+ + RDMA_MAX_SEGMENTS \
+ * sizeof (rdma_read_chunk_t))
+#define RDMA_INLINE_THRESHOLD (1024 * 128)
+#define RDMA_VERSION 1
+#define RDMA_POOL_SIZE 512
+
+typedef enum rdma_errcode {
+ ERR_VERS = 1,
+ ERR_CHUNK = 2
+}rdma_errcode_t;
+
+struct rdma_err_vers {
+ uint32_t rdma_vers_low; /* Version range supported by peer */
+ uint32_t rdma_vers_high;
+}__attribute__ ((packed));
+typedef struct rdma_err_vers rdma_err_vers_t;
+
+typedef enum rdma_proc {
+ RDMA_MSG = 0, /* An RPC call or reply msg */
+ RDMA_NOMSG = 1, /* An RPC call or reply msg - separate body */
+ RDMA_MSGP = 2, /* An RPC call or reply msg with padding */
+ RDMA_DONE = 3, /* Client signals reply completion */
+ RDMA_ERROR = 4 /* An RPC RDMA encoding error */
+}rdma_proc_t;
+
+typedef enum rdma_chunktype {
+ rdma_noch = 0, /* no chunk */
+ rdma_readch, /* some argument through rdma read */
+ rdma_areadch, /* entire request through rdma read */
+ rdma_writech, /* some result through rdma write */
+ rdma_replych /* entire reply through rdma write */
+}rdma_chunktype_t;
+
+struct __rdma_header {
+ uint32_t rm_xid; /* Mirrors the RPC header xid */
+ uint32_t rm_vers; /* Version of this protocol */
+ uint32_t rm_credit; /* Buffers requested/granted */
+ uint32_t rm_type; /* Type of message (enum rdma_proc) */
+ union {
+ struct { /* no chunks */
+ uint32_t rm_empty[3]; /* 3 empty chunk lists */
+ }__attribute__((packed)) rm_nochunks;
+
+ struct { /* no chunks and padded */
+ uint32_t rm_align; /* Padding alignment */
+ uint32_t rm_thresh; /* Padding threshold */
+ uint32_t rm_pempty[3]; /* 3 empty chunk lists */
+ }__attribute__((packed)) rm_padded;
+
+ struct {
+ uint32_t rm_type;
+ rdma_err_vers_t rm_version;
+ }__attribute__ ((packed)) rm_error;
+
+ uint32_t rm_chunks[0]; /* read, write and reply chunks */
+ }__attribute__ ((packed)) rm_body;
+} __attribute__((packed));
+typedef struct __rdma_header rdma_header_t;
+
+struct __rdma_segment {
+ uint32_t rs_handle; /* Registered memory handle */
+ uint32_t rs_length; /* Length of the chunk in bytes */
+ uint64_t rs_offset; /* Chunk virtual address or offset */
+} __attribute__((packed));
+typedef struct __rdma_segment rdma_segment_t;
+
+/* read chunk(s), encoded as a linked list. */
+struct __rdma_read_chunk {
+ uint32_t rc_discrim; /* 1 indicates presence */
+ uint32_t rc_position; /* Position in XDR stream */
+ rdma_segment_t rc_target;
+} __attribute__((packed));
+typedef struct __rdma_read_chunk rdma_read_chunk_t;
+
+/* write chunk, and reply chunk. */
+struct __rdma_write_chunk {
+ rdma_segment_t wc_target;
+} __attribute__((packed));
+typedef struct __rdma_write_chunk rdma_write_chunk_t;
+
+/* write chunk(s), encoded as a counted array. */
+struct __rdma_write_array {
+ uint32_t wc_discrim; /* 1 indicates presence */
+ uint32_t wc_nchunks; /* Array count */
+ struct __rdma_write_chunk wc_array[0];
+} __attribute__((packed));
+typedef struct __rdma_write_array rdma_write_array_t;
+
+/* options per transport end point */
+struct __rdma_options {
+ int32_t port;
+ char *device_name;
+ enum ibv_mtu mtu;
+ int32_t send_count;
+ int32_t recv_count;
+ uint64_t recv_size;
+ uint64_t send_size;
+};
+typedef struct __rdma_options rdma_options_t;
+
+struct __rdma_reply_info {
+ uint32_t rm_xid; /* xid in network endian */
+ rdma_chunktype_t type; /*
+ * can be either rdma_replych
+ * or rdma_writech.
+ */
+ rdma_write_array_t *wc_array;
+ struct mem_pool *pool;
+};
+typedef struct __rdma_reply_info rdma_reply_info_t;
+
+struct __rdma_ioq {
+ union {
+ struct list_head list;
+ struct {
+ struct __rdma_ioq *next;
+ struct __rdma_ioq *prev;
+ };
+ };
+
+ char is_request;
+ struct iovec rpchdr[MAX_IOVEC];
+ int rpchdr_count;
+ struct iovec proghdr[MAX_IOVEC];
+ int proghdr_count;
+ struct iovec prog_payload[MAX_IOVEC];
+ int prog_payload_count;
+
+ struct iobref *iobref;
+
+ union {
+ struct __rdma_ioq_request {
+ /* used to build reply_chunk for RDMA_NOMSG type msgs */
+ struct iovec rsphdr_vec[MAX_IOVEC];
+ int rsphdr_count;
+
+ /*
+ * used to build write_array during operations like
+ * read.
+ */
+ struct iovec rsp_payload[MAX_IOVEC];
+ int rsp_payload_count;
+
+ struct rpc_req *rpc_req; /* FIXME: hack! hack! should be
+ * cleaned up later
+ */
+ struct iobref *rsp_iobref;
+ }request;
+
+ rdma_reply_info_t *reply_info;
+ }msg;
+
+ struct mem_pool *pool;
+};
+typedef struct __rdma_ioq rdma_ioq_t;
+
+typedef enum __rdma_send_post_type {
+ RDMA_SEND_POST_NO_CHUNKLIST, /* post which is sent using rdma-send
+ * and the msg carries no
+ * chunklists.
+ */
+ RDMA_SEND_POST_READ_CHUNKLIST, /* post which is sent using rdma-send
+ * and the msg carries only read
+ * chunklist.
+ */
+ RDMA_SEND_POST_WRITE_CHUNKLIST, /* post which is sent using
+ * rdma-send and the msg carries
+ * only write chunklist.
+ */
+ RDMA_SEND_POST_READ_WRITE_CHUNKLIST, /* post which is sent using
+ * rdma-send and the msg
+ * carries both read and
+ * write chunklists.
+ */
+ RDMA_SEND_POST_RDMA_READ, /* RDMA read */
+ RDMA_SEND_POST_RDMA_WRITE, /* RDMA write */
+}rdma_send_post_type_t;
+
+/* represents one communication peer, two per transport_t */
+struct __rdma_peer {
+ rpc_transport_t *trans;
+ struct ibv_qp *qp;
+
+ int32_t recv_count;
+ int32_t send_count;
+ int32_t recv_size;
+ int32_t send_size;
+
+ int32_t quota;
+ union {
+ struct list_head ioq;
+ struct {
+ rdma_ioq_t *ioq_next;
+ rdma_ioq_t *ioq_prev;
+ };
+ };
+
+ /* QP attributes, needed to connect with remote QP */
+ int32_t local_lid;
+ int32_t local_psn;
+ int32_t local_qpn;
+ int32_t remote_lid;
+ int32_t remote_psn;
+ int32_t remote_qpn;
+};
+typedef struct __rdma_peer rdma_peer_t;
+
+struct __rdma_post_context {
+ struct ibv_mr *mr[RDMA_MAX_SEGMENTS];
+ int mr_count;
+ struct iovec vector[MAX_IOVEC];
+ int count;
+ struct iobref *iobref;
+ char is_request;
+ rdma_reply_info_t *reply_info;
+};
+typedef struct __rdma_post_context rdma_post_context_t;
+
+typedef enum {
+ RDMA_SEND_POST,
+ RDMA_RECV_POST
+} rdma_post_type_t;
+
+struct __rdma_post {
+ struct __rdma_post *next, *prev;
+ struct ibv_mr *mr;
+ char *buf;
+ int32_t buf_size;
+ char aux;
+ int32_t reused;
+ struct __rdma_device *device;
+ rdma_post_type_t type;
+ rdma_post_context_t ctx;
+ int refcount;
+ pthread_mutex_t lock;
+};
+typedef struct __rdma_post rdma_post_t;
+
+struct __rdma_queue {
+ rdma_post_t active_posts, passive_posts;
+ int32_t active_count, passive_count;
+ pthread_mutex_t lock;
+};
+typedef struct __rdma_queue rdma_queue_t;
+
+struct __rdma_qpreg {
+ pthread_mutex_t lock;
+ int32_t count;
+ struct _qpent {
+ struct _qpent *next, *prev;
+ int32_t qp_num;
+ rdma_peer_t *peer;
+ } ents[42];
+};
+typedef struct __rdma_qpreg rdma_qpreg_t;
+
+/* context per device, stored in global glusterfs_ctx_t->ib */
+struct __rdma_device {
+ struct __rdma_device *next;
+ const char *device_name;
+ struct ibv_context *context;
+ int32_t port;
+ struct ibv_pd *pd;
+ struct ibv_srq *srq;
+ rdma_qpreg_t qpreg;
+ struct ibv_comp_channel *send_chan, *recv_chan;
+ struct ibv_cq *send_cq, *recv_cq;
+ rdma_queue_t sendq, recvq;
+ pthread_t send_thread, recv_thread;
+};
+typedef struct __rdma_device rdma_device_t;
+
+typedef enum {
+ RDMA_HANDSHAKE_START = 0,
+ RDMA_HANDSHAKE_SENDING_DATA,
+ RDMA_HANDSHAKE_RECEIVING_DATA,
+ RDMA_HANDSHAKE_SENT_DATA,
+ RDMA_HANDSHAKE_RECEIVED_DATA,
+ RDMA_HANDSHAKE_SENDING_ACK,
+ RDMA_HANDSHAKE_RECEIVING_ACK,
+ RDMA_HANDSHAKE_RECEIVED_ACK,
+ RDMA_HANDSHAKE_COMPLETE,
+} rdma_handshake_state_t;
+
+struct rdma_nbio {
+ int state;
+ char *buf;
+ int count;
+ struct iovec vector;
+ struct iovec *pending_vector;
+ int pending_count;
+};
+
+struct __rdma_request_context {
+ struct ibv_mr *mr[RDMA_MAX_SEGMENTS];
+ int mr_count;
+ struct mem_pool *pool;
+ rdma_peer_t *peer;
+ struct iobref *iobref;
+ struct iobref *rsp_iobref;
+};
+typedef struct __rdma_request_context rdma_request_context_t;
+
+struct __rdma_private {
+ int32_t sock;
+ int32_t idx;
+ unsigned char connected;
+ unsigned char tcp_connected;
+ unsigned char ib_connected;
+ in_addr_t addr;
+ unsigned short port;
+
+ /* IB Verbs Driver specific variables, pointers */
+ rdma_peer_t peer;
+ struct __rdma_device *device;
+ rdma_options_t options;
+
+ /* Used by trans->op->receive */
+ char *data_ptr;
+ int32_t data_offset;
+ int32_t data_len;
+
+ /* Mutex */
+ pthread_mutex_t read_mutex;
+ pthread_mutex_t write_mutex;
+ pthread_barrier_t handshake_barrier;
+ char handshake_ret;
+ char is_server;
+ rpc_transport_t *listener;
+
+ pthread_mutex_t recv_mutex;
+ pthread_cond_t recv_cond;
+
+ struct mem_pool *request_ctx_pool;
+ struct mem_pool *ioq_pool;
+ struct mem_pool *reply_info_pool;
+
+ /* used during rdma_handshake */
+ struct {
+ struct rdma_nbio incoming;
+ struct rdma_nbio outgoing;
+ int state;
+ rdma_header_t header;
+ char *buf;
+ size_t size;
+ } handshake;
+};
+typedef struct __rdma_private rdma_private_t;
+
+#endif /* _XPORT_RDMA_H */