summaryrefslogtreecommitdiffstats
path: root/transport/socket
diff options
context:
space:
mode:
Diffstat (limited to 'transport/socket')
-rw-r--r--transport/socket/Makefile.am1
-rw-r--r--transport/socket/src/Makefile.am14
-rw-r--r--transport/socket/src/name.c677
-rw-r--r--transport/socket/src/name.h44
-rw-r--r--transport/socket/src/socket.c1370
-rw-r--r--transport/socket/src/socket.h106
6 files changed, 2212 insertions, 0 deletions
diff --git a/transport/socket/Makefile.am b/transport/socket/Makefile.am
new file mode 100644
index 00000000000..f963effea22
--- /dev/null
+++ b/transport/socket/Makefile.am
@@ -0,0 +1 @@
+SUBDIRS = src \ No newline at end of file
diff --git a/transport/socket/src/Makefile.am b/transport/socket/src/Makefile.am
new file mode 100644
index 00000000000..e112921232c
--- /dev/null
+++ b/transport/socket/src/Makefile.am
@@ -0,0 +1,14 @@
+noinst_HEADERS = socket.h name.h
+
+transport_LTLIBRARIES = socket.la
+transportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/transport/
+
+socket_la_LDFLAGS = -module -avoidversion
+
+socket_la_SOURCES = socket.c name.c
+socket_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\
+ -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS)
+
+CLEANFILES = *~
diff --git a/transport/socket/src/name.c b/transport/socket/src/name.c
new file mode 100644
index 00000000000..a599b00cced
--- /dev/null
+++ b/transport/socket/src/name.c
@@ -0,0 +1,677 @@
+/*
+ Copyright (c) 2008 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <errno.h>
+#include <netdb.h>
+#include <string.h>
+
+#ifdef CLIENT_PORT_CEILING
+#undef CLIENT_PORT_CEILING
+#endif
+
+#define CLIENT_PORT_CEILING 1024
+
+#ifndef AF_INET_SDP
+#define AF_INET_SDP 27
+#endif
+
+#include "transport.h"
+#include "socket.h"
+
+int32_t
+gf_resolve_ip6 (const char *hostname,
+ uint16_t port,
+ int family,
+ void **dnscache,
+ struct addrinfo **addr_info);
+
+static int32_t
+af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr,
+ socklen_t sockaddr_len, int ceiling)
+{
+ int32_t ret = -1;
+ /* struct sockaddr_in sin = {0, }; */
+ uint16_t port = ceiling - 1;
+
+ while (port)
+ {
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET6:
+ ((struct sockaddr_in6 *)sockaddr)->sin6_port = htons (port);
+ break;
+
+ case AF_INET_SDP:
+ case AF_INET:
+ ((struct sockaddr_in *)sockaddr)->sin_port = htons (port);
+ break;
+ }
+
+ ret = bind (fd, sockaddr, sockaddr_len);
+
+ if (ret == 0)
+ break;
+
+ if (ret == -1 && errno == EACCES)
+ break;
+
+ port--;
+ }
+
+ return ret;
+}
+
+static int32_t
+af_unix_client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t sockaddr_len,
+ int sock)
+{
+ data_t *path_data = NULL;
+ struct sockaddr_un *addr = NULL;
+ int32_t ret = -1;
+
+ path_data = dict_get (this->xl->options, "transport.socket.bind-path");
+ if (path_data) {
+ char *path = data_to_str (path_data);
+ if (!path || strlen (path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "bind-path not specfied for unix socket, "
+ "letting connect to assign default value");
+ goto err;
+ }
+
+ addr = (struct sockaddr_un *) sockaddr;
+ strcpy (addr->sun_path, path);
+ ret = bind (sock, (struct sockaddr *)addr, sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot bind to unix-domain socket %d (%s)",
+ sock, strerror (errno));
+ goto err;
+ }
+ }
+
+err:
+ return ret;
+}
+
+static int32_t
+client_fill_address_family (transport_t *this, struct sockaddr *sockaddr)
+{
+ data_t *address_family_data = NULL;
+
+ address_family_data = dict_get (this->xl->options,
+ "transport.address-family");
+ if (!address_family_data) {
+ data_t *remote_host_data = NULL, *connect_path_data = NULL;
+ remote_host_data = dict_get (this->xl->options, "remote-host");
+ connect_path_data = dict_get (this->xl->options,
+ "transport.socket.connect-path");
+
+ if (!(remote_host_data || connect_path_data) ||
+ (remote_host_data && connect_path_data)) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "transport.address-family not specified and "
+ "not able to determine the "
+ "same from other options (remote-host:%s and "
+ "transport.unix.connect-path:%s)",
+ data_to_str (remote_host_data),
+ data_to_str (connect_path_data));
+ return -1;
+ }
+
+ if (remote_host_data) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "address-family not specified, guessing it "
+ "to be inet/inet6");
+ sockaddr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "address-family not specified, guessing it "
+ "to be unix");
+ sockaddr->sa_family = AF_UNIX;
+ }
+
+ } else {
+ char *address_family = data_to_str (address_family_data);
+ if (!strcasecmp (address_family, "unix")) {
+ sockaddr->sa_family = AF_UNIX;
+ } else if (!strcasecmp (address_family, "inet")) {
+ sockaddr->sa_family = AF_INET;
+ } else if (!strcasecmp (address_family, "inet6")) {
+ sockaddr->sa_family = AF_INET6;
+ } else if (!strcasecmp (address_family, "inet-sdp")) {
+ sockaddr->sa_family = AF_INET_SDP;
+ } else if (!strcasecmp (address_family, "inet/inet6")
+ || !strcasecmp (address_family, "inet6/inet")) {
+ sockaddr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address-family (%s) specified",
+ address_family);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static int32_t
+af_inet_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ dict_t *options = this->xl->options;
+ data_t *remote_host_data = NULL;
+ data_t *remote_port_data = NULL;
+ char *remote_host = NULL;
+ uint16_t remote_port = 0;
+ struct addrinfo *addr_info = NULL;
+ int32_t ret = 0;
+
+ remote_host_data = dict_get (options, "remote-host");
+ if (remote_host_data == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-host missing in volume %s", this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ remote_host = data_to_str (remote_host_data);
+ if (remote_host == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-host has data NULL in volume %s", this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ remote_port_data = dict_get (options, "remote-port");
+ if (remote_port_data == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "option remote-port missing in volume %s. Defaulting to %d",
+ this->xl->name, GF_DEFAULT_SOCKET_LISTEN_PORT);
+
+ remote_port = GF_DEFAULT_SOCKET_LISTEN_PORT;
+ }
+ else
+ {
+ remote_port = data_to_uint16 (remote_port_data);
+ }
+
+ if (remote_port == (uint16_t)-1)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-port has invalid port in volume %s",
+ this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ /* TODO: gf_resolve is a blocking call. kick in some
+ non blocking dns techniques */
+ ret = gf_resolve_ip6 (remote_host, remote_port,
+ sockaddr->sa_family, &this->dnscache, &addr_info);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "DNS resolution failed on host %s", remote_host);
+ goto err;
+ }
+
+ memcpy (sockaddr, addr_info->ai_addr, addr_info->ai_addrlen);
+ *sockaddr_len = addr_info->ai_addrlen;
+
+err:
+ return ret;
+}
+
+static int32_t
+af_unix_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ struct sockaddr_un *sockaddr_un = NULL;
+ char *connect_path = NULL;
+ data_t *connect_path_data = NULL;
+ int32_t ret = 0;
+
+ connect_path_data = dict_get (this->xl->options,
+ "transport.socket.connect-path");
+ if (!connect_path_data) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option transport.unix.connect-path not specified for "
+ "address-family unix");
+ ret = -1;
+ goto err;
+ }
+
+ connect_path = data_to_str (connect_path_data);
+ if (!connect_path) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "transport.unix.connect-path is null-string");
+ ret = -1;
+ goto err;
+ }
+
+ if (strlen (connect_path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connect-path value length %"GF_PRI_SIZET" > %d octets",
+ strlen (connect_path), UNIX_PATH_MAX);
+ ret = -1;
+ goto err;
+ }
+
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "using connect-path %s", connect_path);
+ sockaddr_un = (struct sockaddr_un *)sockaddr;
+ strcpy (sockaddr_un->sun_path, connect_path);
+ *sockaddr_len = sizeof (struct sockaddr_un);
+
+err:
+ return ret;
+}
+
+static int32_t
+af_unix_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ data_t *listen_path_data = NULL;
+ char *listen_path = NULL;
+ int32_t ret = 0;
+ struct sockaddr_un *sunaddr = (struct sockaddr_un *)addr;
+
+
+ listen_path_data = dict_get (this->xl->options,
+ "transport.socket.listen-path");
+ if (!listen_path_data) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "missing option transport.socket.listen-path");
+ ret = -1;
+ goto err;
+ }
+
+ listen_path = data_to_str (listen_path_data);
+
+#ifndef UNIX_PATH_MAX
+#define UNIX_PATH_MAX 108
+#endif
+
+ if (strlen (listen_path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option transport.unix.listen-path has value length "
+ "%"GF_PRI_SIZET" > %d",
+ strlen (listen_path), UNIX_PATH_MAX);
+ ret = -1;
+ goto err;
+ }
+
+ sunaddr->sun_family = AF_UNIX;
+ strcpy (sunaddr->sun_path, listen_path);
+ *addr_len = sizeof (struct sockaddr_un);
+
+err:
+ return ret;
+}
+
+static int32_t
+af_inet_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ struct addrinfo hints, *res = 0;
+ data_t *listen_port_data = NULL, *listen_host_data = NULL;
+ uint16_t listen_port = -1;
+ char service[NI_MAXSERV], *listen_host = NULL;
+ dict_t *options = NULL;
+ int32_t ret = 0;
+
+ options = this->xl->options;
+
+ listen_port_data = dict_get (options, "transport.socket.listen-port");
+ listen_host_data = dict_get (options, "transport.socket.bind-address");
+
+ if (listen_port_data)
+ {
+ listen_port = data_to_uint16 (listen_port_data);
+ }
+
+ if (listen_port == (uint16_t) -1)
+ listen_port = GF_DEFAULT_SOCKET_LISTEN_PORT;
+
+
+ if (listen_host_data)
+ {
+ listen_host = data_to_str (listen_host_data);
+ }
+
+ memset (service, 0, sizeof (service));
+ sprintf (service, "%d", listen_port);
+
+ memset (&hints, 0, sizeof (hints));
+ hints.ai_family = addr->sa_family;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE;
+
+ ret = getaddrinfo(listen_host, service, &hints, &res);
+ if (ret != 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "getaddrinfo failed for host %s, service %s (%s)",
+ listen_host, service, gai_strerror (ret));
+ ret = -1;
+ goto err;
+ }
+
+ memcpy (addr, res->ai_addr, res->ai_addrlen);
+ *addr_len = res->ai_addrlen;
+
+ freeaddrinfo (res);
+
+err:
+ return ret;
+}
+
+int32_t
+client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len,
+ int sock)
+{
+ int ret = 0;
+
+ *sockaddr_len = sizeof (struct sockaddr_in6);
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET_SDP:
+ case AF_INET:
+ *sockaddr_len = sizeof (struct sockaddr_in);
+
+ case AF_INET6:
+ ret = af_inet_bind_to_port_lt_ceiling (sock, sockaddr,
+ *sockaddr_len, CLIENT_PORT_CEILING);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot bind inet socket (%d) to port less than %d (%s)",
+ sock, CLIENT_PORT_CEILING, strerror (errno));
+ ret = 0;
+ }
+ break;
+
+ case AF_UNIX:
+ *sockaddr_len = sizeof (struct sockaddr_un);
+ ret = af_unix_client_bind (this, (struct sockaddr *)sockaddr,
+ *sockaddr_len, sock);
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family %d", sockaddr->sa_family);
+ ret = -1;
+ break;
+ }
+
+ return ret;
+}
+
+int32_t
+socket_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ ret = client_fill_address_family (this, sockaddr);
+ if (ret) {
+ ret = -1;
+ goto err;
+ }
+
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET_SDP:
+ sockaddr->sa_family = AF_INET;
+ is_inet_sdp = 1;
+
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ ret = af_inet_client_get_remote_sockaddr (this, sockaddr, sockaddr_len);
+
+ if (is_inet_sdp) {
+ sockaddr->sa_family = AF_INET_SDP;
+ }
+
+ break;
+
+ case AF_UNIX:
+ ret = af_unix_client_get_remote_sockaddr (this, sockaddr, sockaddr_len);
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address-family %d", sockaddr->sa_family);
+ ret = -1;
+ }
+
+err:
+ return ret;
+}
+
+int32_t
+socket_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ data_t *address_family_data = NULL;
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ address_family_data = dict_get (this->xl->options,
+ "transport.address-family");
+ if (address_family_data) {
+ char *address_family = NULL;
+ address_family = data_to_str (address_family_data);
+
+ if (!strcasecmp (address_family, "inet")) {
+ addr->sa_family = AF_INET;
+ } else if (!strcasecmp (address_family, "inet6")) {
+ addr->sa_family = AF_INET6;
+ } else if (!strcasecmp (address_family, "inet-sdp")) {
+ addr->sa_family = AF_INET_SDP;
+ } else if (!strcasecmp (address_family, "unix")) {
+ addr->sa_family = AF_UNIX;
+ } else if (!strcasecmp (address_family, "inet/inet6")
+ || !strcasecmp (address_family, "inet6/inet")) {
+ addr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family (%s) specified", address_family);
+ ret = -1;
+ goto err;
+ }
+ } else {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "option address-family not specified, defaulting to inet/inet6");
+ addr->sa_family = AF_UNSPEC;
+ }
+
+ switch (addr->sa_family)
+ {
+ case AF_INET_SDP:
+ is_inet_sdp = 1;
+ addr->sa_family = AF_INET;
+
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ ret = af_inet_server_get_local_sockaddr (this, addr, addr_len);
+ if (is_inet_sdp && !ret) {
+ addr->sa_family = AF_INET_SDP;
+ }
+ break;
+
+ case AF_UNIX:
+ ret = af_unix_server_get_local_sockaddr (this, addr, addr_len);
+ break;
+ }
+
+err:
+ return ret;
+}
+
+int32_t
+fill_inet6_inet_identifiers (transport_t *this, struct sockaddr_storage *addr,
+ int32_t addr_len, char *identifier)
+{
+ int32_t ret = 0, tmpaddr_len = 0;
+ char service[NI_MAXSERV], host[NI_MAXHOST];
+ struct sockaddr_storage tmpaddr;
+
+ memset (&tmpaddr, 0, sizeof (tmpaddr));
+ tmpaddr = *addr;
+ tmpaddr_len = addr_len;
+
+ if (((struct sockaddr *) &tmpaddr)->sa_family == AF_INET6) {
+ int32_t one_to_four, four_to_eight, twelve_to_sixteen;
+ int16_t eight_to_ten, ten_to_twelve;
+
+ one_to_four = four_to_eight = twelve_to_sixteen = 0;
+ eight_to_ten = ten_to_twelve = 0;
+
+ one_to_four = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[0];
+ four_to_eight = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[1];
+#ifdef GF_SOLARIS_HOST_OS
+ eight_to_ten = S6_ADDR16(((struct sockaddr_in6 *) &tmpaddr)->sin6_addr)[4];
+#else
+ eight_to_ten = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[4];
+#endif
+
+#ifdef GF_SOLARIS_HOST_OS
+ ten_to_twelve = S6_ADDR16(((struct sockaddr_in6 *) &tmpaddr)->sin6_addr)[5];
+#else
+ ten_to_twelve = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[5];
+#endif
+
+ twelve_to_sixteen = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[3];
+
+ /* ipv4 mapped ipv6 address has
+ bits 0-80: 0
+ bits 80-96: 0xffff
+ bits 96-128: ipv4 address
+ */
+
+ if (one_to_four == 0 &&
+ four_to_eight == 0 &&
+ eight_to_ten == 0 &&
+ ten_to_twelve == -1) {
+ struct sockaddr_in *in_ptr = (struct sockaddr_in *)&tmpaddr;
+ memset (&tmpaddr, 0, sizeof (tmpaddr));
+
+ in_ptr->sin_family = AF_INET;
+ in_ptr->sin_port = ((struct sockaddr_in6 *)addr)->sin6_port;
+ in_ptr->sin_addr.s_addr = twelve_to_sixteen;
+ tmpaddr_len = sizeof (*in_ptr);
+ }
+ }
+
+ ret = getnameinfo ((struct sockaddr *) &tmpaddr,
+ tmpaddr_len,
+ host, sizeof (host),
+ service, sizeof (service),
+ NI_NUMERICHOST | NI_NUMERICSERV);
+ if (ret != 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "getnameinfo failed (%s)", gai_strerror (ret));
+ }
+
+ sprintf (identifier, "%s:%s", host, service);
+
+ return ret;
+}
+
+int32_t
+get_transport_identifiers (transport_t *this)
+{
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ switch (((struct sockaddr *) &this->myinfo.sockaddr)->sa_family)
+ {
+ case AF_INET_SDP:
+ is_inet_sdp = 1;
+ ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET;
+
+ case AF_INET:
+ case AF_INET6:
+ {
+ ret = fill_inet6_inet_identifiers (this,
+ &this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len,
+ this->myinfo.identifier);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot fill inet/inet6 identifier for server");
+ goto err;
+ }
+
+ ret = fill_inet6_inet_identifiers (this,
+ &this->peerinfo.sockaddr,
+ this->peerinfo.sockaddr_len,
+ this->peerinfo.identifier);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot fill inet/inet6 identifier for client");
+ goto err;
+ }
+
+ if (is_inet_sdp) {
+ ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET_SDP;
+ }
+ }
+ break;
+
+ case AF_UNIX:
+ {
+ struct sockaddr_un *sunaddr = NULL;
+
+ sunaddr = (struct sockaddr_un *) &this->myinfo.sockaddr;
+ strcpy (this->myinfo.identifier, sunaddr->sun_path);
+
+ sunaddr = (struct sockaddr_un *) &this->peerinfo.sockaddr;
+ strcpy (this->peerinfo.identifier, sunaddr->sun_path);
+ }
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family (%d)",
+ ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family);
+ ret = -1;
+ break;
+ }
+
+err:
+ return ret;
+}
diff --git a/transport/socket/src/name.h b/transport/socket/src/name.h
new file mode 100644
index 00000000000..552037bcc89
--- /dev/null
+++ b/transport/socket/src/name.h
@@ -0,0 +1,44 @@
+/*
+ Copyright (c) 2008 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _SOCKET_NAME_H
+#define _SOCKET_NAME_H
+
+#include "compat.h"
+
+int32_t
+client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len,
+ int sock);
+
+int32_t
+socket_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len);
+
+int32_t
+socket_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len);
+
+int32_t
+get_transport_identifiers (transport_t *this);
+
+#endif /* _SOCKET_NAME_H */
diff --git a/transport/socket/src/socket.c b/transport/socket/src/socket.c
new file mode 100644
index 00000000000..066da782250
--- /dev/null
+++ b/transport/socket/src/socket.c
@@ -0,0 +1,1370 @@
+/*
+ Copyright (c) 2008 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "socket.h"
+#include "name.h"
+#include "dict.h"
+#include "transport.h"
+#include "logging.h"
+#include "xlator.h"
+#include "byte-order.h"
+#include "common-utils.h"
+#include "compat-errno.h"
+
+#include <fcntl.h>
+#include <errno.h>
+
+
+#define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR)
+#define SA(ptr) ((struct sockaddr *)ptr)
+
+int socket_init (transport_t *this);
+
+/*
+ * return value:
+ * 0 = success (completed)
+ * -1 = error
+ * > 0 = incomplete
+ */
+
+int
+__socket_rwv (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count,
+ int write)
+{
+ socket_private_t *priv = NULL;
+ int sock = -1;
+ int ret = -1;
+ struct iovec *opvector = NULL;
+ int opcount = 0;
+ int moved = 0;
+
+ priv = this->private;
+ sock = priv->sock;
+
+ opvector = vector;
+ opcount = count;
+
+ while (opcount) {
+ if (write) {
+ ret = writev (sock, opvector, opcount);
+
+ if (ret == 0 || (ret == -1 && errno == EAGAIN)) {
+ /* done for now */
+ break;
+ }
+ } else {
+ ret = readv (sock, opvector, opcount);
+
+ if (ret == -1 && errno == EAGAIN) {
+ /* done for now */
+ break;
+ }
+ }
+
+ if (ret == 0) {
+ /* Mostly due to 'umount' in client */
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "EOF from peer %s", this->peerinfo.identifier);
+ opcount = -1;
+ errno = ENOTCONN;
+ break;
+ }
+
+ if (ret == -1) {
+ if (errno == EINTR)
+ continue;
+
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "%s failed (%s)", write ? "writev" : "readv",
+ strerror (errno));
+ opcount = -1;
+ break;
+ }
+
+ moved = 0;
+
+ while (moved < ret) {
+ if ((ret - moved) >= opvector[0].iov_len) {
+ moved += opvector[0].iov_len;
+ opvector++;
+ opcount--;
+ } else {
+ opvector[0].iov_len -= (ret - moved);
+ opvector[0].iov_base += (ret - moved);
+ moved += (ret - moved);
+ }
+ while (opcount && !opvector[0].iov_len) {
+ opvector++;
+ opcount--;
+ }
+ }
+ }
+
+ if (pending_vector)
+ *pending_vector = opvector;
+
+ if (pending_count)
+ *pending_count = opcount;
+
+ return opcount;
+}
+
+
+int
+__socket_readv (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count)
+{
+ int ret = -1;
+
+ ret = __socket_rwv (this, vector, count,
+ pending_vector, pending_count, 0);
+
+ return ret;
+}
+
+
+int
+__socket_writev (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count)
+{
+ int ret = -1;
+
+ ret = __socket_rwv (this, vector, count,
+ pending_vector, pending_count, 1);
+
+ return ret;
+}
+
+
+int
+__socket_disconnect (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ if (priv->sock != -1) {
+ ret = shutdown (priv->sock, SHUT_RDWR);
+ priv->connected = -1;
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "shutdown() returned %d. set connection state to -1",
+ ret);
+ }
+
+ return ret;
+}
+
+
+int
+__socket_server_bind (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ int opt = 1;
+
+ priv = this->private;
+
+ ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR,
+ &opt, sizeof (opt));
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "setsockopt() for SO_REUSEADDR failed (%s)",
+ strerror (errno));
+ }
+
+ ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "binding to %s failed: %s",
+ this->myinfo.identifier, strerror (errno));
+ if (errno == EADDRINUSE) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "Port is already in use");
+ }
+ }
+
+ return ret;
+}
+
+
+int
+__socket_nonblock (int fd)
+{
+ int flags = 0;
+ int ret = -1;
+
+ flags = fcntl (fd, F_GETFL);
+
+ if (flags != -1)
+ ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK);
+
+ return ret;
+}
+
+
+int
+__socket_connect_finish (int fd)
+{
+ int ret = -1;
+ int optval = 0;
+ socklen_t optlen = sizeof (int);
+
+ ret = getsockopt (fd, SOL_SOCKET, SO_ERROR, (void *)&optval, &optlen);
+
+ if (ret == 0 && optval) {
+ errno = optval;
+ ret = -1;
+ }
+
+ return ret;
+}
+
+
+void
+__socket_reset (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+
+ priv = this->private;
+
+ /* TODO: use mem-pool on incoming data */
+
+ if (priv->incoming.hdr_p)
+ free (priv->incoming.hdr_p);
+
+ if (priv->incoming.buf_p)
+ free (priv->incoming.buf_p);
+
+ memset (&priv->incoming, 0, sizeof (priv->incoming));
+
+ event_unregister (this->xl->ctx->event_pool, priv->sock, priv->idx);
+ close (priv->sock);
+ priv->sock = -1;
+ priv->idx = -1;
+ priv->connected = -1;
+}
+
+
+struct ioq *
+__socket_ioq_new (transport_t *this, char *buf, int len,
+ struct iovec *vector, int count, dict_t *refs)
+{
+ socket_private_t *priv = NULL;
+ struct ioq *entry = NULL;
+
+ priv = this->private;
+
+ /* TODO: use mem-pool */
+ entry = CALLOC (1, sizeof (*entry));
+
+ assert (count <= (MAX_IOVEC-2));
+
+ entry->header.colonO[0] = ':';
+ entry->header.colonO[1] = 'O';
+ entry->header.colonO[2] = '\0';
+ entry->header.version = 42;
+ entry->header.size1 = hton32 (len);
+ entry->header.size2 = hton32 (iov_length (vector, count));
+
+ entry->vector[0].iov_base = &entry->header;
+ entry->vector[0].iov_len = sizeof (entry->header);
+ entry->count++;
+
+ entry->vector[1].iov_base = buf;
+ entry->vector[1].iov_len = len;
+ entry->count++;
+
+ if (vector && count) {
+ memcpy (&entry->vector[2], vector, sizeof (*vector) * count);
+ entry->count += count;
+ }
+
+ entry->pending_vector = entry->vector;
+ entry->pending_count = entry->count;
+
+ if (refs)
+ entry->refs = dict_ref (refs);
+
+ entry->buf = buf;
+
+ INIT_LIST_HEAD (&entry->list);
+
+ return entry;
+}
+
+
+void
+__socket_ioq_entry_free (struct ioq *entry)
+{
+ list_del_init (&entry->list);
+ if (entry->refs)
+ dict_unref (entry->refs);
+
+ /* TODO: use mem-pool */
+ free (entry->buf);
+
+ /* TODO: use mem-pool */
+ free (entry);
+}
+
+
+void
+__socket_ioq_flush (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ struct ioq *entry = NULL;
+
+ priv = this->private;
+
+ while (!list_empty (&priv->ioq)) {
+ entry = priv->ioq_next;
+ __socket_ioq_entry_free (entry);
+ }
+
+ return;
+}
+
+
+int
+__socket_ioq_churn_entry (transport_t *this, struct ioq *entry)
+{
+ int ret = -1;
+
+ ret = __socket_writev (this, entry->pending_vector,
+ entry->pending_count,
+ &entry->pending_vector,
+ &entry->pending_count);
+
+ if (ret == 0) {
+ /* current entry was completely written */
+ assert (entry->pending_count == 0);
+ __socket_ioq_entry_free (entry);
+ }
+
+ return ret;
+}
+
+
+int
+__socket_ioq_churn (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ struct ioq *entry = NULL;
+
+ priv = this->private;
+
+ while (!list_empty (&priv->ioq)) {
+ /* pick next entry */
+ entry = priv->ioq_next;
+
+ ret = __socket_ioq_churn_entry (this, entry);
+
+ if (ret != 0)
+ break;
+ }
+
+ if (list_empty (&priv->ioq)) {
+ /* all pending writes done, not interested in POLLOUT */
+ priv->idx = event_select_on (this->xl->ctx->event_pool,
+ priv->sock, priv->idx, -1, 0);
+ }
+
+ return ret;
+}
+
+
+int
+socket_event_poll_err (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ __socket_ioq_flush (this);
+ __socket_reset (this);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ this->xl->notify (this->xl, GF_EVENT_POLLERR, this);
+
+ return ret;
+}
+
+
+int
+socket_event_poll_out (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected == 1) {
+ ret = __socket_ioq_churn (this);
+
+ if (ret == -1) {
+ __socket_disconnect (this);
+ }
+ }
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ this->xl->notify (this->xl, GF_EVENT_POLLOUT, this);
+
+ return ret;
+}
+
+
+int
+__socket_proto_validate_header (transport_t *this,
+ struct socket_header *header,
+ size_t *size1_p, size_t *size2_p)
+{
+ size_t size1 = 0;
+ size_t size2 = 0;
+
+ if (strcmp (header->colonO, ":O")) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket header signature does not match :O (%x.%x.%x)",
+ header->colonO[0], header->colonO[1],
+ header->colonO[2]);
+ return -1;
+ }
+
+ if (header->version != 42) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket header version does not match 42 != %d",
+ header->version);
+ return -1;
+ }
+
+ size1 = ntoh32 (header->size1);
+ size2 = ntoh32 (header->size2);
+
+ if (size1 <= 0 || size1 > 1048576) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket header has incorrect size1=%"GF_PRI_SIZET,
+ size1);
+ return -1;
+ }
+
+ if (size2 > (1048576 * 4)) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket header has incorrect size2=%"GF_PRI_SIZET,
+ size2);
+ return -1;
+ }
+
+ if (size1_p)
+ *size1_p = size1;
+
+ if (size2_p)
+ *size2_p = size2;
+
+ return 0;
+}
+
+
+
+/* socket protocol state machine */
+
+int
+__socket_proto_state_machine (transport_t *this)
+{
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ size_t size1 = 0;
+ size_t size2 = 0;
+ int previous_state = -1;
+ struct socket_header *hdr = NULL;
+
+
+ priv = this->private;
+
+ while (priv->incoming.state != SOCKET_PROTO_STATE_COMPLETE) {
+ /* debug check against infinite loops */
+ if (previous_state == priv->incoming.state) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "state did not change! (%d) breaking",
+ previous_state);
+ ret = -1;
+ goto unlock;
+ }
+ previous_state = priv->incoming.state;
+
+ switch (priv->incoming.state) {
+
+ case SOCKET_PROTO_STATE_NADA:
+ priv->incoming.pending_vector =
+ priv->incoming.vector;
+
+ priv->incoming.pending_vector->iov_base =
+ &priv->incoming.header;
+
+ priv->incoming.pending_vector->iov_len =
+ sizeof (struct socket_header);
+
+ priv->incoming.state =
+ SOCKET_PROTO_STATE_HEADER_COMING;
+ break;
+
+ case SOCKET_PROTO_STATE_HEADER_COMING:
+
+ ret = __socket_readv (this,
+ priv->incoming.pending_vector, 1,
+ &priv->incoming.pending_vector,
+ NULL);
+ if (ret == 0) {
+ priv->incoming.state =
+ SOCKET_PROTO_STATE_HEADER_CAME;
+ break;
+ }
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERRNO (errno),
+ "read (%s) in state %d (%s)",
+ strerror (errno),
+ SOCKET_PROTO_STATE_HEADER_COMING,
+ this->peerinfo.identifier);
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "partial header read on NB socket.");
+ goto unlock;
+ }
+ break;
+
+ case SOCKET_PROTO_STATE_HEADER_CAME:
+ hdr = &priv->incoming.header;
+ ret = __socket_proto_validate_header (this, hdr,
+ &size1, &size2);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket header validate failed (%s). "
+ "possible mismatch of transport-type "
+ "between server and client volumes, "
+ "or version mismatch",
+ this->peerinfo.identifier);
+ goto unlock;
+ }
+
+ priv->incoming.hdrlen = size1;
+ priv->incoming.buflen = size2;
+
+ /* TODO: use mem-pool */
+ priv->incoming.hdr_p = MALLOC (size1);
+ if (size2)
+ priv->incoming.buf_p = MALLOC (size2);
+
+ priv->incoming.vector[0].iov_base =
+ priv->incoming.hdr_p;
+
+ priv->incoming.vector[0].iov_len = size1;
+
+ priv->incoming.vector[1].iov_base =
+ priv->incoming.buf_p;
+
+ priv->incoming.vector[1].iov_len = size2;
+ priv->incoming.count = size2 ? 2 : 1;
+
+ priv->incoming.pending_vector =
+ priv->incoming.vector;
+
+ priv->incoming.pending_count =
+ priv->incoming.count;
+
+ priv->incoming.state =
+ SOCKET_PROTO_STATE_DATA_COMING;
+ break;
+
+ case SOCKET_PROTO_STATE_DATA_COMING:
+
+ ret = __socket_readv (this,
+ priv->incoming.pending_vector,
+ priv->incoming.pending_count,
+ &priv->incoming.pending_vector,
+ &priv->incoming.pending_count);
+ if (ret == 0) {
+ priv->incoming.state =
+ SOCKET_PROTO_STATE_DATA_CAME;
+ break;
+ }
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "read (%s) in state %d (%s)",
+ strerror (errno),
+ SOCKET_PROTO_STATE_DATA_COMING,
+ this->peerinfo.identifier);
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "partial data read on NB socket");
+ goto unlock;
+ }
+ break;
+
+ case SOCKET_PROTO_STATE_DATA_CAME:
+ memset (&priv->incoming.vector, 0,
+ sizeof (priv->incoming.vector));
+ priv->incoming.pending_vector = NULL;
+ priv->incoming.pending_count = 0;
+ priv->incoming.state = SOCKET_PROTO_STATE_COMPLETE;
+ break;
+
+ case SOCKET_PROTO_STATE_COMPLETE:
+ /* not reached */
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "undefined state reached: %d",
+ priv->incoming.state);
+ goto unlock;
+ }
+ }
+unlock:
+
+ return ret;
+}
+
+
+int
+socket_proto_state_machine (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = 0;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ ret = __socket_proto_state_machine (this);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+int
+socket_event_poll_in (transport_t *this)
+{
+ int ret = -1;
+
+ ret = socket_proto_state_machine (this);
+
+ /* call POLLIN on xlator even if complete block is not received,
+ just to keep the last_received timestamp ticking */
+
+ if (ret == 0)
+ ret = this->xl->notify (this->xl, GF_EVENT_POLLIN, this);
+
+ return ret;
+}
+
+
+int
+socket_connect_finish (transport_t *this)
+{
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ int event = -1;
+ char notify_xlator = 0;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected)
+ goto unlock;
+
+ ret = __socket_connect_finish (priv->sock);
+
+ if (ret == -1 && errno == EINPROGRESS)
+ ret = 1;
+
+ if (ret == -1 && errno != EINPROGRESS) {
+ if (!priv->connect_finish_log) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connection failed (%s)",
+ strerror (errno));
+ priv->connect_finish_log = 1;
+ }
+ __socket_disconnect (this);
+ notify_xlator = 1;
+ event = GF_EVENT_POLLERR;
+ goto unlock;
+ }
+
+ if (ret == 0) {
+ notify_xlator = 1;
+
+ this->myinfo.sockaddr_len =
+ sizeof (this->myinfo.sockaddr);
+
+ ret = getsockname (priv->sock,
+ SA (&this->myinfo.sockaddr),
+ &this->myinfo.sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "getsockname on (%d) failed (%s)",
+ priv->sock, strerror (errno));
+ __socket_disconnect (this);
+ event = GF_EVENT_POLLERR;
+ goto unlock;
+ }
+
+ priv->connected = 1;
+ priv->connect_finish_log = 0;
+ event = GF_EVENT_CHILD_UP;
+ get_transport_identifiers (this);
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ if (notify_xlator)
+ this->xl->notify (this->xl, event, this);
+
+ return 0;
+}
+
+
+int
+socket_event_handler (int fd, int idx, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ transport_t *this = NULL;
+ socket_private_t *priv = NULL;
+ int ret = 0;
+
+ this = data;
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ priv->idx = idx;
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ if (!priv->connected) {
+ ret = socket_connect_finish (this);
+ }
+
+ if (!ret && poll_out) {
+ ret = socket_event_poll_out (this);
+ }
+
+ if (!ret && poll_in) {
+ ret = socket_event_poll_in (this);
+ }
+
+ if (ret < 0 || poll_err) {
+ socket_event_poll_err (this);
+ transport_unref (this);
+ }
+
+ return 0;
+}
+
+
+int
+socket_server_event_handler (int fd, int idx, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ transport_t *this = NULL;
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ int new_sock = -1;
+ transport_t *new_trans = NULL;
+ struct sockaddr_storage new_sockaddr = {0, };
+ socklen_t addrlen = sizeof (new_sockaddr);
+ socket_private_t *new_priv = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+
+ this = data;
+ priv = this->private;
+ ctx = this->xl->ctx;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ priv->idx = idx;
+
+ if (poll_in) {
+ new_sock = accept (priv->sock, SA (&new_sockaddr),
+ &addrlen);
+
+ if (new_sock == -1)
+ goto unlock;
+
+ if (!priv->bio) {
+ ret = __socket_nonblock (new_sock);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ new_sock, strerror (errno));
+ close (new_sock);
+ goto unlock;
+ }
+ }
+
+ new_trans = CALLOC (1, sizeof (*new_trans));
+ new_trans->xl = this->xl;
+ new_trans->fini = this->fini;
+
+ memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr,
+ addrlen);
+ new_trans->peerinfo.sockaddr_len = addrlen;
+
+ new_trans->myinfo.sockaddr_len =
+ sizeof (new_trans->myinfo.sockaddr);
+
+ ret = getsockname (new_sock,
+ SA (&new_trans->myinfo.sockaddr),
+ &new_trans->myinfo.sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "getsockname on %d failed (%s)",
+ new_sock, strerror (errno));
+ close (new_sock);
+ goto unlock;
+ }
+
+ get_transport_identifiers (new_trans);
+ socket_init (new_trans);
+ new_trans->ops = this->ops;
+ new_trans->init = this->init;
+ new_trans->fini = this->fini;
+
+ new_priv = new_trans->private;
+
+ pthread_mutex_lock (&new_priv->lock);
+ {
+ new_priv->sock = new_sock;
+ new_priv->connected = 1;
+
+ transport_ref (new_trans);
+ new_priv->idx =
+ event_register (ctx->event_pool,
+ new_sock,
+ socket_event_handler,
+ new_trans, 1, 0);
+
+ if (new_priv->idx == -1)
+ ret = -1;
+ }
+ pthread_mutex_unlock (&new_priv->lock);
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+int
+socket_disconnect (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ ret = __socket_disconnect (this);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+int
+socket_connect (transport_t *this)
+{
+ int ret = -1;
+ int sock = -1;
+ socket_private_t *priv = NULL;
+ struct sockaddr_storage sockaddr = {0, };
+ socklen_t sockaddr_len = 0;
+ glusterfs_ctx_t *ctx = NULL;
+
+ priv = this->private;
+ ctx = this->xl->ctx;
+
+ if (!priv) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connect() called on uninitialized transport");
+ goto err;
+ }
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ sock = priv->sock;
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ if (sock != -1) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "connect () called on transport already connected");
+ goto err;
+ }
+
+ ret = socket_client_get_remote_sockaddr (this, SA (&sockaddr),
+ &sockaddr_len);
+ if (ret == -1) {
+ /* logged inside client_get_remote_sockaddr */
+ goto err;
+ }
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->sock != -1) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "connect() -- already connected");
+ goto unlock;
+ }
+
+ memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len);
+ this->peerinfo.sockaddr_len = sockaddr_len;
+
+ priv->sock = socket (SA (&sockaddr)->sa_family,
+ SOCK_STREAM, 0);
+
+ if (priv->sock == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket creation failed (%s)",
+ strerror (errno));
+ goto unlock;
+ }
+
+ if (!priv->bio) {
+ ret = __socket_nonblock (priv->sock);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ SA (&this->myinfo.sockaddr)->sa_family =
+ SA (&this->peerinfo.sockaddr)->sa_family;
+
+ ret = client_bind (this, SA (&this->myinfo.sockaddr),
+ &this->myinfo.sockaddr_len, priv->sock);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_WARNING,
+ "client bind failed: %s", strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ ret = connect (priv->sock, SA (&this->peerinfo.sockaddr),
+ this->peerinfo.sockaddr_len);
+
+ if (ret == -1 && errno != EINPROGRESS) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connection attempt failed (%s)",
+ strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ priv->connected = 0;
+
+ transport_ref (this);
+
+ priv->idx = event_register (ctx->event_pool, priv->sock,
+ socket_event_handler, this, 1, 1);
+ if (priv->idx == -1)
+ ret = -1;
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+err:
+ return ret;
+}
+
+
+int
+socket_listen (transport_t *this)
+{
+ socket_private_t * priv = NULL;
+ int ret = -1;
+ int sock = -1;
+ struct sockaddr_storage sockaddr;
+ socklen_t sockaddr_len;
+ peer_info_t *myinfo = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+
+ priv = this->private;
+ myinfo = &this->myinfo;
+ ctx = this->xl->ctx;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ sock = priv->sock;
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ if (sock != -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "alreading listening");
+ return ret;
+ }
+
+ ret = socket_server_get_local_sockaddr (this, SA (&sockaddr),
+ &sockaddr_len);
+
+ if (ret == -1) {
+ return ret;
+ }
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->sock != -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "already listening");
+ goto unlock;
+ }
+
+ memcpy (&myinfo->sockaddr, &sockaddr, sockaddr_len);
+ myinfo->sockaddr_len = sockaddr_len;
+
+ priv->sock = socket (SA (&sockaddr)->sa_family,
+ SOCK_STREAM, 0);
+
+ if (priv->sock == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket creation failed (%s)",
+ strerror (errno));
+ goto unlock;
+ }
+
+ if (!priv->bio) {
+ ret = __socket_nonblock (priv->sock);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ ret = __socket_server_bind (this);
+
+ if (ret == -1) {
+ /* logged inside __socket_server_bind() */
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ ret = listen (priv->sock, 10);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "could not set socket %d to listen mode (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ transport_ref (this);
+
+ priv->idx = event_register (ctx->event_pool, priv->sock,
+ socket_server_event_handler,
+ this, 1, 0);
+
+ if (priv->idx == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "could not register socket %d with events",
+ priv->sock);
+ ret = -1;
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+int
+socket_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p,
+ char **buf_p, size_t *buflen_p)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected != 1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket not connected to receive");
+ goto unlock;
+ }
+
+ if (!hdr_p || !hdrlen_p || !buf_p || !buflen_p) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "bad parameters %p %p %p %p",
+ hdr_p, hdrlen_p, buf_p, buflen_p);
+ goto unlock;
+ }
+
+ if (priv->incoming.state == SOCKET_PROTO_STATE_COMPLETE) {
+ *hdr_p = priv->incoming.hdr_p;
+ *hdrlen_p = priv->incoming.hdrlen;
+ *buf_p = priv->incoming.buf_p;
+ *buflen_p = priv->incoming.buflen;
+
+ memset (&priv->incoming, 0, sizeof (priv->incoming));
+ priv->incoming.state = SOCKET_PROTO_STATE_NADA;
+
+ ret = 0;
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+/* TODO: implement per transfer limit */
+int
+socket_submit (transport_t *this, char *buf, int len,
+ struct iovec *vector, int count,
+ dict_t *refs)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ char need_poll_out = 0;
+ char need_append = 1;
+ struct ioq *entry = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+
+ priv = this->private;
+ ctx = this->xl->ctx;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected != 1) {
+ if (!priv->submit_log && !priv->connect_finish_log) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "not connected (priv->connected = %d)",
+ priv->connected);
+ priv->submit_log = 1;
+ }
+ goto unlock;
+ }
+
+ priv->submit_log = 0;
+ entry = __socket_ioq_new (this, buf, len, vector, count, refs);
+
+ if (list_empty (&priv->ioq)) {
+ ret = __socket_ioq_churn_entry (this, entry);
+
+ if (ret == 0)
+ need_append = 0;
+
+ if (ret > 0)
+ need_poll_out = 1;
+ }
+
+ if (need_append) {
+ list_add_tail (&entry->list, &priv->ioq);
+ ret = 0;
+ }
+
+ if (need_poll_out) {
+ /* first entry to wait. continue writing on POLLOUT */
+ priv->idx = event_select_on (ctx->event_pool,
+ priv->sock,
+ priv->idx, -1, 1);
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+struct transport_ops tops = {
+ .listen = socket_listen,
+ .connect = socket_connect,
+ .disconnect = socket_disconnect,
+ .submit = socket_submit,
+ .receive = socket_receive
+};
+
+
+int
+socket_init (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+
+ if (this->private) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "double init attempted");
+ return -1;
+ }
+
+ priv = CALLOC (1, sizeof (*priv));
+ if (!priv) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "calloc (1, %"GF_PRI_SIZET") returned NULL",
+ sizeof (*priv));
+ return -1;
+ }
+
+ pthread_mutex_init (&priv->lock, NULL);
+
+ priv->sock = -1;
+ priv->idx = -1;
+ priv->connected = -1;
+
+ INIT_LIST_HEAD (&priv->ioq);
+
+ if (dict_get (this->xl->options, "non-blocking-io")) {
+ gf_boolean_t tmp_bool = 0;
+ char *nb_connect = data_to_str (dict_get (this->xl->options,
+ "non-blocking-io"));
+
+ if (gf_string2boolean (nb_connect, &tmp_bool) == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "'non-blocking-io' takes only boolean options,"
+ " not taking any action");
+ tmp_bool = 1;
+ }
+ priv->bio = 0;
+ if (!tmp_bool) {
+ priv->bio = 1;
+ gf_log (this->xl->name, GF_LOG_WARNING,
+ "disabling non-blocking IO");
+ }
+ }
+
+ this->private = priv;
+
+ return 0;
+}
+
+
+void
+fini (transport_t *this)
+{
+ socket_private_t *priv = this->private;
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "transport %p destroyed", this);
+
+ pthread_mutex_destroy (&priv->lock);
+ FREE (priv);
+}
+
+
+int32_t
+init (transport_t *this)
+{
+ int ret = -1;
+
+ ret = socket_init (this);
+
+ if (ret == -1)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR, "socket_init() failed");
+ }
+
+ return ret;
+}
+
+struct volume_options options[] = {
+ { .key = {"remote-port",
+ "transport.remote-port",
+ "transport.socket.remote-port"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.socket.listen-port", "listen-port"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.socket.bind-address", "bind-address" },
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.socket.connect-path", "connect-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.socket.bind-path", "bind-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.socket.listen-path", "listen-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = { "transport.address-family",
+ "address-family" },
+ .value = {"inet", "inet6", "inet/inet6", "inet6/inet",
+ "unix", "inet-sdp" },
+ .type = GF_OPTION_TYPE_STR
+ },
+
+ { .key = {NULL} }
+};
+
diff --git a/transport/socket/src/socket.h b/transport/socket/src/socket.h
new file mode 100644
index 00000000000..070e69d088b
--- /dev/null
+++ b/transport/socket/src/socket.h
@@ -0,0 +1,106 @@
+/*
+ Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _SOCKET_H
+#define _SOCKET_H
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "event.h"
+#include "transport.h"
+#include "logging.h"
+#include "dict.h"
+#include "mem-pool.h"
+
+#ifndef MAX_IOVEC
+#define MAX_IOVEC 16
+#endif /* MAX_IOVEC */
+
+#define GF_DEFAULT_SOCKET_LISTEN_PORT 6996
+
+typedef enum {
+ SOCKET_PROTO_STATE_NADA = 0,
+ SOCKET_PROTO_STATE_HEADER_COMING,
+ SOCKET_PROTO_STATE_HEADER_CAME,
+ SOCKET_PROTO_STATE_DATA_COMING,
+ SOCKET_PROTO_STATE_DATA_CAME,
+ SOCKET_PROTO_STATE_COMPLETE,
+} socket_proto_state_t;
+
+struct socket_header {
+ char colonO[3];
+ uint32_t size1;
+ uint32_t size2;
+ char version;
+} __attribute__((packed));
+
+
+struct ioq {
+ union {
+ struct list_head list;
+ struct {
+ struct ioq *next;
+ struct ioq *prev;
+ };
+ };
+ struct socket_header header;
+ struct iovec vector[MAX_IOVEC];
+ int count;
+ struct iovec *pending_vector;
+ int pending_count;
+ char *buf;
+ dict_t *refs;
+};
+
+
+typedef struct {
+ int32_t sock;
+ int32_t idx;
+ unsigned char connected; // -1 = not connected. 0 = in progress. 1 = connected
+ char bio;
+ char connect_finish_log;
+ char submit_log;
+ union {
+ struct list_head ioq;
+ struct {
+ struct ioq *ioq_next;
+ struct ioq *ioq_prev;
+ };
+ };
+ struct {
+ int state;
+ struct socket_header header;
+ char *hdr_p;
+ size_t hdrlen;
+ char *buf_p;
+ size_t buflen;
+ struct iovec vector[2];
+ int count;
+ struct iovec *pending_vector;
+ int pending_count;
+ } incoming;
+ pthread_mutex_t lock;
+} socket_private_t;
+
+
+#endif