summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport/socket/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c3744
1 files changed, 3744 insertions, 0 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
new file mode 100644
index 000000000..93da3f296
--- /dev/null
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -0,0 +1,3744 @@
+/*
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "socket.h"
+#include "name.h"
+#include "dict.h"
+#include "rpc-transport.h"
+#include "logging.h"
+#include "xlator.h"
+#include "byte-order.h"
+#include "common-utils.h"
+#include "compat-errno.h"
+
+
+/* ugly #includes below */
+#include "protocol-common.h"
+#include "glusterfs3-xdr.h"
+#include "xdr-nfs3.h"
+#include "rpcsvc.h"
+
+#include <fcntl.h>
+#include <errno.h>
+#include <netinet/tcp.h>
+#include <rpc/xdr.h>
+#include <sys/ioctl.h>
+#define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR)
+#define SA(ptr) ((struct sockaddr *)ptr)
+
+#define SSL_ENABLED_OPT "transport.socket.ssl-enabled"
+#define SSL_OWN_CERT_OPT "transport.socket.ssl-own-cert"
+#define SSL_PRIVATE_KEY_OPT "transport.socket.ssl-private-key"
+#define SSL_CA_LIST_OPT "transport.socket.ssl-ca-list"
+#define OWN_THREAD_OPT "transport.socket.own-thread"
+
+/* TBD: do automake substitutions etc. (ick) to set these. */
+#if !defined(DEFAULT_CERT_PATH)
+#define DEFAULT_CERT_PATH "/etc/ssl/glusterfs.pem"
+#endif
+#if !defined(DEFAULT_KEY_PATH)
+#define DEFAULT_KEY_PATH "/etc/ssl/glusterfs.key"
+#endif
+#if !defined(DEFAULT_CA_PATH)
+#define DEFAULT_CA_PATH "/etc/ssl/glusterfs.ca"
+#endif
+
+#define POLL_MASK_INPUT (POLLIN | POLLPRI)
+#define POLL_MASK_OUTPUT (POLLOUT)
+#define POLL_MASK_ERROR (POLLERR | POLLHUP | POLLNVAL)
+
+typedef int SSL_unary_func (SSL *);
+typedef int SSL_trinary_func (SSL *, void *, int);
+
+#define __socket_proto_reset_pending(priv) do { \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ \
+ memset (&frag->vector, 0, sizeof (frag->vector)); \
+ frag->pending_vector = &frag->vector; \
+ frag->pending_vector->iov_base = frag->fragcurrent; \
+ priv->incoming.pending_vector = frag->pending_vector; \
+ } while (0)
+
+
+#define __socket_proto_update_pending(priv) \
+ do { \
+ uint32_t remaining; \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ if (frag->pending_vector->iov_len == 0) { \
+ remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr) \
+ - frag->bytes_read); \
+ \
+ frag->pending_vector->iov_len = \
+ (remaining > frag->remaining_size) \
+ ? frag->remaining_size : remaining; \
+ \
+ frag->remaining_size -= \
+ frag->pending_vector->iov_len; \
+ } \
+ } while (0)
+
+#define __socket_proto_update_priv_after_read(priv, ret, bytes_read) \
+ { \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ \
+ frag->fragcurrent += bytes_read; \
+ frag->bytes_read += bytes_read; \
+ \
+ if ((ret > 0) || (frag->remaining_size != 0)) { \
+ if (frag->remaining_size != 0 && ret == 0) { \
+ __socket_proto_reset_pending (priv); \
+ } \
+ \
+ gf_log (this->name, GF_LOG_TRACE, \
+ "partial read on non-blocking socket"); \
+ \
+ break; \
+ } \
+ }
+
+#define __socket_proto_init_pending(priv,size) \
+ do { \
+ uint32_t remaining = 0; \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ \
+ remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr) \
+ - frag->bytes_read); \
+ \
+ __socket_proto_reset_pending (priv); \
+ \
+ frag->pending_vector->iov_len = \
+ (remaining > size) ? size : remaining; \
+ \
+ frag->remaining_size = (size - frag->pending_vector->iov_len); \
+ \
+ } while(0)
+
+
+/* This will be used in a switch case and breaks from the switch case if all
+ * the pending data is not read.
+ */
+#define __socket_proto_read(priv, ret) \
+ { \
+ size_t bytes_read = 0; \
+ struct gf_sock_incoming *in; \
+ in = &priv->incoming; \
+ \
+ __socket_proto_update_pending (priv); \
+ \
+ ret = __socket_readv (this, \
+ in->pending_vector, 1, \
+ &in->pending_vector, \
+ &in->pending_count, \
+ &bytes_read); \
+ if (ret == -1) \
+ break; \
+ __socket_proto_update_priv_after_read (priv, ret, bytes_read); \
+ }
+
+static int socket_init (rpc_transport_t *this);
+
+static void
+ssl_dump_error_stack (const char *caller)
+{
+ unsigned long errnum = 0;
+ char errbuf[120] = {0,};
+
+ /* OpenSSL docs explicitly give 120 as the error-string length. */
+
+ while ((errnum = ERR_get_error())) {
+ ERR_error_string(errnum,errbuf);
+ gf_log(caller,GF_LOG_ERROR," %s",errbuf);
+ }
+}
+
+static int
+ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func)
+{
+ int r = (-1);
+ struct pollfd pfd = {-1,};
+ socket_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO(this->name,this->private,out);
+ priv = this->private;
+
+ for (;;) {
+ if (buf) {
+ if (priv->connected == -1) {
+ /*
+ * Fields in the SSL structure (especially
+ * the BIO pointers) are not valid at this
+ * point, so we'll segfault if we pass them
+ * to SSL_read/SSL_write.
+ */
+ gf_log(this->name,GF_LOG_INFO,
+ "lost connection in %s", __func__);
+ break;
+ }
+ r = func(priv->ssl_ssl,buf,len);
+ }
+ else {
+ /*
+ * We actually need these functions to get to
+ * priv->connected == 1.
+ */
+ r = ((SSL_unary_func *)func)(priv->ssl_ssl);
+ }
+ switch (SSL_get_error(priv->ssl_ssl,r)) {
+ case SSL_ERROR_NONE:
+ return r;
+ case SSL_ERROR_WANT_READ:
+ pfd.fd = priv->sock;
+ pfd.events = POLLIN;
+ if (poll(&pfd,1,-1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,"poll error %d",
+ errno);
+ }
+ break;
+ case SSL_ERROR_WANT_WRITE:
+ pfd.fd = priv->sock;
+ pfd.events = POLLOUT;
+ if (poll(&pfd,1,-1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,"poll error %d",
+ errno);
+ }
+ break;
+ case SSL_ERROR_SYSCALL:
+ /* This is what we get when remote disconnects. */
+ gf_log(this->name,GF_LOG_DEBUG,
+ "syscall error (probably remote disconnect)");
+ errno = ENODATA;
+ goto out;
+ default:
+ errno = EIO;
+ goto out; /* "break" would just loop again */
+ }
+ }
+out:
+ return -1;
+}
+
+#define ssl_connect_one(t) ssl_do((t),NULL,0,(SSL_trinary_func *)SSL_connect)
+#define ssl_accept_one(t) ssl_do((t),NULL,0,(SSL_trinary_func *)SSL_accept)
+#define ssl_read_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read)
+#define ssl_write_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write)
+
+static int
+ssl_setup_connection (rpc_transport_t *this, int server)
+{
+ X509 *peer = NULL;
+ char peer_CN[256] = "";
+ int ret = -1;
+ socket_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO(this->name,this->private,done);
+ priv = this->private;
+
+ priv->ssl_ssl = SSL_new(priv->ssl_ctx);
+ if (!priv->ssl_ssl) {
+ gf_log(this->name,GF_LOG_ERROR,"SSL_new failed");
+ ssl_dump_error_stack(this->name);
+ goto done;
+ }
+ priv->ssl_sbio = BIO_new_socket(priv->sock,BIO_NOCLOSE);
+ if (!priv->ssl_sbio) {
+ gf_log(this->name,GF_LOG_ERROR,"BIO_new_socket failed");
+ ssl_dump_error_stack(this->name);
+ goto free_ssl;
+ }
+ SSL_set_bio(priv->ssl_ssl,priv->ssl_sbio,priv->ssl_sbio);
+
+ if (server) {
+ ret = ssl_accept_one(this);
+ }
+ else {
+ ret = ssl_connect_one(this);
+ }
+
+ /* Make sure _the call_ succeeded. */
+ if (ret < 0) {
+ goto ssl_error;
+ }
+
+ /* Make sure _SSL verification_ succeeded, yielding an identity. */
+ if (SSL_get_verify_result(priv->ssl_ssl) != X509_V_OK) {
+ goto ssl_error;
+ }
+ peer = SSL_get_peer_certificate(priv->ssl_ssl);
+ if (!peer) {
+ goto ssl_error;
+ }
+
+ /* Finally, everything seems OK. */
+ X509_NAME_get_text_by_NID(X509_get_subject_name(peer),
+ NID_commonName, peer_CN, sizeof(peer_CN)-1);
+ peer_CN[sizeof(peer_CN)-1] = '\0';
+ gf_log(this->name,GF_LOG_INFO,"peer CN = %s", peer_CN);
+ return 0;
+
+ /* Error paths. */
+ssl_error:
+ gf_log(this->name,GF_LOG_ERROR,"SSL connect error");
+ ssl_dump_error_stack(this->name);
+free_ssl:
+ SSL_free(priv->ssl_ssl);
+ priv->ssl_ssl = NULL;
+done:
+ return ret;
+}
+
+
+static void
+ssl_teardown_connection (socket_private_t *priv)
+{
+ SSL_shutdown(priv->ssl_ssl);
+ SSL_clear(priv->ssl_ssl);
+ SSL_free(priv->ssl_ssl);
+ priv->ssl_ssl = NULL;
+}
+
+
+static ssize_t
+__socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount)
+{
+ socket_private_t *priv = NULL;
+ int sock = -1;
+ int ret = -1;
+
+ priv = this->private;
+ sock = priv->sock;
+
+ if (priv->use_ssl) {
+ ret = ssl_read_one (this, opvector->iov_base, opvector->iov_len);
+ } else {
+ ret = readv (sock, opvector, opcount);
+ }
+
+ return ret;
+}
+
+
+static ssize_t
+__socket_ssl_read (rpc_transport_t *this, void *buf, size_t count)
+{
+ struct iovec iov = {0, };
+ int ret = -1;
+
+ iov.iov_base = buf;
+ iov.iov_len = count;
+
+ ret = __socket_ssl_readv (this, &iov, 1);
+
+ return ret;
+}
+
+
+static int
+__socket_cached_read (rpc_transport_t *this, struct iovec *opvector, int opcount)
+{
+ socket_private_t *priv = NULL;
+ int sock = -1;
+ struct gf_sock_incoming *in = NULL;
+ int req_len = -1;
+ int ret = -1;
+
+ priv = this->private;
+ sock = priv->sock;
+ in = &priv->incoming;
+ req_len = iov_length (opvector, opcount);
+
+ if (in->record_state == SP_STATE_READING_FRAGHDR) {
+ in->ra_read = 0;
+ in->ra_served = 0;
+ in->ra_max = 0;
+ in->ra_buf = NULL;
+ goto uncached;
+ }
+
+ if (!in->ra_max) {
+ /* first call after passing SP_STATE_READING_FRAGHDR */
+ in->ra_max = min (RPC_FRAGSIZE (in->fraghdr), GF_SOCKET_RA_MAX);
+ /* Note that the in->iobuf is the primary iobuf into which
+ headers are read into. By using this itself as our
+ read-ahead cache, we can avoid memory copies in iov_load
+ */
+ in->ra_buf = iobuf_ptr (in->iobuf);
+ }
+
+ /* fill read-ahead */
+ if (in->ra_read < in->ra_max) {
+ ret = __socket_ssl_read (this, &in->ra_buf[in->ra_read],
+ (in->ra_max - in->ra_read));
+ if (ret > 0)
+ in->ra_read += ret;
+
+ /* we proceed to test if there is still cached data to
+ be served even if readahead could not progress */
+ }
+
+ /* serve cached */
+ if (in->ra_served < in->ra_read) {
+ ret = iov_load (opvector, opcount, &in->ra_buf[in->ra_served],
+ min (req_len, (in->ra_read - in->ra_served)));
+
+ in->ra_served += ret;
+ /* Do not read uncached and cached in the same call */
+ goto out;
+ }
+
+ if (in->ra_read < in->ra_max)
+ /* If there was no cached data to be served, (and we are
+ guaranteed to have already performed an attempt to progress
+ readahead above), and we have not yet read out the full
+ readahead capacity, then bail out for now without doing
+ the uncached read below (as that will overtake future cached
+ read)
+ */
+ goto out;
+uncached:
+ ret = __socket_ssl_readv (this, opvector, opcount);
+out:
+ return ret;
+}
+
+static gf_boolean_t
+__does_socket_rwv_error_need_logging (socket_private_t *priv, int write)
+{
+ int read = !write;
+
+ if (priv->connected == -1) /* Didn't even connect, of course it fails */
+ return _gf_false;
+
+ if (read && (priv->read_fail_log == _gf_false))
+ return _gf_false;
+
+ return _gf_true;
+}
+
+/*
+ * return value:
+ * 0 = success (completed)
+ * -1 = error
+ * > 0 = incomplete
+ */
+
+static int
+__socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count, size_t *bytes,
+ int write)
+{
+ socket_private_t *priv = NULL;
+ int sock = -1;
+ int ret = -1;
+ struct iovec *opvector = NULL;
+ int opcount = 0;
+ int moved = 0;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+ sock = priv->sock;
+
+ opvector = vector;
+ opcount = count;
+
+ if (bytes != NULL) {
+ *bytes = 0;
+ }
+
+ while (opcount > 0) {
+ if (opvector->iov_len == 0) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "would have passed zero length to read/write");
+ ++opvector;
+ --opcount;
+ continue;
+ }
+ if (write) {
+ if (priv->use_ssl) {
+ ret = ssl_write_one(this,
+ opvector->iov_base, opvector->iov_len);
+ }
+ else {
+ ret = writev (sock, opvector, opcount);
+ }
+
+ if (ret == 0 || (ret == -1 && errno == EAGAIN)) {
+ /* done for now */
+ break;
+ }
+ this->total_bytes_write += ret;
+ } else {
+ ret = __socket_cached_read (this, opvector, opcount);
+
+ if (ret == 0) {
+ gf_log(this->name,GF_LOG_DEBUG,"EOF on socket");
+ errno = ENODATA;
+ ret = -1;
+ }
+ if (ret == -1 && errno == EAGAIN) {
+ /* done for now */
+ break;
+ }
+ this->total_bytes_read += ret;
+ }
+
+ if (ret == 0) {
+ /* Mostly due to 'umount' in client */
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "EOF from peer %s", this->peerinfo.identifier);
+ opcount = -1;
+ errno = ENOTCONN;
+ break;
+ }
+ if (ret == -1) {
+ if (errno == EINTR)
+ continue;
+
+ if (__does_socket_rwv_error_need_logging (priv,
+ write)) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "%s on %s failed (%s)",
+ write ? "writev":"readv",
+ this->peerinfo.identifier,
+ strerror (errno));
+ }
+
+ if (priv->use_ssl) {
+ ssl_dump_error_stack(this->name);
+ }
+ opcount = -1;
+ break;
+ }
+
+ if (bytes != NULL) {
+ *bytes += ret;
+ }
+
+ moved = 0;
+
+ while (moved < ret) {
+ if (!opcount) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "ran out of iov, moved %d/%d",
+ moved, ret);
+ goto ran_out;
+ }
+ if (!opvector[0].iov_len) {
+ opvector++;
+ opcount--;
+ continue;
+ }
+ if ((ret - moved) >= opvector[0].iov_len) {
+ moved += opvector[0].iov_len;
+ opvector++;
+ opcount--;
+ } else {
+ opvector[0].iov_len -= (ret - moved);
+ opvector[0].iov_base += (ret - moved);
+ moved += (ret - moved);
+ }
+ }
+ }
+
+ran_out:
+
+ if (pending_vector)
+ *pending_vector = opvector;
+
+ if (pending_count)
+ *pending_count = opcount;
+
+out:
+ return opcount;
+}
+
+
+static int
+__socket_readv (rpc_transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count,
+ size_t *bytes)
+{
+ int ret = -1;
+
+ ret = __socket_rwv (this, vector, count,
+ pending_vector, pending_count, bytes, 0);
+
+ return ret;
+}
+
+
+static int
+__socket_writev (rpc_transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count)
+{
+ int ret = -1;
+
+ ret = __socket_rwv (this, vector, count,
+ pending_vector, pending_count, NULL, 1);
+
+ return ret;
+}
+
+
+static int
+__socket_shutdown (rpc_transport_t *this)
+{
+ int ret = -1;
+ socket_private_t *priv = this->private;
+
+ priv->connected = -1;
+ ret = shutdown (priv->sock, SHUT_RDWR);
+ if (ret) {
+ /* its already disconnected.. no need to understand
+ why it failed to shutdown in normal cases */
+ gf_log (this->name, GF_LOG_DEBUG,
+ "shutdown() returned %d. %s",
+ ret, strerror (errno));
+ }
+
+ return ret;
+}
+
+static int
+__socket_disconnect (rpc_transport_t *this)
+{
+ int ret = -1;
+ socket_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "disconnecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
+
+ if (priv->sock != -1) {
+ ret = __socket_shutdown(this);
+ if (priv->own_thread) {
+ /*
+ * Without this, reconnect (= disconnect + connect)
+ * won't work except by accident.
+ */
+ close(priv->sock);
+ priv->sock = -1;
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_PLEASE_DIE on %p", this);
+ priv->ot_state = OT_PLEASE_DIE;
+ }
+ else if (priv->use_ssl) {
+ ssl_teardown_connection(priv);
+ }
+ }
+
+out:
+ return ret;
+}
+
+
+static int
+__socket_server_bind (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ int opt = 1;
+ int reuse_check_sock = -1;
+ struct sockaddr_storage unix_addr = {0};
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR,
+ &opt, sizeof (opt));
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "setsockopt() for SO_REUSEADDR failed (%s)",
+ strerror (errno));
+ }
+
+ /* reuse-address doesn't work for unix type sockets */
+ if (AF_UNIX == SA (&this->myinfo.sockaddr)->sa_family) {
+ memcpy (&unix_addr, SA (&this->myinfo.sockaddr),
+ this->myinfo.sockaddr_len);
+ reuse_check_sock = socket (AF_UNIX, SOCK_STREAM, 0);
+ if (reuse_check_sock >= 0) {
+ ret = connect (reuse_check_sock, SA (&unix_addr),
+ this->myinfo.sockaddr_len);
+ if ((ret == -1) && (ECONNREFUSED == errno)) {
+ unlink (((struct sockaddr_un*)&unix_addr)->sun_path);
+ }
+ close (reuse_check_sock);
+ }
+ }
+
+ ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len);
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "binding to %s failed: %s",
+ this->myinfo.identifier, strerror (errno));
+ if (errno == EADDRINUSE) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Port is already in use");
+ }
+ }
+
+out:
+ return ret;
+}
+
+
+static int
+__socket_nonblock (int fd)
+{
+ int flags = 0;
+ int ret = -1;
+
+ flags = fcntl (fd, F_GETFL);
+
+ if (flags != -1)
+ ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK);
+
+ return ret;
+}
+
+static int
+__socket_nodelay (int fd)
+{
+ int on = 1;
+ int ret = -1;
+
+ ret = setsockopt (fd, IPPROTO_TCP, TCP_NODELAY,
+ &on, sizeof (on));
+ if (!ret)
+ gf_log (THIS->name, GF_LOG_TRACE,
+ "NODELAY enabled for socket %d", fd);
+
+ return ret;
+}
+
+
+static int
+__socket_keepalive (int fd, int family, int keepalive_intvl, int keepalive_idle)
+{
+ int on = 1;
+ int ret = -1;
+
+ ret = setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof (on));
+ if (ret == -1) {
+ gf_log ("socket", GF_LOG_WARNING,
+ "failed to set keep alive option on socket %d", fd);
+ goto err;
+ }
+
+ if (keepalive_intvl == GF_USE_DEFAULT_KEEPALIVE)
+ goto done;
+
+#if !defined(GF_LINUX_HOST_OS) && !defined(__NetBSD__)
+#ifdef GF_SOLARIS_HOST_OS
+ ret = setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive_intvl,
+ sizeof (keepalive_intvl));
+#else
+ ret = setsockopt (fd, IPPROTO_TCP, TCP_KEEPALIVE, &keepalive_intvl,
+ sizeof (keepalive_intvl));
+#endif
+ if (ret == -1) {
+ gf_log ("socket", GF_LOG_WARNING,
+ "failed to set keep alive interval on socket %d", fd);
+ goto err;
+ }
+#else
+ if (family != AF_INET)
+ goto done;
+
+ ret = setsockopt (fd, IPPROTO_TCP, TCP_KEEPIDLE, &keepalive_idle,
+ sizeof (keepalive_intvl));
+ if (ret == -1) {
+ gf_log ("socket", GF_LOG_WARNING,
+ "failed to set keep idle %d on socket %d, %s",
+ keepalive_idle, fd, strerror(errno));
+ goto err;
+ }
+ ret = setsockopt (fd, IPPROTO_TCP , TCP_KEEPINTVL, &keepalive_intvl,
+ sizeof (keepalive_intvl));
+ if (ret == -1) {
+ gf_log ("socket", GF_LOG_WARNING,
+ "failed to set keep interval %d on socket %d, %s",
+ keepalive_intvl, fd, strerror(errno));
+ goto err;
+ }
+#endif
+
+done:
+ gf_log (THIS->name, GF_LOG_TRACE, "Keep-alive enabled for socket %d, interval "
+ "%d, idle: %d", fd, keepalive_intvl, keepalive_idle);
+
+err:
+ return ret;
+}
+
+
+static int
+__socket_connect_finish (int fd)
+{
+ int ret = -1;
+ int optval = 0;
+ socklen_t optlen = sizeof (int);
+
+ ret = getsockopt (fd, SOL_SOCKET, SO_ERROR, (void *)&optval, &optlen);
+
+ if (ret == 0 && optval) {
+ errno = optval;
+ ret = -1;
+ }
+
+ return ret;
+}
+
+
+static void
+__socket_reset (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ /* TODO: use mem-pool on incoming data */
+
+ if (priv->incoming.iobref) {
+ iobref_unref (priv->incoming.iobref);
+ priv->incoming.iobref = NULL;
+ }
+
+ if (priv->incoming.iobuf) {
+ iobuf_unref (priv->incoming.iobuf);
+ }
+
+ GF_FREE (priv->incoming.request_info);
+
+ memset (&priv->incoming, 0, sizeof (priv->incoming));
+
+ event_unregister (this->ctx->event_pool, priv->sock, priv->idx);
+
+ close (priv->sock);
+ priv->sock = -1;
+ priv->idx = -1;
+ priv->connected = -1;
+
+out:
+ return;
+}
+
+
+static void
+socket_set_lastfrag (uint32_t *fragsize) {
+ (*fragsize) |= 0x80000000U;
+}
+
+
+static void
+socket_set_frag_header_size (uint32_t size, char *haddr)
+{
+ size = htonl (size);
+ memcpy (haddr, &size, sizeof (size));
+}
+
+
+static void
+socket_set_last_frag_header_size (uint32_t size, char *haddr)
+{
+ socket_set_lastfrag (&size);
+ socket_set_frag_header_size (size, haddr);
+}
+
+static struct ioq *
+__socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg)
+{
+ struct ioq *entry = NULL;
+ int count = 0;
+ uint32_t size = 0;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+
+ /* TODO: use mem-pool */
+ entry = GF_CALLOC (1, sizeof (*entry), gf_common_mt_ioq);
+ if (!entry)
+ return NULL;
+
+ count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount;
+
+ GF_ASSERT (count <= (MAX_IOVEC - 1));
+
+ size = iov_length (msg->rpchdr, msg->rpchdrcount)
+ + iov_length (msg->proghdr, msg->proghdrcount)
+ + iov_length (msg->progpayload, msg->progpayloadcount);
+
+ if (size > RPC_MAX_FRAGMENT_SIZE) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "msg size (%u) bigger than the maximum allowed size on "
+ "sockets (%u)", size, RPC_MAX_FRAGMENT_SIZE);
+ GF_FREE (entry);
+ return NULL;
+ }
+
+ socket_set_last_frag_header_size (size, (char *)&entry->fraghdr);
+
+ entry->vector[0].iov_base = (char *)&entry->fraghdr;
+ entry->vector[0].iov_len = sizeof (entry->fraghdr);
+ entry->count = 1;
+
+ if (msg->rpchdr != NULL) {
+ memcpy (&entry->vector[1], msg->rpchdr,
+ sizeof (struct iovec) * msg->rpchdrcount);
+ entry->count += msg->rpchdrcount;
+ }
+
+ if (msg->proghdr != NULL) {
+ memcpy (&entry->vector[entry->count], msg->proghdr,
+ sizeof (struct iovec) * msg->proghdrcount);
+ entry->count += msg->proghdrcount;
+ }
+
+ if (msg->progpayload != NULL) {
+ memcpy (&entry->vector[entry->count], msg->progpayload,
+ sizeof (struct iovec) * msg->progpayloadcount);
+ entry->count += msg->progpayloadcount;
+ }
+
+ entry->pending_vector = entry->vector;
+ entry->pending_count = entry->count;
+
+ if (msg->iobref != NULL)
+ entry->iobref = iobref_ref (msg->iobref);
+
+ INIT_LIST_HEAD (&entry->list);
+
+out:
+ return entry;
+}
+
+
+static void
+__socket_ioq_entry_free (struct ioq *entry)
+{
+ GF_VALIDATE_OR_GOTO ("socket", entry, out);
+
+ list_del_init (&entry->list);
+ if (entry->iobref)
+ iobref_unref (entry->iobref);
+
+ /* TODO: use mem-pool */
+ GF_FREE (entry);
+
+out:
+ return;
+}
+
+
+static void
+__socket_ioq_flush (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ struct ioq *entry = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ while (!list_empty (&priv->ioq)) {
+ entry = priv->ioq_next;
+ __socket_ioq_entry_free (entry);
+ }
+
+out:
+ return;
+}
+
+
+static int
+__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct)
+{
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ char a_byte = 0;
+
+ ret = __socket_writev (this, entry->pending_vector,
+ entry->pending_count,
+ &entry->pending_vector,
+ &entry->pending_count);
+
+ if (ret == 0) {
+ /* current entry was completely written */
+ GF_ASSERT (entry->pending_count == 0);
+ __socket_ioq_entry_free (entry);
+ priv = this->private;
+ if (priv->own_thread) {
+ /*
+ * The pipe should only remain readable if there are
+ * more entries after this, so drain the byte
+ * representing this entry.
+ */
+ if (!direct && read(priv->pipe[0],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "read error on pipe");
+ }
+ }
+ }
+
+ return ret;
+}
+
+
+static int
+__socket_ioq_churn (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ struct ioq *entry = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ while (!list_empty (&priv->ioq)) {
+ /* pick next entry */
+ entry = priv->ioq_next;
+
+ ret = __socket_ioq_churn_entry (this, entry, 0);
+
+ if (ret != 0)
+ break;
+ }
+
+ if (!priv->own_thread && list_empty (&priv->ioq)) {
+ /* all pending writes done, not interested in POLLOUT */
+ priv->idx = event_select_on (this->ctx->event_pool,
+ priv->sock, priv->idx, -1, 0);
+ }
+
+out:
+ return ret;
+}
+
+
+static int
+socket_event_poll_err (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ __socket_ioq_flush (this);
+ __socket_reset (this);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
+
+out:
+ return ret;
+}
+
+
+static int
+socket_event_poll_out (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected == 1) {
+ ret = __socket_ioq_churn (this);
+
+ if (ret == -1) {
+ __socket_disconnect (this);
+ }
+ }
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL);
+
+out:
+ return ret;
+}
+
+
+static inline int
+__socket_read_simple_msg (rpc_transport_t *this)
+{
+ int ret = 0;
+ uint32_t remaining_size = 0;
+ size_t bytes_read = 0;
+ socket_private_t *priv = NULL;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ switch (frag->simple_state) {
+
+ case SP_STATE_SIMPLE_MSG_INIT:
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
+
+ __socket_proto_init_pending (priv, remaining_size);
+
+ frag->simple_state = SP_STATE_READING_SIMPLE_MSG;
+
+ /* fall through */
+
+ case SP_STATE_READING_SIMPLE_MSG:
+ ret = 0;
+
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
+
+ if (remaining_size > 0) {
+ ret = __socket_readv (this,
+ in->pending_vector, 1,
+ &in->pending_vector,
+ &in->pending_count,
+ &bytes_read);
+ }
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "reading from socket failed. Error (%s), "
+ "peer (%s)", strerror (errno),
+ this->peerinfo.identifier);
+ break;
+ }
+
+ frag->bytes_read += bytes_read;
+ frag->fragcurrent += bytes_read;
+
+ if (ret > 0) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "partial read on non-blocking socket.");
+ break;
+ }
+
+ if (ret == 0) {
+ frag->simple_state = SP_STATE_SIMPLE_MSG_INIT;
+ }
+ }
+
+out:
+ return ret;
+}
+
+
+static inline int
+__socket_read_simple_request (rpc_transport_t *this)
+{
+ return __socket_read_simple_msg (this);
+}
+
+
+#define rpc_cred_addr(buf) (buf + RPC_MSGTYPE_SIZE + RPC_CALL_BODY_SIZE - 4)
+
+#define rpc_verf_addr(fragcurrent) (fragcurrent - 4)
+
+#define rpc_msgtype_addr(buf) (buf + 4)
+
+#define rpc_prognum_addr(buf) (buf + RPC_MSGTYPE_SIZE + 4)
+#define rpc_progver_addr(buf) (buf + RPC_MSGTYPE_SIZE + 8)
+#define rpc_procnum_addr(buf) (buf + RPC_MSGTYPE_SIZE + 12)
+
+static inline int
+__socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vector_sizer)
+{
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ uint32_t credlen = 0, verflen = 0;
+ char *addr = NULL;
+ struct iobuf *iobuf = NULL;
+ uint32_t remaining_size = 0;
+ ssize_t readsize = 0;
+ size_t size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+ sp_rpcfrag_request_state_t *request = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+ request = &frag->call_body.request;
+
+ switch (request->vector_state) {
+ case SP_STATE_VECTORED_REQUEST_INIT:
+ request->vector_sizer_state = 0;
+
+ addr = rpc_cred_addr (iobuf_ptr (in->iobuf));
+
+ /* also read verf flavour and verflen */
+ credlen = ntoh32 (*((uint32_t *)addr))
+ + RPC_AUTH_FLAVOUR_N_LENGTH_SIZE;
+
+ __socket_proto_init_pending (priv, credlen);
+
+ request->vector_state = SP_STATE_READING_CREDBYTES;
+
+ /* fall through */
+
+ case SP_STATE_READING_CREDBYTES:
+ __socket_proto_read (priv, ret);
+
+ request->vector_state = SP_STATE_READ_CREDBYTES;
+
+ /* fall through */
+
+ case SP_STATE_READ_CREDBYTES:
+ addr = rpc_verf_addr (frag->fragcurrent);
+ verflen = ntoh32 (*((uint32_t *)addr));
+
+ if (verflen == 0) {
+ request->vector_state = SP_STATE_READ_VERFBYTES;
+ goto sp_state_read_verfbytes;
+ }
+ __socket_proto_init_pending (priv, verflen);
+
+ request->vector_state = SP_STATE_READING_VERFBYTES;
+
+ /* fall through */
+
+ case SP_STATE_READING_VERFBYTES:
+ __socket_proto_read (priv, ret);
+
+ request->vector_state = SP_STATE_READ_VERFBYTES;
+
+ /* fall through */
+
+ case SP_STATE_READ_VERFBYTES:
+sp_state_read_verfbytes:
+ /* set the base_addr 'persistently' across multiple calls
+ into the state machine */
+ in->proghdr_base_addr = frag->fragcurrent;
+
+ request->vector_sizer_state =
+ vector_sizer (request->vector_sizer_state,
+ &readsize, in->proghdr_base_addr,
+ frag->fragcurrent);
+ __socket_proto_init_pending (priv, readsize);
+
+ request->vector_state = SP_STATE_READING_PROGHDR;
+
+ /* fall through */
+
+ case SP_STATE_READING_PROGHDR:
+ __socket_proto_read (priv, ret);
+
+ request->vector_state = SP_STATE_READ_PROGHDR;
+
+ /* fall through */
+
+ case SP_STATE_READ_PROGHDR:
+sp_state_read_proghdr:
+ request->vector_sizer_state =
+ vector_sizer (request->vector_sizer_state,
+ &readsize, in->proghdr_base_addr,
+ frag->fragcurrent);
+ if (readsize == 0) {
+ request->vector_state = SP_STATE_READ_PROGHDR_XDATA;
+ goto sp_state_read_proghdr_xdata;
+ }
+
+ __socket_proto_init_pending (priv, readsize);
+
+ request->vector_state = SP_STATE_READING_PROGHDR_XDATA;
+
+ /* fall through */
+
+ case SP_STATE_READING_PROGHDR_XDATA:
+ __socket_proto_read (priv, ret);
+
+ request->vector_state = SP_STATE_READ_PROGHDR;
+ /* check if the vector_sizer() has more to say */
+ goto sp_state_read_proghdr;
+
+ case SP_STATE_READ_PROGHDR_XDATA:
+sp_state_read_proghdr_xdata:
+ if (in->payload_vector.iov_base == NULL) {
+
+ size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool, size);
+ if (!iobuf) {
+ ret = -1;
+ break;
+ }
+
+ if (in->iobref == NULL) {
+ in->iobref = iobref_new ();
+ if (in->iobref == NULL) {
+ ret = -1;
+ iobuf_unref (iobuf);
+ break;
+ }
+ }
+
+ iobref_add (in->iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ in->payload_vector.iov_base = iobuf_ptr (iobuf);
+
+ frag->fragcurrent = iobuf_ptr (iobuf);
+ }
+
+ request->vector_state = SP_STATE_READING_PROG;
+
+ /* fall through */
+
+ case SP_STATE_READING_PROG:
+ /* now read the remaining rpc msg into buffer pointed by
+ * fragcurrent
+ */
+
+ ret = __socket_read_simple_msg (this);
+
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
+
+ if ((ret == -1) ||
+ ((ret == 0) && (remaining_size == 0)
+ && RPC_LASTFRAG (in->fraghdr))) {
+ request->vector_state = SP_STATE_VECTORED_REQUEST_INIT;
+ in->payload_vector.iov_len
+ = ((unsigned long)frag->fragcurrent
+ - (unsigned long)in->payload_vector.iov_base);
+ }
+ break;
+ }
+
+out:
+ return ret;
+}
+
+static inline int
+__socket_read_request (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ uint32_t prognum = 0, procnum = 0, progver = 0;
+ uint32_t remaining_size = 0;
+ int ret = -1;
+ char *buf = NULL;
+ rpcsvc_vector_sizer vector_sizer = NULL;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+ sp_rpcfrag_request_state_t *request = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+ request = &frag->call_body.request;
+
+ switch (request->header_state) {
+
+ case SP_STATE_REQUEST_HEADER_INIT:
+
+ __socket_proto_init_pending (priv, RPC_CALL_BODY_SIZE);
+
+ request->header_state = SP_STATE_READING_RPCHDR1;
+
+ /* fall through */
+
+ case SP_STATE_READING_RPCHDR1:
+ __socket_proto_read (priv, ret);
+
+ request->header_state = SP_STATE_READ_RPCHDR1;
+
+ /* fall through */
+
+ case SP_STATE_READ_RPCHDR1:
+ buf = rpc_prognum_addr (iobuf_ptr (in->iobuf));
+ prognum = ntoh32 (*((uint32_t *)buf));
+
+ buf = rpc_progver_addr (iobuf_ptr (in->iobuf));
+ progver = ntoh32 (*((uint32_t *)buf));
+
+ buf = rpc_procnum_addr (iobuf_ptr (in->iobuf));
+ procnum = ntoh32 (*((uint32_t *)buf));
+
+ if (priv->is_server) {
+ /* this check is needed as rpcsvc and rpc-clnt
+ * actor structures are not same */
+ vector_sizer =
+ rpcsvc_get_program_vector_sizer ((rpcsvc_t *)this->mydata,
+ prognum, progver, procnum);
+ }
+
+ if (vector_sizer) {
+ ret = __socket_read_vectored_request (this, vector_sizer);
+ } else {
+ ret = __socket_read_simple_request (this);
+ }
+
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
+
+ if ((ret == -1)
+ || ((ret == 0)
+ && (remaining_size == 0)
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ request->header_state = SP_STATE_REQUEST_HEADER_INIT;
+ }
+
+ break;
+ }
+
+out:
+ return ret;
+}
+
+
+static inline int
+__socket_read_accepted_successful_reply (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ struct iobuf *iobuf = NULL;
+ gfs3_read_rsp read_rsp = {0, };
+ ssize_t size = 0;
+ ssize_t default_read_size = 0;
+ char *proghdr_buf = NULL;
+ XDR xdr;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ switch (frag->call_body.reply.accepted_success_state) {
+
+ case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT:
+ default_read_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp,
+ &read_rsp);
+
+ proghdr_buf = frag->fragcurrent;
+
+ __socket_proto_init_pending (priv, default_read_size);
+
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_READING_PROC_HEADER;
+
+ /* fall through */
+
+ case SP_STATE_READING_PROC_HEADER:
+ __socket_proto_read (priv, ret);
+
+ /* there can be 'xdata' in read response, figure it out */
+ xdrmem_create (&xdr, proghdr_buf, default_read_size,
+ XDR_DECODE);
+
+ /* This will fail if there is xdata sent from server, if not,
+ well and good, we don't need to worry about */
+ xdr_gfs3_read_rsp (&xdr, &read_rsp);
+
+ free (read_rsp.xdata.xdata_val);
+
+ /* need to round off to proper roof (%4), as XDR packing pads
+ the end of opaque object with '0' */
+ size = roof (read_rsp.xdata.xdata_len, 4);
+
+ if (!size) {
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_OPAQUE;
+ goto read_proc_opaque;
+ }
+
+ __socket_proto_init_pending (priv, size);
+
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_READING_PROC_OPAQUE;
+
+ case SP_STATE_READING_PROC_OPAQUE:
+ __socket_proto_read (priv, ret);
+
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_OPAQUE;
+
+ case SP_STATE_READ_PROC_OPAQUE:
+ read_proc_opaque:
+ if (in->payload_vector.iov_base == NULL) {
+
+ size = (RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read);
+
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool, size);
+ if (iobuf == NULL) {
+ ret = -1;
+ goto out;
+ }
+
+ if (in->iobref == NULL) {
+ in->iobref = iobref_new ();
+ if (in->iobref == NULL) {
+ ret = -1;
+ iobuf_unref (iobuf);
+ goto out;
+ }
+ }
+
+ iobref_add (in->iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ in->payload_vector.iov_base = iobuf_ptr (iobuf);
+
+ in->payload_vector.iov_len = size;
+ }
+
+ frag->fragcurrent = in->payload_vector.iov_base;
+
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_HEADER;
+
+ /* fall through */
+
+ case SP_STATE_READ_PROC_HEADER:
+ /* now read the entire remaining msg into new iobuf */
+ ret = __socket_read_simple_msg (this);
+ if ((ret == -1)
+ || ((ret == 0) && RPC_LASTFRAG (in->fraghdr))) {
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT;
+ }
+
+ break;
+ }
+
+out:
+ return ret;
+}
+
+#define rpc_reply_verflen_addr(fragcurrent) ((char *)fragcurrent - 4)
+#define rpc_reply_accept_status_addr(fragcurrent) ((char *)fragcurrent - 4)
+
+static inline int
+__socket_read_accepted_reply (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ char *buf = NULL;
+ uint32_t verflen = 0, len = 0;
+ uint32_t remaining_size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ switch (frag->call_body.reply.accepted_state) {
+
+ case SP_STATE_ACCEPTED_REPLY_INIT:
+ __socket_proto_init_pending (priv,
+ RPC_AUTH_FLAVOUR_N_LENGTH_SIZE);
+
+ frag->call_body.reply.accepted_state
+ = SP_STATE_READING_REPLY_VERFLEN;
+
+ /* fall through */
+
+ case SP_STATE_READING_REPLY_VERFLEN:
+ __socket_proto_read (priv, ret);
+
+ frag->call_body.reply.accepted_state
+ = SP_STATE_READ_REPLY_VERFLEN;
+
+ /* fall through */
+
+ case SP_STATE_READ_REPLY_VERFLEN:
+ buf = rpc_reply_verflen_addr (frag->fragcurrent);
+
+ verflen = ntoh32 (*((uint32_t *) buf));
+
+ /* also read accept status along with verf data */
+ len = verflen + RPC_ACCEPT_STATUS_LEN;
+
+ __socket_proto_init_pending (priv, len);
+
+ frag->call_body.reply.accepted_state
+ = SP_STATE_READING_REPLY_VERFBYTES;
+
+ /* fall through */
+
+ case SP_STATE_READING_REPLY_VERFBYTES:
+ __socket_proto_read (priv, ret);
+
+ frag->call_body.reply.accepted_state
+ = SP_STATE_READ_REPLY_VERFBYTES;
+
+ buf = rpc_reply_accept_status_addr (frag->fragcurrent);
+
+ frag->call_body.reply.accept_status
+ = ntoh32 (*(uint32_t *) buf);
+
+ /* fall through */
+
+ case SP_STATE_READ_REPLY_VERFBYTES:
+
+ if (frag->call_body.reply.accept_status
+ == SUCCESS) {
+ ret = __socket_read_accepted_successful_reply (this);
+ } else {
+ /* read entire remaining msg into buffer pointed to by
+ * fragcurrent
+ */
+ ret = __socket_read_simple_msg (this);
+ }
+
+ remaining_size = RPC_FRAGSIZE (in->fraghdr)
+ - frag->bytes_read;
+
+ if ((ret == -1)
+ || ((ret == 0) && (remaining_size == 0)
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ frag->call_body.reply.accepted_state
+ = SP_STATE_ACCEPTED_REPLY_INIT;
+ }
+
+ break;
+ }
+
+out:
+ return ret;
+}
+
+
+static inline int
+__socket_read_denied_reply (rpc_transport_t *this)
+{
+ return __socket_read_simple_msg (this);
+}
+
+
+#define rpc_reply_status_addr(fragcurrent) ((char *)fragcurrent - 4)
+
+
+static inline int
+__socket_read_vectored_reply (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ char *buf = NULL;
+ uint32_t remaining_size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ switch (frag->call_body.reply.status_state) {
+
+ case SP_STATE_ACCEPTED_REPLY_INIT:
+ __socket_proto_init_pending (priv, RPC_REPLY_STATUS_SIZE);
+
+ frag->call_body.reply.status_state
+ = SP_STATE_READING_REPLY_STATUS;
+
+ /* fall through */
+
+ case SP_STATE_READING_REPLY_STATUS:
+ __socket_proto_read (priv, ret);
+
+ buf = rpc_reply_status_addr (frag->fragcurrent);
+
+ frag->call_body.reply.accept_status
+ = ntoh32 (*((uint32_t *) buf));
+
+ frag->call_body.reply.status_state
+ = SP_STATE_READ_REPLY_STATUS;
+
+ /* fall through */
+
+ case SP_STATE_READ_REPLY_STATUS:
+ if (frag->call_body.reply.accept_status == MSG_ACCEPTED) {
+ ret = __socket_read_accepted_reply (this);
+ } else {
+ ret = __socket_read_denied_reply (this);
+ }
+
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
+
+ if ((ret == -1)
+ || ((ret == 0) && (remaining_size == 0)
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ frag->call_body.reply.status_state
+ = SP_STATE_ACCEPTED_REPLY_INIT;
+ in->payload_vector.iov_len
+ = (unsigned long)frag->fragcurrent
+ - (unsigned long)in->payload_vector.iov_base;
+ }
+ break;
+ }
+
+out:
+ return ret;
+}
+
+
+static inline int
+__socket_read_simple_reply (rpc_transport_t *this)
+{
+ return __socket_read_simple_msg (this);
+}
+
+#define rpc_xid_addr(buf) (buf)
+
+static inline int
+__socket_read_reply (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ char *buf = NULL;
+ int32_t ret = -1;
+ rpc_request_info_t *request_info = NULL;
+ char map_xid = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ buf = rpc_xid_addr (iobuf_ptr (in->iobuf));
+
+ if (in->request_info == NULL) {
+ in->request_info = GF_CALLOC (1, sizeof (*request_info),
+ gf_common_mt_rpc_trans_reqinfo_t);
+ if (in->request_info == NULL) {
+ goto out;
+ }
+
+ map_xid = 1;
+ }
+
+ request_info = in->request_info;
+
+ if (map_xid) {
+ request_info->xid = ntoh32 (*((uint32_t *) buf));
+
+ /* release priv->lock, so as to avoid deadlock b/w conn->lock
+ * and priv->lock, since we are doing an upcall here.
+ */
+ pthread_mutex_unlock (&priv->lock);
+ {
+ ret = rpc_transport_notify (this,
+ RPC_TRANSPORT_MAP_XID_REQUEST,
+ in->request_info);
+ }
+ pthread_mutex_lock (&priv->lock);
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "notify for event MAP_XID failed");
+ goto out;
+ }
+ }
+
+ if ((request_info->prognum == GLUSTER_FOP_PROGRAM)
+ && (request_info->procnum == GF_FOP_READ)) {
+ if (map_xid && request_info->rsp.rsp_payload_count != 0) {
+ in->iobref = iobref_ref (request_info->rsp.rsp_iobref);
+ in->payload_vector = *request_info->rsp.rsp_payload;
+ }
+
+ ret = __socket_read_vectored_reply (this);
+ } else {
+ ret = __socket_read_simple_reply (this);
+ }
+out:
+ return ret;
+}
+
+
+/* returns the number of bytes yet to be read in a fragment */
+static inline int
+__socket_read_frag (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int32_t ret = 0;
+ char *buf = NULL;
+ uint32_t remaining_size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ switch (frag->state) {
+ case SP_STATE_NADA:
+ __socket_proto_init_pending (priv, RPC_MSGTYPE_SIZE);
+
+ frag->state = SP_STATE_READING_MSGTYPE;
+
+ /* fall through */
+
+ case SP_STATE_READING_MSGTYPE:
+ __socket_proto_read (priv, ret);
+
+ frag->state = SP_STATE_READ_MSGTYPE;
+ /* fall through */
+
+ case SP_STATE_READ_MSGTYPE:
+ buf = rpc_msgtype_addr (iobuf_ptr (in->iobuf));
+ in->msg_type = ntoh32 (*((uint32_t *)buf));
+
+ if (in->msg_type == CALL) {
+ ret = __socket_read_request (this);
+ } else if (in->msg_type == REPLY) {
+ ret = __socket_read_reply (this);
+ } else if (in->msg_type == GF_UNIVERSAL_ANSWER) {
+ gf_log ("rpc", GF_LOG_ERROR,
+ "older version of protocol/process trying to "
+ "connect from %s. use newer version on that node",
+ this->peerinfo.identifier);
+ } else {
+ gf_log ("rpc", GF_LOG_ERROR,
+ "wrong MSG-TYPE (%d) received from %s",
+ in->msg_type,
+ this->peerinfo.identifier);
+ ret = -1;
+ }
+
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
+
+ if ((ret == -1)
+ || ((ret == 0) && (remaining_size == 0)
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ frag->state = SP_STATE_NADA;
+ }
+
+ break;
+ }
+
+out:
+ return ret;
+}
+
+
+static inline
+void __socket_reset_priv (socket_private_t *priv)
+{
+ struct gf_sock_incoming *in = NULL;
+
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+
+ if (in->iobref) {
+ iobref_unref (in->iobref);
+ in->iobref = NULL;
+ }
+
+ if (in->iobuf) {
+ iobuf_unref (in->iobuf);
+ }
+
+ if (in->request_info != NULL) {
+ GF_FREE (in->request_info);
+ in->request_info = NULL;
+ }
+
+ memset (&in->payload_vector, 0,
+ sizeof (in->payload_vector));
+
+ in->iobuf = NULL;
+}
+
+
+static int
+__socket_proto_state_machine (rpc_transport_t *this,
+ rpc_transport_pollin_t **pollin)
+{
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ struct iobuf *iobuf = NULL;
+ struct iobref *iobref = NULL;
+ struct iovec vector[2];
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ while (in->record_state != SP_STATE_COMPLETE) {
+ switch (in->record_state) {
+
+ case SP_STATE_NADA:
+ in->total_bytes_read = 0;
+ in->payload_vector.iov_len = 0;
+
+ in->pending_vector = in->vector;
+ in->pending_vector->iov_base = &in->fraghdr;
+
+ in->pending_vector->iov_len = sizeof (in->fraghdr);
+
+ in->record_state = SP_STATE_READING_FRAGHDR;
+
+ /* fall through */
+
+ case SP_STATE_READING_FRAGHDR:
+ ret = __socket_readv (this, in->pending_vector, 1,
+ &in->pending_vector,
+ &in->pending_count,
+ NULL);
+ if (ret == -1)
+ goto out;
+
+ if (ret > 0) {
+ gf_log (this->name, GF_LOG_TRACE, "partial "
+ "fragment header read");
+ goto out;
+ }
+
+ if (ret == 0) {
+ in->record_state = SP_STATE_READ_FRAGHDR;
+ }
+ /* fall through */
+
+ case SP_STATE_READ_FRAGHDR:
+
+ in->fraghdr = ntoh32 (in->fraghdr);
+ in->total_bytes_read += RPC_FRAGSIZE(in->fraghdr);
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool,
+ (in->total_bytes_read +
+ sizeof (in->fraghdr)));
+ if (!iobuf) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ in->iobuf = iobuf;
+ in->iobuf_size = 0;
+ frag->fragcurrent = iobuf_ptr (iobuf);
+ in->record_state = SP_STATE_READING_FRAG;
+ /* fall through */
+
+ case SP_STATE_READING_FRAG:
+ ret = __socket_read_frag (this);
+
+ if ((ret == -1) ||
+ (frag->bytes_read != RPC_FRAGSIZE (in->fraghdr))) {
+ goto out;
+ }
+
+ frag->bytes_read = 0;
+
+ if (!RPC_LASTFRAG (in->fraghdr)) {
+ in->record_state = SP_STATE_READING_FRAGHDR;
+ break;
+ }
+
+ /* we've read the entire rpc record, notify the
+ * upper layers.
+ */
+ if (pollin != NULL) {
+ int count = 0;
+ in->iobuf_size = (in->total_bytes_read -
+ in->payload_vector.iov_len);
+
+ memset (vector, 0, sizeof (vector));
+
+ if (in->iobref == NULL) {
+ in->iobref = iobref_new ();
+ if (in->iobref == NULL) {
+ ret = -1;
+ goto out;
+ }
+ }
+
+ vector[count].iov_base = iobuf_ptr (in->iobuf);
+ vector[count].iov_len = in->iobuf_size;
+
+ iobref = in->iobref;
+
+ count++;
+
+ if (in->payload_vector.iov_base != NULL) {
+ vector[count] = in->payload_vector;
+ count++;
+ }
+
+ *pollin = rpc_transport_pollin_alloc (this,
+ vector,
+ count,
+ in->iobuf,
+ iobref,
+ in->request_info);
+ iobuf_unref (in->iobuf);
+ in->iobuf = NULL;
+
+ if (*pollin == NULL) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "transport pollin allocation failed");
+ ret = -1;
+ goto out;
+ }
+ if (in->msg_type == REPLY)
+ (*pollin)->is_reply = 1;
+
+ in->request_info = NULL;
+ }
+ in->record_state = SP_STATE_COMPLETE;
+ break;
+
+ case SP_STATE_COMPLETE:
+ /* control should not reach here */
+ gf_log (this->name, GF_LOG_WARNING, "control reached to "
+ "SP_STATE_COMPLETE, which should not have "
+ "happened");
+ break;
+ }
+ }
+
+ if (in->record_state == SP_STATE_COMPLETE) {
+ in->record_state = SP_STATE_NADA;
+ __socket_reset_priv (priv);
+ }
+
+out:
+ if ((ret == -1) && (errno == EAGAIN)) {
+ ret = 0;
+ }
+ return ret;
+}
+
+
+static int
+socket_proto_state_machine (rpc_transport_t *this,
+ rpc_transport_pollin_t **pollin)
+{
+ socket_private_t *priv = NULL;
+ int ret = 0;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ ret = __socket_proto_state_machine (this, pollin);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+out:
+ return ret;
+}
+
+
+static int
+socket_event_poll_in (rpc_transport_t *this)
+{
+ int ret = -1;
+ rpc_transport_pollin_t *pollin = NULL;
+ socket_private_t *priv = this->private;
+
+ ret = socket_proto_state_machine (this, &pollin);
+
+ if (pollin != NULL) {
+ priv->ot_state = OT_CALLBACK;
+ ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
+ pollin);
+ if (priv->ot_state == OT_CALLBACK) {
+ priv->ot_state = OT_RUNNING;
+ }
+ rpc_transport_pollin_destroy (pollin);
+ }
+
+ return ret;
+}
+
+
+static int
+socket_connect_finish (rpc_transport_t *this)
+{
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ rpc_transport_event_t event = 0;
+ char notify_rpc = 0;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected != 0)
+ goto unlock;
+
+ get_transport_identifiers (this);
+
+ ret = __socket_connect_finish (priv->sock);
+
+ if (ret == -1 && errno == EINPROGRESS)
+ ret = 1;
+
+ if (ret == -1 && errno != EINPROGRESS) {
+ if (!priv->connect_finish_log) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "connection to %s failed (%s)",
+ this->peerinfo.identifier,
+ strerror (errno));
+ priv->connect_finish_log = 1;
+ }
+ __socket_disconnect (this);
+ goto unlock;
+ }
+
+ if (ret == 0) {
+ notify_rpc = 1;
+
+ this->myinfo.sockaddr_len =
+ sizeof (this->myinfo.sockaddr);
+
+ ret = getsockname (priv->sock,
+ SA (&this->myinfo.sockaddr),
+ &this->myinfo.sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "getsockname on (%d) failed (%s)",
+ priv->sock, strerror (errno));
+ __socket_disconnect (this);
+ event = GF_EVENT_POLLERR;
+ goto unlock;
+ }
+
+ priv->connected = 1;
+ priv->connect_finish_log = 0;
+ event = RPC_TRANSPORT_CONNECT;
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ if (notify_rpc) {
+ rpc_transport_notify (this, event, this);
+ }
+out:
+ return 0;
+}
+
+
+/* reads rpc_requests during pollin */
+static int
+socket_event_handler (int fd, int idx, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ rpc_transport_t *this = NULL;
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ this = data;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
+
+ THIS = this->xl;
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ priv->idx = idx;
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ ret = (priv->connected == 1) ? 0 : socket_connect_finish(this);
+
+ if (!ret && poll_out) {
+ ret = socket_event_poll_out (this);
+ }
+
+ if (!ret && poll_in) {
+ ret = socket_event_poll_in (this);
+ }
+
+ if ((ret < 0) || poll_err) {
+ /* Logging has happened already in earlier cases */
+ gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),
+ "disconnecting now");
+ socket_event_poll_err (this);
+ rpc_transport_unref (this);
+ }
+
+out:
+ return ret;
+}
+
+
+static void *
+socket_poller (void *ctx)
+{
+ rpc_transport_t *this = ctx;
+ socket_private_t *priv = this->private;
+ struct pollfd pfd[2] = {{0,},};
+ gf_boolean_t to_write = _gf_false;
+ int ret = 0;
+ uint32_t gen = 0;
+
+ priv->ot_state = OT_RUNNING;
+
+ if (priv->use_ssl) {
+ if (ssl_setup_connection(this,priv->connected) < 0) {
+ gf_log (this->name,GF_LOG_ERROR, "%s setup failed",
+ priv->connected ? "server" : "client");
+ goto err;
+ }
+ }
+
+ if (!priv->bio) {
+ ret = __socket_nonblock (priv->sock);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ goto err;
+ }
+ }
+
+ if (priv->connected == 0) {
+ THIS = this->xl;
+ ret = socket_connect_finish (this);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "asynchronous socket_connect_finish failed");
+ }
+ }
+
+ ret = rpc_transport_notify (this->listener,
+ RPC_TRANSPORT_ACCEPT, this);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "asynchronous rpc_transport_notify failed");
+ }
+
+ gen = priv->ot_gen;
+ for (;;) {
+ pthread_mutex_lock(&priv->lock);
+ to_write = !list_empty(&priv->ioq);
+ pthread_mutex_unlock(&priv->lock);
+ pfd[0].fd = priv->pipe[0];
+ pfd[0].events = POLL_MASK_ERROR;
+ pfd[0].revents = 0;
+ pfd[1].fd = priv->sock;
+ pfd[1].events = POLL_MASK_INPUT | POLL_MASK_ERROR;
+ pfd[1].revents = 0;
+ if (to_write) {
+ pfd[1].events |= POLL_MASK_OUTPUT;
+ }
+ else {
+ pfd[0].events |= POLL_MASK_INPUT;
+ }
+ if (poll(pfd,2,-1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,"poll failed");
+ break;
+ }
+ if (pfd[0].revents & POLL_MASK_ERROR) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "poll error on pipe");
+ break;
+ }
+ /* Only glusterd actually seems to need this. */
+ THIS = this->xl;
+ if (pfd[1].revents & POLL_MASK_INPUT) {
+ ret = socket_event_poll_in(this);
+ if (ret >= 0) {
+ /* Suppress errors while making progress. */
+ pfd[1].revents &= ~POLL_MASK_ERROR;
+ }
+ else if (errno == ENOTCONN) {
+ ret = 0;
+ }
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (input request)",
+ this);
+ priv->ot_state = OT_IDLE;
+ break;
+ }
+ }
+ else if (pfd[1].revents & POLL_MASK_OUTPUT) {
+ ret = socket_event_poll_out(this);
+ if (ret >= 0) {
+ /* Suppress errors while making progress. */
+ pfd[1].revents &= ~POLL_MASK_ERROR;
+ }
+ else if (errno == ENOTCONN) {
+ ret = 0;
+ }
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (output request)",
+ this);
+ priv->ot_state = OT_IDLE;
+ break;
+ }
+ }
+ else {
+ /*
+ * This usually means that we left poll() because
+ * somebody pushed a byte onto our pipe. That wakeup
+ * is why the pipe is there, but once awake we can do
+ * all the checking we need on the next iteration.
+ */
+ ret = 0;
+ }
+ if (pfd[1].revents & POLL_MASK_ERROR) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "poll error on socket");
+ break;
+ }
+ if (ret < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "error in polling loop");
+ break;
+ }
+ if (priv->ot_gen != gen) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "generation mismatch, my %u != %u",
+ gen, priv->ot_gen);
+ return NULL;
+ }
+ }
+
+err:
+ /* All (and only) I/O errors should come here. */
+ pthread_mutex_lock(&priv->lock);
+ if (priv->ssl_ssl) {
+ /*
+ * We're always responsible for this part, but only actually
+ * have to do it if we got far enough for ssl_ssl to be valid
+ * (i.e. errors in ssl_setup_connection don't count).
+ */
+ ssl_teardown_connection(priv);
+ }
+ __socket_shutdown(this);
+ close(priv->sock);
+ priv->sock = -1;
+ priv->ot_state = OT_IDLE;
+ pthread_mutex_unlock(&priv->lock);
+ rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT,
+ this);
+ rpc_transport_unref (this);
+ return NULL;
+}
+
+
+static void
+socket_spawn (rpc_transport_t *this)
+{
+ socket_private_t *priv = this->private;
+
+ switch (priv->ot_state) {
+ case OT_IDLE:
+ case OT_PLEASE_DIE:
+ break;
+ default:
+ gf_log (this->name, GF_LOG_WARNING,
+ "refusing to start redundant poller");
+ return;
+ }
+
+ priv->ot_gen += 7;
+ priv->ot_state = OT_SPAWNING;
+ gf_log (this->name, GF_LOG_TRACE,
+ "spawning %p with gen %u", this, priv->ot_gen);
+
+ if (gf_thread_create(&priv->thread,NULL,socket_poller,this) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not create poll thread");
+ }
+}
+
+static int
+socket_server_event_handler (int fd, int idx, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ rpc_transport_t *this = NULL;
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ int new_sock = -1;
+ rpc_transport_t *new_trans = NULL;
+ struct sockaddr_storage new_sockaddr = {0, };
+ socklen_t addrlen = sizeof (new_sockaddr);
+ socket_private_t *new_priv = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+
+ this = data;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
+
+ THIS = this->xl;
+ priv = this->private;
+ ctx = this->ctx;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ priv->idx = idx;
+
+ if (poll_in) {
+ new_sock = accept (priv->sock, SA (&new_sockaddr),
+ &addrlen);
+
+ if (new_sock == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "accept on %d failed (%s)",
+ priv->sock, strerror (errno));
+ goto unlock;
+ }
+
+ if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) {
+ ret = __socket_nodelay (new_sock);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "setsockopt() failed for "
+ "NODELAY (%s)",
+ strerror (errno));
+ }
+ }
+
+ if (priv->keepalive &&
+ new_sockaddr.ss_family != AF_UNIX) {
+ ret = __socket_keepalive (new_sock,
+ new_sockaddr.ss_family,
+ priv->keepaliveintvl,
+ priv->keepaliveidle);
+ if (ret == -1)
+ gf_log (this->name, GF_LOG_WARNING,
+ "Failed to set keep-alive: %s",
+ strerror (errno));
+ }
+
+ new_trans = GF_CALLOC (1, sizeof (*new_trans),
+ gf_common_mt_rpc_trans_t);
+ if (!new_trans)
+ goto unlock;
+
+ ret = pthread_mutex_init(&new_trans->lock, NULL);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "pthread_mutex_init() failed: %s",
+ strerror (errno));
+ close (new_sock);
+ goto unlock;
+ }
+
+ new_trans->name = gf_strdup (this->name);
+
+ memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr,
+ addrlen);
+ new_trans->peerinfo.sockaddr_len = addrlen;
+
+ new_trans->myinfo.sockaddr_len =
+ sizeof (new_trans->myinfo.sockaddr);
+
+ ret = getsockname (new_sock,
+ SA (&new_trans->myinfo.sockaddr),
+ &new_trans->myinfo.sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "getsockname on %d failed (%s)",
+ new_sock, strerror (errno));
+ close (new_sock);
+ goto unlock;
+ }
+
+ get_transport_identifiers (new_trans);
+ ret = socket_init(new_trans);
+ if (ret != 0) {
+ close(new_sock);
+ goto unlock;
+ }
+ new_trans->ops = this->ops;
+ new_trans->init = this->init;
+ new_trans->fini = this->fini;
+ new_trans->ctx = ctx;
+ new_trans->xl = this->xl;
+ new_trans->mydata = this->mydata;
+ new_trans->notify = this->notify;
+ new_trans->listener = this;
+ new_priv = new_trans->private;
+
+ new_priv->use_ssl = priv->use_ssl;
+ new_priv->sock = new_sock;
+ new_priv->own_thread = priv->own_thread;
+
+ new_priv->ssl_ctx = priv->ssl_ctx;
+ if (priv->use_ssl && !priv->own_thread) {
+ if (ssl_setup_connection(new_trans,1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "server setup failed");
+ close(new_sock);
+ goto unlock;
+ }
+ }
+
+ if (!priv->bio && !priv->own_thread) {
+ ret = __socket_nonblock (new_sock);
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "NBIO on %d failed (%s)",
+ new_sock, strerror (errno));
+
+ close (new_sock);
+ goto unlock;
+ }
+ }
+
+ pthread_mutex_lock (&new_priv->lock);
+ {
+ /*
+ * In the own_thread case, this is used to
+ * indicate that we're initializing a server
+ * connection.
+ */
+ new_priv->connected = 1;
+ new_priv->is_server = _gf_true;
+ rpc_transport_ref (new_trans);
+
+ if (new_priv->own_thread) {
+ if (pipe(new_priv->pipe) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create pipe");
+ }
+ socket_spawn(new_trans);
+ }
+ else {
+ new_priv->idx =
+ event_register (ctx->event_pool,
+ new_sock,
+ socket_event_handler,
+ new_trans,
+ 1, 0);
+ if (new_priv->idx == -1)
+ ret = -1;
+ }
+
+ }
+ pthread_mutex_unlock (&new_priv->lock);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "failed to register the socket with event");
+ goto unlock;
+ }
+
+ if (!priv->own_thread) {
+ ret = rpc_transport_notify (this,
+ RPC_TRANSPORT_ACCEPT, new_trans);
+ }
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+out:
+ return ret;
+}
+
+
+static int
+socket_disconnect (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ ret = __socket_disconnect (this);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+out:
+ return ret;
+}
+
+
+static int
+socket_connect (rpc_transport_t *this, int port)
+{
+ int ret = -1;
+ int sock = -1;
+ socket_private_t *priv = NULL;
+ socklen_t sockaddr_len = 0;
+ glusterfs_ctx_t *ctx = NULL;
+ sa_family_t sa_family = {0, };
+ char *local_addr = NULL;
+ union gf_sock_union sock_union;
+ struct sockaddr_in *addr = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, err);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, err);
+
+ priv = this->private;
+ ctx = this->ctx;
+
+ if (!priv) {
+ gf_log_callingfn (this->name, GF_LOG_WARNING,
+ "connect() called on uninitialized transport");
+ goto err;
+ }
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ sock = priv->sock;
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ if (sock != -1) {
+ gf_log_callingfn (this->name, GF_LOG_TRACE,
+ "connect () called on transport already connected");
+ errno = EINPROGRESS;
+ ret = -1;
+ goto err;
+ }
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "connecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
+
+ ret = socket_client_get_remote_sockaddr (this, &sock_union.sa,
+ &sockaddr_len, &sa_family);
+ if (ret == -1) {
+ /* logged inside client_get_remote_sockaddr */
+ goto err;
+ }
+
+ if (port > 0) {
+ sock_union.sin.sin_port = htons (port);
+ }
+ if (ntohs(sock_union.sin.sin_port) == GF_DEFAULT_SOCKET_LISTEN_PORT) {
+ if (priv->use_ssl) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "disabling SSL for portmapper connection");
+ priv->use_ssl = _gf_false;
+ }
+ }
+ else {
+ if (priv->ssl_enabled && !priv->use_ssl) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "re-enabling SSL for I/O connection");
+ priv->use_ssl = _gf_true;
+ }
+ }
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->sock != -1) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "connect() -- already connected");
+ goto unlock;
+ }
+
+ memcpy (&this->peerinfo.sockaddr, &sock_union.storage,
+ sockaddr_len);
+ this->peerinfo.sockaddr_len = sockaddr_len;
+
+ priv->sock = socket (sa_family, SOCK_STREAM, 0);
+ if (priv->sock == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "socket creation failed (%s)",
+ strerror (errno));
+ goto unlock;
+ }
+
+ /* Cant help if setting socket options fails. We can continue
+ * working nonetheless.
+ */
+ if (priv->windowsize != 0) {
+ if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF,
+ &priv->windowsize,
+ sizeof (priv->windowsize)) < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "setting receive window "
+ "size failed: %d: %d: %s",
+ priv->sock, priv->windowsize,
+ strerror (errno));
+ }
+
+ if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF,
+ &priv->windowsize,
+ sizeof (priv->windowsize)) < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "setting send window size "
+ "failed: %d: %d: %s",
+ priv->sock, priv->windowsize,
+ strerror (errno));
+ }
+ }
+
+ if (priv->nodelay && (sa_family != AF_UNIX)) {
+ ret = __socket_nodelay (priv->sock);
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "NODELAY on %d failed (%s)",
+ priv->sock, strerror (errno));
+ }
+ }
+
+ if (priv->keepalive && sa_family != AF_UNIX) {
+ ret = __socket_keepalive (priv->sock,
+ sa_family,
+ priv->keepaliveintvl,
+ priv->keepaliveidle);
+ if (ret == -1)
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to set keep-alive: %s",
+ strerror (errno));
+ }
+
+ SA (&this->myinfo.sockaddr)->sa_family =
+ SA (&this->peerinfo.sockaddr)->sa_family;
+
+ /* If a source addr is explicitly specified, use it */
+ ret = dict_get_str (this->options,
+ "transport.socket.source-addr",
+ &local_addr);
+ if (!ret && SA (&this->myinfo.sockaddr)->sa_family == AF_INET) {
+ addr = (struct sockaddr_in *)(&this->myinfo.sockaddr);
+ ret = inet_pton (AF_INET, local_addr, &(addr->sin_addr.s_addr));
+ }
+
+ ret = client_bind (this, SA (&this->myinfo.sockaddr),
+ &this->myinfo.sockaddr_len, priv->sock);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "client bind failed: %s", strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ if (!priv->use_ssl && !priv->bio && !priv->own_thread) {
+ ret = __socket_nonblock (priv->sock);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ ret = connect (priv->sock, SA (&this->peerinfo.sockaddr),
+ this->peerinfo.sockaddr_len);
+
+ if (ret == -1 && ((errno != EINPROGRESS) && (errno != ENOENT))) {
+ /* For unix path based sockets, the socket path is
+ * cryptic (md5sum of path) and may not be useful for
+ * the user in debugging so log it in DEBUG
+ */
+ gf_log (this->name, ((sa_family == AF_UNIX) ?
+ GF_LOG_DEBUG : GF_LOG_ERROR),
+ "connection attempt on %s failed, (%s)",
+ this->peerinfo.identifier, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ if (priv->use_ssl && !priv->own_thread) {
+ ret = ssl_setup_connection(this,0);
+ if (ret < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "client setup failed");
+ close(priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ if (!priv->bio && !priv->own_thread) {
+ ret = __socket_nonblock (priv->sock);
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ /*
+ * In the own_thread case, this is used to indicate that we're
+ * initializing a client connection.
+ */
+ priv->connected = 0;
+ priv->is_server = _gf_false;
+ rpc_transport_ref (this);
+
+ if (priv->own_thread) {
+ if (pipe(priv->pipe) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create pipe");
+ }
+
+ this->listener = this;
+ socket_spawn(this);
+ }
+ else {
+ priv->idx = event_register (ctx->event_pool, priv->sock,
+ socket_event_handler,
+ this, 1, 1);
+ if (priv->idx == -1) {
+ gf_log ("", GF_LOG_WARNING,
+ "failed to register the event");
+ ret = -1;
+ }
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+err:
+ return ret;
+}
+
+
+static int
+socket_listen (rpc_transport_t *this)
+{
+ socket_private_t * priv = NULL;
+ int ret = -1;
+ int sock = -1;
+ struct sockaddr_storage sockaddr;
+ socklen_t sockaddr_len = 0;
+ peer_info_t *myinfo = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ sa_family_t sa_family = {0, };
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+ myinfo = &this->myinfo;
+ ctx = this->ctx;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ sock = priv->sock;
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ if (sock != -1) {
+ gf_log_callingfn (this->name, GF_LOG_DEBUG,
+ "already listening");
+ return ret;
+ }
+
+ ret = socket_server_get_local_sockaddr (this, SA (&sockaddr),
+ &sockaddr_len, &sa_family);
+ if (ret == -1) {
+ return ret;
+ }
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->sock != -1) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "already listening");
+ goto unlock;
+ }
+
+ memcpy (&myinfo->sockaddr, &sockaddr, sockaddr_len);
+ myinfo->sockaddr_len = sockaddr_len;
+
+ priv->sock = socket (sa_family, SOCK_STREAM, 0);
+
+ if (priv->sock == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "socket creation failed (%s)",
+ strerror (errno));
+ goto unlock;
+ }
+
+ /* Cant help if setting socket options fails. We can continue
+ * working nonetheless.
+ */
+ if (priv->windowsize != 0) {
+ if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF,
+ &priv->windowsize,
+ sizeof (priv->windowsize)) < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "setting receive window size "
+ "failed: %d: %d: %s", priv->sock,
+ priv->windowsize,
+ strerror (errno));
+ }
+
+ if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF,
+ &priv->windowsize,
+ sizeof (priv->windowsize)) < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "setting send window size failed:"
+ " %d: %d: %s", priv->sock,
+ priv->windowsize,
+ strerror (errno));
+ }
+ }
+
+ if (priv->nodelay && (sa_family != AF_UNIX)) {
+ ret = __socket_nodelay (priv->sock);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "setsockopt() failed for NODELAY (%s)",
+ strerror (errno));
+ }
+ }
+
+ if (!priv->bio) {
+ ret = __socket_nonblock (priv->sock);
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ ret = __socket_server_bind (this);
+
+ if (ret == -1) {
+ /* logged inside __socket_server_bind() */
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ if (priv->backlog)
+ ret = listen (priv->sock, priv->backlog);
+ else
+ ret = listen (priv->sock, 10);
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not set socket %d to listen mode (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ rpc_transport_ref (this);
+
+ priv->idx = event_register (ctx->event_pool, priv->sock,
+ socket_server_event_handler,
+ this, 1, 0);
+
+ if (priv->idx == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "could not register socket %d with events",
+ priv->sock);
+ ret = -1;
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+out:
+ return ret;
+}
+
+
+static int32_t
+socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ char need_poll_out = 0;
+ char need_append = 1;
+ struct ioq *entry = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ char a_byte = 'j';
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+ ctx = this->ctx;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected != 1) {
+ if (!priv->submit_log && !priv->connect_finish_log) {
+ gf_log (this->name, GF_LOG_INFO,
+ "not connected (priv->connected = %d)",
+ priv->connected);
+ priv->submit_log = 1;
+ }
+ goto unlock;
+ }
+
+ priv->submit_log = 0;
+ entry = __socket_ioq_new (this, &req->msg);
+ if (!entry)
+ goto unlock;
+
+ if (list_empty (&priv->ioq)) {
+ ret = __socket_ioq_churn_entry (this, entry, 1);
+
+ if (ret == 0) {
+ need_append = 0;
+ }
+ if (ret > 0) {
+ need_poll_out = 1;
+ }
+ }
+
+ if (need_append) {
+ list_add_tail (&entry->list, &priv->ioq);
+ if (priv->own_thread) {
+ /*
+ * Make sure the polling thread wakes up, by
+ * writing a byte to represent this entry.
+ */
+ if (write(priv->pipe[1],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "write error on pipe");
+ }
+ }
+ ret = 0;
+ }
+ if (!priv->own_thread && need_poll_out) {
+ /* first entry to wait. continue writing on POLLOUT */
+ priv->idx = event_select_on (ctx->event_pool,
+ priv->sock,
+ priv->idx, -1, 1);
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+out:
+ return ret;
+}
+
+
+static int32_t
+socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ char need_poll_out = 0;
+ char need_append = 1;
+ struct ioq *entry = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ char a_byte = 'd';
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+ ctx = this->ctx;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected != 1) {
+ if (!priv->submit_log && !priv->connect_finish_log) {
+ gf_log (this->name, GF_LOG_INFO,
+ "not connected (priv->connected = %d)",
+ priv->connected);
+ priv->submit_log = 1;
+ }
+ goto unlock;
+ }
+
+ priv->submit_log = 0;
+ entry = __socket_ioq_new (this, &reply->msg);
+ if (!entry)
+ goto unlock;
+
+ if (list_empty (&priv->ioq)) {
+ ret = __socket_ioq_churn_entry (this, entry, 1);
+
+ if (ret == 0) {
+ need_append = 0;
+ }
+ if (ret > 0) {
+ need_poll_out = 1;
+ }
+ }
+
+ if (need_append) {
+ list_add_tail (&entry->list, &priv->ioq);
+ if (priv->own_thread) {
+ /*
+ * Make sure the polling thread wakes up, by
+ * writing a byte to represent this entry.
+ */
+ if (write(priv->pipe[1],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "write error on pipe");
+ }
+ }
+ ret = 0;
+ }
+ if (!priv->own_thread && need_poll_out) {
+ /* first entry to wait. continue writing on POLLOUT */
+ priv->idx = event_select_on (ctx->event_pool,
+ priv->sock,
+ priv->idx, -1, 1);
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+out:
+ return ret;
+}
+
+
+static int32_t
+socket_getpeername (rpc_transport_t *this, char *hostname, int hostlen)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", hostname, out);
+
+ if (hostlen < (strlen (this->peerinfo.identifier) + 1)) {
+ goto out;
+ }
+
+ strcpy (hostname, this->peerinfo.identifier);
+ ret = 0;
+out:
+ return ret;
+}
+
+
+static int32_t
+socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,
+ struct sockaddr_storage *sa, socklen_t salen)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", sa, out);
+
+ *sa = this->peerinfo.sockaddr;
+
+ if (peeraddr != NULL) {
+ ret = socket_getpeername (this, peeraddr, addrlen);
+ }
+ ret = 0;
+
+out:
+ return ret;
+}
+
+
+static int32_t
+socket_getmyname (rpc_transport_t *this, char *hostname, int hostlen)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", hostname, out);
+
+ if (hostlen < (strlen (this->myinfo.identifier) + 1)) {
+ goto out;
+ }
+
+ strcpy (hostname, this->myinfo.identifier);
+ ret = 0;
+out:
+ return ret;
+}
+
+
+static int32_t
+socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen,
+ struct sockaddr_storage *sa, socklen_t salen)
+{
+ int32_t ret = 0;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", sa, out);
+
+ *sa = this->myinfo.sockaddr;
+
+ if (myaddr != NULL) {
+ ret = socket_getmyname (this, myaddr, addrlen);
+ }
+
+out:
+ return ret;
+}
+
+
+static int
+socket_throttle (rpc_transport_t *this, gf_boolean_t onoff)
+{
+ socket_private_t *priv = NULL;
+
+ priv = this->private;
+
+ /* The way we implement throttling is by taking off
+ POLLIN event from the polled flags. This way we
+ never get called with the POLLIN event and therefore
+ will never read() any more data until throttling
+ is turned off.
+ */
+ priv->idx = event_select_on (this->ctx->event_pool, priv->sock,
+ priv->idx, (int) !onoff, -1);
+ return 0;
+}
+
+
+struct rpc_transport_ops tops = {
+ .listen = socket_listen,
+ .connect = socket_connect,
+ .disconnect = socket_disconnect,
+ .submit_request = socket_submit_request,
+ .submit_reply = socket_submit_reply,
+ .get_peername = socket_getpeername,
+ .get_peeraddr = socket_getpeeraddr,
+ .get_myname = socket_getmyname,
+ .get_myaddr = socket_getmyaddr,
+ .throttle = socket_throttle,
+};
+
+int
+reconfigure (rpc_transport_t *this, dict_t *options)
+{
+ socket_private_t *priv = NULL;
+ gf_boolean_t tmp_bool = _gf_false;
+ char *optstr = NULL;
+ int ret = 0;
+ uint64_t windowsize = 0;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ if (!this || !this->private) {
+ ret =-1;
+ goto out;
+ }
+
+ priv = this->private;
+
+ if (dict_get_str (this->options, "transport.socket.keepalive",
+ &optstr) == 0) {
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'transport.socket.keepalive' takes only "
+ "boolean options, not taking any action");
+ priv->keepalive = 1;
+ ret = -1;
+ goto out;
+ }
+ gf_log (this->name, GF_LOG_DEBUG, "Reconfigured transport.socket.keepalive");
+
+ priv->keepalive = tmp_bool;
+ }
+ else
+ priv->keepalive = 1;
+
+ optstr = NULL;
+ if (dict_get_str (this->options, "tcp-window-size",
+ &optstr) == 0) {
+ if (gf_string2bytesize (optstr, &windowsize) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid number format: %s", optstr);
+ goto out;
+ }
+ }
+
+ priv->windowsize = (int)windowsize;
+
+ ret = 0;
+out:
+ return ret;
+
+}
+
+static int
+socket_init (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ gf_boolean_t tmp_bool = 0;
+ uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
+ char *optstr = NULL;
+ uint32_t keepalive = 0;
+ uint32_t backlog = 0;
+ int session_id = 0;
+
+ if (this->private) {
+ gf_log_callingfn (this->name, GF_LOG_ERROR,
+ "double init attempted");
+ return -1;
+ }
+
+ priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t);
+ if (!priv) {
+ return -1;
+ }
+ memset(priv,0,sizeof(*priv));
+
+ pthread_mutex_init (&priv->lock, NULL);
+
+ priv->sock = -1;
+ priv->idx = -1;
+ priv->connected = -1;
+ priv->nodelay = 1;
+ priv->bio = 0;
+ priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
+ INIT_LIST_HEAD (&priv->ioq);
+
+ /* All the below section needs 'this->options' to be present */
+ if (!this->options)
+ goto out;
+
+ if (dict_get (this->options, "non-blocking-io")) {
+ optstr = data_to_str (dict_get (this->options,
+ "non-blocking-io"));
+
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'non-blocking-io' takes only boolean options,"
+ " not taking any action");
+ tmp_bool = 1;
+ }
+
+ if (!tmp_bool) {
+ priv->bio = 1;
+ gf_log (this->name, GF_LOG_WARNING,
+ "disabling non-blocking IO");
+ }
+ }
+
+ optstr = NULL;
+
+ // By default, we enable NODELAY
+ if (dict_get (this->options, "transport.socket.nodelay")) {
+ optstr = data_to_str (dict_get (this->options,
+ "transport.socket.nodelay"));
+
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'transport.socket.nodelay' takes only "
+ "boolean options, not taking any action");
+ tmp_bool = 1;
+ }
+ if (!tmp_bool) {
+ priv->nodelay = 0;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "disabling nodelay");
+ }
+ }
+
+ optstr = NULL;
+ if (dict_get_str (this->options, "tcp-window-size",
+ &optstr) == 0) {
+ if (gf_string2bytesize (optstr, &windowsize) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid number format: %s", optstr);
+ return -1;
+ }
+ }
+
+ priv->windowsize = (int)windowsize;
+
+ optstr = NULL;
+ /* Enable Keep-alive by default. */
+ priv->keepalive = 1;
+ priv->keepaliveintvl = 2;
+ priv->keepaliveidle = 20;
+ if (dict_get_str (this->options, "transport.socket.keepalive",
+ &optstr) == 0) {
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'transport.socket.keepalive' takes only "
+ "boolean options, not taking any action");
+ tmp_bool = 1;
+ }
+
+ if (!tmp_bool)
+ priv->keepalive = 0;
+ }
+
+ if (dict_get_uint32 (this->options,
+ "transport.socket.keepalive-interval",
+ &keepalive) == 0) {
+ priv->keepaliveintvl = keepalive;
+ }
+
+ if (dict_get_uint32 (this->options,
+ "transport.socket.keepalive-time",
+ &keepalive) == 0) {
+ priv->keepaliveidle = keepalive;
+ }
+
+ if (dict_get_uint32 (this->options,
+ "transport.socket.listen-backlog",
+ &backlog) == 0) {
+ priv->backlog = backlog;
+ }
+
+ optstr = NULL;
+
+ /* Check if socket read failures are to be logged */
+ priv->read_fail_log = 1;
+ if (dict_get (this->options, "transport.socket.read-fail-log")) {
+ optstr = data_to_str (dict_get (this->options, "transport.socket.read-fail-log"));
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "'transport.socket.read-fail-log' takes only "
+ "boolean options; logging socket read fails");
+ }
+ else if (tmp_bool == _gf_false) {
+ priv->read_fail_log = 0;
+ }
+ }
+
+ priv->windowsize = (int)windowsize;
+
+ priv->ssl_enabled = _gf_false;
+ if (dict_get_str(this->options,SSL_ENABLED_OPT,&optstr) == 0) {
+ if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid value given for ssl-enabled boolean");
+ }
+ }
+
+ priv->ssl_own_cert = DEFAULT_CERT_PATH;
+ if (dict_get_str(this->options,SSL_OWN_CERT_OPT,&optstr) == 0) {
+ if (!priv->ssl_enabled) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "%s specified without %s (ignored)",
+ SSL_OWN_CERT_OPT, SSL_ENABLED_OPT);
+ }
+ priv->ssl_own_cert = optstr;
+ }
+ priv->ssl_own_cert = gf_strdup(priv->ssl_own_cert);
+
+ priv->ssl_private_key = DEFAULT_KEY_PATH;
+ if (dict_get_str(this->options,SSL_PRIVATE_KEY_OPT,&optstr) == 0) {
+ if (!priv->ssl_enabled) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "%s specified without %s (ignored)",
+ SSL_PRIVATE_KEY_OPT, SSL_ENABLED_OPT);
+ }
+ priv->ssl_private_key = optstr;
+ }
+ priv->ssl_private_key = gf_strdup(priv->ssl_private_key);
+
+ priv->ssl_ca_list = DEFAULT_CA_PATH;
+ if (dict_get_str(this->options,SSL_CA_LIST_OPT,&optstr) == 0) {
+ if (!priv->ssl_enabled) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "%s specified without %s (ignored)",
+ SSL_CA_LIST_OPT, SSL_ENABLED_OPT);
+ }
+ priv->ssl_ca_list = optstr;
+ }
+ priv->ssl_ca_list = gf_strdup(priv->ssl_ca_list);
+
+ gf_log(this->name,GF_LOG_INFO,"SSL support is %s",
+ priv->ssl_enabled ? "ENABLED" : "NOT enabled");
+ /*
+ * This might get overridden temporarily in socket_connect (q.v.)
+ * if we're using the glusterd portmapper.
+ */
+ priv->use_ssl = priv->ssl_enabled;
+
+ priv->own_thread = priv->use_ssl;
+ if (dict_get_str(this->options,OWN_THREAD_OPT,&optstr) == 0) {
+ if (gf_string2boolean (optstr, &priv->own_thread) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid value given for own-thread boolean");
+ }
+ }
+ gf_log(this->name,GF_LOG_INFO,"using %s polling thread",
+ priv->own_thread ? "private" : "system");
+
+ if (priv->use_ssl) {
+ SSL_library_init();
+ SSL_load_error_strings();
+ priv->ssl_meth = (SSL_METHOD *)TLSv1_method();
+ priv->ssl_ctx = SSL_CTX_new(priv->ssl_meth);
+
+ if (SSL_CTX_set_cipher_list(priv->ssl_ctx,
+ "HIGH:-SSLv2") == 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "failed to find any valid ciphers");
+ goto err;
+ }
+
+ if (!SSL_CTX_use_certificate_chain_file(priv->ssl_ctx,
+ priv->ssl_own_cert)) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not load our cert");
+ goto err;
+ }
+
+ if (!SSL_CTX_use_PrivateKey_file(priv->ssl_ctx,
+ priv->ssl_private_key,
+ SSL_FILETYPE_PEM)) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not load private key");
+ goto err;
+ }
+
+ if (!SSL_CTX_load_verify_locations(priv->ssl_ctx,
+ priv->ssl_ca_list,0)) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not load CA list");
+ goto err;
+ }
+
+#if (OPENSSL_VERSION_NUMBER < 0x00905100L)
+ SSL_CTX_set_verify_depth(ctx,1);
+#endif
+
+ priv->ssl_session_id = ++session_id;
+ SSL_CTX_set_session_id_context(priv->ssl_ctx,
+ (void *)&priv->ssl_session_id,
+ sizeof(priv->ssl_session_id));
+
+ SSL_CTX_set_verify(priv->ssl_ctx,SSL_VERIFY_PEER,0);
+ }
+
+ if (priv->own_thread) {
+ priv->ot_state = OT_IDLE;
+ }
+
+out:
+ this->private = priv;
+ return 0;
+
+err:
+ if (priv->ssl_own_cert) {
+ GF_FREE(priv->ssl_own_cert);
+ }
+ if (priv->ssl_private_key) {
+ GF_FREE(priv->ssl_private_key);
+ }
+ if (priv->ssl_ca_list) {
+ GF_FREE(priv->ssl_ca_list);
+ }
+ GF_FREE(priv);
+ return -1;
+}
+
+
+void
+fini (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+
+ if (!this)
+ return;
+
+ priv = this->private;
+ if (priv) {
+ if (priv->sock != -1) {
+ pthread_mutex_lock (&priv->lock);
+ {
+ __socket_ioq_flush (this);
+ __socket_reset (this);
+ }
+ pthread_mutex_unlock (&priv->lock);
+ }
+ gf_log (this->name, GF_LOG_TRACE,
+ "transport %p destroyed", this);
+
+ pthread_mutex_destroy (&priv->lock);
+ if (priv->ssl_private_key) {
+ GF_FREE(priv->ssl_private_key);
+ }
+ if (priv->ssl_own_cert) {
+ GF_FREE(priv->ssl_own_cert);
+ }
+ if (priv->ssl_ca_list) {
+ GF_FREE(priv->ssl_ca_list);
+ }
+ GF_FREE (priv);
+ }
+
+ this->private = NULL;
+}
+
+
+int32_t
+init (rpc_transport_t *this)
+{
+ int ret = -1;
+
+ ret = socket_init (this);
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_DEBUG, "socket_init() failed");
+ }
+
+ return ret;
+}
+
+struct volume_options options[] = {
+ { .key = {"remote-port",
+ "transport.remote-port",
+ "transport.socket.remote-port"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.socket.listen-port", "listen-port"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.socket.bind-address", "bind-address" },
+ .type = GF_OPTION_TYPE_INTERNET_ADDRESS
+ },
+ { .key = {"transport.socket.connect-path", "connect-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.socket.bind-path", "bind-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.socket.listen-path", "listen-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = { "transport.address-family",
+ "address-family" },
+ .value = {"inet", "inet6", "unix", "inet-sdp" },
+ .type = GF_OPTION_TYPE_STR
+ },
+
+ { .key = {"non-blocking-io"},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {"tcp-window-size"},
+ .type = GF_OPTION_TYPE_SIZET,
+ .min = GF_MIN_SOCKET_WINDOW_SIZE,
+ .max = GF_MAX_SOCKET_WINDOW_SIZE
+ },
+ { .key = {"transport.socket.nodelay"},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {"transport.socket.lowlat"},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {"transport.socket.keepalive"},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {"transport.socket.keepalive-interval"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.socket.keepalive-time"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.socket.listen-backlog"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.socket.read-fail-log"},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {SSL_ENABLED_OPT},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {SSL_OWN_CERT_OPT},
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {SSL_PRIVATE_KEY_OPT},
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {SSL_CA_LIST_OPT},
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {OWN_THREAD_OPT},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {NULL} }
+};