diff options
author | Vijay Bellur <vijay@gluster.com> | 2010-06-28 02:49:46 +0000 |
---|---|---|
committer | Anand V. Avati <avati@dev.gluster.com> | 2010-06-28 22:20:45 -0700 |
commit | 0da2a6d08cc8fc2315742d010c8a8cab38ef75bb (patch) | |
tree | eed93284ad3826ac7401c9f9540b6afe9a68c2cc /rpc/rpc-transport | |
parent | 2c5c3cedc096c36ef9d004ffa0cdb5324d2915e6 (diff) |
Move rpc to top-level
Signed-off-by: Vijay Bellur <vijay@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875
Diffstat (limited to 'rpc/rpc-transport')
-rw-r--r-- | rpc/rpc-transport/Makefile.am | 1 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/Makefile.am | 1 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/Makefile.am | 15 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/name.c | 737 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/name.h | 44 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 2308 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 190 |
7 files changed, 3296 insertions, 0 deletions
diff --git a/rpc/rpc-transport/Makefile.am b/rpc/rpc-transport/Makefile.am new file mode 100644 index 000000000..7dd9f026c --- /dev/null +++ b/rpc/rpc-transport/Makefile.am @@ -0,0 +1 @@ +SUBDIRS = socket diff --git a/rpc/rpc-transport/socket/Makefile.am b/rpc/rpc-transport/socket/Makefile.am new file mode 100644 index 000000000..f963effea --- /dev/null +++ b/rpc/rpc-transport/socket/Makefile.am @@ -0,0 +1 @@ +SUBDIRS = src
\ No newline at end of file diff --git a/rpc/rpc-transport/socket/src/Makefile.am b/rpc/rpc-transport/socket/src/Makefile.am new file mode 100644 index 000000000..46a3e1237 --- /dev/null +++ b/rpc/rpc-transport/socket/src/Makefile.am @@ -0,0 +1,15 @@ +noinst_HEADERS = socket.h name.h + +rpctransport_LTLIBRARIES = socket.la +rpctransportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport + +socket_la_LDFLAGS = -module -avoidversion + +socket_la_SOURCES = socket.c name.c +socket_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ + -I$(top_srcdir)/libglusterfs/src -I$(top_srcdir)/rpc/rpc-lib/src/ \ + -I$(top_srcdir)/xlators/protocol/lib/src/ -shared -nostartfiles $(GF_CFLAGS) + +CLEANFILES = *~ diff --git a/rpc/rpc-transport/socket/src/name.c b/rpc/rpc-transport/socket/src/name.c new file mode 100644 index 000000000..763fa3dd0 --- /dev/null +++ b/rpc/rpc-transport/socket/src/name.c @@ -0,0 +1,737 @@ +/* + Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <errno.h> +#include <netdb.h> +#include <string.h> + +#ifdef CLIENT_PORT_CEILING +#undef CLIENT_PORT_CEILING +#endif + +#define CLIENT_PORT_CEILING 1024 + +#ifndef AF_INET_SDP +#define AF_INET_SDP 27 +#endif + +#include "rpc-transport.h" +#include "socket.h" + +int32_t +gf_resolve_ip6 (const char *hostname, + uint16_t port, + int family, + void **dnscache, + struct addrinfo **addr_info); + +static int32_t +af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr, + socklen_t sockaddr_len, int ceiling) +{ + int32_t ret = -1; + /* struct sockaddr_in sin = {0, }; */ + uint16_t port = ceiling - 1; + + while (port) + { + switch (sockaddr->sa_family) + { + case AF_INET6: + ((struct sockaddr_in6 *)sockaddr)->sin6_port = htons (port); + break; + + case AF_INET_SDP: + case AF_INET: + ((struct sockaddr_in *)sockaddr)->sin_port = htons (port); + break; + } + + ret = bind (fd, sockaddr, sockaddr_len); + + if (ret == 0) + break; + + if (ret == -1 && errno == EACCES) + break; + + port--; + } + + return ret; +} + +static int32_t +af_unix_client_bind (rpc_transport_t *this, + struct sockaddr *sockaddr, + socklen_t sockaddr_len, + int sock) +{ + data_t *path_data = NULL; + struct sockaddr_un *addr = NULL; + int32_t ret = 0; + + path_data = dict_get (this->options, "transport.socket.bind-path"); + if (path_data) { + char *path = data_to_str (path_data); + if (!path || strlen (path) > UNIX_PATH_MAX) { + gf_log (this->name, GF_LOG_TRACE, + "bind-path not specfied for unix socket, " + "letting connect to assign default value"); + goto err; + } + + addr = (struct sockaddr_un *) sockaddr; + strcpy (addr->sun_path, path); + ret = bind (sock, (struct sockaddr *)addr, sockaddr_len); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "cannot bind to unix-domain socket %d (%s)", + sock, strerror (errno)); + goto err; + } + } else { + gf_log (this->name, GF_LOG_TRACE, + "bind-path not specfied for unix socket, " + "letting connect to assign default value"); + } + +err: + return ret; +} + +int32_t +client_fill_address_family (rpc_transport_t *this, sa_family_t *sa_family) +{ + data_t *address_family_data = NULL; + int32_t ret = -1; + + if (sa_family == NULL) { + goto out; + } + + address_family_data = dict_get (this->options, + "transport.address-family"); + if (!address_family_data) { + data_t *remote_host_data = NULL, *connect_path_data = NULL; + remote_host_data = dict_get (this->options, "remote-host"); + connect_path_data = dict_get (this->options, + "transport.socket.connect-path"); + + if (!(remote_host_data || connect_path_data) || + (remote_host_data && connect_path_data)) { + gf_log (this->name, GF_LOG_ERROR, + "transport.address-family not specified and " + "not able to determine the " + "same from other options (remote-host:%s and " + "transport.unix.connect-path:%s)", + data_to_str (remote_host_data), + data_to_str (connect_path_data)); + goto out; + } + + if (remote_host_data) { + gf_log (this->name, GF_LOG_DEBUG, + "address-family not specified, guessing it " + "to be inet/inet6"); + *sa_family = AF_UNSPEC; + } else { + gf_log (this->name, GF_LOG_DEBUG, + "address-family not specified, guessing it " + "to be unix"); + *sa_family = AF_UNIX; + } + + } else { + char *address_family = data_to_str (address_family_data); + if (!strcasecmp (address_family, "unix")) { + *sa_family = AF_UNIX; + } else if (!strcasecmp (address_family, "inet")) { + *sa_family = AF_INET; + } else if (!strcasecmp (address_family, "inet6")) { + *sa_family = AF_INET6; + } else if (!strcasecmp (address_family, "inet-sdp")) { + *sa_family = AF_INET_SDP; + } else if (!strcasecmp (address_family, "inet/inet6") + || !strcasecmp (address_family, "inet6/inet")) { + *sa_family = AF_UNSPEC; + } else { + gf_log (this->name, GF_LOG_ERROR, + "unknown address-family (%s) specified", + address_family); + goto out; + } + } + + ret = 0; + +out: + return ret; +} + +static int32_t +af_inet_client_get_remote_sockaddr (rpc_transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len) +{ + dict_t *options = this->options; + data_t *remote_host_data = NULL; + data_t *remote_port_data = NULL; + char *remote_host = NULL; + uint16_t remote_port = 0; + struct addrinfo *addr_info = NULL; + int32_t ret = 0; + + remote_host_data = dict_get (options, "remote-host"); + if (remote_host_data == NULL) + { + gf_log (this->name, GF_LOG_ERROR, + "option remote-host missing in volume %s", this->name); + ret = -1; + goto err; + } + + remote_host = data_to_str (remote_host_data); + if (remote_host == NULL) + { + gf_log (this->name, GF_LOG_ERROR, + "option remote-host has data NULL in volume %s", this->name); + ret = -1; + goto err; + } + + remote_port_data = dict_get (options, "remote-port"); + if (remote_port_data == NULL) + { + gf_log (this->name, GF_LOG_TRACE, + "option remote-port missing in volume %s. Defaulting to %d", + this->name, GF_DEFAULT_SOCKET_LISTEN_PORT); + + remote_port = GF_DEFAULT_SOCKET_LISTEN_PORT; + } + else + { + remote_port = data_to_uint16 (remote_port_data); + } + + if (remote_port == (uint16_t)-1) + { + gf_log (this->name, GF_LOG_ERROR, + "option remote-port has invalid port in volume %s", + this->name); + ret = -1; + goto err; + } + + /* TODO: gf_resolve is a blocking call. kick in some + non blocking dns techniques */ + ret = gf_resolve_ip6 (remote_host, remote_port, + sockaddr->sa_family, &this->dnscache, &addr_info); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "DNS resolution failed on host %s", remote_host); + goto err; + } + + memcpy (sockaddr, addr_info->ai_addr, addr_info->ai_addrlen); + *sockaddr_len = addr_info->ai_addrlen; + +err: + return ret; +} + +static int32_t +af_unix_client_get_remote_sockaddr (rpc_transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len) +{ + struct sockaddr_un *sockaddr_un = NULL; + char *connect_path = NULL; + data_t *connect_path_data = NULL; + int32_t ret = 0; + + connect_path_data = dict_get (this->options, + "transport.socket.connect-path"); + if (!connect_path_data) { + gf_log (this->name, GF_LOG_ERROR, + "option transport.unix.connect-path not specified for " + "address-family unix"); + ret = -1; + goto err; + } + + connect_path = data_to_str (connect_path_data); + if (!connect_path) { + gf_log (this->name, GF_LOG_ERROR, + "transport.unix.connect-path is null-string"); + ret = -1; + goto err; + } + + if (strlen (connect_path) > UNIX_PATH_MAX) { + gf_log (this->name, GF_LOG_ERROR, + "connect-path value length %"GF_PRI_SIZET" > %d octets", + strlen (connect_path), UNIX_PATH_MAX); + ret = -1; + goto err; + } + + gf_log (this->name, GF_LOG_TRACE, + "using connect-path %s", connect_path); + sockaddr_un = (struct sockaddr_un *)sockaddr; + strcpy (sockaddr_un->sun_path, connect_path); + *sockaddr_len = sizeof (struct sockaddr_un); + +err: + return ret; +} + +static int32_t +af_unix_server_get_local_sockaddr (rpc_transport_t *this, + struct sockaddr *addr, + socklen_t *addr_len) +{ + data_t *listen_path_data = NULL; + char *listen_path = NULL; + int32_t ret = 0; + struct sockaddr_un *sunaddr = (struct sockaddr_un *)addr; + + + listen_path_data = dict_get (this->options, + "transport.socket.listen-path"); + if (!listen_path_data) { + gf_log (this->name, GF_LOG_ERROR, + "missing option transport.socket.listen-path"); + ret = -1; + goto err; + } + + listen_path = data_to_str (listen_path_data); + +#ifndef UNIX_PATH_MAX +#define UNIX_PATH_MAX 108 +#endif + + if (strlen (listen_path) > UNIX_PATH_MAX) { + gf_log (this->name, GF_LOG_ERROR, + "option transport.unix.listen-path has value length " + "%"GF_PRI_SIZET" > %d", + strlen (listen_path), UNIX_PATH_MAX); + ret = -1; + goto err; + } + + sunaddr->sun_family = AF_UNIX; + strcpy (sunaddr->sun_path, listen_path); + *addr_len = sizeof (struct sockaddr_un); + +err: + return ret; +} + +static int32_t +af_inet_server_get_local_sockaddr (rpc_transport_t *this, + struct sockaddr *addr, + socklen_t *addr_len) +{ + struct addrinfo hints, *res = 0; + data_t *listen_port_data = NULL, *listen_host_data = NULL; + uint16_t listen_port = -1; + char service[NI_MAXSERV], *listen_host = NULL; + dict_t *options = NULL; + int32_t ret = 0; + + options = this->options; + + listen_port_data = dict_get (options, "transport.socket.listen-port"); + listen_host_data = dict_get (options, "transport.socket.bind-address"); + + if (listen_port_data) + { + listen_port = data_to_uint16 (listen_port_data); + } + + if (listen_port == (uint16_t) -1) + listen_port = GF_DEFAULT_SOCKET_LISTEN_PORT; + + + if (listen_host_data) + { + listen_host = data_to_str (listen_host_data); + } else { + if (addr->sa_family == AF_INET6) { + struct sockaddr_in6 *in = (struct sockaddr_in6 *) addr; + in->sin6_addr = in6addr_any; + in->sin6_port = htons(listen_port); + *addr_len = sizeof(struct sockaddr_in6); + goto out; + } else if (addr->sa_family == AF_INET) { + struct sockaddr_in *in = (struct sockaddr_in *) addr; + in->sin_addr.s_addr = htonl(INADDR_ANY); + in->sin_port = htons(listen_port); + *addr_len = sizeof(struct sockaddr_in); + goto out; + } + } + + memset (service, 0, sizeof (service)); + sprintf (service, "%d", listen_port); + + memset (&hints, 0, sizeof (hints)); + hints.ai_family = addr->sa_family; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE; + + ret = getaddrinfo(listen_host, service, &hints, &res); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "getaddrinfo failed for host %s, service %s (%s)", + listen_host, service, gai_strerror (ret)); + ret = -1; + goto out; + } + + memcpy (addr, res->ai_addr, res->ai_addrlen); + *addr_len = res->ai_addrlen; + + freeaddrinfo (res); + +out: + return ret; +} + +int32_t +client_bind (rpc_transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len, + int sock) +{ + int ret = 0; + + *sockaddr_len = sizeof (struct sockaddr_in6); + switch (sockaddr->sa_family) + { + case AF_INET_SDP: + case AF_INET: + *sockaddr_len = sizeof (struct sockaddr_in); + + case AF_INET6: + ret = af_inet_bind_to_port_lt_ceiling (sock, sockaddr, + *sockaddr_len, CLIENT_PORT_CEILING); + if (ret == -1) { + gf_log (this->name, GF_LOG_WARNING, + "cannot bind inet socket (%d) to port less than %d (%s)", + sock, CLIENT_PORT_CEILING, strerror (errno)); + ret = 0; + } + break; + + case AF_UNIX: + *sockaddr_len = sizeof (struct sockaddr_un); + ret = af_unix_client_bind (this, (struct sockaddr *)sockaddr, + *sockaddr_len, sock); + break; + + default: + gf_log (this->name, GF_LOG_ERROR, + "unknown address family %d", sockaddr->sa_family); + ret = -1; + break; + } + + return ret; +} + +int32_t +socket_client_get_remote_sockaddr (rpc_transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len, + sa_family_t *sa_family) +{ + int32_t ret = 0; + + if ((sockaddr == NULL) || (sockaddr_len == NULL) + || (sa_family == NULL)) { + ret = -1; + goto err; + } + + + ret = client_fill_address_family (this, &sockaddr->sa_family); + if (ret) { + ret = -1; + goto err; + } + + *sa_family = sockaddr->sa_family; + + switch (sockaddr->sa_family) + { + case AF_INET_SDP: + sockaddr->sa_family = AF_INET; + + case AF_INET: + case AF_INET6: + case AF_UNSPEC: + ret = af_inet_client_get_remote_sockaddr (this, sockaddr, + sockaddr_len); + break; + + case AF_UNIX: + ret = af_unix_client_get_remote_sockaddr (this, sockaddr, + sockaddr_len); + break; + + default: + gf_log (this->name, GF_LOG_ERROR, + "unknown address-family %d", sockaddr->sa_family); + ret = -1; + } + + if (*sa_family == AF_UNSPEC) { + *sa_family = sockaddr->sa_family; + } + +err: + return ret; +} + + +int32_t +server_fill_address_family (rpc_transport_t *this, sa_family_t *sa_family) +{ + data_t *address_family_data = NULL; + int32_t ret = -1; + + if (sa_family == NULL) { + goto out; + } + + address_family_data = dict_get (this->options, + "transport.address-family"); + if (address_family_data) { + char *address_family = NULL; + address_family = data_to_str (address_family_data); + + if (!strcasecmp (address_family, "inet")) { + *sa_family = AF_INET; + } else if (!strcasecmp (address_family, "inet6")) { + *sa_family = AF_INET6; + } else if (!strcasecmp (address_family, "inet-sdp")) { + *sa_family = AF_INET_SDP; + } else if (!strcasecmp (address_family, "unix")) { + *sa_family = AF_UNIX; + } else if (!strcasecmp (address_family, "inet/inet6") + || !strcasecmp (address_family, "inet6/inet")) { + *sa_family = AF_UNSPEC; + } else { + gf_log (this->name, GF_LOG_ERROR, + "unknown address family (%s) specified", address_family); + goto out; + } + } else { + gf_log (this->name, GF_LOG_DEBUG, + "option address-family not specified, defaulting to inet/inet6"); + *sa_family = AF_UNSPEC; + } + + ret = 0; +out: + return ret; +} + + +int32_t +socket_server_get_local_sockaddr (rpc_transport_t *this, struct sockaddr *addr, + socklen_t *addr_len, sa_family_t *sa_family) +{ + int32_t ret = -1; + + if ((addr == NULL) || (addr_len == NULL) || (sa_family == NULL)) { + goto err; + } + + ret = server_fill_address_family (this, &addr->sa_family); + if (ret == -1) { + goto err; + } + + *sa_family = addr->sa_family; + + switch (addr->sa_family) + { + case AF_INET_SDP: + addr->sa_family = AF_INET; + + case AF_INET: + case AF_INET6: + case AF_UNSPEC: + ret = af_inet_server_get_local_sockaddr (this, addr, addr_len); + break; + + case AF_UNIX: + ret = af_unix_server_get_local_sockaddr (this, addr, addr_len); + break; + } + + if (*sa_family == AF_UNSPEC) { + *sa_family = addr->sa_family; + } + +err: + return ret; +} + +int32_t +fill_inet6_inet_identifiers (rpc_transport_t *this, struct sockaddr_storage *addr, + int32_t addr_len, char *identifier) +{ + int32_t ret = 0, tmpaddr_len = 0; + char service[NI_MAXSERV], host[NI_MAXHOST]; + struct sockaddr_storage tmpaddr; + + memset (&tmpaddr, 0, sizeof (tmpaddr)); + tmpaddr = *addr; + tmpaddr_len = addr_len; + + if (((struct sockaddr *) &tmpaddr)->sa_family == AF_INET6) { + int32_t one_to_four, four_to_eight, twelve_to_sixteen; + int16_t eight_to_ten, ten_to_twelve; + + one_to_four = four_to_eight = twelve_to_sixteen = 0; + eight_to_ten = ten_to_twelve = 0; + + one_to_four = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[0]; + four_to_eight = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[1]; +#ifdef GF_SOLARIS_HOST_OS + eight_to_ten = S6_ADDR16(((struct sockaddr_in6 *) &tmpaddr)->sin6_addr)[4]; +#else + eight_to_ten = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[4]; +#endif + +#ifdef GF_SOLARIS_HOST_OS + ten_to_twelve = S6_ADDR16(((struct sockaddr_in6 *) &tmpaddr)->sin6_addr)[5]; +#else + ten_to_twelve = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[5]; +#endif + + twelve_to_sixteen = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[3]; + + /* ipv4 mapped ipv6 address has + bits 0-80: 0 + bits 80-96: 0xffff + bits 96-128: ipv4 address + */ + + if (one_to_four == 0 && + four_to_eight == 0 && + eight_to_ten == 0 && + ten_to_twelve == -1) { + struct sockaddr_in *in_ptr = (struct sockaddr_in *)&tmpaddr; + memset (&tmpaddr, 0, sizeof (tmpaddr)); + + in_ptr->sin_family = AF_INET; + in_ptr->sin_port = ((struct sockaddr_in6 *)addr)->sin6_port; + in_ptr->sin_addr.s_addr = twelve_to_sixteen; + tmpaddr_len = sizeof (*in_ptr); + } + } + + ret = getnameinfo ((struct sockaddr *) &tmpaddr, + tmpaddr_len, + host, sizeof (host), + service, sizeof (service), + NI_NUMERICHOST | NI_NUMERICSERV); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "getnameinfo failed (%s)", gai_strerror (ret)); + } + + sprintf (identifier, "%s:%s", host, service); + + return ret; +} + +int32_t +get_transport_identifiers (rpc_transport_t *this) +{ + int32_t ret = 0; + char is_inet_sdp = 0; + + switch (((struct sockaddr *) &this->myinfo.sockaddr)->sa_family) + { + case AF_INET_SDP: + is_inet_sdp = 1; + ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET; + + case AF_INET: + case AF_INET6: + { + ret = fill_inet6_inet_identifiers (this, + &this->myinfo.sockaddr, + this->myinfo.sockaddr_len, + this->myinfo.identifier); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "cannot fill inet/inet6 identifier for server"); + goto err; + } + + ret = fill_inet6_inet_identifiers (this, + &this->peerinfo.sockaddr, + this->peerinfo.sockaddr_len, + this->peerinfo.identifier); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "cannot fill inet/inet6 identifier for client"); + goto err; + } + + if (is_inet_sdp) { + ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET_SDP; + } + } + break; + + case AF_UNIX: + { + struct sockaddr_un *sunaddr = NULL; + + sunaddr = (struct sockaddr_un *) &this->myinfo.sockaddr; + strcpy (this->myinfo.identifier, sunaddr->sun_path); + + sunaddr = (struct sockaddr_un *) &this->peerinfo.sockaddr; + strcpy (this->peerinfo.identifier, sunaddr->sun_path); + } + break; + + default: + gf_log (this->name, GF_LOG_ERROR, + "unknown address family (%d)", + ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family); + ret = -1; + break; + } + +err: + return ret; +} diff --git a/rpc/rpc-transport/socket/src/name.h b/rpc/rpc-transport/socket/src/name.h new file mode 100644 index 000000000..6a89d383b --- /dev/null +++ b/rpc/rpc-transport/socket/src/name.h @@ -0,0 +1,44 @@ +/* + Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + +#ifndef _SOCKET_NAME_H +#define _SOCKET_NAME_H + +#include "compat.h" + +int32_t +client_bind (rpc_transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len, + int sock); + +int32_t +socket_client_get_remote_sockaddr (rpc_transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len, + sa_family_t *sa_family); + +int32_t +socket_server_get_local_sockaddr (rpc_transport_t *this, struct sockaddr *addr, + socklen_t *addr_len, sa_family_t *sa_family); + +int32_t +get_transport_identifiers (rpc_transport_t *this); + +#endif /* _SOCKET_NAME_H */ diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c new file mode 100644 index 000000000..4ca7121e8 --- /dev/null +++ b/rpc/rpc-transport/socket/src/socket.c @@ -0,0 +1,2308 @@ +/* + Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "socket.h" +#include "name.h" +#include "dict.h" +#include "rpc-transport.h" +#include "logging.h" +#include "xlator.h" +#include "byte-order.h" +#include "common-utils.h" +#include "compat-errno.h" +#include "protocol-common.h" + +#include "glusterfs-xdr.h" +#include <fcntl.h> +#include <errno.h> +#include <netinet/tcp.h> + +#define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR) +#define SA(ptr) ((struct sockaddr *)ptr) + +#define __socket_proto_reset_pending(priv) do { \ + memset (&priv->incoming.frag.vector, 0, \ + sizeof (priv->incoming.frag.vector)); \ + priv->incoming.frag.pending_vector = \ + &priv->incoming.frag.vector; \ + priv->incoming.frag.pending_vector->iov_base = \ + priv->incoming.frag.fragcurrent; \ + priv->incoming.pending_vector = \ + priv->incoming.frag.pending_vector; \ + } while (0); + + +#define __socket_proto_update_pending(priv) \ + do { \ + uint32_t remaining_fragsize = 0; \ + if (priv->incoming.frag.pending_vector->iov_len == 0) { \ + remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \ + - priv->incoming.frag.bytes_read; \ + \ + priv->incoming.frag.pending_vector->iov_len = \ + remaining_fragsize > priv->incoming.frag.remaining_size \ + ? priv->incoming.frag.remaining_size : remaining_fragsize; \ + \ + priv->incoming.frag.remaining_size -= \ + priv->incoming.frag.pending_vector->iov_len; \ + } \ + } while (0); + +#define __socket_proto_update_priv_after_read(priv, ret, bytes_read) \ + { \ + priv->incoming.frag.fragcurrent += bytes_read; \ + priv->incoming.frag.bytes_read += bytes_read; \ + \ + if ((ret > 0) || (priv->incoming.frag.remaining_size != 0)) { \ + if (priv->incoming.frag.remaining_size != 0) { \ + __socket_proto_reset_pending (priv); \ + } \ + \ + gf_log (this->name, GF_LOG_TRACE, "partial read on non-blocking socket"); \ + \ + break; \ + } \ + } + +#define __socket_proto_init_pending(priv, size) \ + do { \ + uint32_t remaining_fragsize = 0; \ + remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \ + - priv->incoming.frag.bytes_read; \ + \ + __socket_proto_reset_pending (priv); \ + \ + priv->incoming.frag.pending_vector->iov_len = \ + remaining_fragsize > size ? size : remaining_fragsize; \ + \ + priv->incoming.frag.remaining_size = \ + size - priv->incoming.frag.pending_vector->iov_len; \ + \ +} while (0); + + +/* This will be used in a switch case and breaks from the switch case if all + * the pending data is not read. + */ +#define __socket_proto_read(priv, ret) \ + { \ + size_t bytes_read = 0; \ + \ + __socket_proto_update_pending (priv); \ + \ + ret = __socket_readv (this, \ + priv->incoming.pending_vector, 1, \ + &priv->incoming.pending_vector, \ + &priv->incoming.pending_count, \ + &bytes_read); \ + if (ret == -1) { \ + gf_log (this->name, GF_LOG_TRACE, \ + "reading from socket failed. Error (%s), " \ + "peer (%s)", strerror (errno), \ + this->peerinfo.identifier); \ + break; \ + } \ + __socket_proto_update_priv_after_read (priv, ret, bytes_read); \ + } + + +int socket_init (rpc_transport_t *this); + +/* + * return value: + * 0 = success (completed) + * -1 = error + * > 0 = incomplete + */ + +int +__socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, + struct iovec **pending_vector, int *pending_count, size_t *bytes, + int write) +{ + socket_private_t *priv = NULL; + int sock = -1; + int ret = -1; + struct iovec *opvector = NULL; + int opcount = 0; + int moved = 0; + + priv = this->private; + sock = priv->sock; + + opvector = vector; + opcount = count; + + if (bytes != NULL) { + *bytes = 0; + } + + while (opcount) { + if (write) { + ret = writev (sock, opvector, opcount); + + if (ret == 0 || (ret == -1 && errno == EAGAIN)) { + /* done for now */ + break; + } + } else { + ret = readv (sock, opvector, opcount); + if (ret == -1 && errno == EAGAIN) { + /* done for now */ + break; + } + } + + if (ret == 0) { + /* Mostly due to 'umount' in client */ + + gf_log (this->name, GF_LOG_TRACE, + "EOF from peer %s", this->peerinfo.identifier); + opcount = -1; + errno = ENOTCONN; + break; + } + if (ret == -1) { + if (errno == EINTR) + continue; + + gf_log (this->name, GF_LOG_TRACE, + "%s failed (%s)", write ? "writev" : "readv", + strerror (errno)); + opcount = -1; + break; + } + + if (bytes != NULL) { + *bytes += ret; + } + + moved = 0; + + while (moved < ret) { + if ((ret - moved) >= opvector[0].iov_len) { + moved += opvector[0].iov_len; + opvector++; + opcount--; + } else { + opvector[0].iov_len -= (ret - moved); + opvector[0].iov_base += (ret - moved); + moved += (ret - moved); + } + while (opcount && !opvector[0].iov_len) { + opvector++; + opcount--; + } + } + } + + if (pending_vector) + *pending_vector = opvector; + + if (pending_count) + *pending_count = opcount; + + return opcount; +} + + +int +__socket_readv (rpc_transport_t *this, struct iovec *vector, int count, + struct iovec **pending_vector, int *pending_count, + size_t *bytes) +{ + int ret = -1; + + ret = __socket_rwv (this, vector, count, + pending_vector, pending_count, bytes, 0); + + return ret; +} + + +int +__socket_writev (rpc_transport_t *this, struct iovec *vector, int count, + struct iovec **pending_vector, int *pending_count) +{ + int ret = -1; + + ret = __socket_rwv (this, vector, count, + pending_vector, pending_count, NULL, 1); + + return ret; +} + + +int +__socket_disconnect (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = -1; + + priv = this->private; + + if (priv->sock != -1) { + ret = shutdown (priv->sock, SHUT_RDWR); + priv->connected = -1; + gf_log (this->name, GF_LOG_TRACE, + "shutdown() returned %d. set connection state to -1", + ret); + } + + return ret; +} + + +int +__socket_server_bind (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = -1; + int opt = 1; + + priv = this->private; + + ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, + &opt, sizeof (opt)); + + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "setsockopt() for SO_REUSEADDR failed (%s)", + strerror (errno)); + } + + ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr, + this->myinfo.sockaddr_len); + + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "binding to %s failed: %s", + this->myinfo.identifier, strerror (errno)); + if (errno == EADDRINUSE) { + gf_log (this->name, GF_LOG_ERROR, + "Port is already in use"); + } + } + + return ret; +} + + +int +__socket_nonblock (int fd) +{ + int flags = 0; + int ret = -1; + + flags = fcntl (fd, F_GETFL); + + if (flags != -1) + ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK); + + return ret; +} + + +int +__socket_nodelay (int fd) +{ + int on = 1; + int ret = -1; + + ret = setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, + &on, sizeof (on)); + if (!ret) + gf_log ("", GF_LOG_TRACE, + "NODELAY enabled for socket %d", fd); + + return ret; +} + +int +__socket_connect_finish (int fd) +{ + int ret = -1; + int optval = 0; + socklen_t optlen = sizeof (int); + + ret = getsockopt (fd, SOL_SOCKET, SO_ERROR, (void *)&optval, &optlen); + + if (ret == 0 && optval) { + errno = optval; + ret = -1; + } + + return ret; +} + + +void +__socket_reset (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + + priv = this->private; + + /* TODO: use mem-pool on incoming data */ + + if (priv->incoming.iobuf) { + iobuf_unref (priv->incoming.iobuf); + } + + if (priv->incoming.vectoriob) { + iobuf_unref (priv->incoming.vectoriob); + } + + memset (&priv->incoming, 0, sizeof (priv->incoming)); + + event_unregister (this->ctx->event_pool, priv->sock, priv->idx); + + close (priv->sock); + priv->sock = -1; + priv->idx = -1; + priv->connected = -1; +} + + +struct ioq * +__socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg) +{ + socket_private_t *priv = NULL; + struct ioq *entry = NULL; + int count = 0; + + priv = this->private; + + /* TODO: use mem-pool */ + entry = GF_CALLOC (1, sizeof (*entry), gf_common_mt_ioq); + if (!entry) + return NULL; + + count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount; + + assert (count <= MAX_IOVEC); + + if (msg->rpchdr != NULL) { + memcpy (&entry->vector[0], msg->rpchdr, + sizeof (struct iovec) * msg->rpchdrcount); + entry->count += msg->rpchdrcount; + } + + if (msg->proghdr != NULL) { + memcpy (&entry->vector[entry->count], msg->proghdr, + sizeof (struct iovec) * msg->proghdrcount); + entry->count += msg->proghdrcount; + } + + if (msg->progpayload != NULL) { + memcpy (&entry->vector[entry->count], msg->progpayload, + sizeof (struct iovec) * msg->progpayloadcount); + entry->count += msg->progpayloadcount; + } + + entry->pending_vector = entry->vector; + entry->pending_count = entry->count; + + if (msg->iobref != NULL) + entry->iobref = iobref_ref (msg->iobref); + + INIT_LIST_HEAD (&entry->list); + + return entry; +} + + +void +__socket_ioq_entry_free (struct ioq *entry) +{ + list_del_init (&entry->list); + if (entry->iobref) + iobref_unref (entry->iobref); + + /* TODO: use mem-pool */ + GF_FREE (entry); +} + + +void +__socket_ioq_flush (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + struct ioq *entry = NULL; + + priv = this->private; + + while (!list_empty (&priv->ioq)) { + entry = priv->ioq_next; + __socket_ioq_entry_free (entry); + } + + return; +} + + +int +__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry) +{ + int ret = -1; + + ret = __socket_writev (this, entry->pending_vector, + entry->pending_count, + &entry->pending_vector, + &entry->pending_count); + + if (ret == 0) { + /* current entry was completely written */ + assert (entry->pending_count == 0); + __socket_ioq_entry_free (entry); + } + + return ret; +} + + +int +__socket_ioq_churn (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = 0; + struct ioq *entry = NULL; + + priv = this->private; + + while (!list_empty (&priv->ioq)) { + /* pick next entry */ + entry = priv->ioq_next; + + ret = __socket_ioq_churn_entry (this, entry); + + if (ret != 0) + break; + } + + if (list_empty (&priv->ioq)) { + /* all pending writes done, not interested in POLLOUT */ + priv->idx = event_select_on (this->ctx->event_pool, + priv->sock, priv->idx, -1, 0); + } + + return ret; +} + + +int +socket_event_poll_err (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = -1; + + priv = this->private; + + pthread_mutex_lock (&priv->lock); + { + __socket_ioq_flush (this); + __socket_reset (this); + } + pthread_mutex_unlock (&priv->lock); + + rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); + + return ret; +} + + +int +socket_event_poll_out (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = -1; + + priv = this->private; + + pthread_mutex_lock (&priv->lock); + { + if (priv->connected == 1) { + ret = __socket_ioq_churn (this); + + if (ret == -1) { + __socket_disconnect (this); + } + } + } + pthread_mutex_unlock (&priv->lock); + + ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL); + + return ret; +} + + +inline int +__socket_read_simple_msg (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = 0; + uint32_t remaining_size = 0; + size_t bytes_read = 0; + + priv = this->private; + + switch (priv->incoming.frag.simple_state) { + + case SP_STATE_SIMPLE_MSG_INIT: + remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) + - priv->incoming.frag.bytes_read; + + __socket_proto_init_pending (priv, remaining_size); + + priv->incoming.frag.simple_state = + SP_STATE_READING_SIMPLE_MSG; + + /* fall through */ + + case SP_STATE_READING_SIMPLE_MSG: + ret = 0; + + remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) + - priv->incoming.frag.bytes_read; + + if (remaining_size > 0) { + ret = __socket_readv (this, + priv->incoming.pending_vector, 1, + &priv->incoming.pending_vector, + &priv->incoming.pending_count, + &bytes_read); + } + + if (ret == -1) { + gf_log (this->name, GF_LOG_TRACE, + "reading from socket failed. Error (%s), " + "peer (%s)", strerror (errno), + this->peerinfo.identifier); + break; + } + + priv->incoming.frag.bytes_read += bytes_read; + priv->incoming.frag.fragcurrent += bytes_read; + + if (ret > 0) { + gf_log (this->name, GF_LOG_TRACE, + "partial read on non-blocking socket."); + break; + } + + if (ret == 0) { + priv->incoming.frag.simple_state + = SP_STATE_SIMPLE_MSG_INIT; + } + } + + return ret; +} + + +inline int +__socket_read_simple_request (rpc_transport_t *this) +{ + return __socket_read_simple_msg (this); +} + + +#define rpc_cred_addr(buf) (buf + RPC_MSGTYPE_SIZE + RPC_CALL_BODY_SIZE - 4) + +#define rpc_verf_addr(fragcurrent) (fragcurrent - 4) + + +inline int +__socket_read_vectored_request (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = 0; + uint32_t credlen = 0, verflen = 0; + char *addr = NULL; + struct iobuf *iobuf = NULL; + uint32_t remaining_size = 0; + uint32_t gluster_write_proc_len = 0; + + priv = this->private; + + switch (priv->incoming.frag.call_body.request.vector_state) { + case SP_STATE_VECTORED_REQUEST_INIT: + addr = rpc_cred_addr (iobuf_ptr (priv->incoming.iobuf)); + + /* also read verf flavour and verflen */ + credlen = ntoh32 (*((uint32_t *)addr)) + + RPC_AUTH_FLAVOUR_N_LENGTH_SIZE; + + __socket_proto_init_pending (priv, credlen); + + priv->incoming.frag.call_body.request.vector_state = + SP_STATE_READING_CREDBYTES; + + /* fall through */ + + case SP_STATE_READING_CREDBYTES: + __socket_proto_read (priv, ret); + + priv->incoming.frag.call_body.request.vector_state = + SP_STATE_READ_CREDBYTES; + + /* fall through */ + + case SP_STATE_READ_CREDBYTES: + addr = rpc_verf_addr (priv->incoming.frag.fragcurrent); + + /* FIXME: Also handle procedures other than glusterfs-write + * here + */ + /* also read proc-header */ + gluster_write_proc_len = sizeof (gfs3_write_req); + + verflen = ntoh32 (*((uint32_t *)addr)) + + gluster_write_proc_len; + + __socket_proto_init_pending (priv, verflen); + + priv->incoming.frag.call_body.request.vector_state + = SP_STATE_READING_VERFBYTES; + + /* fall through */ + + case SP_STATE_READING_VERFBYTES: + __socket_proto_read (priv, ret); + + priv->incoming.frag.call_body.request.vector_state = + SP_STATE_READ_VERFBYTES; + + /* fall through */ + + case SP_STATE_READ_VERFBYTES: + if (priv->incoming.vectoriob == NULL) { + iobuf = iobuf_get (this->ctx->iobuf_pool); + if (!iobuf) { + gf_log (this->name, GF_LOG_ERROR, + "unable to allocate IO buffer " + "for peer %s", + this->peerinfo.identifier); + ret = -1; + break; + } + + priv->incoming.vectoriob = iobuf; + priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf); + } + + priv->incoming.frag.call_body.request.vector_state = + SP_STATE_READING_PROG; + + /* fall through */ + + case SP_STATE_READING_PROG: + /* now read the remaining rpc msg into buffer pointed by + * fragcurrent + */ + + ret = __socket_read_simple_msg (this); + + remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) + - priv->incoming.frag.bytes_read; + + if ((ret == -1) + || ((ret == 0) + && (remaining_size == 0) + && RPC_LASTFRAG (priv->incoming.fraghdr))) { + priv->incoming.frag.call_body.request.vector_state + = SP_STATE_VECTORED_REQUEST_INIT; + priv->incoming.vectoriob_size + = (unsigned long)priv->incoming.frag.fragcurrent + - (unsigned long)iobuf_ptr (priv->incoming.vectoriob); + } + break; + } + + return ret; +} + + +#define rpc_msgtype_addr(buf) (buf + 4) + +#define rpc_prognum_addr(buf) (buf + RPC_MSGTYPE_SIZE + 4) + +#define rpc_procnum_addr(buf) (buf + RPC_MSGTYPE_SIZE + 12) + + +inline int +__socket_read_request (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + uint32_t prognum = 0, procnum = 0; + uint32_t remaining_size = 0; + int ret = -1; + char *buf = NULL; + + priv = this->private; + + switch (priv->incoming.frag.call_body.request.header_state) { + + case SP_STATE_REQUEST_HEADER_INIT: + + __socket_proto_init_pending (priv, RPC_CALL_BODY_SIZE); + + priv->incoming.frag.call_body.request.header_state + = SP_STATE_READING_RPCHDR1; + + /* fall through */ + + case SP_STATE_READING_RPCHDR1: + __socket_proto_read (priv, ret); + + priv->incoming.frag.call_body.request.header_state = + SP_STATE_READ_RPCHDR1; + + /* fall through */ + + case SP_STATE_READ_RPCHDR1: + buf = rpc_prognum_addr (iobuf_ptr (priv->incoming.iobuf)); + prognum = ntoh32 (*((uint32_t *)buf)); + + buf = rpc_procnum_addr (iobuf_ptr (priv->incoming.iobuf)); + procnum = ntoh32 (*((uint32_t *)buf)); + + if ((prognum == GLUSTER3_1_FOP_PROGRAM) + && (procnum == GF_FOP_WRITE)) { + ret = __socket_read_vectored_request (this); + } else { + ret = __socket_read_simple_request (this); + } + + remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) + - priv->incoming.frag.bytes_read; + + if ((ret == -1) + || ((ret == 0) + && (remaining_size == 0) + && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { + priv->incoming.frag.call_body.request.header_state = + SP_STATE_REQUEST_HEADER_INIT; + } + + break; + } + + return ret; +} + + +inline int +__socket_read_accepted_successful_reply (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = 0; + struct iobuf *iobuf = NULL; + uint32_t gluster_read_rsp_hdr_len = 0; + + priv = this->private; + + switch (priv->incoming.frag.call_body.reply.accepted_success_state) { + + case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT: + gluster_read_rsp_hdr_len = sizeof (gfs3_read_rsp); + + __socket_proto_init_pending (priv, gluster_read_rsp_hdr_len); + + priv->incoming.frag.call_body.reply.accepted_success_state + = SP_STATE_READING_PROC_HEADER; + + /* fall through */ + + case SP_STATE_READING_PROC_HEADER: + __socket_proto_read (priv, ret); + + priv->incoming.frag.call_body.reply.accepted_success_state + = SP_STATE_READ_PROC_HEADER; + + /* fall through */ + + case SP_STATE_READ_PROC_HEADER: + if (priv->incoming.vectoriob == NULL) { + iobuf = iobuf_get (this->ctx->iobuf_pool); + if (iobuf == NULL) { + ret = -1; + goto out; + } + + priv->incoming.vectoriob = iobuf; + } + + priv->incoming.frag.fragcurrent + = iobuf_ptr (priv->incoming.vectoriob); + + /* now read the entire remaining msg into new iobuf */ + ret = __socket_read_simple_msg (this); + if ((ret == -1) + || ((ret == 0) + && RPC_LASTFRAG (priv->incoming.fraghdr))) { + priv->incoming.frag.call_body.reply.accepted_success_state + = SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT; + } + + break; + } + +out: + return ret; +} + +#define rpc_reply_verflen_addr(fragcurrent) ((char *)fragcurrent - 4) +#define rpc_reply_accept_status_addr(fragcurrent) ((char *)fragcurrent - 4) + +inline int +__socket_read_accepted_reply (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = -1; + char *buf = NULL; + uint32_t verflen = 0, len = 0; + uint32_t remaining_size = 0; + + priv = this->private; + + switch (priv->incoming.frag.call_body.reply.accepted_state) { + + case SP_STATE_ACCEPTED_REPLY_INIT: + __socket_proto_init_pending (priv, + RPC_AUTH_FLAVOUR_N_LENGTH_SIZE); + + priv->incoming.frag.call_body.reply.accepted_state + = SP_STATE_READING_REPLY_VERFLEN; + + /* fall through */ + + case SP_STATE_READING_REPLY_VERFLEN: + __socket_proto_read (priv, ret); + + priv->incoming.frag.call_body.reply.accepted_state + = SP_STATE_READ_REPLY_VERFLEN; + + /* fall through */ + + case SP_STATE_READ_REPLY_VERFLEN: + buf = rpc_reply_verflen_addr (priv->incoming.frag.fragcurrent); + + verflen = ntoh32 (*((uint32_t *) buf)); + + /* also read accept status along with verf data */ + len = verflen + RPC_ACCEPT_STATUS_LEN; + + __socket_proto_init_pending (priv, len); + + priv->incoming.frag.call_body.reply.accepted_state + = SP_STATE_READING_REPLY_VERFBYTES; + + /* fall through */ + + case SP_STATE_READING_REPLY_VERFBYTES: + __socket_proto_read (priv, ret); + + priv->incoming.frag.call_body.reply.accepted_state + = SP_STATE_READ_REPLY_VERFBYTES; + + buf = rpc_reply_accept_status_addr (priv->incoming.frag.fragcurrent); + + priv->incoming.frag.call_body.reply.accept_status + = ntoh32 (*(uint32_t *) buf); + + /* fall through */ + + case SP_STATE_READ_REPLY_VERFBYTES: + + if (priv->incoming.frag.call_body.reply.accept_status + == SUCCESS) { + ret = __socket_read_accepted_successful_reply (this); + } else { + /* read entire remaining msg into buffer pointed to by + * fragcurrent + */ + ret = __socket_read_simple_msg (this); + } + + remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) + - priv->incoming.frag.bytes_read; + + if ((ret == -1) + || ((ret == 0) + && (remaining_size == 0) + && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { + priv->incoming.frag.call_body.reply.accepted_state + = SP_STATE_ACCEPTED_REPLY_INIT; + } + + break; + } + + return ret; +} + + +inline int +__socket_read_denied_reply (rpc_transport_t *this) +{ + return __socket_read_simple_msg (this); +} + + +#define rpc_reply_status_addr(fragcurrent) ((char *)fragcurrent - 4) + + +inline int +__socket_read_vectored_reply (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = 0; + char *buf = NULL; + uint32_t remaining_size = 0; + + priv = this->private; + + switch (priv->incoming.frag.call_body.reply.status_state) { + + case SP_STATE_ACCEPTED_REPLY_INIT: + __socket_proto_init_pending (priv, RPC_REPLY_STATUS_SIZE); + + priv->incoming.frag.call_body.reply.status_state + = SP_STATE_READING_REPLY_STATUS; + + /* fall through */ + + case SP_STATE_READING_REPLY_STATUS: + __socket_proto_read (priv, ret); + + buf = rpc_reply_status_addr (priv->incoming.frag.fragcurrent); + + priv->incoming.frag.call_body.reply.accept_status + = ntoh32 (*((uint32_t *) buf)); + + priv->incoming.frag.call_body.reply.status_state + = SP_STATE_READ_REPLY_STATUS; + + /* fall through */ + + case SP_STATE_READ_REPLY_STATUS: + if (priv->incoming.frag.call_body.reply.accept_status + == MSG_ACCEPTED) { + ret = __socket_read_accepted_reply (this); + } else { + ret = __socket_read_denied_reply (this); + } + + remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) + - priv->incoming.frag.bytes_read; + + if ((ret == -1) + || ((ret == 0) + && (remaining_size == 0) + && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { + priv->incoming.frag.call_body.reply.status_state + = SP_STATE_ACCEPTED_REPLY_INIT; + } + break; + } + + return ret; +} + + +inline int +__socket_read_simple_reply (rpc_transport_t *this) +{ + return __socket_read_simple_msg (this); +} + +#define rpc_xid_addr(buf) (buf) + +inline int +__socket_read_reply (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + char *buf = NULL; + int32_t ret = -1; + rpc_request_info_t *request_info = NULL; + + priv = this->private; + + buf = rpc_xid_addr (iobuf_ptr (priv->incoming.iobuf)); + + request_info = GF_CALLOC (1, sizeof (*request_info), gf_common_mt_rpc_trans_reqinfo_t); + if (request_info == NULL) { + gf_log (this->name, GF_LOG_ERROR, "out of memory"); + goto out; + } + + priv->incoming.request_info = request_info; + + request_info->xid = ntoh32 (*((uint32_t *) buf)); + + /* release priv->lock, so as to avoid deadlock b/w conn->lock and + * priv->lock, since we are doing an upcall here. + */ + pthread_mutex_unlock (&priv->lock); + { + ret = rpc_transport_notify (this, RPC_TRANSPORT_MAP_XID_REQUEST, + priv->incoming.request_info); + } + pthread_mutex_lock (&priv->lock); + + if (ret == -1) { + goto out; + } + + if ((request_info->prognum == GLUSTER3_1_FOP_PROGRAM) + && (request_info->procnum == GF_FOP_READ)) { + if (request_info->rsp.rspbuf != NULL) { + priv->incoming.vectoriob + = iobuf_ref (request_info->rsp.rspbuf); + } + + ret = __socket_read_vectored_reply (this); + } else { + ret = __socket_read_simple_reply (this); + } +out: + return ret; +} + + +/* returns the number of bytes yet to be read in a fragment */ +inline int +__socket_read_frag (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + int32_t ret = 0; + char *buf = NULL; + uint32_t remaining_size = 0; + + priv = this->private; + + switch (priv->incoming.frag.state) { + case SP_STATE_NADA: + __socket_proto_init_pending (priv, RPC_MSGTYPE_SIZE); + + priv->incoming.frag.state = SP_STATE_READING_MSGTYPE; + + /* fall through */ + + case SP_STATE_READING_MSGTYPE: + __socket_proto_read (priv, ret); + + priv->incoming.frag.state = SP_STATE_READ_MSGTYPE; + /* fall through */ + + case SP_STATE_READ_MSGTYPE: + buf = rpc_msgtype_addr (iobuf_ptr (priv->incoming.iobuf)); + priv->incoming.msg_type = ntoh32 (*((uint32_t *)buf)); + + if (priv->incoming.msg_type == CALL) { + ret = __socket_read_request (this); + } else if (priv->incoming.msg_type == REPLY) { + ret = __socket_read_reply (this); + } else { + gf_log ("rpc", GF_LOG_ERROR, + "wrong MSG-TYPE (%d) received", + priv->incoming.msg_type); + ret = -1; + } + + remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) + - priv->incoming.frag.bytes_read; + + if ((ret == -1) + || ((ret == 0) + && (remaining_size == 0) + && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { + priv->incoming.frag.state = SP_STATE_NADA; + } + + break; + } + + return ret; +} + + +inline +void __socket_reset_priv (socket_private_t *priv) +{ + if (priv->incoming.iobuf) { + iobuf_unref (priv->incoming.iobuf); + priv->incoming.iobuf = NULL; + } + + if (priv->incoming.vectoriob) { + iobuf_unref (priv->incoming.vectoriob); + priv->incoming.vectoriob = NULL; + } +} + + +int +__socket_proto_state_machine (rpc_transport_t *this, + rpc_transport_pollin_t **pollin) +{ + int ret = -1; + socket_private_t *priv = NULL; + struct iobuf *iobuf = NULL; + + priv = this->private; + while (priv->incoming.record_state != SP_STATE_COMPLETE) { + switch (priv->incoming.record_state) { + + case SP_STATE_NADA: + iobuf = iobuf_get (this->ctx->iobuf_pool); + if (!iobuf) { + gf_log (this->name, GF_LOG_ERROR, + "unable to allocate IO buffer " + "for peer %s", + this->peerinfo.identifier); + ret = -ENOMEM; + goto out; + } + + priv->incoming.iobuf = iobuf; + priv->incoming.iobuf_size = 0; + priv->incoming.vectoriob_size = 0; + + priv->incoming.pending_vector = priv->incoming.vector; + priv->incoming.pending_vector->iov_base = + &priv->incoming.fraghdr; + + priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf); + priv->incoming.pending_vector->iov_len = + sizeof (priv->incoming.fraghdr); + + priv->incoming.record_state = SP_STATE_READING_FRAGHDR; + + /* fall through */ + + case SP_STATE_READING_FRAGHDR: + ret = __socket_readv (this, + priv->incoming.pending_vector, 1, + &priv->incoming.pending_vector, + &priv->incoming.pending_count, + NULL); + if (ret == -1) { + gf_log (this->name, GF_LOG_TRACE, + "reading from socket failed. Error (%s), " + "peer (%s)", strerror (errno), + this->peerinfo.identifier); + goto out; + } + + if (ret > 0) { + gf_log (this->name, GF_LOG_TRACE, "partial " + "fragment header read"); + goto out; + } + + if (ret == 0) { + priv->incoming.record_state = + SP_STATE_READ_FRAGHDR; + } + /* fall through */ + + case SP_STATE_READ_FRAGHDR: + + priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr); + priv->incoming.record_state = SP_STATE_READING_FRAG; + priv->incoming.total_bytes_read + += RPC_FRAGSIZE(priv->incoming.fraghdr); + /* fall through */ + + case SP_STATE_READING_FRAG: + ret = __socket_read_frag (this); + + if ((ret == -1) + || (priv->incoming.frag.bytes_read != + RPC_FRAGSIZE (priv->incoming.fraghdr))) { + goto out; + } + + priv->incoming.frag.bytes_read = 0; + + if (!RPC_LASTFRAG (priv->incoming.fraghdr)) { + priv->incoming.record_state = + SP_STATE_READING_FRAGHDR; + break; + } + + /* we've read the entire rpc record, notify the + * upper layers. + */ + if (pollin != NULL) { + priv->incoming.iobuf_size + = priv->incoming.total_bytes_read + - priv->incoming.vectoriob_size; + + *pollin = rpc_transport_pollin_alloc (this, + priv->incoming.iobuf, + priv->incoming.iobuf_size, + priv->incoming.vectoriob, + priv->incoming.vectoriob_size, + priv->incoming.request_info); + if (*pollin == NULL) { + ret = -1; + goto out; + } + + priv->incoming.request_info = NULL; + } + priv->incoming.record_state = SP_STATE_COMPLETE; + break; + + case SP_STATE_COMPLETE: + /* control should not reach here */ + gf_log (this->name, GF_LOG_DEBUG, "control reached to " + "SP_STATE_COMPLETE, which should not have " + "happened"); + break; + } + } + + if (priv->incoming.record_state == SP_STATE_COMPLETE) { + priv->incoming.record_state = SP_STATE_NADA; + __socket_reset_priv (priv); + } + +out: + if ((ret == -1) && (errno == EAGAIN)) { + ret = 0; + } + return ret; +} + + +int +socket_proto_state_machine (rpc_transport_t *this, + rpc_transport_pollin_t **pollin) +{ + socket_private_t *priv = NULL; + int ret = 0; + + priv = this->private; + + pthread_mutex_lock (&priv->lock); + { + ret = __socket_proto_state_machine (this, pollin); + } + pthread_mutex_unlock (&priv->lock); + + return ret; +} + + +int +socket_event_poll_in (rpc_transport_t *this) +{ + int ret = -1; + rpc_transport_pollin_t *pollin = NULL; + + ret = socket_proto_state_machine (this, &pollin); + + if (pollin != NULL) { + ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, + pollin); + + rpc_transport_pollin_destroy (pollin); + } + + return ret; +} + + +int +socket_connect_finish (rpc_transport_t *this) +{ + int ret = -1; + socket_private_t *priv = NULL; + rpc_transport_event_t event = 0; + char notify_rpc = 0; + + priv = this->private; + + pthread_mutex_lock (&priv->lock); + { + if (priv->connected) + goto unlock; + + ret = __socket_connect_finish (priv->sock); + + if (ret == -1 && errno == EINPROGRESS) + ret = 1; + + if (ret == -1 && errno != EINPROGRESS) { + if (!priv->connect_finish_log) { + gf_log (this->name, GF_LOG_ERROR, + "connection to %s failed (%s)", + this->peerinfo.identifier, + strerror (errno)); + priv->connect_finish_log = 1; + } + __socket_disconnect (this); + notify_rpc = 1; + event = RPC_TRANSPORT_DISCONNECT; + goto unlock; + } + + if (ret == 0) { + notify_rpc = 1; + + this->myinfo.sockaddr_len = + sizeof (this->myinfo.sockaddr); + + ret = getsockname (priv->sock, + SA (&this->myinfo.sockaddr), + &this->myinfo.sockaddr_len); + if (ret == -1) { + gf_log (this->name, GF_LOG_DEBUG, + "getsockname on (%d) failed (%s)", + priv->sock, strerror (errno)); + __socket_disconnect (this); + event = GF_EVENT_POLLERR; + goto unlock; + } + + priv->connected = 1; + priv->connect_finish_log = 0; + event = RPC_TRANSPORT_CONNECT; + get_transport_identifiers (this); + } + } +unlock: + pthread_mutex_unlock (&priv->lock); + + if (notify_rpc) { + rpc_transport_notify (this, event, this); + } + + return 0; +} + + +/* reads rpc_requests during pollin */ +int +socket_event_handler (int fd, int idx, void *data, + int poll_in, int poll_out, int poll_err) +{ + rpc_transport_t *this = NULL; + socket_private_t *priv = NULL; + int ret = 0; + + this = data; + priv = this->private; + + pthread_mutex_lock (&priv->lock); + { + priv->idx = idx; + } + pthread_mutex_unlock (&priv->lock); + + if (!priv->connected) { + ret = socket_connect_finish (this); + } + + if (!ret && poll_out) { + ret = socket_event_poll_out (this); + } + + if (!ret && poll_in) { + ret = socket_event_poll_in (this); + } + + if ((ret < 0) || poll_err) { + gf_log ("transport", GF_LOG_TRACE, "disconnecting now"); + socket_event_poll_err (this); + rpc_transport_unref (this); + } + + return 0; +} + + +int +socket_server_event_handler (int fd, int idx, void *data, + int poll_in, int poll_out, int poll_err) +{ + rpc_transport_t *this = NULL; + socket_private_t *priv = NULL; + int ret = 0; + int new_sock = -1; + rpc_transport_t *new_trans = NULL; + struct sockaddr_storage new_sockaddr = {0, }; + socklen_t addrlen = sizeof (new_sockaddr); + socket_private_t *new_priv = NULL; + glusterfs_ctx_t *ctx = NULL; + + this = data; + priv = this->private; + ctx = this->ctx; + + pthread_mutex_lock (&priv->lock); + { + priv->idx = idx; + + if (poll_in) { + new_sock = accept (priv->sock, SA (&new_sockaddr), + &addrlen); + + if (new_sock == -1) + goto unlock; + + if (!priv->bio) { + ret = __socket_nonblock (new_sock); + + if (ret == -1) { + gf_log (this->name, GF_LOG_DEBUG, + "NBIO on %d failed (%s)", + new_sock, strerror (errno)); + + close (new_sock); + goto unlock; + } + } + + if (priv->nodelay) { + ret = __socket_nodelay (new_sock); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "setsockopt() failed for " + "NODELAY (%s)", + strerror (errno)); + } + } + + new_trans = GF_CALLOC (1, sizeof (*new_trans), + gf_common_mt_rpc_trans_t); + new_trans->fini = this->fini; + new_trans->name = gf_strdup (this->name); + + memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, + addrlen); + new_trans->peerinfo.sockaddr_len = addrlen; + + new_trans->myinfo.sockaddr_len = + sizeof (new_trans->myinfo.sockaddr); + + ret = getsockname (new_sock, + SA (&new_trans->myinfo.sockaddr), + &new_trans->myinfo.sockaddr_len); + if (ret == -1) { + gf_log (this->name, GF_LOG_DEBUG, + "getsockname on %d failed (%s)", + new_sock, strerror (errno)); + close (new_sock); + goto unlock; + } + + get_transport_identifiers (new_trans); + socket_init (new_trans); + new_trans->ops = this->ops; + new_trans->init = this->init; + new_trans->fini = this->fini; + new_trans->ctx = ctx; + new_trans->mydata = this->mydata; + new_trans->notify = this->notify; + new_priv = new_trans->private; + + pthread_mutex_lock (&new_priv->lock); + { + new_priv->sock = new_sock; + new_priv->connected = 1; + rpc_transport_ref (new_trans); + + new_priv->idx = + event_register (ctx->event_pool, + new_sock, + socket_event_handler, + new_trans, 1, 0); + + if (new_priv->idx == -1) + ret = -1; + } + pthread_mutex_unlock (&new_priv->lock); + ret = rpc_transport_notify (this, RPC_TRANSPORT_ACCEPT, new_trans); + } + } +unlock: + pthread_mutex_unlock (&priv->lock); + + return ret; +} + + +int +socket_disconnect (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = -1; + + priv = this->private; + + pthread_mutex_lock (&priv->lock); + { + ret = __socket_disconnect (this); + } + pthread_mutex_unlock (&priv->lock); + + return ret; +} + + +int +socket_connect (rpc_transport_t *this) +{ + int ret = -1; + int sock = -1; + socket_private_t *priv = NULL; + struct sockaddr_storage sockaddr = {0, }; + socklen_t sockaddr_len = 0; + glusterfs_ctx_t *ctx = NULL; + sa_family_t sa_family = {0, }; + + priv = this->private; + ctx = this->ctx; + + if (!priv) { + gf_log (this->name, GF_LOG_DEBUG, + "connect() called on uninitialized transport"); + goto err; + } + + pthread_mutex_lock (&priv->lock); + { + sock = priv->sock; + } + pthread_mutex_unlock (&priv->lock); + + if (sock != -1) { + gf_log (this->name, GF_LOG_TRACE, + "connect () called on transport already connected"); + ret = 0; + goto err; + } + + ret = socket_client_get_remote_sockaddr (this, SA (&sockaddr), + &sockaddr_len, &sa_family); + if (ret == -1) { + /* logged inside client_get_remote_sockaddr */ + goto err; + } + + pthread_mutex_lock (&priv->lock); + { + if (priv->sock != -1) { + gf_log (this->name, GF_LOG_TRACE, + "connect() -- already connected"); + goto unlock; + } + + memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len); + this->peerinfo.sockaddr_len = sockaddr_len; + + priv->sock = socket (sa_family, SOCK_STREAM, 0); + if (priv->sock == -1) { + gf_log (this->name, GF_LOG_ERROR, + "socket creation failed (%s)", + strerror (errno)); + goto unlock; + } + + /* Cant help if setting socket options fails. We can continue + * working nonetheless. + */ + if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF, + &priv->windowsize, + sizeof (priv->windowsize)) < 0) { + gf_log (this->name, GF_LOG_ERROR, + "setting receive window size failed: %d: %d: " + "%s", priv->sock, priv->windowsize, + strerror (errno)); + } + + if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF, + &priv->windowsize, + sizeof (priv->windowsize)) < 0) { + gf_log (this->name, GF_LOG_ERROR, + "setting send window size failed: %d: %d: " + "%s", priv->sock, priv->windowsize, + strerror (errno)); + } + + + if (priv->nodelay && priv->lowlat) { + ret = __socket_nodelay (priv->sock); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "setsockopt() failed for NODELAY (%s)", + strerror (errno)); + } + } + + if (!priv->bio) { + ret = __socket_nonblock (priv->sock); + + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "NBIO on %d failed (%s)", + priv->sock, strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + } + + SA (&this->myinfo.sockaddr)->sa_family = + SA (&this->peerinfo.sockaddr)->sa_family; + + ret = client_bind (this, SA (&this->myinfo.sockaddr), + &this->myinfo.sockaddr_len, priv->sock); + if (ret == -1) { + gf_log (this->name, GF_LOG_WARNING, + "client bind failed: %s", strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + + ret = connect (priv->sock, SA (&this->peerinfo.sockaddr), + this->peerinfo.sockaddr_len); + + if (ret == -1 && errno != EINPROGRESS) { + gf_log (this->name, GF_LOG_ERROR, + "connection attempt failed (%s)", + strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + + priv->connected = 0; + + rpc_transport_ref (this); + + priv->idx = event_register (ctx->event_pool, priv->sock, + socket_event_handler, this, 1, 1); + if (priv->idx == -1) + ret = -1; + } +unlock: + pthread_mutex_unlock (&priv->lock); + +err: + return ret; +} + + +int +socket_listen (rpc_transport_t *this) +{ + socket_private_t * priv = NULL; + int ret = -1; + int sock = -1; + struct sockaddr_storage sockaddr; + socklen_t sockaddr_len; + peer_info_t *myinfo = NULL; + glusterfs_ctx_t *ctx = NULL; + sa_family_t sa_family = {0, }; + + priv = this->private; + myinfo = &this->myinfo; + ctx = this->ctx; + + pthread_mutex_lock (&priv->lock); + { + sock = priv->sock; + } + pthread_mutex_unlock (&priv->lock); + + if (sock != -1) { + gf_log (this->name, GF_LOG_DEBUG, + "alreading listening"); + return ret; + } + + ret = socket_server_get_local_sockaddr (this, SA (&sockaddr), + &sockaddr_len, &sa_family); + if (ret == -1) { + return ret; + } + + pthread_mutex_lock (&priv->lock); + { + if (priv->sock != -1) { + gf_log (this->name, GF_LOG_DEBUG, + "already listening"); + goto unlock; + } + + memcpy (&myinfo->sockaddr, &sockaddr, sockaddr_len); + myinfo->sockaddr_len = sockaddr_len; + + priv->sock = socket (sa_family, SOCK_STREAM, 0); + + if (priv->sock == -1) { + gf_log (this->name, GF_LOG_ERROR, + "socket creation failed (%s)", + strerror (errno)); + goto unlock; + } + + /* Cant help if setting socket options fails. We can continue + * working nonetheless. + */ + if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF, + &priv->windowsize, + sizeof (priv->windowsize)) < 0) { + gf_log (this->name, GF_LOG_ERROR, + "setting receive window size failed: %d: %d: " + "%s", priv->sock, priv->windowsize, + strerror (errno)); + } + + if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF, + &priv->windowsize, + sizeof (priv->windowsize)) < 0) { + gf_log (this->name, GF_LOG_ERROR, + "setting send window size failed: %d: %d: " + "%s", priv->sock, priv->windowsize, + strerror (errno)); + } + + if (priv->nodelay) { + ret = __socket_nodelay (priv->sock); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "setsockopt() failed for NODELAY (%s)", + strerror (errno)); + } + } + + if (!priv->bio) { + ret = __socket_nonblock (priv->sock); + + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "NBIO on %d failed (%s)", + priv->sock, strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + } + + ret = __socket_server_bind (this); + + if (ret == -1) { + /* logged inside __socket_server_bind() */ + close (priv->sock); + priv->sock = -1; + goto unlock; + } + + ret = listen (priv->sock, 10); + + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "could not set socket %d to listen mode (%s)", + priv->sock, strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + + rpc_transport_ref (this); + + priv->idx = event_register (ctx->event_pool, priv->sock, + socket_server_event_handler, + this, 1, 0); + + if (priv->idx == -1) { + gf_log (this->name, GF_LOG_DEBUG, + "could not register socket %d with events", + priv->sock); + ret = -1; + close (priv->sock); + priv->sock = -1; + goto unlock; + } + } +unlock: + pthread_mutex_unlock (&priv->lock); + + return ret; +} + + +/* TODO: implement per transfer limit */ +#if 0 +int +socket_submit (rpc_transport_t *this, char *buf, int len, + struct iovec *vector, int count, + struct iobref *iobref) +{ + socket_private_t *priv = NULL; + int ret = -1; + char need_poll_out = 0; + char need_append = 1; + struct ioq *entry = NULL; + glusterfs_ctx_t *ctx = NULL; + + priv = this->private; + ctx = this->ctx; + + pthread_mutex_lock (&priv->lock); + { + if (priv->connected != 1) { + if (!priv->submit_log && !priv->connect_finish_log) { + gf_log (this->name, GF_LOG_DEBUG, + "not connected (priv->connected = %d)", + priv->connected); + priv->submit_log = 1; + } + goto unlock; + } + + priv->submit_log = 0; + entry = __socket_ioq_new (this, buf, len, vector, count, iobref); + if (!entry) + goto unlock; + + if (list_empty (&priv->ioq)) { + ret = __socket_ioq_churn_entry (this, entry); + + if (ret == 0) + need_append = 0; + + if (ret > 0) + need_poll_out = 1; + } + + if (need_append) { + list_add_tail (&entry->list, &priv->ioq); + ret = 0; + } + + if (need_poll_out) { + /* first entry to wait. continue writing on POLLOUT */ + priv->idx = event_select_on (ctx->event_pool, + priv->sock, + priv->idx, -1, 1); + } + } +unlock: + pthread_mutex_unlock (&priv->lock); + + return ret; +} +#endif + + +int32_t +socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) +{ + socket_private_t *priv = NULL; + int ret = -1; + char need_poll_out = 0; + char need_append = 1; + struct ioq *entry = NULL; + glusterfs_ctx_t *ctx = NULL; + + priv = this->private; + ctx = this->ctx; + + pthread_mutex_lock (&priv->lock); + { + if (priv->connected != 1) { + if (!priv->submit_log && !priv->connect_finish_log) { + gf_log (this->name, GF_LOG_DEBUG, + "not connected (priv->connected = %d)", + priv->connected); + priv->submit_log = 1; + } + goto unlock; + } + + priv->submit_log = 0; + entry = __socket_ioq_new (this, &req->msg); + if (!entry) + goto unlock; + + if (list_empty (&priv->ioq)) { + ret = __socket_ioq_churn_entry (this, entry); + + if (ret == 0) + need_append = 0; + + if (ret > 0) + need_poll_out = 1; + } + + if (need_append) { + list_add_tail (&entry->list, &priv->ioq); + ret = 0; + } + + if (need_poll_out) { + /* first entry to wait. continue writing on POLLOUT */ + priv->idx = event_select_on (ctx->event_pool, + priv->sock, + priv->idx, -1, 1); + } + } +unlock: + pthread_mutex_unlock (&priv->lock); + + return ret; +} + + +int32_t +socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) +{ + socket_private_t *priv = NULL; + int ret = -1; + char need_poll_out = 0; + char need_append = 1; + struct ioq *entry = NULL; + glusterfs_ctx_t *ctx = NULL; + + priv = this->private; + ctx = this->ctx; + + pthread_mutex_lock (&priv->lock); + { + if (priv->connected != 1) { + if (!priv->submit_log && !priv->connect_finish_log) { + gf_log (this->name, GF_LOG_DEBUG, + "not connected (priv->connected = %d)", + priv->connected); + priv->submit_log = 1; + } + goto unlock; + } + priv->submit_log = 0; + entry = __socket_ioq_new (this, &reply->msg); + if (!entry) + goto unlock; + if (list_empty (&priv->ioq)) { + ret = __socket_ioq_churn_entry (this, entry); + + if (ret == 0) + need_append = 0; + + if (ret > 0) + need_poll_out = 1; + } + + if (need_append) { + list_add_tail (&entry->list, &priv->ioq); + ret = 0; + } + + if (need_poll_out) { + /* first entry to wait. continue writing on POLLOUT */ + priv->idx = event_select_on (ctx->event_pool, + priv->sock, + priv->idx, -1, 1); + } + } + +unlock: + pthread_mutex_unlock (&priv->lock); + + return ret; +} + + +int32_t +socket_getpeername (rpc_transport_t *this, char *hostname, int hostlen) +{ + int32_t ret = -1; + + if ((this == NULL) || (hostname == NULL)) { + goto out; + } + + if (hostlen < (strlen (this->peerinfo.identifier) + 1)) { + goto out; + } + + strcpy (hostname, this->peerinfo.identifier); + ret = 0; +out: + return ret; +} + + +int32_t +socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, + struct sockaddr *sa, socklen_t salen) +{ + int32_t ret = -1; + + if ((this == NULL) || (sa == NULL)) { + goto out; + } + + *sa = *((struct sockaddr *)&this->peerinfo.sockaddr); + + if (peeraddr != NULL) { + ret = socket_getpeername (this, peeraddr, addrlen); + } + +out: + return ret; +} + + +int32_t +socket_getmyname (rpc_transport_t *this, char *hostname, int hostlen) +{ + int32_t ret = -1; + + if ((this == NULL) || (hostname == NULL)) { + goto out; + } + + if (hostlen < (strlen (this->myinfo.identifier) + 1)) { + goto out; + } + + strcpy (hostname, this->myinfo.identifier); + ret = 0; +out: + return ret; +} + + +int32_t +socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen, + struct sockaddr *sa, socklen_t salen) +{ + int32_t ret = -1; + + if ((this == NULL) || (sa == NULL)) { + goto out; + } + + *sa = *((struct sockaddr *)&this->myinfo.sockaddr); + + if (myaddr != NULL) { + ret = socket_getmyname (this, myaddr, addrlen); + } + +out: + return ret; +} + + +struct rpc_transport_ops tops = { + .listen = socket_listen, + .connect = socket_connect, + .disconnect = socket_disconnect, + .submit_request = socket_submit_request, + .submit_reply = socket_submit_reply, + .get_peername = socket_getpeername, + .get_peeraddr = socket_getpeeraddr, + .get_myname = socket_getmyname, + .get_myaddr = socket_getmyaddr +}; + + +int +socket_init (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + gf_boolean_t tmp_bool = 0; + uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; + char *optstr = NULL; + + if (this->private) { + gf_log (this->name, GF_LOG_DEBUG, + "double init attempted"); + return -1; + } + + priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t); + if (!priv) { + gf_log (this->name, GF_LOG_ERROR, + "calloc (1, %"GF_PRI_SIZET") returned NULL", + sizeof (*priv)); + return -1; + } + + pthread_mutex_init (&priv->lock, NULL); + + priv->sock = -1; + priv->idx = -1; + priv->connected = -1; + + INIT_LIST_HEAD (&priv->ioq); + + if (dict_get (this->options, "non-blocking-io")) { + optstr = data_to_str (dict_get (this->options, + "non-blocking-io")); + + if (gf_string2boolean (optstr, &tmp_bool) == -1) { + gf_log (this->name, GF_LOG_ERROR, + "'non-blocking-io' takes only boolean options," + " not taking any action"); + tmp_bool = 1; + } + priv->bio = 0; + if (!tmp_bool) { + priv->bio = 1; + gf_log (this->name, GF_LOG_WARNING, + "disabling non-blocking IO"); + } + } + + optstr = NULL; + + // By default, we enable NODELAY + priv->nodelay = 1; + if (dict_get (this->options, "transport.socket.nodelay")) { + optstr = data_to_str (dict_get (this->options, + "transport.socket.nodelay")); + + if (gf_string2boolean (optstr, &tmp_bool) == -1) { + gf_log (this->name, GF_LOG_ERROR, + "'transport.socket.nodelay' takes only " + "boolean options, not taking any action"); + tmp_bool = 1; + } + if (!tmp_bool) { + priv->nodelay = 0; + gf_log (this->name, GF_LOG_DEBUG, + "disabling nodelay"); + } + } + + + optstr = NULL; + if (dict_get_str (this->options, "transport.window-size", + &optstr) == 0) { + if (gf_string2bytesize (optstr, &windowsize) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "invalid number format: %s", optstr); + return -1; + } + } + + optstr = NULL; + + if (dict_get_str (this->options, "transport.socket.lowlat", + &optstr) == 0) { + priv->lowlat = 1; + } + + priv->windowsize = (int)windowsize; + this->private = priv; + + return 0; +} + + +void +fini (rpc_transport_t *this) +{ + socket_private_t *priv = this->private; + + gf_log (this->name, GF_LOG_TRACE, + "transport %p destroyed", this); + + pthread_mutex_destroy (&priv->lock); + + GF_FREE (this->name); + GF_FREE (priv); +} + + +int32_t +init (rpc_transport_t *this) +{ + int ret = -1; + + ret = socket_init (this); + + if (ret == -1) { + gf_log (this->name, GF_LOG_DEBUG, "socket_init() failed"); + } + + return ret; +} + +struct volume_options options[] = { + { .key = {"remote-port", + "transport.remote-port", + "transport.socket.remote-port"}, + .type = GF_OPTION_TYPE_INT + }, + { .key = {"transport.socket.listen-port", "listen-port"}, + .type = GF_OPTION_TYPE_INT + }, + { .key = {"transport.socket.bind-address", "bind-address" }, + .type = GF_OPTION_TYPE_INTERNET_ADDRESS + }, + { .key = {"transport.socket.connect-path", "connect-path"}, + .type = GF_OPTION_TYPE_ANY + }, + { .key = {"transport.socket.bind-path", "bind-path"}, + .type = GF_OPTION_TYPE_ANY + }, + { .key = {"transport.socket.listen-path", "listen-path"}, + .type = GF_OPTION_TYPE_ANY + }, + { .key = { "transport.address-family", + "address-family" }, + .value = {"inet", "inet6", "inet/inet6", "inet6/inet", + "unix", "inet-sdp" }, + .type = GF_OPTION_TYPE_STR + }, + + { .key = {"non-blocking-io"}, + .type = GF_OPTION_TYPE_BOOL + }, + { .key = {"transport.window-size"}, + .type = GF_OPTION_TYPE_SIZET, + .min = GF_MIN_SOCKET_WINDOW_SIZE, + .max = GF_MAX_SOCKET_WINDOW_SIZE, + }, + { .key = {"transport.socket.nodelay"}, + .type = GF_OPTION_TYPE_BOOL + }, + { .key = {"transport.socket.lowlat"}, + .type = GF_OPTION_TYPE_BOOL + }, + { .key = {NULL} } +}; diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h new file mode 100644 index 000000000..aa31ee2a7 --- /dev/null +++ b/rpc/rpc-transport/socket/src/socket.h @@ -0,0 +1,190 @@ +/* + Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + +#ifndef _SOCKET_H +#define _SOCKET_H + + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "event.h" +#include "rpc-transport.h" +#include "logging.h" +#include "dict.h" +#include "mem-pool.h" + +#ifndef MAX_IOVEC +#define MAX_IOVEC 16 +#endif /* MAX_IOVEC */ + +#define GF_DEFAULT_SOCKET_LISTEN_PORT 6969 + +/* This is the size set through setsockopt for + * both the TCP receive window size and the + * send buffer size. + * Till the time iobuf size becomes configurable, this size is set to include + * two iobufs + the GlusterFS protocol headers. + * Linux allows us to over-ride the max values for the system. + * Should we over-ride them? Because if we set a value larger than the default + * setsockopt will fail. Having larger values might be beneficial for + * IB links. + */ +#define GF_DEFAULT_SOCKET_WINDOW_SIZE (512 * GF_UNIT_KB) +#define GF_MAX_SOCKET_WINDOW_SIZE (1 * GF_UNIT_MB) +#define GF_MIN_SOCKET_WINDOW_SIZE (128 * GF_UNIT_KB) + +typedef enum { + SP_STATE_NADA = 0, + SP_STATE_COMPLETE, + SP_STATE_READING_FRAGHDR, + SP_STATE_READ_FRAGHDR, + SP_STATE_READING_FRAG, +} sp_rpcrecord_state_t; + +typedef enum { + SP_STATE_RPCFRAG_INIT, + SP_STATE_READING_MSGTYPE, + SP_STATE_READ_MSGTYPE, +} sp_rpcfrag_state_t; + +typedef enum { + SP_STATE_SIMPLE_MSG_INIT, + SP_STATE_READING_SIMPLE_MSG, +} sp_rpcfrag_simple_msg_state_t; + +typedef enum { + SP_STATE_VECTORED_REQUEST_INIT, + SP_STATE_READING_CREDBYTES, + SP_STATE_READ_CREDBYTES, /* read credential data. */ + SP_STATE_READING_VERFBYTES, + SP_STATE_READ_VERFBYTES, /* read verifier data */ + SP_STATE_READING_PROG, +} sp_rpcfrag_vectored_request_state_t; + +typedef enum { + SP_STATE_REQUEST_HEADER_INIT, + SP_STATE_READING_RPCHDR1, + SP_STATE_READ_RPCHDR1, /* read msg from beginning till and + * including credlen + */ +} sp_rpcfrag_request_header_state_t; + +struct ioq { + union { + struct list_head list; + struct { + struct ioq *next; + struct ioq *prev; + }; + }; + + struct iovec vector[MAX_IOVEC]; + int count; + struct iovec *pending_vector; + int pending_count; + struct iobref *iobref; +}; + +typedef struct { + sp_rpcfrag_request_header_state_t header_state; + sp_rpcfrag_vectored_request_state_t vector_state; +} sp_rpcfrag_request_state_t; + +typedef enum { + SP_STATE_VECTORED_REPLY_STATUS_INIT, + SP_STATE_READING_REPLY_STATUS, + SP_STATE_READ_REPLY_STATUS, +} sp_rpcfrag_vectored_reply_status_state_t; + +typedef enum { + SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT, + SP_STATE_READING_PROC_HEADER, + SP_STATE_READ_PROC_HEADER, +} sp_rpcfrag_vectored_reply_accepted_success_state_t; + +typedef enum { + SP_STATE_ACCEPTED_REPLY_INIT, + SP_STATE_READING_REPLY_VERFLEN, + SP_STATE_READ_REPLY_VERFLEN, + SP_STATE_READING_REPLY_VERFBYTES, + SP_STATE_READ_REPLY_VERFBYTES, +} sp_rpcfrag_vectored_reply_accepted_state_t; + +typedef struct { + uint32_t accept_status; + sp_rpcfrag_vectored_reply_status_state_t status_state; + sp_rpcfrag_vectored_reply_accepted_state_t accepted_state; + sp_rpcfrag_vectored_reply_accepted_success_state_t accepted_success_state; +} sp_rpcfrag_vectored_reply_state_t; + +typedef struct { + int32_t sock; + int32_t idx; + unsigned char connected; // -1 = not connected. 0 = in progress. 1 = connected + char bio; + char connect_finish_log; + char submit_log; + union { + struct list_head ioq; + struct { + struct ioq *ioq_next; + struct ioq *ioq_prev; + }; + }; + struct { + sp_rpcrecord_state_t record_state; + struct { + char *fragcurrent; + uint32_t bytes_read; + uint32_t remaining_size; + struct iovec vector; + struct iovec *pending_vector; + union { + sp_rpcfrag_request_state_t request; + sp_rpcfrag_vectored_reply_state_t reply; + } call_body; + + sp_rpcfrag_simple_msg_state_t simple_state; + sp_rpcfrag_state_t state; + } frag; + struct iobuf *iobuf; + size_t iobuf_size; + struct iovec vector[2]; + int count; + struct iobuf *vectoriob; + size_t vectoriob_size; + rpc_request_info_t *request_info; + struct iovec *pending_vector; + int pending_count; + uint32_t fraghdr; + char complete_record; + msg_type_t msg_type; + size_t total_bytes_read; + } incoming; + pthread_mutex_t lock; + int windowsize; + char lowlat; + char nodelay; +} socket_private_t; + + +#endif |