/* Copyright (c) 2008-2012 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" #endif #include "socket.h" #include "name.h" #include "dict.h" #include "rpc-transport.h" #include "logging.h" #include "xlator.h" #include "byte-order.h" #include "common-utils.h" #include "compat-errno.h" /* ugly #includes below */ #include "protocol-common.h" #include "glusterfs3-xdr.h" #include "xdr-nfs3.h" #include "rpcsvc.h" #include #include #include #include #include #define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR) #define SA(ptr) ((struct sockaddr *)ptr) #define SSL_ENABLED_OPT "transport.socket.ssl-enabled" #define SSL_OWN_CERT_OPT "transport.socket.ssl-own-cert" #define SSL_PRIVATE_KEY_OPT "transport.socket.ssl-private-key" #define SSL_CA_LIST_OPT "transport.socket.ssl-ca-list" #define OWN_THREAD_OPT "transport.socket.own-thread" /* TBD: do automake substitutions etc. (ick) to set these. */ #if !defined(DEFAULT_CERT_PATH) #define DEFAULT_CERT_PATH "/etc/ssl/glusterfs.pem" #endif #if !defined(DEFAULT_KEY_PATH) #define DEFAULT_KEY_PATH "/etc/ssl/glusterfs.key" #endif #if !defined(DEFAULT_CA_PATH) #define DEFAULT_CA_PATH "/etc/ssl/glusterfs.ca" #endif #define POLL_MASK_INPUT (POLLIN | POLLPRI) #define POLL_MASK_OUTPUT (POLLOUT) #define POLL_MASK_ERROR (POLLERR | POLLHUP | POLLNVAL) typedef int SSL_unary_func (SSL *); typedef int SSL_trinary_func (SSL *, void *, int); #define __socket_proto_reset_pending(priv) do { \ struct gf_sock_incoming_frag *frag; \ frag = &priv->incoming.frag; \ \ memset (&frag->vector, 0, sizeof (frag->vector)); \ frag->pending_vector = &frag->vector; \ frag->pending_vector->iov_base = frag->fragcurrent; \ priv->incoming.pending_vector = frag->pending_vector; \ } while (0) #define __socket_proto_update_pending(priv) \ do { \ uint32_t remaining; \ struct gf_sock_incoming_frag *frag; \ frag = &priv->incoming.frag; \ if (frag->pending_vector->iov_len == 0) { \ remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr) \ - frag->bytes_read); \ \ frag->pending_vector->iov_len = \ (remaining > frag->remaining_size) \ ? frag->remaining_size : remaining; \ \ frag->remaining_size -= \ frag->pending_vector->iov_len; \ } \ } while (0) #define __socket_proto_update_priv_after_read(priv, ret, bytes_read) \ { \ struct gf_sock_incoming_frag *frag; \ frag = &priv->incoming.frag; \ \ frag->fragcurrent += bytes_read; \ frag->bytes_read += bytes_read; \ \ if ((ret > 0) || (frag->remaining_size != 0)) { \ if (frag->remaining_size != 0 && ret == 0) { \ __socket_proto_reset_pending (priv); \ } \ \ gf_log (this->name, GF_LOG_TRACE, \ "partial read on non-blocking socket"); \ \ break; \ } \ } #define __socket_proto_init_pending(priv,size) \ do { \ uint32_t remaining = 0; \ struct gf_sock_incoming_frag *frag; \ frag = &priv->incoming.frag; \ \ remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr) \ - frag->bytes_read); \ \ __socket_proto_reset_pending (priv); \ \ frag->pending_vector->iov_len = \ (remaining > size) ? size : remaining; \ \ frag->remaining_size = (size - frag->pending_vector->iov_len); \ \ } while(0) /* This will be used in a switch case and breaks from the switch case if all * the pending data is not read. */ #define __socket_proto_read(priv, ret) \ { \ size_t bytes_read = 0; \ struct gf_sock_incoming *in; \ in = &priv->incoming; \ \ __socket_proto_update_pending (priv); \ \ ret = __socket_readv (this, \ in->pending_vector, 1, \ &in->pending_vector, \ &in->pending_count, \ &bytes_read); \ if (ret == -1) \ break; \ __socket_proto_update_priv_after_read (priv, ret, bytes_read); \ } static int socket_init (rpc_transport_t *this); static void ssl_dump_error_stack (const char *caller) { unsigned long errnum = 0; char errbuf[120] = {0,}; /* OpenSSL docs explicitly give 120 as the error-string length. */ while ((errnum = ERR_get_error())) { ERR_error_string(errnum,errbuf); gf_log(caller,GF_LOG_ERROR," %s",errbuf); } } static int ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func) { int r = (-1); struct pollfd pfd = {-1,}; socket_private_t *priv = NULL; GF_VALIDATE_OR_GOTO(this->name,this->private,out); priv = this->private; for (;;) { if (buf) { if (priv->connected == -1) { /* * Fields in the SSL structure (especially * the BIO pointers) are not valid at this * point, so we'll segfault if we pass them * to SSL_read/SSL_write. */ gf_log(this->name,GF_LOG_INFO, "lost connection in %s", __func__); break; } r = func(priv->ssl_ssl,buf,len); } else { /* * We actually need these functions to get to * priv->connected == 1. */ r = ((SSL_unary_func *)func)(priv->ssl_ssl); } switch (SSL_get_error(priv->ssl_ssl,r)) { case SSL_ERROR_NONE: return r; case SSL_ERROR_WANT_READ: pfd.fd = priv->sock; pfd.events = POLLIN; if (poll(&pfd,1,-1) < 0) { gf_log(this->name,GF_LOG_ERROR,"poll error %d", errno); } break; case SSL_ERROR_WANT_WRITE: pfd.fd = priv->sock; pfd.events = POLLOUT; if (poll(&pfd,1,-1) < 0) { gf_log(this->name,GF_LOG_ERROR,"poll error %d", errno); } break; case SSL_ERROR_SYSCALL: /* This is what we get when remote disconnects. */ gf_log(this->name,GF_LOG_DEBUG, "syscall error (probably remote disconnect)"); errno = ENODATA; goto out; default: errno = EIO; goto out; /* "break" would just loop again */ } } out: return -1; } #define ssl_connect_one(t) ssl_do((t),NULL,0,(SSL_trinary_func *)SSL_connect) #define ssl_accept_one(t) ssl_do((t),NULL,0,(SSL_trinary_func *)SSL_accept) #define ssl_read_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read) #define ssl_write_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write) static int ssl_setup_connection (rpc_transport_t *this, int server) { X509 *peer = NULL; char peer_CN[256] = ""; int ret = -1; socket_private_t *priv = NULL; GF_VALIDATE_OR_GOTO(this->name,this->private,done); priv = this->private; priv->ssl_ssl = SSL_new(priv->ssl_ctx); if (!priv->ssl_ssl) { gf_log(this->name,GF_LOG_ERROR,"SSL_new failed"); ssl_dump_error_stack(this->name); goto done; } priv->ssl_sbio = BIO_new_socket(priv->sock,BIO_NOCLOSE); if (!priv->ssl_sbio) { gf_log(this->name,GF_LOG_ERROR,"BIO_new_socket failed"); ssl_dump_error_stack(this->name); goto free_ssl; } SSL_set_bio(priv->ssl_ssl,priv->ssl_sbio,priv->ssl_sbio); if (server) { ret = ssl_accept_one(this); } else { ret = ssl_connect_one(this); } /* Make sure _the call_ succeeded. */ if (ret < 0) { goto ssl_error; } /* Make sure _SSL verification_ succeeded, yielding an identity. */ if (SSL_get_verify_result(priv->ssl_ssl) != X509_V_OK) { goto ssl_error; } peer = SSL_get_peer_certificate(priv->ssl_ssl); if (!peer) { goto ssl_error; } /* Finally, everything seems OK. */ X509_NAME_get_text_by_NID(X509_get_subject_name(peer), NID_commonName, peer_CN, sizeof(peer_CN)-1); peer_CN[sizeof(peer_CN)-1] = '\0'; gf_log(this->name,GF_LOG_INFO,"peer CN = %s", peer_CN); return 0; /* Error paths. */ ssl_error: gf_log(this->name,GF_LOG_ERROR,"SSL connect error"); ssl_dump_error_stack(this->name); free_ssl: SSL_free(priv->ssl_ssl); priv->ssl_ssl = NULL; done: return ret; } static void ssl_teardown_connection (socket_private_t *priv) { SSL_shutdown(priv->ssl_ssl); SSL_clear(priv->ssl_ssl); SSL_free(priv->ssl_ssl); priv->ssl_ssl = NULL; } static ssize_t __socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount) { socket_private_t *priv = NULL; int sock = -1; int ret = -1; priv = this->private; sock = priv->sock; if (priv->use_ssl) { ret = ssl_read_one (this, opvector->iov_base, opvector->iov_len); } else { ret = readv (sock, opvector, IOV_MIN(opcount)); } return ret; } static ssize_t __socket_ssl_read (rpc_transport_t *this, void *buf, size_t count) { struct iovec iov = {0, }; int ret = -1; iov.iov_base = buf; iov.iov_len = count; ret = __socket_ssl_readv (this, &iov, 1); return ret; } static int __socket_cached_read (rpc_transport_t *this, struct iovec *opvector, int opcount) { socket_private_t *priv = NULL; int sock = -1; struct gf_sock_incoming *in = NULL; int req_len = -1; int ret = -1; priv = this->private; sock = priv->sock; in = &priv->incoming; req_len = iov_length (opvector, opcount); if (in->record_state == SP_STATE_READING_FRAGHDR) { in->ra_read = 0; in->ra_served = 0; in->ra_max = 0; in->ra_buf = NULL; goto uncached; } if (!in->ra_max) { /* first call after passing SP_STATE_READING_FRAGHDR */ in->ra_max = min (RPC_FRAGSIZE (in->fraghdr), GF_SOCKET_RA_MAX); /* Note that the in->iobuf is the primary iobuf into which headers are read into. By using this itself as our read-ahead cache, we can avoid memory copies in iov_load */ in->ra_buf = iobuf_ptr (in->iobuf); } /* fill read-ahead */ if (in->ra_read < in->ra_max) { ret = __socket_ssl_read (this, &in->ra_buf[in->ra_read], (in->ra_max - in->ra_read)); if (ret > 0) in->ra_read += ret; /* we proceed to test if there is still cached data to be served even if readahead could not progress */ } /* serve cached */ if (in->ra_served < in->ra_read) { ret = iov_load (opvector, opcount, &in->ra_buf[in->ra_served], min (req_len, (in->ra_read - in->ra_served))); in->ra_served += ret; /* Do not read uncached and cached in the same call */ goto out; } if (in->ra_read < in->ra_max) /* If there was no cached data to be served, (and we are guaranteed to have already performed an attempt to progress readahead above), and we have not yet read out the full readahead capacity, then bail out for now without doing the uncached read below (as that will overtake future cached read) */ goto out; uncached: ret = __socket_ssl_readv (this, opvector, opcount); out: return ret; } static gf_boolean_t __does_socket_rwv_error_need_logging (socket_private_t *priv, int write) { int read = !write; if (priv->connected == -1) /* Didn't even connect, of course it fails */ return _gf_false; if (read && (priv->read_fail_log == _gf_false)) return _gf_false; return _gf_true; } /* * return value: * 0 = success (completed) * -1 = error * > 0 = incomplete */ static int __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, struct iovec **pending_vector, int *pending_count, size_t *bytes, int write) { socket_private_t *priv = NULL; int sock = -1; int ret = -1; struct iovec *opvector = NULL; int opcount = 0; int moved = 0; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; sock = priv->sock; opvector = vector; opcount = count; if (bytes != NULL) { *bytes = 0; } while (opcount > 0) { if (opvector->iov_len == 0) { gf_log(this->name,GF_LOG_DEBUG, "would have passed zero length to read/write"); ++opvector; --opcount; continue; } if (write) { if (priv->use_ssl) { ret = ssl_write_one(this, opvector->iov_base, opvector->iov_len); } else { ret = writev (sock, opvector, IOV_MIN(opcount)); } if (ret == 0 || (ret == -1 && errno == EAGAIN)) { /* done for now */ break; } this->total_bytes_write += ret; } else { ret = __socket_cached_read (this, opvector, opcount); if (ret == 0) { gf_log(this->name,GF_LOG_DEBUG,"EOF on socket"); errno = ENODATA; ret = -1; } if (ret == -1 && errno == EAGAIN) { /* done for now */ break; } this->total_bytes_read += ret; } if (ret == 0) { /* Mostly due to 'umount' in client */ gf_log (this->name, GF_LOG_DEBUG, "EOF from peer %s", this->peerinfo.identifier); opcount = -1; errno = ENOTCONN; break; } if (ret == -1) { if (errno == EINTR) continue; if (__does_socket_rwv_error_need_logging (priv, write)) { gf_log (this->name, GF_LOG_WARNING, "%s on %s failed (%s)", write ? "writev":"readv", this->peerinfo.identifier, strerror (errno)); } if (priv->use_ssl) { ssl_dump_error_stack(this->name); } opcount = -1; break; } if (bytes != NULL) { *bytes += ret; } moved = 0; while (moved < ret) { if (!opcount) { gf_log(this->name,GF_LOG_DEBUG, "ran out of iov, moved %d/%d", moved, ret); goto ran_out; } if (!opvector[0].iov_len) { opvector++; opcount--; continue; } if ((ret - moved) >= opvector[0].iov_len) { moved += opvector[0].iov_len; opvector++; opcount--; } else { opvector[0].iov_len -= (ret - moved); opvector[0].iov_base += (ret - moved); moved += (ret - moved); } } } ran_out: if (pending_vector) *pending_vector = opvector; if (pending_count) *pending_count = opcount; out: return opcount; } static int __socket_readv (rpc_transport_t *this, struct iovec *vector, int count, struct iovec **pending_vector, int *pending_count, size_t *bytes) { int ret = -1; ret = __socket_rwv (this, vector, count, pending_vector, pending_count, bytes, 0); return ret; } static int __socket_writev (rpc_transport_t *this, struct iovec *vector, int count, struct iovec **pending_vector, int *pending_count) { int ret = -1; ret = __socket_rwv (this, vector, count, pending_vector, pending_count, NULL, 1); return ret; } static int __socket_shutdown (rpc_transport_t *this) { int ret = -1; socket_private_t *priv = this->private; priv->connected = -1; ret = shutdown (priv->sock, SHUT_RDWR); if (ret) { /* its already disconnected.. no need to understand why it failed to shutdown in normal cases */ gf_log (this->name, GF_LOG_DEBUG, "shutdown() returned %d. %s", ret, strerror (errno)); } return ret; } static int __socket_disconnect (rpc_transport_t *this) { int ret = -1; socket_private_t *priv = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; gf_log (this->name, GF_LOG_TRACE, "disconnecting %p, state=%u gen=%u sock=%d", this, priv->ot_state, priv->ot_gen, priv->sock); if (priv->sock != -1) { ret = __socket_shutdown(this); if (priv->own_thread) { /* * Without this, reconnect (= disconnect + connect) * won't work except by accident. */ close(priv->sock); priv->sock = -1; gf_log (this->name, GF_LOG_TRACE, "OT_PLEASE_DIE on %p", this); priv->ot_state = OT_PLEASE_DIE; } else if (priv->use_ssl) { ssl_teardown_connection(priv); } } out: return ret; } static int __socket_server_bind (rpc_transport_t *this) { socket_private_t *priv = NULL; int ret = -1; int opt = 1; int reuse_check_sock = -1; struct sockaddr_storage unix_addr = {0}; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt)); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "setsockopt() for SO_REUSEADDR failed (%s)", strerror (errno)); } /* reuse-address doesn't work for unix type sockets */ if (AF_UNIX == SA (&this->myinfo.sockaddr)->sa_family) { memcpy (&unix_addr, SA (&this->myinfo.sockaddr), this->myinfo.sockaddr_len); reuse_check_sock = socket (AF_UNIX, SOCK_STREAM, 0); if (reuse_check_sock >= 0) { ret = connect (reuse_check_sock, SA (&unix_addr), this->myinfo.sockaddr_len); if ((ret == -1) && (ECONNREFUSED == errno)) { unlink (((struct sockaddr_un*)&unix_addr)->sun_path); } close (reuse_check_sock); } } ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr, this->myinfo.sockaddr_len); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "binding to %s failed: %s", this->myinfo.identifier, strerror (errno)); if (errno == EADDRINUSE) { gf_log (this->name, GF_LOG_ERROR, "Port is already in use"); } } out: return ret; } static int __socket_nonblock (int fd) { int flags = 0; int ret = -1; flags = fcntl (fd, F_GETFL); if (flags != -1) ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK); return ret; } static int __socket_nodelay (int fd) { int on = 1; int ret = -1; ret = setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); if (!ret) gf_log (THIS->name, GF_LOG_TRACE, "NODELAY enabled for socket %d", fd); return ret; } static int __socket_keepalive (int fd, int family, int keepalive_intvl, int keepalive_idle) { int on = 1; int ret = -1; ret = setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof (on)); if (ret == -1) { gf_log ("socket", GF_LOG_WARNING, "failed to set keep alive option on socket %d", fd); goto err; } if (keepalive_intvl == GF_USE_DEFAULT_KEEPALIVE) goto done; #if !defined(GF_LINUX_HOST_OS) && !defined(__NetBSD__) #ifdef GF_SOLARIS_HOST_OS ret = setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive_intvl, sizeof (keepalive_intvl)); #else ret = setsockopt (fd, IPPROTO_TCP, TCP_KEEPALIVE, &keepalive_intvl, sizeof (keepalive_intvl)); #endif if (ret == -1) { gf_log ("socket", GF_LOG_WARNING, "failed to set keep alive interval on socket %d", fd); goto err; } #else if (family != AF_INET) goto done; ret = setsockopt (fd, IPPROTO_TCP, TCP_KEEPIDLE, &keepalive_idle, sizeof (keepalive_intvl)); if (ret == -1) { gf_log ("socket", GF_LOG_WARNING, "failed to set keep idle %d on socket %d, %s", keepalive_idle, fd, strerror(errno)); goto err; } ret = setsockopt (fd, IPPROTO_TCP , TCP_KEEPINTVL, &keepalive_intvl, sizeof (keepalive_intvl)); if (ret == -1) { gf_log ("socket", GF_LOG_WARNING, "failed to set keep interval %d on socket %d, %s", keepalive_intvl, fd, strerror(errno)); goto err; } #endif done: gf_log (THIS->name, GF_LOG_TRACE, "Keep-alive enabled for socket %d, interval " "%d, idle: %d", fd, keepalive_intvl, keepalive_idle); err: return ret; } static int __socket_connect_finish (int fd) { int ret = -1; int optval = 0; socklen_t optlen = sizeof (int); ret = getsockopt (fd, SOL_SOCKET, SO_ERROR, (void *)&optval, &optlen); if (ret == 0 && optval) { errno = optval; ret = -1; } return ret; } static void __socket_reset (rpc_transport_t *this) { socket_private_t *priv = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; /* TODO: use mem-pool on incoming data */ if (priv->incoming.iobref) { iobref_unref (priv->incoming.iobref); priv->incoming.iobref = NULL; } if (priv->incoming.iobuf) { iobuf_unref (priv->incoming.iobuf); } GF_FREE (priv->incoming.request_info); memset (&priv->incoming, 0, sizeof (priv->incoming)); event_unregister (this->ctx->event_pool, priv->sock, priv->idx); close (priv->sock); priv->sock = -1; priv->idx = -1; priv->connected = -1; out: return; } static void socket_set_lastfrag (uint32_t *fragsize) { (*fragsize) |= 0x80000000U; } static void socket_set_frag_header_size (uint32_t size, char *haddr) { size = htonl (size); memcpy (haddr, &size, sizeof (size)); } static void socket_set_last_frag_header_size (uint32_t size, char *haddr) { socket_set_lastfrag (&size); socket_set_frag_header_size (size, haddr); } static struct ioq * __socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg) { struct ioq *entry = NULL; int count = 0; uint32_t size = 0; GF_VALIDATE_OR_GOTO ("socket", this, out); /* TODO: use mem-pool */ entry = GF_CALLOC (1, sizeof (*entry), gf_common_mt_ioq); if (!entry) return NULL; count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount; GF_ASSERT (count <= (MAX_IOVEC - 1)); size = iov_length (msg->rpchdr, msg->rpchdrcount) + iov_length (msg->proghdr, msg->proghdrcount) + iov_length (msg->progpayload, msg->progpayloadcount); if (size > RPC_MAX_FRAGMENT_SIZE) { gf_log (this->name, GF_LOG_ERROR, "msg size (%u) bigger than the maximum allowed size on " "sockets (%u)", size, RPC_MAX_FRAGMENT_SIZE); GF_FREE (entry); return NULL; } socket_set_last_frag_header_size (size, (char *)&entry->fraghdr); entry->vector[0].iov_base = (char *)&entry->fraghdr; entry->vector[0].iov_len = sizeof (entry->fraghdr); entry->count = 1; if (msg->rpchdr != NULL) { memcpy (&entry->vector[1], msg->rpchdr, sizeof (struct iovec) * msg->rpchdrcount); entry->count += msg->rpchdrcount; } if (msg->proghdr != NULL) { memcpy (&entry->vector[entry->count], msg->proghdr, sizeof (struct iovec) * msg->proghdrcount); entry->count += msg->proghdrcount; } if (msg->progpayload != NULL) { memcpy (&entry->vector[entry->count], msg->progpayload, sizeof (struct iovec) * msg->progpayloadcount); entry->count += msg->progpayloadcount; } entry->pending_vector = entry->vector; entry->pending_count = entry->count; if (msg->iobref != NULL) entry->iobref = iobref_ref (msg->iobref); INIT_LIST_HEAD (&entry->list); out: return entry; } static void __socket_ioq_entry_free (struct ioq *entry) { GF_VALIDATE_OR_GOTO ("socket", entry, out); list_del_init (&entry->list); if (entry->iobref) iobref_unref (entry->iobref); /* TODO: use mem-pool */ GF_FREE (entry); out: return; } static void __socket_ioq_flush (rpc_transport_t *this) { socket_private_t *priv = NULL; struct ioq *entry = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; while (!list_empty (&priv->ioq)) { entry = priv->ioq_next; __socket_ioq_entry_free (entry); } out: return; } static int __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct) { int ret = -1; socket_private_t *priv = NULL; char a_byte = 0; ret = __socket_writev (this, entry->pending_vector, entry->pending_count, &entry->pending_vector, &entry->pending_count); if (ret == 0) { /* current entry was completely written */ GF_ASSERT (entry->pending_count == 0); __socket_ioq_entry_free (entry); priv = this->private; if (priv->own_thread) { /* * The pipe should only remain readable if there are * more entries after this, so drain the byte * representing this entry. */ if (!direct && read(priv->pipe[0],&a_byte,1) < 1) { gf_log(this->name,GF_LOG_WARNING, "read error on pipe"); } } } return ret; } static int __socket_ioq_churn (rpc_transport_t *this) { socket_private_t *priv = NULL; int ret = 0; struct ioq *entry = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; while (!list_empty (&priv->ioq)) { /* pick next entry */ entry = priv->ioq_next; ret = __socket_ioq_churn_entry (this, entry, 0); if (ret != 0) break; } if (!priv->own_thread && list_empty (&priv->ioq)) { /* all pending writes done, not interested in POLLOUT */ priv->idx = event_select_on (this->ctx->event_pool, priv->sock, priv->idx, -1, 0); } out: return ret; } static int socket_event_poll_err (rpc_transport_t *this) { socket_private_t *priv = NULL; int ret = -1; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; pthread_mutex_lock (&priv->lock); { __socket_ioq_flush (this); __socket_reset (this); } pthread_mutex_unlock (&priv->lock); rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); out: return ret; } static int socket_event_poll_out (rpc_transport_t *this) { socket_private_t *priv = NULL; int ret = -1; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; pthread_mutex_lock (&priv->lock); { if (priv->connected == 1) { ret = __socket_ioq_churn (this); if (ret == -1) { __socket_disconnect (this); } } } pthread_mutex_unlock (&priv->lock); ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL); out: return ret; } static inline int __socket_read_simple_msg (rpc_transport_t *this) { int ret = 0; uint32_t remaining_size = 0; size_t bytes_read = 0; socket_private_t *priv = NULL; struct gf_sock_incoming *in = NULL; struct gf_sock_incoming_frag *frag = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; in = &priv->incoming; frag = &in->frag; switch (frag->simple_state) { case SP_STATE_SIMPLE_MSG_INIT: remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; __socket_proto_init_pending (priv, remaining_size); frag->simple_state = SP_STATE_READING_SIMPLE_MSG; /* fall through */ case SP_STATE_READING_SIMPLE_MSG: ret = 0; remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; if (remaining_size > 0) { ret = __socket_readv (this, in->pending_vector, 1, &in->pending_vector, &in->pending_count, &bytes_read); } if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, "reading from socket failed. Error (%s), " "peer (%s)", strerror (errno), this->peerinfo.identifier); break; } frag->bytes_read += bytes_read; frag->fragcurrent += bytes_read; if (ret > 0) { gf_log (this->name, GF_LOG_TRACE, "partial read on non-blocking socket."); break; } if (ret == 0) { frag->simple_state = SP_STATE_SIMPLE_MSG_INIT; } } out: return ret; } static inline int __socket_read_simple_request (rpc_transport_t *this) { return __socket_read_simple_msg (this); } #define rpc_cred_addr(buf) (buf + RPC_MSGTYPE_SIZE + RPC_CALL_BODY_SIZE - 4) #define rpc_verf_addr(fragcurrent) (fragcurrent - 4) #define rpc_msgtype_addr(buf) (buf + 4) #define rpc_prognum_addr(buf) (buf + RPC_MSGTYPE_SIZE + 4) #define rpc_progver_addr(buf) (buf + RPC_MSGTYPE_SIZE + 8) #define rpc_procnum_addr(buf) (buf + RPC_MSGTYPE_SIZE + 12) static inline int __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vector_sizer) { socket_private_t *priv = NULL; int ret = 0; uint32_t credlen = 0, verflen = 0; char *addr = NULL; struct iobuf *iobuf = NULL; uint32_t remaining_size = 0; ssize_t readsize = 0; size_t size = 0; struct gf_sock_incoming *in = NULL; struct gf_sock_incoming_frag *frag = NULL; sp_rpcfrag_request_state_t *request = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; /* used to reduce the indirection */ in = &priv->incoming; frag = &in->frag; request = &frag->call_body.request; switch (request->vector_state) { case SP_STATE_VECTORED_REQUEST_INIT: request->vector_sizer_state = 0; addr = rpc_cred_addr (iobuf_ptr (in->iobuf)); /* also read verf flavour and verflen */ credlen = ntoh32 (*((uint32_t *)addr)) + RPC_AUTH_FLAVOUR_N_LENGTH_SIZE; __socket_proto_init_pending (priv, credlen); request->vector_state = SP_STATE_READING_CREDBYTES; /* fall through */ case SP_STATE_READING_CREDBYTES: __socket_proto_read (priv, ret); request->vector_state = SP_STATE_READ_CREDBYTES; /* fall through */ case SP_STATE_READ_CREDBYTES: addr = rpc_verf_addr (frag->fragcurrent); verflen = ntoh32 (*((uint32_t *)addr)); if (verflen == 0) { request->vector_state = SP_STATE_READ_VERFBYTES; goto sp_state_read_verfbytes; } __socket_proto_init_pending (priv, verflen); request->vector_state = SP_STATE_READING_VERFBYTES; /* fall through */ case SP_STATE_READING_VERFBYTES: __socket_proto_read (priv, ret); request->vector_state = SP_STATE_READ_VERFBYTES; /* fall through */ case SP_STATE_READ_VERFBYTES: sp_state_read_verfbytes: /* set the base_addr 'persistently' across multiple calls into the state machine */ in->proghdr_base_addr = frag->fragcurrent; request->vector_sizer_state = vector_sizer (request->vector_sizer_state, &readsize, in->proghdr_base_addr, frag->fragcurrent); __socket_proto_init_pending (priv, readsize); request->vector_state = SP_STATE_READING_PROGHDR; /* fall through */ case SP_STATE_READING_PROGHDR: __socket_proto_read (priv, ret); request->vector_state = SP_STATE_READ_PROGHDR; /* fall through */ case SP_STATE_READ_PROGHDR: sp_state_read_proghdr: request->vector_sizer_state = vector_sizer (request->vector_sizer_state, &readsize, in->proghdr_base_addr, frag->fragcurrent); if (readsize == 0) { request->vector_state = SP_STATE_READ_PROGHDR_XDATA; goto sp_state_read_proghdr_xdata; } __socket_proto_init_pending (priv, readsize); request->vector_state = SP_STATE_READING_PROGHDR_XDATA; /* fall through */ case SP_STATE_READING_PROGHDR_XDATA: __socket_proto_read (priv, ret); request->vector_state = SP_STATE_READ_PROGHDR; /* check if the vector_sizer() has more to say */ goto sp_state_read_proghdr; case SP_STATE_READ_PROGHDR_XDATA: sp_state_read_proghdr_xdata: if (in->payload_vector.iov_base == NULL) { size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; iobuf = iobuf_get2 (this->ctx->iobuf_pool, size); if (!iobuf) { ret = -1; break; } if (in->iobref == NULL) { in->iobref = iobref_new (); if (in->iobref == NULL) { ret = -1; iobuf_unref (iobuf); break; } } iobref_add (in->iobref, iobuf); iobuf_unref (iobuf); in->payload_vector.iov_base = iobuf_ptr (iobuf); frag->fragcurrent = iobuf_ptr (iobuf); } request->vector_state = SP_STATE_READING_PROG; /* fall through */ case SP_STATE_READING_PROG: /* now read the remaining rpc msg into buffer pointed by * fragcurrent */ ret = __socket_read_simple_msg (this); remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; if ((ret == -1) || ((ret == 0) && (remaining_size == 0) && RPC_LASTFRAG (in->fraghdr))) { request->vector_state = SP_STATE_VECTORED_REQUEST_INIT; in->payload_vector.iov_len = ((unsigned long)frag->fragcurrent - (unsigned long)in->payload_vector.iov_base); } break; } out: return ret; } static inline int __socket_read_request (rpc_transport_t *this) { socket_private_t *priv = NULL; uint32_t prognum = 0, procnum = 0, progver = 0; uint32_t remaining_size = 0; int ret = -1; char *buf = NULL; rpcsvc_vector_sizer vector_sizer = NULL; struct gf_sock_incoming *in = NULL; struct gf_sock_incoming_frag *frag = NULL; sp_rpcfrag_request_state_t *request = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; /* used to reduce the indirection */ in = &priv->incoming; frag = &in->frag; request = &frag->call_body.request; switch (request->header_state) { case SP_STATE_REQUEST_HEADER_INIT: __socket_proto_init_pending (priv, RPC_CALL_BODY_SIZE); request->header_state = SP_STATE_READING_RPCHDR1; /* fall through */ case SP_STATE_READING_RPCHDR1: __socket_proto_read (priv, ret); request->header_state = SP_STATE_READ_RPCHDR1; /* fall through */ case SP_STATE_READ_RPCHDR1: buf = rpc_prognum_addr (iobuf_ptr (in->iobuf)); prognum = ntoh32 (*((uint32_t *)buf)); buf = rpc_progver_addr (iobuf_ptr (in->iobuf)); progver = ntoh32 (*((uint32_t *)buf)); buf = rpc_procnum_addr (iobuf_ptr (in->iobuf)); procnum = ntoh32 (*((uint32_t *)buf)); if (priv->is_server) { /* this check is needed as rpcsvc and rpc-clnt * actor structures are not same */ vector_sizer = rpcsvc_get_program_vector_sizer ((rpcsvc_t *)this->mydata, prognum, progver, procnum); } if (vector_sizer) { ret = __socket_read_vectored_request (this, vector_sizer); } else { ret = __socket_read_simple_request (this); } remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; if ((ret == -1) || ((ret == 0) && (remaining_size == 0) && (RPC_LASTFRAG (in->fraghdr)))) { request->header_state = SP_STATE_REQUEST_HEADER_INIT; } break; } out: return ret; } static inline int __socket_read_accepted_successful_reply (rpc_transport_t *this) { socket_private_t *priv = NULL; int ret = 0; struct iobuf *iobuf = NULL; gfs3_read_rsp read_rsp = {0, }; ssize_t size = 0; ssize_t default_read_size = 0; char *proghdr_buf = NULL; XDR xdr; struct gf_sock_incoming *in = NULL; struct gf_sock_incoming_frag *frag = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; /* used to reduce the indirection */ in = &priv->incoming; frag = &in->frag; switch (frag->call_body.reply.accepted_success_state) { case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT: default_read_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp, &read_rsp); proghdr_buf = frag->fragcurrent; __socket_proto_init_pending (priv, default_read_size); frag->call_body.reply.accepted_success_state = SP_STATE_READING_PROC_HEADER; /* fall through */ case SP_STATE_READING_PROC_HEADER: __socket_proto_read (priv, ret); /* there can be 'xdata' in read response, figure it out */ xdrmem_create (&xdr, proghdr_buf, default_read_size, XDR_DECODE); /* This will fail if there is xdata sent from server, if not, well and good, we don't need to worry about */ xdr_gfs3_read_rsp (&xdr, &read_rsp); free (read_rsp.xdata.xdata_val); /* need to round off to proper roof (%4), as XDR packing pads the end of opaque object with '0' */ size = roof (read_rsp.xdata.xdata_len, 4); if (!size) { frag->call_body.reply.accepted_success_state = SP_STATE_READ_PROC_OPAQUE; goto read_proc_opaque; } __socket_proto_init_pending (priv, size); frag->call_body.reply.accepted_success_state = SP_STATE_READING_PROC_OPAQUE; case SP_STATE_READING_PROC_OPAQUE: __socket_proto_read (priv, ret); frag->call_body.reply.accepted_success_state = SP_STATE_READ_PROC_OPAQUE; case SP_STATE_READ_PROC_OPAQUE: read_proc_opaque: if (in->payload_vector.iov_base == NULL) { size = (RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read); iobuf = iobuf_get2 (this->ctx->iobuf_pool, size); if (iobuf == NULL) { ret = -1; goto out; } if (in->iobref == NULL) { in->iobref = iobref_new (); if (in->iobref == NULL) { ret = -1; iobuf_unref (iobuf); goto out; } } iobref_add (in->iobref, iobuf); iobuf_unref (iobuf); in->payload_vector.iov_base = iobuf_ptr (iobuf); in->payload_vector.iov_len = size; } frag->fragcurrent = in->payload_vector.iov_base; frag->call_body.reply.accepted_success_state = SP_STATE_READ_PROC_HEADER; /* fall through */ case SP_STATE_READ_PROC_HEADER: /* now read the entire remaining msg into new iobuf */ ret = __socket_read_simple_msg (this); if ((ret == -1) || ((ret == 0) && RPC_LASTFRAG (in->fraghdr))) { frag->call_body.reply.accepted_success_state = SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT; } break; } out: return ret; } #define rpc_reply_verflen_addr(fragcurrent) ((char *)fragcurrent - 4) #define rpc_reply_accept_status_addr(fragcurrent) ((char *)fragcurrent - 4) static inline int __socket_read_accepted_reply (rpc_transport_t *this) { socket_private_t *priv = NULL; int ret = -1; char *buf = NULL; uint32_t verflen = 0, len = 0; uint32_t remaining_size = 0; struct gf_sock_incoming *in = NULL; struct gf_sock_incoming_frag *frag = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; /* used to reduce the indirection */ in = &priv->incoming; frag = &in->frag; switch (frag->call_body.reply.accepted_state) { case SP_STATE_ACCEPTED_REPLY_INIT: __socket_proto_init_pending (priv, RPC_AUTH_FLAVOUR_N_LENGTH_SIZE); frag->call_body.reply.accepted_state = SP_STATE_READING_REPLY_VERFLEN; /* fall through */ case SP_STATE_READING_REPLY_VERFLEN: __socket_proto_read (priv, ret); frag->call_body.reply.accepted_state = SP_STATE_READ_REPLY_VERFLEN; /* fall through */ case SP_STATE_READ_REPLY_VERFLEN: buf = rpc_reply_verflen_addr (frag->fragcurrent); verflen = ntoh32 (*((uint32_t *) buf)); /* also read accept status along with verf data */ len = verflen + RPC_ACCEPT_STATUS_LEN; __socket_proto_init_pending (priv, len); frag->call_body.reply.accepted_state = SP_STATE_READING_REPLY_VERFBYTES; /* fall through */ case SP_STATE_READING_REPLY_VERFBYTES: __socket_proto_read (priv, ret); frag->call_body.reply.accepted_state = SP_STATE_READ_REPLY_VERFBYTES; buf = rpc_reply_accept_status_addr (frag->fragcurrent); frag->call_body.reply.accept_status = ntoh32 (*(uint32_t *) buf); /* fall through */ case SP_STATE_READ_REPLY_VERFBYTES: if (frag->call_body.reply.accept_status == SUCCESS) { ret = __socket_read_accepted_successful_reply (this); } else { /* read entire remaining msg into buffer pointed to by * fragcurrent */ ret = __socket_read_simple_msg (this); } remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; if ((ret == -1) || ((ret == 0) && (remaining_size == 0) && (RPC_LASTFRAG (in->fraghdr)))) { frag->call_body.reply.accepted_state = SP_STATE_ACCEPTED_REPLY_INIT; } break; } out: return ret; } static inline int __socket_read_denied_reply (rpc_transport_t *this) { return __socket_read_simple_msg (this); } #define rpc_reply_status_addr(fragcurrent) ((char *)fragcurrent - 4) static inline int __socket_read_vectored_reply (rpc_transport_t *this) { socket_private_t *priv = NULL; int ret = 0; char *buf = NULL; uint32_t remaining_size = 0; struct gf_sock_incoming *in = NULL; struct gf_sock_incoming_frag *frag = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; in = &priv->incoming; frag = &in->frag; switch (frag->call_body.reply.status_state) { case SP_STATE_ACCEPTED_REPLY_INIT: __socket_proto_init_pending (priv, RPC_REPLY_STATUS_SIZE); frag->call_body.reply.status_state = SP_STATE_READING_REPLY_STATUS; /* fall through */ case SP_STATE_READING_REPLY_STATUS: __socket_proto_read (priv, ret); buf = rpc_reply_status_addr (frag->fragcurrent); frag->call_body.reply.accept_status = ntoh32 (*((uint32_t *) buf)); frag->call_body.reply.status_state = SP_STATE_READ_REPLY_STATUS; /* fall through */ case SP_STATE_READ_REPLY_STATUS: if (frag->call_body.reply.accept_status == MSG_ACCEPTED) { ret = __socket_read_accepted_reply (this); } else { ret = __socket_read_denied_reply (this); } remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; if ((ret == -1) || ((ret == 0) && (remaining_size == 0) && (RPC_LASTFRAG (in->fraghdr)))) { frag->call_body.reply.status_state = SP_STATE_ACCEPTED_REPLY_INIT; in->payload_vector.iov_len = (unsigned long)frag->fragcurrent - (unsigned long)in->payload_vector.iov_base; } break; } out: return ret; } static inline int __socket_read_simple_reply (rpc_transport_t *this) { return __socket_read_simple_msg (this); } #define rpc_xid_addr(buf) (buf) static inline int __socket_read_reply (rpc_transport_t *this) { socket_private_t *priv = NULL; char *buf = NULL; int32_t ret = -1; rpc_request_info_t *request_info = NULL; char map_xid = 0; struct gf_sock_incoming *in = NULL; struct gf_sock_incoming_frag *frag = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; in = &priv->incoming; frag = &in->frag; buf = rpc_xid_addr (iobuf_ptr (in->iobuf)); if (in->request_info == NULL) { in->request_info = GF_CALLOC (1, sizeof (*request_info), gf_common_mt_rpc_trans_reqinfo_t); if (in->request_info == NULL) { goto out; } map_xid = 1; } request_info = in->request_info; if (map_xid) { request_info->xid = ntoh32 (*((uint32_t *) buf)); /* release priv->lock, so as to avoid deadlock b/w conn->lock * and priv->lock, since we are doing an upcall here. */ pthread_mutex_unlock (&priv->lock); { ret = rpc_transport_notify (this, RPC_TRANSPORT_MAP_XID_REQUEST, in->request_info); } pthread_mutex_lock (&priv->lock); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, "notify for event MAP_XID failed"); goto out; } } if ((request_info->prognum == GLUSTER_FOP_PROGRAM) && (request_info->procnum == GF_FOP_READ)) { if (map_xid && request_info->rsp.rsp_payload_count != 0) { in->iobref = iobref_ref (request_info->rsp.rsp_iobref); in->payload_vector = *request_info->rsp.rsp_payload; } ret = __socket_read_vectored_reply (this); } else { ret = __socket_read_simple_reply (this); } out: return ret; } /* returns the number of bytes yet to be read in a fragment */ static inline int __socket_read_frag (rpc_transport_t *this) { socket_private_t *priv = NULL; int32_t ret = 0; char *buf = NULL; uint32_t remaining_size = 0; struct gf_sock_incoming *in = NULL; struct gf_sock_incoming_frag *frag = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; /* used to reduce the indirection */ in = &priv->incoming; frag = &in->frag; switch (frag->state) { case SP_STATE_NADA: __socket_proto_init_pending (priv, RPC_MSGTYPE_SIZE); frag->state = SP_STATE_READING_MSGTYPE; /* fall through */ case SP_STATE_READING_MSGTYPE: __socket_proto_read (priv, ret); frag->state = SP_STATE_READ_MSGTYPE; /* fall through */ case SP_STATE_READ_MSGTYPE: buf = rpc_msgtype_addr (iobuf_ptr (in->iobuf)); in->msg_type = ntoh32 (*((uint32_t *)buf)); if (in->msg_type == CALL) { ret = __socket_read_request (this); } else if (in->msg_type == REPLY) { ret = __socket_read_reply (this); } else if (in->msg_type == GF_UNIVERSAL_ANSWER) { gf_log ("rpc", GF_LOG_ERROR, "older version of protocol/process trying to " "connect from %s. use newer version on that node", this->peerinfo.identifier); } else { gf_log ("rpc", GF_LOG_ERROR, "wrong MSG-TYPE (%d) received from %s", in->msg_type, this->peerinfo.identifier); ret = -1; } remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; if ((ret == -1) || ((ret == 0) && (remaining_size == 0) && (RPC_LASTFRAG (in->fraghdr)))) { frag->state = SP_STATE_NADA; } break; } out: return ret; } static inline void __socket_reset_priv (socket_private_t *priv) { struct gf_sock_incoming *in = NULL; /* used to reduce the indirection */ in = &priv->incoming; if (in->iobref) { iobref_unref (in->iobref); in->iobref = NULL; } if (in->iobuf) { iobuf_unref (in->iobuf); } if (in->request_info != NULL) { GF_FREE (in->request_info); in->request_info = NULL; } memset (&in->payload_vector, 0, sizeof (in->payload_vector)); in->iobuf = NULL; } static int __socket_proto_state_machine (rpc_transport_t *this, rpc_transport_pollin_t **pollin) { int ret = -1; socket_private_t *priv = NULL; struct iobuf *iobuf = NULL; struct iobref *iobref = NULL; struct iovec vector[2]; struct gf_sock_incoming *in = NULL; struct gf_sock_incoming_frag *frag = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; /* used to reduce the indirection */ in = &priv->incoming; frag = &in->frag; while (in->record_state != SP_STATE_COMPLETE) { switch (in->record_state) { case SP_STATE_NADA: in->total_bytes_read = 0; in->payload_vector.iov_len = 0; in->pending_vector = in->vector; in->pending_vector->iov_base = &in->fraghdr; in->pending_vector->iov_len = sizeof (in->fraghdr); in->record_state = SP_STATE_READING_FRAGHDR; /* fall through */ case SP_STATE_READING_FRAGHDR: ret = __socket_readv (this, in->pending_vector, 1, &in->pending_vector, &in->pending_count, NULL); if (ret == -1) goto out; if (ret > 0) { gf_log (this->name, GF_LOG_TRACE, "partial " "fragment header read"); goto out; } if (ret == 0) { in->record_state = SP_STATE_READ_FRAGHDR; } /* fall through */ case SP_STATE_READ_FRAGHDR: in->fraghdr = ntoh32 (in->fraghdr); in->total_bytes_read += RPC_FRAGSIZE(in->fraghdr); iobuf = iobuf_get2 (this->ctx->iobuf_pool, (in->total_bytes_read + sizeof (in->fraghdr))); if (!iobuf) { ret = -ENOMEM; goto out; } in->iobuf = iobuf; in->iobuf_size = 0; frag->fragcurrent = iobuf_ptr (iobuf); in->record_state = SP_STATE_READING_FRAG; /* fall through */ case SP_STATE_READING_FRAG: ret = __socket_read_frag (this); if ((ret == -1) || (frag->bytes_read != RPC_FRAGSIZE (in->fraghdr))) { goto out; } frag->bytes_read = 0; if (!RPC_LASTFRAG (in->fraghdr)) { in->record_state = SP_STATE_READING_FRAGHDR; break; } /* we've read the entire rpc record, notify the * upper layers. */ if (pollin != NULL) { int count = 0; in->iobuf_size = (in->total_bytes_read - in->payload_vector.iov_len); memset (vector, 0, sizeof (vector)); if (in->iobref == NULL) { in->iobref = iobref_new (); if (in->iobref == NULL) { ret = -1; goto out; } } vector[count].iov_base = iobuf_ptr (in->iobuf); vector[count].iov_len = in->iobuf_size; iobref = in->iobref; count++; if (in->payload_vector.iov_base != NULL) { vector[count] = in->payload_vector; count++; } *pollin = rpc_transport_pollin_alloc (this, vector, count, in->iobuf, iobref, in->request_info); iobuf_unref (in->iobuf); in->iobuf = NULL; if (*pollin == NULL) { gf_log (this->name, GF_LOG_WARNING, "transport pollin allocation failed"); ret = -1; goto out; } if (in->msg_type == REPLY) (*pollin)->is_reply = 1; in->request_info = NULL; } in->record_state = SP_STATE_COMPLETE; break; case SP_STATE_COMPLETE: /* control should not reach here */ gf_log (this->name, GF_LOG_WARNING, "control reached to " "SP_STATE_COMPLETE, which should not have " "happened"); break; } } if (in->record_state == SP_STATE_COMPLETE) { in->record_state = SP_STATE_NADA; __socket_reset_priv (priv); } out: if ((ret == -1) && (errno == EAGAIN)) { ret = 0; } return ret; } static int socket_proto_state_machine (rpc_transport_t *this, rpc_transport_pollin_t **pollin) { socket_private_t *priv = NULL; int ret = 0; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; pthread_mutex_lock (&priv->lock); { ret = __socket_proto_state_machine (this, pollin); } pthread_mutex_unlock (&priv->lock); out: return ret; } static int socket_event_poll_in (rpc_transport_t *this) { int ret = -1; rpc_transport_pollin_t *pollin = NULL; socket_private_t *priv = this->private; ret = socket_proto_state_machine (this, &pollin); if (pollin != NULL) { priv->ot_state = OT_CALLBACK; ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, pollin); if (priv->ot_state == OT_CALLBACK) { priv->ot_state = OT_RUNNING; } rpc_transport_pollin_destroy (pollin); } return ret; } static int socket_connect_finish (rpc_transport_t *this) { int ret = -1; socket_private_t *priv = NULL; rpc_transport_event_t event = 0; char notify_rpc = 0; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; pthread_mutex_lock (&priv->lock); { if (priv->connected != 0) goto unlock; get_transport_identifiers (this); ret = __socket_connect_finish (priv->sock); if (ret == -1 && errno == EINPROGRESS) ret = 1; if (ret == -1 && errno != EINPROGRESS) { if (!priv->connect_finish_log) { gf_log (this->name, GF_LOG_ERROR, "connection to %s failed (%s)", this->peerinfo.identifier, strerror (errno)); priv->connect_finish_log = 1; } __socket_disconnect (this); goto unlock; } if (ret == 0) { notify_rpc = 1; this->myinfo.sockaddr_len = sizeof (this->myinfo.sockaddr); ret = getsockname (priv->sock, SA (&this->myinfo.sockaddr), &this->myinfo.sockaddr_len); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, "getsockname on (%d) failed (%s)", priv->sock, strerror (errno)); __socket_disconnect (this); event = GF_EVENT_POLLERR; goto unlock; } priv->connected = 1; priv->connect_finish_log = 0; event = RPC_TRANSPORT_CONNECT; } } unlock: pthread_mutex_unlock (&priv->lock); if (notify_rpc) { rpc_transport_notify (this, event, this); } out: return ret; } /* reads rpc_requests during pollin */ static int socket_event_handler (int fd, int idx, void *data, int poll_in, int poll_out, int poll_err) { rpc_transport_t *this = NULL; socket_private_t *priv = NULL; int ret = -1; this = data; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); GF_VALIDATE_OR_GOTO ("socket", this->xl, out); THIS = this->xl; priv = this->private; pthread_mutex_lock (&priv->lock); { priv->idx = idx; } pthread_mutex_unlock (&priv->lock); ret = (priv->connected == 1) ? 0 : socket_connect_finish(this); if (!ret && poll_out) { ret = socket_event_poll_out (this); } if (!ret && poll_in) { ret = socket_event_poll_in (this); } if ((ret < 0) || poll_err) { /* Logging has happened already in earlier cases */ gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG), "disconnecting now"); socket_event_poll_err (this); rpc_transport_unref (this); } out: return ret; } static void * socket_poller (void *ctx) { rpc_transport_t *this = ctx; socket_private_t *priv = this->private; struct pollfd pfd[2] = {{0,},}; gf_boolean_t to_write = _gf_false; int ret = 0; uint32_t gen = 0; priv->ot_state = OT_RUNNING; if (priv->use_ssl) { if (ssl_setup_connection(this,priv->connected) < 0) { gf_log (this->name,GF_LOG_ERROR, "%s setup failed", priv->connected ? "server" : "client"); goto err; } } if (!priv->bio) { ret = __socket_nonblock (priv->sock); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, "NBIO on %d failed (%s)", priv->sock, strerror (errno)); goto err; } } if (priv->connected == 0) { THIS = this->xl; ret = socket_connect_finish (this); if (ret != 0) { gf_log (this->name, GF_LOG_WARNING, "asynchronous socket_connect_finish failed"); } } ret = rpc_transport_notify (this->listener, RPC_TRANSPORT_ACCEPT, this); if (ret != 0) { gf_log (this->name, GF_LOG_WARNING, "asynchronous rpc_transport_notify failed"); } gen = priv->ot_gen; for (;;) { pthread_mutex_lock(&priv->lock); to_write = !list_empty(&priv->ioq); pthread_mutex_unlock(&priv->lock); pfd[0].fd = priv->pipe[0]; pfd[0].events = POLL_MASK_ERROR; pfd[0].revents = 0; pfd[1].fd = priv->sock; pfd[1].events = POLL_MASK_INPUT | POLL_MASK_ERROR; pfd[1].revents = 0; if (to_write) { pfd[1].events |= POLL_MASK_OUTPUT; } else { pfd[0].events |= POLL_MASK_INPUT; } if (poll(pfd,2,-1) < 0) { gf_log(this->name,GF_LOG_ERROR,"poll failed"); break; } if (pfd[0].revents & POLL_MASK_ERROR) { gf_log(this->name,GF_LOG_ERROR, "poll error on pipe"); break; } /* Only glusterd actually seems to need this. */ THIS = this->xl; if (pfd[1].revents & POLL_MASK_INPUT) { ret = socket_event_poll_in(this); if (ret >= 0) { /* Suppress errors while making progress. */ pfd[1].revents &= ~POLL_MASK_ERROR; } else if (errno == ENOTCONN) { ret = 0; } if (priv->ot_state == OT_PLEASE_DIE) { gf_log (this->name, GF_LOG_TRACE, "OT_IDLE on %p (input request)", this); priv->ot_state = OT_IDLE; break; } } else if (pfd[1].revents & POLL_MASK_OUTPUT) { ret = socket_event_poll_out(this); if (ret >= 0) { /* Suppress errors while making progress. */ pfd[1].revents &= ~POLL_MASK_ERROR; } else if (errno == ENOTCONN) { ret = 0; } if (priv->ot_state == OT_PLEASE_DIE) { gf_log (this->name, GF_LOG_TRACE, "OT_IDLE on %p (output request)", this); priv->ot_state = OT_IDLE; break; } } else { /* * This usually means that we left poll() because * somebody pushed a byte onto our pipe. That wakeup * is why the pipe is there, but once awake we can do * all the checking we need on the next iteration. */ ret = 0; } if (pfd[1].revents & POLL_MASK_ERROR) { gf_log(this->name,GF_LOG_ERROR, "poll error on socket"); break; } if (ret < 0) { gf_log(this->name,GF_LOG_ERROR, "error in polling loop"); break; } if (priv->ot_gen != gen) { gf_log (this->name, GF_LOG_TRACE, "generation mismatch, my %u != %u", gen, priv->ot_gen); return NULL; } } err: /* All (and only) I/O errors should come here. */ pthread_mutex_lock(&priv->lock); if (priv->ssl_ssl) { /* * We're always responsible for this part, but only actually * have to do it if we got far enough for ssl_ssl to be valid * (i.e. errors in ssl_setup_connection don't count). */ ssl_teardown_connection(priv); } __socket_shutdown(this); close(priv->sock); priv->sock = -1; priv->ot_state = OT_IDLE; pthread_mutex_unlock(&priv->lock); rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, this); rpc_transport_unref (this); return NULL; } static void socket_spawn (rpc_transport_t *this) { socket_private_t *priv = this->private; switch (priv->ot_state) { case OT_IDLE: case OT_PLEASE_DIE: break; default: gf_log (this->name, GF_LOG_WARNING, "refusing to start redundant poller"); return; } priv->ot_gen += 7; priv->ot_state = OT_SPAWNING; gf_log (this->name, GF_LOG_TRACE, "spawning %p with gen %u", this, priv->ot_gen); if (gf_thread_create(&priv->thread,NULL,socket_poller,this) != 0) { gf_log (this->name, GF_LOG_ERROR, "could not create poll thread"); } } static int socket_server_event_handler (int fd, int idx, void *data, int poll_in, int poll_out, int poll_err) { rpc_transport_t *this = NULL; socket_private_t *priv = NULL; int ret = 0; int new_sock = -1; rpc_transport_t *new_trans = NULL; struct sockaddr_storage new_sockaddr = {0, }; socklen_t addrlen = sizeof (new_sockaddr); socket_private_t *new_priv = NULL; glusterfs_ctx_t *ctx = NULL; this = data; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); GF_VALIDATE_OR_GOTO ("socket", this->xl, out); THIS = this->xl; priv = this->private; ctx = this->ctx; pthread_mutex_lock (&priv->lock); { priv->idx = idx; if (poll_in) { new_sock = accept (priv->sock, SA (&new_sockaddr), &addrlen); if (new_sock == -1) { gf_log (this->name, GF_LOG_WARNING, "accept on %d failed (%s)", priv->sock, strerror (errno)); goto unlock; } if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) { ret = __socket_nodelay (new_sock); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, "setsockopt() failed for " "NODELAY (%s)", strerror (errno)); } } if (priv->keepalive && new_sockaddr.ss_family != AF_UNIX) { ret = __socket_keepalive (new_sock, new_sockaddr.ss_family, priv->keepaliveintvl, priv->keepaliveidle); if (ret == -1) gf_log (this->name, GF_LOG_WARNING, "Failed to set keep-alive: %s", strerror (errno)); } new_trans = GF_CALLOC (1, sizeof (*new_trans), gf_common_mt_rpc_trans_t); if (!new_trans) goto unlock; ret = pthread_mutex_init(&new_trans->lock, NULL); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, "pthread_mutex_init() failed: %s", strerror (errno)); close (new_sock); goto unlock; } new_trans->name = gf_strdup (this->name); memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, addrlen); new_trans->peerinfo.sockaddr_len = addrlen; new_trans->myinfo.sockaddr_len = sizeof (new_trans->myinfo.sockaddr); ret = getsockname (new_sock, SA (&new_trans->myinfo.sockaddr), &new_trans->myinfo.sockaddr_len); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, "getsockname on %d failed (%s)", new_sock, strerror (errno)); close (new_sock); goto unlock; } get_transport_identifiers (new_trans); ret = socket_init(new_trans); if (ret != 0) { close(new_sock); goto unlock; } new_trans->ops = this->ops; new_trans->init = this->init; new_trans->fini = this->fini; new_trans->ctx = ctx; new_trans->xl = this->xl; new_trans->mydata = this->mydata; new_trans->notify = this->notify; new_trans->listener = this; new_priv = new_trans->private; new_priv->use_ssl = priv->use_ssl; new_priv->sock = new_sock; new_priv->own_thread = priv->own_thread; new_priv->ssl_ctx = priv->ssl_ctx; if (priv->use_ssl && !priv->own_thread) { if (ssl_setup_connection(new_trans,1) < 0) { gf_log(this->name,GF_LOG_ERROR, "server setup failed"); close(new_sock); goto unlock; } } if (!priv->bio && !priv->own_thread) { ret = __socket_nonblock (new_sock); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, "NBIO on %d failed (%s)", new_sock, strerror (errno)); close (new_sock); goto unlock; } } pthread_mutex_lock (&new_priv->lock); { /* * In the own_thread case, this is used to * indicate that we're initializing a server * connection. */ new_priv->connected = 1; new_priv->is_server = _gf_true; rpc_transport_ref (new_trans); if (new_priv->own_thread) { if (pipe(new_priv->pipe) < 0) { gf_log(this->name,GF_LOG_ERROR, "could not create pipe"); } socket_spawn(new_trans); } else { new_priv->idx = event_register (ctx->event_pool, new_sock, socket_event_handler, new_trans, 1, 0); if (new_priv->idx == -1) ret = -1; } } pthread_mutex_unlock (&new_priv->lock); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, "failed to register the socket with event"); goto unlock; } if (!priv->own_thread) { ret = rpc_transport_notify (this, RPC_TRANSPORT_ACCEPT, new_trans); } } } unlock: pthread_mutex_unlock (&priv->lock); out: return ret; } static int socket_disconnect (rpc_transport_t *this) { socket_private_t *priv = NULL; int ret = -1; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; pthread_mutex_lock (&priv->lock); { ret = __socket_disconnect (this); } pthread_mutex_unlock (&priv->lock); out: return ret; } static int socket_connect (rpc_transport_t *this, int port) { int ret = -1; int sock = -1; socket_private_t *priv = NULL; socklen_t sockaddr_len = 0; glusterfs_ctx_t *ctx = NULL; sa_family_t sa_family = {0, }; char *local_addr = NULL; union gf_sock_union sock_union; struct sockaddr_in *addr = NULL; GF_VALIDATE_OR_GOTO ("socket", this, err); GF_VALIDATE_OR_GOTO ("socket", this->private, err); priv = this->private; ctx = this->ctx; if (!priv) { gf_log_callingfn (this->name, GF_LOG_WARNING, "connect() called on uninitialized transport"); goto err; } pthread_mutex_lock (&priv->lock); { sock = priv->sock; } pthread_mutex_unlock (&priv->lock); if (sock != -1) { gf_log_callingfn (this->name, GF_LOG_TRACE, "connect () called on transport already connected"); errno = EINPROGRESS; ret = -1; goto err; } gf_log (this->name, GF_LOG_TRACE, "connecting %p, state=%u gen=%u sock=%d", this, priv->ot_state, priv->ot_gen, priv->sock); ret = socket_client_get_remote_sockaddr (this, &sock_union.sa, &sockaddr_len, &sa_family); if (ret == -1) { /* logged inside client_get_remote_sockaddr */ goto err; } if (port > 0) { sock_union.sin.sin_port = htons (port); } if (ntohs(sock_union.sin.sin_port) == GF_DEFAULT_SOCKET_LISTEN_PORT) { if (priv->use_ssl) { gf_log(this->name,GF_LOG_DEBUG, "disabling SSL for portmapper connection"); priv->use_ssl = _gf_false; } } else { if (priv->ssl_enabled && !priv->use_ssl) { gf_log(this->name,GF_LOG_DEBUG, "re-enabling SSL for I/O connection"); priv->use_ssl = _gf_true; } } pthread_mutex_lock (&priv->lock); { if (priv->sock != -1) { gf_log (this->name, GF_LOG_TRACE, "connect() -- already connected"); goto unlock; } memcpy (&this->peerinfo.sockaddr, &sock_union.storage, sockaddr_len); this->peerinfo.sockaddr_len = sockaddr_len; priv->sock = socket (sa_family, SOCK_STREAM, 0); if (priv->sock == -1) { gf_log (this->name, GF_LOG_ERROR, "socket creation failed (%s)", strerror (errno)); goto unlock; } /* Cant help if setting socket options fails. We can continue * working nonetheless. */ if (priv->windowsize != 0) { if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF, &priv->windowsize, sizeof (priv->windowsize)) < 0) { gf_log (this->name, GF_LOG_ERROR, "setting receive window " "size failed: %d: %d: %s", priv->sock, priv->windowsize, strerror (errno)); } if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF, &priv->windowsize, sizeof (priv->windowsize)) < 0) { gf_log (this->name, GF_LOG_ERROR, "setting send window size " "failed: %d: %d: %s", priv->sock, priv->windowsize, strerror (errno)); } } if (priv->nodelay && (sa_family != AF_UNIX)) { ret = __socket_nodelay (priv->sock); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "NODELAY on %d failed (%s)", priv->sock, strerror (errno)); } } if (priv->keepalive && sa_family != AF_UNIX) { ret = __socket_keepalive (priv->sock, sa_family, priv->keepaliveintvl, priv->keepaliveidle); if (ret == -1) gf_log (this->name, GF_LOG_ERROR, "Failed to set keep-alive: %s", strerror (errno)); } SA (&this->myinfo.sockaddr)->sa_family = SA (&this->peerinfo.sockaddr)->sa_family; /* If a source addr is explicitly specified, use it */ ret = dict_get_str (this->options, "transport.socket.source-addr", &local_addr); if (!ret && SA (&this->myinfo.sockaddr)->sa_family == AF_INET) { addr = (struct sockaddr_in *)(&this->myinfo.sockaddr); ret = inet_pton (AF_INET, local_addr, &(addr->sin_addr.s_addr)); } ret = client_bind (this, SA (&this->myinfo.sockaddr), &this->myinfo.sockaddr_len, priv->sock); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, "client bind failed: %s", strerror (errno)); close (priv->sock); priv->sock = -1; goto unlock; } if (!priv->use_ssl && !priv->bio && !priv->own_thread) { ret = __socket_nonblock (priv->sock); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "NBIO on %d failed (%s)", priv->sock, strerror (errno)); close (priv->sock); priv->sock = -1; goto unlock; } } ret = connect (priv->sock, SA (&this->peerinfo.sockaddr), this->peerinfo.sockaddr_len); if (ret == -1 && ((errno != EINPROGRESS) && (errno != ENOENT))) { /* For unix path based sockets, the socket path is * cryptic (md5sum of path) and may not be useful for * the user in debugging so log it in DEBUG */ gf_log (this->name, ((sa_family == AF_UNIX) ? GF_LOG_DEBUG : GF_LOG_ERROR), "connection attempt on %s failed, (%s)", this->peerinfo.identifier, strerror (errno)); close (priv->sock); priv->sock = -1; goto unlock; } if (priv->use_ssl && !priv->own_thread) { ret = ssl_setup_connection(this,0); if (ret < 0) { gf_log(this->name,GF_LOG_ERROR, "client setup failed"); close(priv->sock); priv->sock = -1; goto unlock; } } if (!priv->bio && !priv->own_thread) { ret = __socket_nonblock (priv->sock); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "NBIO on %d failed (%s)", priv->sock, strerror (errno)); close (priv->sock); priv->sock = -1; goto unlock; } } /* * In the own_thread case, this is used to indicate that we're * initializing a client connection. */ priv->connected = 0; priv->is_server = _gf_false; rpc_transport_ref (this); if (priv->own_thread) { if (pipe(priv->pipe) < 0) { gf_log(this->name,GF_LOG_ERROR, "could not create pipe"); } this->listener = this; socket_spawn(this); } else { priv->idx = event_register (ctx->event_pool, priv->sock, socket_event_handler, this, 1, 1); if (priv->idx == -1) { gf_log ("", GF_LOG_WARNING, "failed to register the event"); ret = -1; } } } unlock: pthread_mutex_unlock (&priv->lock); err: return ret; } static int socket_listen (rpc_transport_t *this) { socket_private_t * priv = NULL; int ret = -1; int sock = -1; struct sockaddr_storage sockaddr; socklen_t sockaddr_len = 0; peer_info_t *myinfo = NULL; glusterfs_ctx_t *ctx = NULL; sa_family_t sa_family = {0, }; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; myinfo = &this->myinfo; ctx = this->ctx; pthread_mutex_lock (&priv->lock); { sock = priv->sock; } pthread_mutex_unlock (&priv->lock); if (sock != -1) { gf_log_callingfn (this->name, GF_LOG_DEBUG, "already listening"); return ret; } ret = socket_server_get_local_sockaddr (this, SA (&sockaddr), &sockaddr_len, &sa_family); if (ret == -1) { return ret; } pthread_mutex_lock (&priv->lock); { if (priv->sock != -1) { gf_log (this->name, GF_LOG_DEBUG, "already listening"); goto unlock; } memcpy (&myinfo->sockaddr, &sockaddr, sockaddr_len); myinfo->sockaddr_len = sockaddr_len; priv->sock = socket (sa_family, SOCK_STREAM, 0); if (priv->sock == -1) { gf_log (this->name, GF_LOG_ERROR, "socket creation failed (%s)", strerror (errno)); goto unlock; } /* Cant help if setting socket options fails. We can continue * working nonetheless. */ if (priv->windowsize != 0) { if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF, &priv->windowsize, sizeof (priv->windowsize)) < 0) { gf_log (this->name, GF_LOG_ERROR, "setting receive window size " "failed: %d: %d: %s", priv->sock, priv->windowsize, strerror (errno)); } if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF, &priv->windowsize, sizeof (priv->windowsize)) < 0) { gf_log (this->name, GF_LOG_ERROR, "setting send window size failed:" " %d: %d: %s", priv->sock, priv->windowsize, strerror (errno)); } } if (priv->nodelay && (sa_family != AF_UNIX)) { ret = __socket_nodelay (priv->sock); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "setsockopt() failed for NODELAY (%s)", strerror (errno)); } } if (!priv->bio) { ret = __socket_nonblock (priv->sock); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "NBIO on %d failed (%s)", priv->sock, strerror (errno)); close (priv->sock); priv->sock = -1; goto unlock; } } ret = __socket_server_bind (this); if (ret == -1) { /* logged inside __socket_server_bind() */ close (priv->sock); priv->sock = -1; goto unlock; } if (priv->backlog) ret = listen (priv->sock, priv->backlog); else ret = listen (priv->sock, 10); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "could not set socket %d to listen mode (%s)", priv->sock, strerror (errno)); close (priv->sock); priv->sock = -1; goto unlock; } rpc_transport_ref (this); priv->idx = event_register (ctx->event_pool, priv->sock, socket_server_event_handler, this, 1, 0); if (priv->idx == -1) { gf_log (this->name, GF_LOG_WARNING, "could not register socket %d with events", priv->sock); ret = -1; close (priv->sock); priv->sock = -1; goto unlock; } } unlock: pthread_mutex_unlock (&priv->lock); out: return ret; } static int32_t socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) { socket_private_t *priv = NULL; int ret = -1; char need_poll_out = 0; char need_append = 1; struct ioq *entry = NULL; glusterfs_ctx_t *ctx = NULL; char a_byte = 'j'; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; ctx = this->ctx; pthread_mutex_lock (&priv->lock); { if (priv->connected != 1) { if (!priv->submit_log && !priv->connect_finish_log) { gf_log (this->name, GF_LOG_INFO, "not connected (priv->connected = %d)", priv->connected); priv->submit_log = 1; } goto unlock; } priv->submit_log = 0; entry = __socket_ioq_new (this, &req->msg); if (!entry) goto unlock; if (list_empty (&priv->ioq)) { ret = __socket_ioq_churn_entry (this, entry, 1); if (ret == 0) { need_append = 0; } if (ret > 0) { need_poll_out = 1; } } if (need_append) { list_add_tail (&entry->list, &priv->ioq); if (priv->own_thread) { /* * Make sure the polling thread wakes up, by * writing a byte to represent this entry. */ if (write(priv->pipe[1],&a_byte,1) < 1) { gf_log(this->name,GF_LOG_WARNING, "write error on pipe"); } } ret = 0; } if (!priv->own_thread && need_poll_out) { /* first entry to wait. continue writing on POLLOUT */ priv->idx = event_select_on (ctx->event_pool, priv->sock, priv->idx, -1, 1); } } unlock: pthread_mutex_unlock (&priv->lock); out: return ret; } static int32_t socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) { socket_private_t *priv = NULL; int ret = -1; char need_poll_out = 0; char need_append = 1; struct ioq *entry = NULL; glusterfs_ctx_t *ctx = NULL; char a_byte = 'd'; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; ctx = this->ctx; pthread_mutex_lock (&priv->lock); { if (priv->connected != 1) { if (!priv->submit_log && !priv->connect_finish_log) { gf_log (this->name, GF_LOG_INFO, "not connected (priv->connected = %d)", priv->connected); priv->submit_log = 1; } goto unlock; } priv->submit_log = 0; entry = __socket_ioq_new (this, &reply->msg); if (!entry) goto unlock; if (list_empty (&priv->ioq)) { ret = __socket_ioq_churn_entry (this, entry, 1); if (ret == 0) { need_append = 0; } if (ret > 0) { need_poll_out = 1; } } if (need_append) { list_add_tail (&entry->list, &priv->ioq); if (priv->own_thread) { /* * Make sure the polling thread wakes up, by * writing a byte to represent this entry. */ if (write(priv->pipe[1],&a_byte,1) < 1) { gf_log(this->name,GF_LOG_WARNING, "write error on pipe"); } } ret = 0; } if (!priv->own_thread && need_poll_out) { /* first entry to wait. continue writing on POLLOUT */ priv->idx = event_select_on (ctx->event_pool, priv->sock, priv->idx, -1, 1); } } unlock: pthread_mutex_unlock (&priv->lock); out: return ret; } static int32_t socket_getpeername (rpc_transport_t *this, char *hostname, int hostlen) { int32_t ret = -1; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", hostname, out); if (hostlen < (strlen (this->peerinfo.identifier) + 1)) { goto out; } strcpy (hostname, this->peerinfo.identifier); ret = 0; out: return ret; } static int32_t socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, socklen_t salen) { int32_t ret = -1; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", sa, out); *sa = this->peerinfo.sockaddr; if (peeraddr != NULL) { ret = socket_getpeername (this, peeraddr, addrlen); } ret = 0; out: return ret; } static int32_t socket_getmyname (rpc_transport_t *this, char *hostname, int hostlen) { int32_t ret = -1; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", hostname, out); if (hostlen < (strlen (this->myinfo.identifier) + 1)) { goto out; } strcpy (hostname, this->myinfo.identifier); ret = 0; out: return ret; } static int32_t socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen, struct sockaddr_storage *sa, socklen_t salen) { int32_t ret = 0; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", sa, out); *sa = this->myinfo.sockaddr; if (myaddr != NULL) { ret = socket_getmyname (this, myaddr, addrlen); } out: return ret; } static int socket_throttle (rpc_transport_t *this, gf_boolean_t onoff) { socket_private_t *priv = NULL; priv = this->private; /* The way we implement throttling is by taking off POLLIN event from the polled flags. This way we never get called with the POLLIN event and therefore will never read() any more data until throttling is turned off. */ priv->idx = event_select_on (this->ctx->event_pool, priv->sock, priv->idx, (int) !onoff, -1); return 0; } struct rpc_transport_ops tops = { .listen = socket_listen, .connect = socket_connect, .disconnect = socket_disconnect, .submit_request = socket_submit_request, .submit_reply = socket_submit_reply, .get_peername = socket_getpeername, .get_peeraddr = socket_getpeeraddr, .get_myname = socket_getmyname, .get_myaddr = socket_getmyaddr, .throttle = socket_throttle, }; int reconfigure (rpc_transport_t *this, dict_t *options) { socket_private_t *priv = NULL; gf_boolean_t tmp_bool = _gf_false; char *optstr = NULL; int ret = 0; uint64_t windowsize = 0; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); if (!this || !this->private) { ret =-1; goto out; } priv = this->private; if (dict_get_str (this->options, "transport.socket.keepalive", &optstr) == 0) { if (gf_string2boolean (optstr, &tmp_bool) == -1) { gf_log (this->name, GF_LOG_ERROR, "'transport.socket.keepalive' takes only " "boolean options, not taking any action"); priv->keepalive = 1; ret = -1; goto out; } gf_log (this->name, GF_LOG_DEBUG, "Reconfigured transport.socket.keepalive"); priv->keepalive = tmp_bool; } else priv->keepalive = 1; optstr = NULL; if (dict_get_str (this->options, "tcp-window-size", &optstr) == 0) { if (gf_string2bytesize (optstr, &windowsize) != 0) { gf_log (this->name, GF_LOG_ERROR, "invalid number format: %s", optstr); goto out; } } priv->windowsize = (int)windowsize; ret = 0; out: return ret; } static int socket_init (rpc_transport_t *this) { socket_private_t *priv = NULL; gf_boolean_t tmp_bool = 0; uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; char *optstr = NULL; uint32_t keepalive = 0; uint32_t backlog = 0; int session_id = 0; if (this->private) { gf_log_callingfn (this->name, GF_LOG_ERROR, "double init attempted"); return -1; } priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t); if (!priv) { return -1; } memset(priv,0,sizeof(*priv)); pthread_mutex_init (&priv->lock, NULL); priv->sock = -1; priv->idx = -1; priv->connected = -1; priv->nodelay = 1; priv->bio = 0; priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; INIT_LIST_HEAD (&priv->ioq); /* All the below section needs 'this->options' to be present */ if (!this->options) goto out; if (dict_get (this->options, "non-blocking-io")) { optstr = data_to_str (dict_get (this->options, "non-blocking-io")); if (gf_string2boolean (optstr, &tmp_bool) == -1) { gf_log (this->name, GF_LOG_ERROR, "'non-blocking-io' takes only boolean options," " not taking any action"); tmp_bool = 1; } if (!tmp_bool) { priv->bio = 1; gf_log (this->name, GF_LOG_WARNING, "disabling non-blocking IO"); } } optstr = NULL; // By default, we enable NODELAY if (dict_get (this->options, "transport.socket.nodelay")) { optstr = data_to_str (dict_get (this->options, "transport.socket.nodelay")); if (gf_string2boolean (optstr, &tmp_bool) == -1) { gf_log (this->name, GF_LOG_ERROR, "'transport.socket.nodelay' takes only " "boolean options, not taking any action"); tmp_bool = 1; } if (!tmp_bool) { priv->nodelay = 0; gf_log (this->name, GF_LOG_DEBUG, "disabling nodelay"); } } optstr = NULL; if (dict_get_str (this->options, "tcp-window-size", &optstr) == 0) { if (gf_string2bytesize (optstr, &windowsize) != 0) { gf_log (this->name, GF_LOG_ERROR, "invalid number format: %s", optstr); return -1; } } priv->windowsize = (int)windowsize; optstr = NULL; /* Enable Keep-alive by default. */ priv->keepalive = 1; priv->keepaliveintvl = 2; priv->keepaliveidle = 20; if (dict_get_str (this->options, "transport.socket.keepalive", &optstr) == 0) { if (gf_string2boolean (optstr, &tmp_bool) == -1) { gf_log (this->name, GF_LOG_ERROR, "'transport.socket.keepalive' takes only " "boolean options, not taking any action"); tmp_bool = 1; } if (!tmp_bool) priv->keepalive = 0; } if (dict_get_uint32 (this->options, "transport.socket.keepalive-interval", &keepalive) == 0) { priv->keepaliveintvl = keepalive; } if (dict_get_uint32 (this->options, "transport.socket.keepalive-time", &keepalive) == 0) { priv->keepaliveidle = keepalive; } if (dict_get_uint32 (this->options, "transport.socket.listen-backlog", &backlog) == 0) { priv->backlog = backlog; } optstr = NULL; /* Check if socket read failures are to be logged */ priv->read_fail_log = 1; if (dict_get (this->options, "transport.socket.read-fail-log")) { optstr = data_to_str (dict_get (this->options, "transport.socket.read-fail-log")); if (gf_string2boolean (optstr, &tmp_bool) == -1) { gf_log (this->name, GF_LOG_WARNING, "'transport.socket.read-fail-log' takes only " "boolean options; logging socket read fails"); } else if (tmp_bool == _gf_false) { priv->read_fail_log = 0; } } priv->windowsize = (int)windowsize; priv->ssl_enabled = _gf_false; if (dict_get_str(this->options,SSL_ENABLED_OPT,&optstr) == 0) { if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) { gf_log (this->name, GF_LOG_ERROR, "invalid value given for ssl-enabled boolean"); } } priv->ssl_own_cert = DEFAULT_CERT_PATH; if (dict_get_str(this->options,SSL_OWN_CERT_OPT,&optstr) == 0) { if (!priv->ssl_enabled) { gf_log(this->name,GF_LOG_WARNING, "%s specified without %s (ignored)", SSL_OWN_CERT_OPT, SSL_ENABLED_OPT); } priv->ssl_own_cert = optstr; } priv->ssl_own_cert = gf_strdup(priv->ssl_own_cert); priv->ssl_private_key = DEFAULT_KEY_PATH; if (dict_get_str(this->options,SSL_PRIVATE_KEY_OPT,&optstr) == 0) { if (!priv->ssl_enabled) { gf_log(this->name,GF_LOG_WARNING, "%s specified without %s (ignored)", SSL_PRIVATE_KEY_OPT, SSL_ENABLED_OPT); } priv->ssl_private_key = optstr; } priv->ssl_private_key = gf_strdup(priv->ssl_private_key); priv->ssl_ca_list = DEFAULT_CA_PATH; if (dict_get_str(this->options,SSL_CA_LIST_OPT,&optstr) == 0) { if (!priv->ssl_enabled) { gf_log(this->name,GF_LOG_WARNING, "%s specified without %s (ignored)", SSL_CA_LIST_OPT, SSL_ENABLED_OPT); } priv->ssl_ca_list = optstr; } priv->ssl_ca_list = gf_strdup(priv->ssl_ca_list); gf_log(this->name,GF_LOG_INFO,"SSL support is %s", priv->ssl_enabled ? "ENABLED" : "NOT enabled"); /* * This might get overridden temporarily in socket_connect (q.v.) * if we're using the glusterd portmapper. */ priv->use_ssl = priv->ssl_enabled; priv->own_thread = priv->use_ssl; if (dict_get_str(this->options,OWN_THREAD_OPT,&optstr) == 0) { if (gf_string2boolean (optstr, &priv->own_thread) != 0) { gf_log (this->name, GF_LOG_ERROR, "invalid value given for own-thread boolean"); } } gf_log(this->name,GF_LOG_INFO,"using %s polling thread", priv->own_thread ? "private" : "system"); if (priv->use_ssl) { SSL_library_init(); SSL_load_error_strings(); priv->ssl_meth = (SSL_METHOD *)TLSv1_method(); priv->ssl_ctx = SSL_CTX_new(priv->ssl_meth); if (SSL_CTX_set_cipher_list(priv->ssl_ctx, "HIGH:-SSLv2") == 0) { gf_log(this->name,GF_LOG_ERROR, "failed to find any valid ciphers"); goto err; } if (!SSL_CTX_use_certificate_chain_file(priv->ssl_ctx, priv->ssl_own_cert)) { gf_log(this->name,GF_LOG_ERROR, "could not load our cert"); goto err; } if (!SSL_CTX_use_PrivateKey_file(priv->ssl_ctx, priv->ssl_private_key, SSL_FILETYPE_PEM)) { gf_log(this->name,GF_LOG_ERROR, "could not load private key"); goto err; } if (!SSL_CTX_load_verify_locations(priv->ssl_ctx, priv->ssl_ca_list,0)) { gf_log(this->name,GF_LOG_ERROR, "could not load CA list"); goto err; } #if (OPENSSL_VERSION_NUMBER < 0x00905100L) SSL_CTX_set_verify_depth(ctx,1); #endif priv->ssl_session_id = ++session_id; SSL_CTX_set_session_id_context(priv->ssl_ctx, (void *)&priv->ssl_session_id, sizeof(priv->ssl_session_id)); SSL_CTX_set_verify(priv->ssl_ctx,SSL_VERIFY_PEER,0); } if (priv->own_thread) { priv->ot_state = OT_IDLE; } out: this->private = priv; return 0; err: if (priv->ssl_own_cert) { GF_FREE(priv->ssl_own_cert); } if (priv->ssl_private_key) { GF_FREE(priv->ssl_private_key); } if (priv->ssl_ca_list) { GF_FREE(priv->ssl_ca_list); } GF_FREE(priv); return -1; } void fini (rpc_transport_t *this) { socket_private_t *priv = NULL; if (!this) return; priv = this->private; if (priv) { if (priv->sock != -1) { pthread_mutex_lock (&priv->lock); { __socket_ioq_flush (this); __socket_reset (this); } pthread_mutex_unlock (&priv->lock); } gf_log (this->name, GF_LOG_TRACE, "transport %p destroyed", this); pthread_mutex_destroy (&priv->lock); if (priv->ssl_private_key) { GF_FREE(priv->ssl_private_key); } if (priv->ssl_own_cert) { GF_FREE(priv->ssl_own_cert); } if (priv->ssl_ca_list) { GF_FREE(priv->ssl_ca_list); } GF_FREE (priv); } this->private = NULL; } int32_t init (rpc_transport_t *this) { int ret = -1; ret = socket_init (this); if (ret == -1) { gf_log (this->name, GF_LOG_DEBUG, "socket_init() failed"); } return ret; } struct volume_options options[] = { { .key = {"remote-port", "transport.remote-port", "transport.socket.remote-port"}, .type = GF_OPTION_TYPE_INT }, { .key = {"transport.socket.listen-port", "listen-port"}, .type = GF_OPTION_TYPE_INT }, { .key = {"transport.socket.bind-address", "bind-address" }, .type = GF_OPTION_TYPE_INTERNET_ADDRESS }, { .key = {"transport.socket.connect-path", "connect-path"}, .type = GF_OPTION_TYPE_ANY }, { .key = {"transport.socket.bind-path", "bind-path"}, .type = GF_OPTION_TYPE_ANY }, { .key = {"transport.socket.listen-path", "listen-path"}, .type = GF_OPTION_TYPE_ANY }, { .key = { "transport.address-family", "address-family" }, .value = {"inet", "inet6", "unix", "inet-sdp" }, .type = GF_OPTION_TYPE_STR }, { .key = {"non-blocking-io"}, .type = GF_OPTION_TYPE_BOOL }, { .key = {"tcp-window-size"}, .type = GF_OPTION_TYPE_SIZET, .min = GF_MIN_SOCKET_WINDOW_SIZE, .max = GF_MAX_SOCKET_WINDOW_SIZE }, { .key = {"transport.socket.nodelay"}, .type = GF_OPTION_TYPE_BOOL }, { .key = {"transport.socket.lowlat"}, .type = GF_OPTION_TYPE_BOOL }, { .key = {"transport.socket.keepalive"}, .type = GF_OPTION_TYPE_BOOL }, { .key = {"transport.socket.keepalive-interval"}, .type = GF_OPTION_TYPE_INT }, { .key = {"transport.socket.keepalive-time"}, .type = GF_OPTION_TYPE_INT }, { .key = {"transport.socket.listen-backlog"}, .type = GF_OPTION_TYPE_INT }, { .key = {"transport.socket.read-fail-log"}, .type = GF_OPTION_TYPE_BOOL }, { .key = {SSL_ENABLED_OPT}, .type = GF_OPTION_TYPE_BOOL }, { .key = {SSL_OWN_CERT_OPT}, .type = GF_OPTION_TYPE_STR }, { .key = {SSL_PRIVATE_KEY_OPT}, .type = GF_OPTION_TYPE_STR }, { .key = {SSL_CA_LIST_OPT}, .type = GF_OPTION_TYPE_STR }, { .key = {OWN_THREAD_OPT}, .type = GF_OPTION_TYPE_BOOL }, { .key = {NULL} } };