summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport/socket/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c1879
1 files changed, 1407 insertions, 472 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 6c2d909e4..93da3f296 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -35,97 +35,401 @@
#include <errno.h>
#include <netinet/tcp.h>
#include <rpc/xdr.h>
+#include <sys/ioctl.h>
#define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR)
#define SA(ptr) ((struct sockaddr *)ptr)
+#define SSL_ENABLED_OPT "transport.socket.ssl-enabled"
+#define SSL_OWN_CERT_OPT "transport.socket.ssl-own-cert"
+#define SSL_PRIVATE_KEY_OPT "transport.socket.ssl-private-key"
+#define SSL_CA_LIST_OPT "transport.socket.ssl-ca-list"
+#define OWN_THREAD_OPT "transport.socket.own-thread"
-#define __socket_proto_reset_pending(priv) do { \
- memset (&priv->incoming.frag.vector, 0, \
- sizeof (priv->incoming.frag.vector)); \
- priv->incoming.frag.pending_vector = \
- &priv->incoming.frag.vector; \
- priv->incoming.frag.pending_vector->iov_base = \
- priv->incoming.frag.fragcurrent; \
- priv->incoming.pending_vector = \
- priv->incoming.frag.pending_vector; \
- } while (0);
+/* TBD: do automake substitutions etc. (ick) to set these. */
+#if !defined(DEFAULT_CERT_PATH)
+#define DEFAULT_CERT_PATH "/etc/ssl/glusterfs.pem"
+#endif
+#if !defined(DEFAULT_KEY_PATH)
+#define DEFAULT_KEY_PATH "/etc/ssl/glusterfs.key"
+#endif
+#if !defined(DEFAULT_CA_PATH)
+#define DEFAULT_CA_PATH "/etc/ssl/glusterfs.ca"
+#endif
+
+#define POLL_MASK_INPUT (POLLIN | POLLPRI)
+#define POLL_MASK_OUTPUT (POLLOUT)
+#define POLL_MASK_ERROR (POLLERR | POLLHUP | POLLNVAL)
+
+typedef int SSL_unary_func (SSL *);
+typedef int SSL_trinary_func (SSL *, void *, int);
+
+#define __socket_proto_reset_pending(priv) do { \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ \
+ memset (&frag->vector, 0, sizeof (frag->vector)); \
+ frag->pending_vector = &frag->vector; \
+ frag->pending_vector->iov_base = frag->fragcurrent; \
+ priv->incoming.pending_vector = frag->pending_vector; \
+ } while (0)
#define __socket_proto_update_pending(priv) \
do { \
- uint32_t remaining_fragsize = 0; \
- if (priv->incoming.frag.pending_vector->iov_len == 0) { \
- remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \
- - priv->incoming.frag.bytes_read; \
+ uint32_t remaining; \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ if (frag->pending_vector->iov_len == 0) { \
+ remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr) \
+ - frag->bytes_read); \
\
- priv->incoming.frag.pending_vector->iov_len = \
- remaining_fragsize > priv->incoming.frag.remaining_size \
- ? priv->incoming.frag.remaining_size : remaining_fragsize; \
+ frag->pending_vector->iov_len = \
+ (remaining > frag->remaining_size) \
+ ? frag->remaining_size : remaining; \
\
- priv->incoming.frag.remaining_size -= \
- priv->incoming.frag.pending_vector->iov_len; \
+ frag->remaining_size -= \
+ frag->pending_vector->iov_len; \
} \
- } while (0);
+ } while (0)
#define __socket_proto_update_priv_after_read(priv, ret, bytes_read) \
{ \
- priv->incoming.frag.fragcurrent += bytes_read; \
- priv->incoming.frag.bytes_read += bytes_read; \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
\
- if ((ret > 0) || (priv->incoming.frag.remaining_size != 0)) { \
- if (priv->incoming.frag.remaining_size != 0 && ret == 0) { \
+ frag->fragcurrent += bytes_read; \
+ frag->bytes_read += bytes_read; \
+ \
+ if ((ret > 0) || (frag->remaining_size != 0)) { \
+ if (frag->remaining_size != 0 && ret == 0) { \
__socket_proto_reset_pending (priv); \
} \
\
- gf_log (this->name, GF_LOG_TRACE, "partial read on non-blocking socket"); \
+ gf_log (this->name, GF_LOG_TRACE, \
+ "partial read on non-blocking socket"); \
\
break; \
} \
}
-#define __socket_proto_init_pending(priv, size) \
+#define __socket_proto_init_pending(priv,size) \
do { \
- uint32_t remaining_fragsize = 0; \
- remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \
- - priv->incoming.frag.bytes_read; \
+ uint32_t remaining = 0; \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ \
+ remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr) \
+ - frag->bytes_read); \
\
- __socket_proto_reset_pending (priv); \
+ __socket_proto_reset_pending (priv); \
\
- priv->incoming.frag.pending_vector->iov_len = \
- remaining_fragsize > size ? size : remaining_fragsize; \
+ frag->pending_vector->iov_len = \
+ (remaining > size) ? size : remaining; \
\
- priv->incoming.frag.remaining_size = \
- size - priv->incoming.frag.pending_vector->iov_len; \
+ frag->remaining_size = (size - frag->pending_vector->iov_len); \
\
- } while (0);
+ } while(0)
/* This will be used in a switch case and breaks from the switch case if all
* the pending data is not read.
*/
#define __socket_proto_read(priv, ret) \
- { \
+ { \
size_t bytes_read = 0; \
+ struct gf_sock_incoming *in; \
+ in = &priv->incoming; \
\
__socket_proto_update_pending (priv); \
\
ret = __socket_readv (this, \
- priv->incoming.pending_vector, 1, \
- &priv->incoming.pending_vector, \
- &priv->incoming.pending_count, \
+ in->pending_vector, 1, \
+ &in->pending_vector, \
+ &in->pending_count, \
&bytes_read); \
- if (ret == -1) { \
- gf_log (this->name, GF_LOG_WARNING, \
- "reading from socket failed. Error (%s), " \
- "peer (%s)", strerror (errno), \
- this->peerinfo.identifier); \
+ if (ret == -1) \
break; \
- } \
__socket_proto_update_priv_after_read (priv, ret, bytes_read); \
}
+static int socket_init (rpc_transport_t *this);
+
+static void
+ssl_dump_error_stack (const char *caller)
+{
+ unsigned long errnum = 0;
+ char errbuf[120] = {0,};
+
+ /* OpenSSL docs explicitly give 120 as the error-string length. */
+
+ while ((errnum = ERR_get_error())) {
+ ERR_error_string(errnum,errbuf);
+ gf_log(caller,GF_LOG_ERROR," %s",errbuf);
+ }
+}
+
+static int
+ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func)
+{
+ int r = (-1);
+ struct pollfd pfd = {-1,};
+ socket_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO(this->name,this->private,out);
+ priv = this->private;
+
+ for (;;) {
+ if (buf) {
+ if (priv->connected == -1) {
+ /*
+ * Fields in the SSL structure (especially
+ * the BIO pointers) are not valid at this
+ * point, so we'll segfault if we pass them
+ * to SSL_read/SSL_write.
+ */
+ gf_log(this->name,GF_LOG_INFO,
+ "lost connection in %s", __func__);
+ break;
+ }
+ r = func(priv->ssl_ssl,buf,len);
+ }
+ else {
+ /*
+ * We actually need these functions to get to
+ * priv->connected == 1.
+ */
+ r = ((SSL_unary_func *)func)(priv->ssl_ssl);
+ }
+ switch (SSL_get_error(priv->ssl_ssl,r)) {
+ case SSL_ERROR_NONE:
+ return r;
+ case SSL_ERROR_WANT_READ:
+ pfd.fd = priv->sock;
+ pfd.events = POLLIN;
+ if (poll(&pfd,1,-1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,"poll error %d",
+ errno);
+ }
+ break;
+ case SSL_ERROR_WANT_WRITE:
+ pfd.fd = priv->sock;
+ pfd.events = POLLOUT;
+ if (poll(&pfd,1,-1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,"poll error %d",
+ errno);
+ }
+ break;
+ case SSL_ERROR_SYSCALL:
+ /* This is what we get when remote disconnects. */
+ gf_log(this->name,GF_LOG_DEBUG,
+ "syscall error (probably remote disconnect)");
+ errno = ENODATA;
+ goto out;
+ default:
+ errno = EIO;
+ goto out; /* "break" would just loop again */
+ }
+ }
+out:
+ return -1;
+}
+
+#define ssl_connect_one(t) ssl_do((t),NULL,0,(SSL_trinary_func *)SSL_connect)
+#define ssl_accept_one(t) ssl_do((t),NULL,0,(SSL_trinary_func *)SSL_accept)
+#define ssl_read_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read)
+#define ssl_write_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write)
-int socket_init (rpc_transport_t *this);
+static int
+ssl_setup_connection (rpc_transport_t *this, int server)
+{
+ X509 *peer = NULL;
+ char peer_CN[256] = "";
+ int ret = -1;
+ socket_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO(this->name,this->private,done);
+ priv = this->private;
+
+ priv->ssl_ssl = SSL_new(priv->ssl_ctx);
+ if (!priv->ssl_ssl) {
+ gf_log(this->name,GF_LOG_ERROR,"SSL_new failed");
+ ssl_dump_error_stack(this->name);
+ goto done;
+ }
+ priv->ssl_sbio = BIO_new_socket(priv->sock,BIO_NOCLOSE);
+ if (!priv->ssl_sbio) {
+ gf_log(this->name,GF_LOG_ERROR,"BIO_new_socket failed");
+ ssl_dump_error_stack(this->name);
+ goto free_ssl;
+ }
+ SSL_set_bio(priv->ssl_ssl,priv->ssl_sbio,priv->ssl_sbio);
+
+ if (server) {
+ ret = ssl_accept_one(this);
+ }
+ else {
+ ret = ssl_connect_one(this);
+ }
+
+ /* Make sure _the call_ succeeded. */
+ if (ret < 0) {
+ goto ssl_error;
+ }
+
+ /* Make sure _SSL verification_ succeeded, yielding an identity. */
+ if (SSL_get_verify_result(priv->ssl_ssl) != X509_V_OK) {
+ goto ssl_error;
+ }
+ peer = SSL_get_peer_certificate(priv->ssl_ssl);
+ if (!peer) {
+ goto ssl_error;
+ }
+
+ /* Finally, everything seems OK. */
+ X509_NAME_get_text_by_NID(X509_get_subject_name(peer),
+ NID_commonName, peer_CN, sizeof(peer_CN)-1);
+ peer_CN[sizeof(peer_CN)-1] = '\0';
+ gf_log(this->name,GF_LOG_INFO,"peer CN = %s", peer_CN);
+ return 0;
+
+ /* Error paths. */
+ssl_error:
+ gf_log(this->name,GF_LOG_ERROR,"SSL connect error");
+ ssl_dump_error_stack(this->name);
+free_ssl:
+ SSL_free(priv->ssl_ssl);
+ priv->ssl_ssl = NULL;
+done:
+ return ret;
+}
+
+
+static void
+ssl_teardown_connection (socket_private_t *priv)
+{
+ SSL_shutdown(priv->ssl_ssl);
+ SSL_clear(priv->ssl_ssl);
+ SSL_free(priv->ssl_ssl);
+ priv->ssl_ssl = NULL;
+}
+
+
+static ssize_t
+__socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount)
+{
+ socket_private_t *priv = NULL;
+ int sock = -1;
+ int ret = -1;
+
+ priv = this->private;
+ sock = priv->sock;
+
+ if (priv->use_ssl) {
+ ret = ssl_read_one (this, opvector->iov_base, opvector->iov_len);
+ } else {
+ ret = readv (sock, opvector, opcount);
+ }
+
+ return ret;
+}
+
+
+static ssize_t
+__socket_ssl_read (rpc_transport_t *this, void *buf, size_t count)
+{
+ struct iovec iov = {0, };
+ int ret = -1;
+
+ iov.iov_base = buf;
+ iov.iov_len = count;
+
+ ret = __socket_ssl_readv (this, &iov, 1);
+
+ return ret;
+}
+
+
+static int
+__socket_cached_read (rpc_transport_t *this, struct iovec *opvector, int opcount)
+{
+ socket_private_t *priv = NULL;
+ int sock = -1;
+ struct gf_sock_incoming *in = NULL;
+ int req_len = -1;
+ int ret = -1;
+
+ priv = this->private;
+ sock = priv->sock;
+ in = &priv->incoming;
+ req_len = iov_length (opvector, opcount);
+
+ if (in->record_state == SP_STATE_READING_FRAGHDR) {
+ in->ra_read = 0;
+ in->ra_served = 0;
+ in->ra_max = 0;
+ in->ra_buf = NULL;
+ goto uncached;
+ }
+
+ if (!in->ra_max) {
+ /* first call after passing SP_STATE_READING_FRAGHDR */
+ in->ra_max = min (RPC_FRAGSIZE (in->fraghdr), GF_SOCKET_RA_MAX);
+ /* Note that the in->iobuf is the primary iobuf into which
+ headers are read into. By using this itself as our
+ read-ahead cache, we can avoid memory copies in iov_load
+ */
+ in->ra_buf = iobuf_ptr (in->iobuf);
+ }
+
+ /* fill read-ahead */
+ if (in->ra_read < in->ra_max) {
+ ret = __socket_ssl_read (this, &in->ra_buf[in->ra_read],
+ (in->ra_max - in->ra_read));
+ if (ret > 0)
+ in->ra_read += ret;
+
+ /* we proceed to test if there is still cached data to
+ be served even if readahead could not progress */
+ }
+
+ /* serve cached */
+ if (in->ra_served < in->ra_read) {
+ ret = iov_load (opvector, opcount, &in->ra_buf[in->ra_served],
+ min (req_len, (in->ra_read - in->ra_served)));
+
+ in->ra_served += ret;
+ /* Do not read uncached and cached in the same call */
+ goto out;
+ }
+
+ if (in->ra_read < in->ra_max)
+ /* If there was no cached data to be served, (and we are
+ guaranteed to have already performed an attempt to progress
+ readahead above), and we have not yet read out the full
+ readahead capacity, then bail out for now without doing
+ the uncached read below (as that will overtake future cached
+ read)
+ */
+ goto out;
+uncached:
+ ret = __socket_ssl_readv (this, opvector, opcount);
+out:
+ return ret;
+}
+
+static gf_boolean_t
+__does_socket_rwv_error_need_logging (socket_private_t *priv, int write)
+{
+ int read = !write;
+
+ if (priv->connected == -1) /* Didn't even connect, of course it fails */
+ return _gf_false;
+
+ if (read && (priv->read_fail_log == _gf_false))
+ return _gf_false;
+
+ return _gf_true;
+}
/*
* return value:
@@ -134,7 +438,7 @@ int socket_init (rpc_transport_t *this);
* > 0 = incomplete
*/
-int
+static int
__socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
struct iovec **pending_vector, int *pending_count, size_t *bytes,
int write)
@@ -159,9 +463,22 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
*bytes = 0;
}
- while (opcount) {
+ while (opcount > 0) {
+ if (opvector->iov_len == 0) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "would have passed zero length to read/write");
+ ++opvector;
+ --opcount;
+ continue;
+ }
if (write) {
- ret = writev (sock, opvector, opcount);
+ if (priv->use_ssl) {
+ ret = ssl_write_one(this,
+ opvector->iov_base, opvector->iov_len);
+ }
+ else {
+ ret = writev (sock, opvector, opcount);
+ }
if (ret == 0 || (ret == -1 && errno == EAGAIN)) {
/* done for now */
@@ -169,7 +486,13 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
}
this->total_bytes_write += ret;
} else {
- ret = readv (sock, opvector, opcount);
+ ret = __socket_cached_read (this, opvector, opcount);
+
+ if (ret == 0) {
+ gf_log(this->name,GF_LOG_DEBUG,"EOF on socket");
+ errno = ENODATA;
+ ret = -1;
+ }
if (ret == -1 && errno == EAGAIN) {
/* done for now */
break;
@@ -190,9 +513,18 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
if (errno == EINTR)
continue;
- gf_log (this->name, GF_LOG_WARNING,
- "%s failed (%s)", write ? "writev" : "readv",
- strerror (errno));
+ if (__does_socket_rwv_error_need_logging (priv,
+ write)) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "%s on %s failed (%s)",
+ write ? "writev":"readv",
+ this->peerinfo.identifier,
+ strerror (errno));
+ }
+
+ if (priv->use_ssl) {
+ ssl_dump_error_stack(this->name);
+ }
opcount = -1;
break;
}
@@ -204,6 +536,17 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
moved = 0;
while (moved < ret) {
+ if (!opcount) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "ran out of iov, moved %d/%d",
+ moved, ret);
+ goto ran_out;
+ }
+ if (!opvector[0].iov_len) {
+ opvector++;
+ opcount--;
+ continue;
+ }
if ((ret - moved) >= opvector[0].iov_len) {
moved += opvector[0].iov_len;
opvector++;
@@ -213,13 +556,11 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
opvector[0].iov_base += (ret - moved);
moved += (ret - moved);
}
- while (opcount && !opvector[0].iov_len) {
- opvector++;
- opcount--;
- }
}
}
+ran_out:
+
if (pending_vector)
*pending_vector = opvector;
@@ -231,7 +572,7 @@ out:
}
-int
+static int
__socket_readv (rpc_transport_t *this, struct iovec *vector, int count,
struct iovec **pending_vector, int *pending_count,
size_t *bytes)
@@ -245,7 +586,7 @@ __socket_readv (rpc_transport_t *this, struct iovec *vector, int count,
}
-int
+static int
__socket_writev (rpc_transport_t *this, struct iovec *vector, int count,
struct iovec **pending_vector, int *pending_count)
{
@@ -258,26 +599,55 @@ __socket_writev (rpc_transport_t *this, struct iovec *vector, int count,
}
-int
+static int
+__socket_shutdown (rpc_transport_t *this)
+{
+ int ret = -1;
+ socket_private_t *priv = this->private;
+
+ priv->connected = -1;
+ ret = shutdown (priv->sock, SHUT_RDWR);
+ if (ret) {
+ /* its already disconnected.. no need to understand
+ why it failed to shutdown in normal cases */
+ gf_log (this->name, GF_LOG_DEBUG,
+ "shutdown() returned %d. %s",
+ ret, strerror (errno));
+ }
+
+ return ret;
+}
+
+static int
__socket_disconnect (rpc_transport_t *this)
{
- socket_private_t *priv = NULL;
int ret = -1;
+ socket_private_t *priv = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ gf_log (this->name, GF_LOG_TRACE,
+ "disconnecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
+
if (priv->sock != -1) {
- priv->connected = -1;
- ret = shutdown (priv->sock, SHUT_RDWR);
- if (ret) {
- /* its already disconnected.. no need to understand
- why it failed to shutdown in normal cases */
- gf_log (this->name, GF_LOG_DEBUG,
- "shutdown() returned %d. %s",
- ret, strerror (errno));
+ ret = __socket_shutdown(this);
+ if (priv->own_thread) {
+ /*
+ * Without this, reconnect (= disconnect + connect)
+ * won't work except by accident.
+ */
+ close(priv->sock);
+ priv->sock = -1;
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_PLEASE_DIE on %p", this);
+ priv->ot_state = OT_PLEASE_DIE;
+ }
+ else if (priv->use_ssl) {
+ ssl_teardown_connection(priv);
}
}
@@ -286,7 +656,7 @@ out:
}
-int
+static int
__socket_server_bind (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -314,7 +684,7 @@ __socket_server_bind (rpc_transport_t *this)
memcpy (&unix_addr, SA (&this->myinfo.sockaddr),
this->myinfo.sockaddr_len);
reuse_check_sock = socket (AF_UNIX, SOCK_STREAM, 0);
- if (reuse_check_sock > 0) {
+ if (reuse_check_sock >= 0) {
ret = connect (reuse_check_sock, SA (&unix_addr),
this->myinfo.sockaddr_len);
if ((ret == -1) && (ECONNREFUSED == errno)) {
@@ -342,7 +712,7 @@ out:
}
-int
+static int
__socket_nonblock (int fd)
{
int flags = 0;
@@ -356,8 +726,7 @@ __socket_nonblock (int fd)
return ret;
}
-
-int
+static int
__socket_nodelay (int fd)
{
int on = 1;
@@ -374,7 +743,7 @@ __socket_nodelay (int fd)
static int
-__socket_keepalive (int fd, int keepalive_intvl, int keepalive_idle)
+__socket_keepalive (int fd, int family, int keepalive_intvl, int keepalive_idle)
{
int on = 1;
int ret = -1;
@@ -403,18 +772,23 @@ __socket_keepalive (int fd, int keepalive_intvl, int keepalive_idle)
goto err;
}
#else
+ if (family != AF_INET)
+ goto done;
+
ret = setsockopt (fd, IPPROTO_TCP, TCP_KEEPIDLE, &keepalive_idle,
sizeof (keepalive_intvl));
if (ret == -1) {
gf_log ("socket", GF_LOG_WARNING,
- "failed to set keep idle on socket %d", fd);
+ "failed to set keep idle %d on socket %d, %s",
+ keepalive_idle, fd, strerror(errno));
goto err;
}
- ret = setsockopt (fd, IPPROTO_TCP, TCP_KEEPINTVL, &keepalive_intvl,
+ ret = setsockopt (fd, IPPROTO_TCP , TCP_KEEPINTVL, &keepalive_intvl,
sizeof (keepalive_intvl));
if (ret == -1) {
gf_log ("socket", GF_LOG_WARNING,
- "failed to set keep alive interval on socket %d", fd);
+ "failed to set keep interval %d on socket %d, %s",
+ keepalive_intvl, fd, strerror(errno));
goto err;
}
#endif
@@ -428,7 +802,7 @@ err:
}
-int
+static int
__socket_connect_finish (int fd)
{
int ret = -1;
@@ -446,7 +820,7 @@ __socket_connect_finish (int fd)
}
-void
+static void
__socket_reset (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -467,9 +841,7 @@ __socket_reset (rpc_transport_t *this)
iobuf_unref (priv->incoming.iobuf);
}
- if (priv->incoming.request_info != NULL) {
- GF_FREE (priv->incoming.request_info);
- }
+ GF_FREE (priv->incoming.request_info);
memset (&priv->incoming, 0, sizeof (priv->incoming));
@@ -485,13 +857,13 @@ out:
}
-void
+static void
socket_set_lastfrag (uint32_t *fragsize) {
(*fragsize) |= 0x80000000U;
}
-void
+static void
socket_set_frag_header_size (uint32_t size, char *haddr)
{
size = htonl (size);
@@ -499,14 +871,14 @@ socket_set_frag_header_size (uint32_t size, char *haddr)
}
-void
+static void
socket_set_last_frag_header_size (uint32_t size, char *haddr)
{
socket_set_lastfrag (&size);
socket_set_frag_header_size (size, haddr);
}
-struct ioq *
+static struct ioq *
__socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg)
{
struct ioq *entry = NULL;
@@ -573,7 +945,7 @@ out:
}
-void
+static void
__socket_ioq_entry_free (struct ioq *entry)
{
GF_VALIDATE_OR_GOTO ("socket", entry, out);
@@ -590,7 +962,7 @@ out:
}
-void
+static void
__socket_ioq_flush (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -611,10 +983,12 @@ out:
}
-int
-__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry)
+static int
+__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct)
{
- int ret = -1;
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ char a_byte = 0;
ret = __socket_writev (this, entry->pending_vector,
entry->pending_count,
@@ -625,13 +999,25 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry)
/* current entry was completely written */
GF_ASSERT (entry->pending_count == 0);
__socket_ioq_entry_free (entry);
+ priv = this->private;
+ if (priv->own_thread) {
+ /*
+ * The pipe should only remain readable if there are
+ * more entries after this, so drain the byte
+ * representing this entry.
+ */
+ if (!direct && read(priv->pipe[0],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "read error on pipe");
+ }
+ }
}
return ret;
}
-int
+static int
__socket_ioq_churn (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -647,13 +1033,13 @@ __socket_ioq_churn (rpc_transport_t *this)
/* pick next entry */
entry = priv->ioq_next;
- ret = __socket_ioq_churn_entry (this, entry);
+ ret = __socket_ioq_churn_entry (this, entry, 0);
if (ret != 0)
break;
}
- if (list_empty (&priv->ioq)) {
+ if (!priv->own_thread && list_empty (&priv->ioq)) {
/* all pending writes done, not interested in POLLOUT */
priv->idx = event_select_on (this->ctx->event_pool,
priv->sock, priv->idx, -1, 0);
@@ -664,7 +1050,7 @@ out:
}
-int
+static int
socket_event_poll_err (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -689,7 +1075,7 @@ out:
}
-int
+static int
socket_event_poll_out (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -719,43 +1105,45 @@ out:
}
-inline int
+static inline int
__socket_read_simple_msg (rpc_transport_t *this)
{
- socket_private_t *priv = NULL;
- int ret = 0;
- uint32_t remaining_size = 0;
- size_t bytes_read = 0;
+ int ret = 0;
+ uint32_t remaining_size = 0;
+ size_t bytes_read = 0;
+ socket_private_t *priv = NULL;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- switch (priv->incoming.frag.simple_state) {
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ switch (frag->simple_state) {
case SP_STATE_SIMPLE_MSG_INIT:
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
__socket_proto_init_pending (priv, remaining_size);
- priv->incoming.frag.simple_state =
- SP_STATE_READING_SIMPLE_MSG;
+ frag->simple_state = SP_STATE_READING_SIMPLE_MSG;
/* fall through */
case SP_STATE_READING_SIMPLE_MSG:
ret = 0;
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
if (remaining_size > 0) {
ret = __socket_readv (this,
- priv->incoming.pending_vector, 1,
- &priv->incoming.pending_vector,
- &priv->incoming.pending_count,
+ in->pending_vector, 1,
+ &in->pending_vector,
+ &in->pending_count,
&bytes_read);
}
@@ -767,8 +1155,8 @@ __socket_read_simple_msg (rpc_transport_t *this)
break;
}
- priv->incoming.frag.bytes_read += bytes_read;
- priv->incoming.frag.fragcurrent += bytes_read;
+ frag->bytes_read += bytes_read;
+ frag->fragcurrent += bytes_read;
if (ret > 0) {
gf_log (this->name, GF_LOG_TRACE,
@@ -777,8 +1165,7 @@ __socket_read_simple_msg (rpc_transport_t *this)
}
if (ret == 0) {
- priv->incoming.frag.simple_state
- = SP_STATE_SIMPLE_MSG_INIT;
+ frag->simple_state = SP_STATE_SIMPLE_MSG_INIT;
}
}
@@ -787,7 +1174,7 @@ out:
}
-inline int
+static inline int
__socket_read_simple_request (rpc_transport_t *this)
{
return __socket_read_simple_msg (this);
@@ -804,7 +1191,7 @@ __socket_read_simple_request (rpc_transport_t *this)
#define rpc_progver_addr(buf) (buf + RPC_MSGTYPE_SIZE + 8)
#define rpc_procnum_addr(buf) (buf + RPC_MSGTYPE_SIZE + 12)
-inline int
+static inline int
__socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vector_sizer)
{
socket_private_t *priv = NULL;
@@ -814,17 +1201,26 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto
struct iobuf *iobuf = NULL;
uint32_t remaining_size = 0;
ssize_t readsize = 0;
- size_t size = 0;
+ size_t size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+ sp_rpcfrag_request_state_t *request = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- switch (priv->incoming.frag.call_body.request.vector_state) {
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+ request = &frag->call_body.request;
+
+ switch (request->vector_state) {
case SP_STATE_VECTORED_REQUEST_INIT:
- priv->incoming.frag.call_body.request.vector_sizer_state = 0;
- addr = rpc_cred_addr (iobuf_ptr (priv->incoming.iobuf));
+ request->vector_sizer_state = 0;
+
+ addr = rpc_cred_addr (iobuf_ptr (in->iobuf));
/* also read verf flavour and verflen */
credlen = ntoh32 (*((uint32_t *)addr))
@@ -832,102 +1228,114 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto
__socket_proto_init_pending (priv, credlen);
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READING_CREDBYTES;
+ request->vector_state = SP_STATE_READING_CREDBYTES;
/* fall through */
case SP_STATE_READING_CREDBYTES:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READ_CREDBYTES;
+ request->vector_state = SP_STATE_READ_CREDBYTES;
/* fall through */
case SP_STATE_READ_CREDBYTES:
- addr = rpc_verf_addr (priv->incoming.frag.fragcurrent);
+ addr = rpc_verf_addr (frag->fragcurrent);
verflen = ntoh32 (*((uint32_t *)addr));
if (verflen == 0) {
- priv->incoming.frag.call_body.request.vector_state
- = SP_STATE_READ_VERFBYTES;
+ request->vector_state = SP_STATE_READ_VERFBYTES;
goto sp_state_read_verfbytes;
}
__socket_proto_init_pending (priv, verflen);
- priv->incoming.frag.call_body.request.vector_state
- = SP_STATE_READING_VERFBYTES;
+ request->vector_state = SP_STATE_READING_VERFBYTES;
/* fall through */
case SP_STATE_READING_VERFBYTES:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READ_VERFBYTES;
+ request->vector_state = SP_STATE_READ_VERFBYTES;
/* fall through */
case SP_STATE_READ_VERFBYTES:
sp_state_read_verfbytes:
- priv->incoming.frag.call_body.request.vector_sizer_state =
- vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state,
- &readsize,
- priv->incoming.frag.fragcurrent);
+ /* set the base_addr 'persistently' across multiple calls
+ into the state machine */
+ in->proghdr_base_addr = frag->fragcurrent;
+
+ request->vector_sizer_state =
+ vector_sizer (request->vector_sizer_state,
+ &readsize, in->proghdr_base_addr,
+ frag->fragcurrent);
__socket_proto_init_pending (priv, readsize);
- priv->incoming.frag.call_body.request.vector_state
- = SP_STATE_READING_PROGHDR;
+
+ request->vector_state = SP_STATE_READING_PROGHDR;
/* fall through */
case SP_STATE_READING_PROGHDR:
__socket_proto_read (priv, ret);
-sp_state_reading_proghdr:
- priv->incoming.frag.call_body.request.vector_sizer_state =
- vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state,
- &readsize,
- priv->incoming.frag.fragcurrent);
+
+ request->vector_state = SP_STATE_READ_PROGHDR;
+
+ /* fall through */
+
+ case SP_STATE_READ_PROGHDR:
+sp_state_read_proghdr:
+ request->vector_sizer_state =
+ vector_sizer (request->vector_sizer_state,
+ &readsize, in->proghdr_base_addr,
+ frag->fragcurrent);
if (readsize == 0) {
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READ_PROGHDR;
- } else {
- __socket_proto_init_pending (priv, readsize);
- __socket_proto_read (priv, ret);
- goto sp_state_reading_proghdr;
+ request->vector_state = SP_STATE_READ_PROGHDR_XDATA;
+ goto sp_state_read_proghdr_xdata;
}
- case SP_STATE_READ_PROGHDR:
- if (priv->incoming.payload_vector.iov_base == NULL) {
+ __socket_proto_init_pending (priv, readsize);
- size = RPC_FRAGSIZE (priv->incoming.fraghdr) -
- priv->incoming.frag.bytes_read;
+ request->vector_state = SP_STATE_READING_PROGHDR_XDATA;
+
+ /* fall through */
+
+ case SP_STATE_READING_PROGHDR_XDATA:
+ __socket_proto_read (priv, ret);
+
+ request->vector_state = SP_STATE_READ_PROGHDR;
+ /* check if the vector_sizer() has more to say */
+ goto sp_state_read_proghdr;
+
+ case SP_STATE_READ_PROGHDR_XDATA:
+sp_state_read_proghdr_xdata:
+ if (in->payload_vector.iov_base == NULL) {
+
+ size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
iobuf = iobuf_get2 (this->ctx->iobuf_pool, size);
if (!iobuf) {
ret = -1;
break;
}
- if (priv->incoming.iobref == NULL) {
- priv->incoming.iobref = iobref_new ();
- if (priv->incoming.iobref == NULL) {
+ if (in->iobref == NULL) {
+ in->iobref = iobref_new ();
+ if (in->iobref == NULL) {
ret = -1;
iobuf_unref (iobuf);
break;
}
}
- iobref_add (priv->incoming.iobref, iobuf);
+ iobref_add (in->iobref, iobuf);
iobuf_unref (iobuf);
- priv->incoming.payload_vector.iov_base
- = iobuf_ptr (iobuf);
+ in->payload_vector.iov_base = iobuf_ptr (iobuf);
- priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf);
+ frag->fragcurrent = iobuf_ptr (iobuf);
}
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READING_PROG;
+ request->vector_state = SP_STATE_READING_PROG;
/* fall through */
@@ -938,19 +1346,15 @@ sp_state_reading_proghdr:
ret = __socket_read_simple_msg (this);
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
- if ((ret == -1)
- || ((ret == 0)
- && (remaining_size == 0)
- && RPC_LASTFRAG (priv->incoming.fraghdr))) {
- priv->incoming.frag.call_body.request.vector_state
- = SP_STATE_VECTORED_REQUEST_INIT;
- priv->incoming.payload_vector.iov_len
- = (unsigned long)priv->incoming.frag.fragcurrent
- - (unsigned long)
- priv->incoming.payload_vector.iov_base;
+ if ((ret == -1) ||
+ ((ret == 0) && (remaining_size == 0)
+ && RPC_LASTFRAG (in->fraghdr))) {
+ request->vector_state = SP_STATE_VECTORED_REQUEST_INIT;
+ in->payload_vector.iov_len
+ = ((unsigned long)frag->fragcurrent
+ - (unsigned long)in->payload_vector.iov_base);
}
break;
}
@@ -959,7 +1363,7 @@ out:
return ret;
}
-inline int
+static inline int
__socket_read_request (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -968,46 +1372,53 @@ __socket_read_request (rpc_transport_t *this)
int ret = -1;
char *buf = NULL;
rpcsvc_vector_sizer vector_sizer = NULL;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+ sp_rpcfrag_request_state_t *request = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- switch (priv->incoming.frag.call_body.request.header_state) {
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+ request = &frag->call_body.request;
+
+ switch (request->header_state) {
case SP_STATE_REQUEST_HEADER_INIT:
__socket_proto_init_pending (priv, RPC_CALL_BODY_SIZE);
- priv->incoming.frag.call_body.request.header_state
- = SP_STATE_READING_RPCHDR1;
+ request->header_state = SP_STATE_READING_RPCHDR1;
/* fall through */
case SP_STATE_READING_RPCHDR1:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.request.header_state =
- SP_STATE_READ_RPCHDR1;
+ request->header_state = SP_STATE_READ_RPCHDR1;
/* fall through */
case SP_STATE_READ_RPCHDR1:
- buf = rpc_prognum_addr (iobuf_ptr (priv->incoming.iobuf));
+ buf = rpc_prognum_addr (iobuf_ptr (in->iobuf));
prognum = ntoh32 (*((uint32_t *)buf));
- buf = rpc_progver_addr (iobuf_ptr (priv->incoming.iobuf));
+ buf = rpc_progver_addr (iobuf_ptr (in->iobuf));
progver = ntoh32 (*((uint32_t *)buf));
- buf = rpc_procnum_addr (iobuf_ptr (priv->incoming.iobuf));
+ buf = rpc_procnum_addr (iobuf_ptr (in->iobuf));
procnum = ntoh32 (*((uint32_t *)buf));
- if (this->listener) {
- /* this check is needed as rpcsvc and rpc-clnt actor structures are
- * not same */
- vector_sizer = rpcsvc_get_program_vector_sizer ((rpcsvc_t *)this->mydata,
- prognum, progver, procnum);
+ if (priv->is_server) {
+ /* this check is needed as rpcsvc and rpc-clnt
+ * actor structures are not same */
+ vector_sizer =
+ rpcsvc_get_program_vector_sizer ((rpcsvc_t *)this->mydata,
+ prognum, progver, procnum);
}
if (vector_sizer) {
@@ -1016,15 +1427,13 @@ __socket_read_request (rpc_transport_t *this)
ret = __socket_read_simple_request (this);
}
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
if ((ret == -1)
|| ((ret == 0)
&& (remaining_size == 0)
- && (RPC_LASTFRAG (priv->incoming.fraghdr)))) {
- priv->incoming.frag.call_body.request.header_state =
- SP_STATE_REQUEST_HEADER_INIT;
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ request->header_state = SP_STATE_REQUEST_HEADER_INIT;
}
break;
@@ -1035,36 +1444,40 @@ out:
}
-inline int
+static inline int
__socket_read_accepted_successful_reply (rpc_transport_t *this)
{
- socket_private_t *priv = NULL;
- int ret = 0;
- struct iobuf *iobuf = NULL;
- uint32_t gluster_read_rsp_hdr_len = 0;
- gfs3_read_rsp read_rsp = {0, };
- size_t size = 0;
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ struct iobuf *iobuf = NULL;
+ gfs3_read_rsp read_rsp = {0, };
+ ssize_t size = 0;
+ ssize_t default_read_size = 0;
+ char *proghdr_buf = NULL;
+ XDR xdr;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- switch (priv->incoming.frag.call_body.reply.accepted_success_state) {
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ switch (frag->call_body.reply.accepted_success_state) {
case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT:
- gluster_read_rsp_hdr_len = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp,
- &read_rsp);
+ default_read_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp,
+ &read_rsp);
- if (gluster_read_rsp_hdr_len == 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "xdr_sizeof on gfs3_read_rsp failed");
- ret = -1;
- goto out;
- }
- __socket_proto_init_pending (priv, gluster_read_rsp_hdr_len);
+ proghdr_buf = frag->fragcurrent;
+
+ __socket_proto_init_pending (priv, default_read_size);
- priv->incoming.frag.call_body.reply.accepted_success_state
+ frag->call_body.reply.accepted_success_state
= SP_STATE_READING_PROC_HEADER;
/* fall through */
@@ -1072,13 +1485,42 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
case SP_STATE_READING_PROC_HEADER:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.reply.accepted_success_state
- = SP_STATE_READ_PROC_HEADER;
+ /* there can be 'xdata' in read response, figure it out */
+ xdrmem_create (&xdr, proghdr_buf, default_read_size,
+ XDR_DECODE);
- if (priv->incoming.payload_vector.iov_base == NULL) {
+ /* This will fail if there is xdata sent from server, if not,
+ well and good, we don't need to worry about */
+ xdr_gfs3_read_rsp (&xdr, &read_rsp);
- size = (RPC_FRAGSIZE (priv->incoming.fraghdr) -
- priv->incoming.frag.bytes_read);
+ free (read_rsp.xdata.xdata_val);
+
+ /* need to round off to proper roof (%4), as XDR packing pads
+ the end of opaque object with '0' */
+ size = roof (read_rsp.xdata.xdata_len, 4);
+
+ if (!size) {
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_OPAQUE;
+ goto read_proc_opaque;
+ }
+
+ __socket_proto_init_pending (priv, size);
+
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_READING_PROC_OPAQUE;
+
+ case SP_STATE_READING_PROC_OPAQUE:
+ __socket_proto_read (priv, ret);
+
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_OPAQUE;
+
+ case SP_STATE_READ_PROC_OPAQUE:
+ read_proc_opaque:
+ if (in->payload_vector.iov_base == NULL) {
+
+ size = (RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read);
iobuf = iobuf_get2 (this->ctx->iobuf_pool, size);
if (iobuf == NULL) {
@@ -1086,26 +1528,27 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
goto out;
}
- if (priv->incoming.iobref == NULL) {
- priv->incoming.iobref = iobref_new ();
- if (priv->incoming.iobref == NULL) {
+ if (in->iobref == NULL) {
+ in->iobref = iobref_new ();
+ if (in->iobref == NULL) {
ret = -1;
iobuf_unref (iobuf);
goto out;
}
}
- iobref_add (priv->incoming.iobref, iobuf);
+ iobref_add (in->iobref, iobuf);
iobuf_unref (iobuf);
- priv->incoming.payload_vector.iov_base
- = iobuf_ptr (iobuf);
+ in->payload_vector.iov_base = iobuf_ptr (iobuf);
- priv->incoming.payload_vector.iov_len = size;
+ in->payload_vector.iov_len = size;
}
- priv->incoming.frag.fragcurrent
- = priv->incoming.payload_vector.iov_base;
+ frag->fragcurrent = in->payload_vector.iov_base;
+
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_HEADER;
/* fall through */
@@ -1113,9 +1556,8 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
/* now read the entire remaining msg into new iobuf */
ret = __socket_read_simple_msg (this);
if ((ret == -1)
- || ((ret == 0)
- && RPC_LASTFRAG (priv->incoming.fraghdr))) {
- priv->incoming.frag.call_body.reply.accepted_success_state
+ || ((ret == 0) && RPC_LASTFRAG (in->fraghdr))) {
+ frag->call_body.reply.accepted_success_state
= SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT;
}
@@ -1129,7 +1571,7 @@ out:
#define rpc_reply_verflen_addr(fragcurrent) ((char *)fragcurrent - 4)
#define rpc_reply_accept_status_addr(fragcurrent) ((char *)fragcurrent - 4)
-inline int
+static inline int
__socket_read_accepted_reply (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -1137,19 +1579,24 @@ __socket_read_accepted_reply (rpc_transport_t *this)
char *buf = NULL;
uint32_t verflen = 0, len = 0;
uint32_t remaining_size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
- switch (priv->incoming.frag.call_body.reply.accepted_state) {
+ switch (frag->call_body.reply.accepted_state) {
case SP_STATE_ACCEPTED_REPLY_INIT:
__socket_proto_init_pending (priv,
RPC_AUTH_FLAVOUR_N_LENGTH_SIZE);
- priv->incoming.frag.call_body.reply.accepted_state
+ frag->call_body.reply.accepted_state
= SP_STATE_READING_REPLY_VERFLEN;
/* fall through */
@@ -1157,13 +1604,13 @@ __socket_read_accepted_reply (rpc_transport_t *this)
case SP_STATE_READING_REPLY_VERFLEN:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.reply.accepted_state
+ frag->call_body.reply.accepted_state
= SP_STATE_READ_REPLY_VERFLEN;
/* fall through */
case SP_STATE_READ_REPLY_VERFLEN:
- buf = rpc_reply_verflen_addr (priv->incoming.frag.fragcurrent);
+ buf = rpc_reply_verflen_addr (frag->fragcurrent);
verflen = ntoh32 (*((uint32_t *) buf));
@@ -1172,7 +1619,7 @@ __socket_read_accepted_reply (rpc_transport_t *this)
__socket_proto_init_pending (priv, len);
- priv->incoming.frag.call_body.reply.accepted_state
+ frag->call_body.reply.accepted_state
= SP_STATE_READING_REPLY_VERFBYTES;
/* fall through */
@@ -1180,19 +1627,19 @@ __socket_read_accepted_reply (rpc_transport_t *this)
case SP_STATE_READING_REPLY_VERFBYTES:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.reply.accepted_state
+ frag->call_body.reply.accepted_state
= SP_STATE_READ_REPLY_VERFBYTES;
- buf = rpc_reply_accept_status_addr (priv->incoming.frag.fragcurrent);
+ buf = rpc_reply_accept_status_addr (frag->fragcurrent);
- priv->incoming.frag.call_body.reply.accept_status
+ frag->call_body.reply.accept_status
= ntoh32 (*(uint32_t *) buf);
/* fall through */
case SP_STATE_READ_REPLY_VERFBYTES:
- if (priv->incoming.frag.call_body.reply.accept_status
+ if (frag->call_body.reply.accept_status
== SUCCESS) {
ret = __socket_read_accepted_successful_reply (this);
} else {
@@ -1202,14 +1649,13 @@ __socket_read_accepted_reply (rpc_transport_t *this)
ret = __socket_read_simple_msg (this);
}
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr)
+ - frag->bytes_read;
if ((ret == -1)
- || ((ret == 0)
- && (remaining_size == 0)
- && (RPC_LASTFRAG (priv->incoming.fraghdr)))) {
- priv->incoming.frag.call_body.reply.accepted_state
+ || ((ret == 0) && (remaining_size == 0)
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ frag->call_body.reply.accepted_state
= SP_STATE_ACCEPTED_REPLY_INIT;
}
@@ -1221,7 +1667,7 @@ out:
}
-inline int
+static inline int
__socket_read_denied_reply (rpc_transport_t *this)
{
return __socket_read_simple_msg (this);
@@ -1231,25 +1677,29 @@ __socket_read_denied_reply (rpc_transport_t *this)
#define rpc_reply_status_addr(fragcurrent) ((char *)fragcurrent - 4)
-inline int
+static inline int
__socket_read_vectored_reply (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
int ret = 0;
char *buf = NULL;
uint32_t remaining_size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ in = &priv->incoming;
+ frag = &in->frag;
- switch (priv->incoming.frag.call_body.reply.status_state) {
+ switch (frag->call_body.reply.status_state) {
case SP_STATE_ACCEPTED_REPLY_INIT:
__socket_proto_init_pending (priv, RPC_REPLY_STATUS_SIZE);
- priv->incoming.frag.call_body.reply.status_state
+ frag->call_body.reply.status_state
= SP_STATE_READING_REPLY_STATUS;
/* fall through */
@@ -1257,37 +1707,33 @@ __socket_read_vectored_reply (rpc_transport_t *this)
case SP_STATE_READING_REPLY_STATUS:
__socket_proto_read (priv, ret);
- buf = rpc_reply_status_addr (priv->incoming.frag.fragcurrent);
+ buf = rpc_reply_status_addr (frag->fragcurrent);
- priv->incoming.frag.call_body.reply.accept_status
+ frag->call_body.reply.accept_status
= ntoh32 (*((uint32_t *) buf));
- priv->incoming.frag.call_body.reply.status_state
+ frag->call_body.reply.status_state
= SP_STATE_READ_REPLY_STATUS;
/* fall through */
case SP_STATE_READ_REPLY_STATUS:
- if (priv->incoming.frag.call_body.reply.accept_status
- == MSG_ACCEPTED) {
+ if (frag->call_body.reply.accept_status == MSG_ACCEPTED) {
ret = __socket_read_accepted_reply (this);
} else {
ret = __socket_read_denied_reply (this);
}
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
if ((ret == -1)
- || ((ret == 0)
- && (remaining_size == 0)
- && (RPC_LASTFRAG (priv->incoming.fraghdr)))) {
- priv->incoming.frag.call_body.reply.status_state
+ || ((ret == 0) && (remaining_size == 0)
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ frag->call_body.reply.status_state
= SP_STATE_ACCEPTED_REPLY_INIT;
- priv->incoming.payload_vector.iov_len
- = (unsigned long)priv->incoming.frag.fragcurrent
- - (unsigned long)
- priv->incoming.payload_vector.iov_base;
+ in->payload_vector.iov_len
+ = (unsigned long)frag->fragcurrent
+ - (unsigned long)in->payload_vector.iov_base;
}
break;
}
@@ -1297,7 +1743,7 @@ out:
}
-inline int
+static inline int
__socket_read_simple_reply (rpc_transport_t *this)
{
return __socket_read_simple_msg (this);
@@ -1305,7 +1751,7 @@ __socket_read_simple_reply (rpc_transport_t *this)
#define rpc_xid_addr(buf) (buf)
-inline int
+static inline int
__socket_read_reply (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -1313,26 +1759,29 @@ __socket_read_reply (rpc_transport_t *this)
int32_t ret = -1;
rpc_request_info_t *request_info = NULL;
char map_xid = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ in = &priv->incoming;
+ frag = &in->frag;
- buf = rpc_xid_addr (iobuf_ptr (priv->incoming.iobuf));
+ buf = rpc_xid_addr (iobuf_ptr (in->iobuf));
- if (priv->incoming.request_info == NULL) {
- priv->incoming.request_info = GF_CALLOC (1,
- sizeof (*request_info),
- gf_common_mt_rpc_trans_reqinfo_t);
- if (priv->incoming.request_info == NULL) {
+ if (in->request_info == NULL) {
+ in->request_info = GF_CALLOC (1, sizeof (*request_info),
+ gf_common_mt_rpc_trans_reqinfo_t);
+ if (in->request_info == NULL) {
goto out;
}
map_xid = 1;
}
- request_info = priv->incoming.request_info;
+ request_info = in->request_info;
if (map_xid) {
request_info->xid = ntoh32 (*((uint32_t *) buf));
@@ -1344,7 +1793,7 @@ __socket_read_reply (rpc_transport_t *this)
{
ret = rpc_transport_notify (this,
RPC_TRANSPORT_MAP_XID_REQUEST,
- priv->incoming.request_info);
+ in->request_info);
}
pthread_mutex_lock (&priv->lock);
@@ -1355,13 +1804,11 @@ __socket_read_reply (rpc_transport_t *this)
}
}
- if ((request_info->prognum == GLUSTER3_1_FOP_PROGRAM)
+ if ((request_info->prognum == GLUSTER_FOP_PROGRAM)
&& (request_info->procnum == GF_FOP_READ)) {
if (map_xid && request_info->rsp.rsp_payload_count != 0) {
- priv->incoming.iobref
- = iobref_ref (request_info->rsp.rsp_iobref);
- priv->incoming.payload_vector
- = *request_info->rsp.rsp_payload;
+ in->iobref = iobref_ref (request_info->rsp.rsp_iobref);
+ in->payload_vector = *request_info->rsp.rsp_payload;
}
ret = __socket_read_vectored_reply (this);
@@ -1374,42 +1821,47 @@ out:
/* returns the number of bytes yet to be read in a fragment */
-inline int
+static inline int
__socket_read_frag (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
int32_t ret = 0;
char *buf = NULL;
uint32_t remaining_size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
- switch (priv->incoming.frag.state) {
+ switch (frag->state) {
case SP_STATE_NADA:
__socket_proto_init_pending (priv, RPC_MSGTYPE_SIZE);
- priv->incoming.frag.state = SP_STATE_READING_MSGTYPE;
+ frag->state = SP_STATE_READING_MSGTYPE;
/* fall through */
case SP_STATE_READING_MSGTYPE:
__socket_proto_read (priv, ret);
- priv->incoming.frag.state = SP_STATE_READ_MSGTYPE;
+ frag->state = SP_STATE_READ_MSGTYPE;
/* fall through */
case SP_STATE_READ_MSGTYPE:
- buf = rpc_msgtype_addr (iobuf_ptr (priv->incoming.iobuf));
- priv->incoming.msg_type = ntoh32 (*((uint32_t *)buf));
+ buf = rpc_msgtype_addr (iobuf_ptr (in->iobuf));
+ in->msg_type = ntoh32 (*((uint32_t *)buf));
- if (priv->incoming.msg_type == CALL) {
+ if (in->msg_type == CALL) {
ret = __socket_read_request (this);
- } else if (priv->incoming.msg_type == REPLY) {
+ } else if (in->msg_type == REPLY) {
ret = __socket_read_reply (this);
- } else if (priv->incoming.msg_type == GF_UNIVERSAL_ANSWER) {
+ } else if (in->msg_type == GF_UNIVERSAL_ANSWER) {
gf_log ("rpc", GF_LOG_ERROR,
"older version of protocol/process trying to "
"connect from %s. use newer version on that node",
@@ -1417,19 +1869,17 @@ __socket_read_frag (rpc_transport_t *this)
} else {
gf_log ("rpc", GF_LOG_ERROR,
"wrong MSG-TYPE (%d) received from %s",
- priv->incoming.msg_type,
+ in->msg_type,
this->peerinfo.identifier);
ret = -1;
}
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
if ((ret == -1)
- || ((ret == 0)
- && (remaining_size == 0)
- && (RPC_LASTFRAG (priv->incoming.fraghdr)))) {
- priv->incoming.frag.state = SP_STATE_NADA;
+ || ((ret == 0) && (remaining_size == 0)
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ frag->state = SP_STATE_NADA;
}
break;
@@ -1440,31 +1890,36 @@ out:
}
-inline
+static inline
void __socket_reset_priv (socket_private_t *priv)
{
- if (priv->incoming.iobref) {
- iobref_unref (priv->incoming.iobref);
- priv->incoming.iobref = NULL;
+ struct gf_sock_incoming *in = NULL;
+
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+
+ if (in->iobref) {
+ iobref_unref (in->iobref);
+ in->iobref = NULL;
}
- if (priv->incoming.iobuf) {
- iobuf_unref (priv->incoming.iobuf);
+ if (in->iobuf) {
+ iobuf_unref (in->iobuf);
}
- if (priv->incoming.request_info != NULL) {
- GF_FREE (priv->incoming.request_info);
- priv->incoming.request_info = NULL;
+ if (in->request_info != NULL) {
+ GF_FREE (in->request_info);
+ in->request_info = NULL;
}
- memset (&priv->incoming.payload_vector, 0,
- sizeof (priv->incoming.payload_vector));
+ memset (&in->payload_vector, 0,
+ sizeof (in->payload_vector));
- priv->incoming.iobuf = NULL;
+ in->iobuf = NULL;
}
-int
+static int
__socket_proto_state_machine (rpc_transport_t *this,
rpc_transport_pollin_t **pollin)
{
@@ -1473,46 +1928,40 @@ __socket_proto_state_machine (rpc_transport_t *this,
struct iobuf *iobuf = NULL;
struct iobref *iobref = NULL;
struct iovec vector[2];
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- while (priv->incoming.record_state != SP_STATE_COMPLETE) {
- switch (priv->incoming.record_state) {
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ while (in->record_state != SP_STATE_COMPLETE) {
+ switch (in->record_state) {
case SP_STATE_NADA:
- priv->incoming.total_bytes_read = 0;
- priv->incoming.payload_vector.iov_len = 0;
+ in->total_bytes_read = 0;
+ in->payload_vector.iov_len = 0;
- priv->incoming.pending_vector = priv->incoming.vector;
- priv->incoming.pending_vector->iov_base =
- &priv->incoming.fraghdr;
+ in->pending_vector = in->vector;
+ in->pending_vector->iov_base = &in->fraghdr;
- priv->incoming.pending_vector->iov_len =
- sizeof (priv->incoming.fraghdr);
+ in->pending_vector->iov_len = sizeof (in->fraghdr);
- priv->incoming.record_state = SP_STATE_READING_FRAGHDR;
+ in->record_state = SP_STATE_READING_FRAGHDR;
/* fall through */
case SP_STATE_READING_FRAGHDR:
- ret = __socket_readv (this,
- priv->incoming.pending_vector, 1,
- &priv->incoming.pending_vector,
- &priv->incoming.pending_count,
+ ret = __socket_readv (this, in->pending_vector, 1,
+ &in->pending_vector,
+ &in->pending_count,
NULL);
- if (ret == -1) {
- if (priv->read_fail_log == 1) {
- gf_log (this->name,
- ((priv->connected == 1) ?
- GF_LOG_WARNING : GF_LOG_DEBUG),
- "reading from socket failed. Error (%s)"
- ", peer (%s)", strerror (errno),
- this->peerinfo.identifier);
- }
+ if (ret == -1)
goto out;
- }
if (ret > 0) {
gf_log (this->name, GF_LOG_TRACE, "partial "
@@ -1521,44 +1970,40 @@ __socket_proto_state_machine (rpc_transport_t *this,
}
if (ret == 0) {
- priv->incoming.record_state =
- SP_STATE_READ_FRAGHDR;
+ in->record_state = SP_STATE_READ_FRAGHDR;
}
/* fall through */
case SP_STATE_READ_FRAGHDR:
- priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr);
- priv->incoming.record_state = SP_STATE_READING_FRAG;
- priv->incoming.total_bytes_read
- += RPC_FRAGSIZE(priv->incoming.fraghdr);
+ in->fraghdr = ntoh32 (in->fraghdr);
+ in->total_bytes_read += RPC_FRAGSIZE(in->fraghdr);
iobuf = iobuf_get2 (this->ctx->iobuf_pool,
- priv->incoming.total_bytes_read +
- sizeof (priv->incoming.fraghdr));
+ (in->total_bytes_read +
+ sizeof (in->fraghdr)));
if (!iobuf) {
ret = -ENOMEM;
goto out;
}
- priv->incoming.iobuf = iobuf;
- priv->incoming.iobuf_size = 0;
- priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf);
+ in->iobuf = iobuf;
+ in->iobuf_size = 0;
+ frag->fragcurrent = iobuf_ptr (iobuf);
+ in->record_state = SP_STATE_READING_FRAG;
/* fall through */
case SP_STATE_READING_FRAG:
ret = __socket_read_frag (this);
- if ((ret == -1)
- || (priv->incoming.frag.bytes_read !=
- RPC_FRAGSIZE (priv->incoming.fraghdr))) {
+ if ((ret == -1) ||
+ (frag->bytes_read != RPC_FRAGSIZE (in->fraghdr))) {
goto out;
}
- priv->incoming.frag.bytes_read = 0;
+ frag->bytes_read = 0;
- if (!RPC_LASTFRAG (priv->incoming.fraghdr)) {
- priv->incoming.record_state =
- SP_STATE_READING_FRAGHDR;
+ if (!RPC_LASTFRAG (in->fraghdr)) {
+ in->record_state = SP_STATE_READING_FRAGHDR;
break;
}
@@ -1567,44 +2012,39 @@ __socket_proto_state_machine (rpc_transport_t *this,
*/
if (pollin != NULL) {
int count = 0;
- priv->incoming.iobuf_size
- = priv->incoming.total_bytes_read
- - priv->incoming.payload_vector.iov_len;
+ in->iobuf_size = (in->total_bytes_read -
+ in->payload_vector.iov_len);
memset (vector, 0, sizeof (vector));
- if (priv->incoming.iobref == NULL) {
- priv->incoming.iobref = iobref_new ();
- if (priv->incoming.iobref == NULL) {
+ if (in->iobref == NULL) {
+ in->iobref = iobref_new ();
+ if (in->iobref == NULL) {
ret = -1;
goto out;
}
}
- vector[count].iov_base
- = iobuf_ptr (priv->incoming.iobuf);
- vector[count].iov_len
- = priv->incoming.iobuf_size;
+ vector[count].iov_base = iobuf_ptr (in->iobuf);
+ vector[count].iov_len = in->iobuf_size;
- iobref = priv->incoming.iobref;
+ iobref = in->iobref;
count++;
- if (priv->incoming.payload_vector.iov_base
- != NULL) {
- vector[count]
- = priv->incoming.payload_vector;
+ if (in->payload_vector.iov_base != NULL) {
+ vector[count] = in->payload_vector;
count++;
}
*pollin = rpc_transport_pollin_alloc (this,
vector,
count,
- priv->incoming.iobuf,
+ in->iobuf,
iobref,
- priv->incoming.request_info);
- iobuf_unref (priv->incoming.iobuf);
- priv->incoming.iobuf = NULL;
+ in->request_info);
+ iobuf_unref (in->iobuf);
+ in->iobuf = NULL;
if (*pollin == NULL) {
gf_log (this->name, GF_LOG_WARNING,
@@ -1612,12 +2052,12 @@ __socket_proto_state_machine (rpc_transport_t *this,
ret = -1;
goto out;
}
- if (priv->incoming.msg_type == REPLY)
+ if (in->msg_type == REPLY)
(*pollin)->is_reply = 1;
- priv->incoming.request_info = NULL;
+ in->request_info = NULL;
}
- priv->incoming.record_state = SP_STATE_COMPLETE;
+ in->record_state = SP_STATE_COMPLETE;
break;
case SP_STATE_COMPLETE:
@@ -1629,8 +2069,8 @@ __socket_proto_state_machine (rpc_transport_t *this,
}
}
- if (priv->incoming.record_state == SP_STATE_COMPLETE) {
- priv->incoming.record_state = SP_STATE_NADA;
+ if (in->record_state == SP_STATE_COMPLETE) {
+ in->record_state = SP_STATE_NADA;
__socket_reset_priv (priv);
}
@@ -1642,7 +2082,7 @@ out:
}
-int
+static int
socket_proto_state_machine (rpc_transport_t *this,
rpc_transport_pollin_t **pollin)
{
@@ -1665,18 +2105,22 @@ out:
}
-int
+static int
socket_event_poll_in (rpc_transport_t *this)
{
int ret = -1;
rpc_transport_pollin_t *pollin = NULL;
+ socket_private_t *priv = this->private;
ret = socket_proto_state_machine (this, &pollin);
if (pollin != NULL) {
+ priv->ot_state = OT_CALLBACK;
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
pollin);
-
+ if (priv->ot_state == OT_CALLBACK) {
+ priv->ot_state = OT_RUNNING;
+ }
rpc_transport_pollin_destroy (pollin);
}
@@ -1684,7 +2128,7 @@ socket_event_poll_in (rpc_transport_t *this)
}
-int
+static int
socket_connect_finish (rpc_transport_t *this)
{
int ret = -1;
@@ -1699,9 +2143,11 @@ socket_connect_finish (rpc_transport_t *this)
pthread_mutex_lock (&priv->lock);
{
- if (priv->connected)
+ if (priv->connected != 0)
goto unlock;
+ get_transport_identifiers (this);
+
ret = __socket_connect_finish (priv->sock);
if (ret == -1 && errno == EINPROGRESS)
@@ -1716,8 +2162,6 @@ socket_connect_finish (rpc_transport_t *this)
priv->connect_finish_log = 1;
}
__socket_disconnect (this);
- notify_rpc = 1;
- event = RPC_TRANSPORT_DISCONNECT;
goto unlock;
}
@@ -1742,7 +2186,6 @@ socket_connect_finish (rpc_transport_t *this)
priv->connected = 1;
priv->connect_finish_log = 0;
event = RPC_TRANSPORT_CONNECT;
- get_transport_identifiers (this);
}
}
unlock:
@@ -1757,13 +2200,13 @@ out:
/* reads rpc_requests during pollin */
-int
+static int
socket_event_handler (int fd, int idx, void *data,
int poll_in, int poll_out, int poll_err)
{
- rpc_transport_t *this = NULL;
+ rpc_transport_t *this = NULL;
socket_private_t *priv = NULL;
- int ret = 0;
+ int ret = -1;
this = data;
GF_VALIDATE_OR_GOTO ("socket", this, out);
@@ -1773,16 +2216,13 @@ socket_event_handler (int fd, int idx, void *data,
THIS = this->xl;
priv = this->private;
-
pthread_mutex_lock (&priv->lock);
{
priv->idx = idx;
}
pthread_mutex_unlock (&priv->lock);
- if (!priv->connected) {
- ret = socket_connect_finish (this);
- }
+ ret = (priv->connected == 1) ? 0 : socket_connect_finish(this);
if (!ret && poll_out) {
ret = socket_event_poll_out (this);
@@ -1798,14 +2238,198 @@ socket_event_handler (int fd, int idx, void *data,
"disconnecting now");
socket_event_poll_err (this);
rpc_transport_unref (this);
- }
+ }
out:
- return 0;
+ return ret;
}
-int
+static void *
+socket_poller (void *ctx)
+{
+ rpc_transport_t *this = ctx;
+ socket_private_t *priv = this->private;
+ struct pollfd pfd[2] = {{0,},};
+ gf_boolean_t to_write = _gf_false;
+ int ret = 0;
+ uint32_t gen = 0;
+
+ priv->ot_state = OT_RUNNING;
+
+ if (priv->use_ssl) {
+ if (ssl_setup_connection(this,priv->connected) < 0) {
+ gf_log (this->name,GF_LOG_ERROR, "%s setup failed",
+ priv->connected ? "server" : "client");
+ goto err;
+ }
+ }
+
+ if (!priv->bio) {
+ ret = __socket_nonblock (priv->sock);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ goto err;
+ }
+ }
+
+ if (priv->connected == 0) {
+ THIS = this->xl;
+ ret = socket_connect_finish (this);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "asynchronous socket_connect_finish failed");
+ }
+ }
+
+ ret = rpc_transport_notify (this->listener,
+ RPC_TRANSPORT_ACCEPT, this);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "asynchronous rpc_transport_notify failed");
+ }
+
+ gen = priv->ot_gen;
+ for (;;) {
+ pthread_mutex_lock(&priv->lock);
+ to_write = !list_empty(&priv->ioq);
+ pthread_mutex_unlock(&priv->lock);
+ pfd[0].fd = priv->pipe[0];
+ pfd[0].events = POLL_MASK_ERROR;
+ pfd[0].revents = 0;
+ pfd[1].fd = priv->sock;
+ pfd[1].events = POLL_MASK_INPUT | POLL_MASK_ERROR;
+ pfd[1].revents = 0;
+ if (to_write) {
+ pfd[1].events |= POLL_MASK_OUTPUT;
+ }
+ else {
+ pfd[0].events |= POLL_MASK_INPUT;
+ }
+ if (poll(pfd,2,-1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,"poll failed");
+ break;
+ }
+ if (pfd[0].revents & POLL_MASK_ERROR) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "poll error on pipe");
+ break;
+ }
+ /* Only glusterd actually seems to need this. */
+ THIS = this->xl;
+ if (pfd[1].revents & POLL_MASK_INPUT) {
+ ret = socket_event_poll_in(this);
+ if (ret >= 0) {
+ /* Suppress errors while making progress. */
+ pfd[1].revents &= ~POLL_MASK_ERROR;
+ }
+ else if (errno == ENOTCONN) {
+ ret = 0;
+ }
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (input request)",
+ this);
+ priv->ot_state = OT_IDLE;
+ break;
+ }
+ }
+ else if (pfd[1].revents & POLL_MASK_OUTPUT) {
+ ret = socket_event_poll_out(this);
+ if (ret >= 0) {
+ /* Suppress errors while making progress. */
+ pfd[1].revents &= ~POLL_MASK_ERROR;
+ }
+ else if (errno == ENOTCONN) {
+ ret = 0;
+ }
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (output request)",
+ this);
+ priv->ot_state = OT_IDLE;
+ break;
+ }
+ }
+ else {
+ /*
+ * This usually means that we left poll() because
+ * somebody pushed a byte onto our pipe. That wakeup
+ * is why the pipe is there, but once awake we can do
+ * all the checking we need on the next iteration.
+ */
+ ret = 0;
+ }
+ if (pfd[1].revents & POLL_MASK_ERROR) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "poll error on socket");
+ break;
+ }
+ if (ret < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "error in polling loop");
+ break;
+ }
+ if (priv->ot_gen != gen) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "generation mismatch, my %u != %u",
+ gen, priv->ot_gen);
+ return NULL;
+ }
+ }
+
+err:
+ /* All (and only) I/O errors should come here. */
+ pthread_mutex_lock(&priv->lock);
+ if (priv->ssl_ssl) {
+ /*
+ * We're always responsible for this part, but only actually
+ * have to do it if we got far enough for ssl_ssl to be valid
+ * (i.e. errors in ssl_setup_connection don't count).
+ */
+ ssl_teardown_connection(priv);
+ }
+ __socket_shutdown(this);
+ close(priv->sock);
+ priv->sock = -1;
+ priv->ot_state = OT_IDLE;
+ pthread_mutex_unlock(&priv->lock);
+ rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT,
+ this);
+ rpc_transport_unref (this);
+ return NULL;
+}
+
+
+static void
+socket_spawn (rpc_transport_t *this)
+{
+ socket_private_t *priv = this->private;
+
+ switch (priv->ot_state) {
+ case OT_IDLE:
+ case OT_PLEASE_DIE:
+ break;
+ default:
+ gf_log (this->name, GF_LOG_WARNING,
+ "refusing to start redundant poller");
+ return;
+ }
+
+ priv->ot_gen += 7;
+ priv->ot_state = OT_SPAWNING;
+ gf_log (this->name, GF_LOG_TRACE,
+ "spawning %p with gen %u", this, priv->ot_gen);
+
+ if (gf_thread_create(&priv->thread,NULL,socket_poller,this) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not create poll thread");
+ }
+}
+
+static int
socket_server_event_handler (int fd, int idx, void *data,
int poll_in, int poll_out, int poll_err)
{
@@ -1843,20 +2467,7 @@ socket_server_event_handler (int fd, int idx, void *data,
goto unlock;
}
- if (!priv->bio) {
- ret = __socket_nonblock (new_sock);
-
- if (ret == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "NBIO on %d failed (%s)",
- new_sock, strerror (errno));
-
- close (new_sock);
- goto unlock;
- }
- }
-
- if (priv->nodelay) {
+ if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) {
ret = __socket_nodelay (new_sock);
if (ret == -1) {
gf_log (this->name, GF_LOG_WARNING,
@@ -1866,8 +2477,10 @@ socket_server_event_handler (int fd, int idx, void *data,
}
}
- if (priv->keepalive) {
+ if (priv->keepalive &&
+ new_sockaddr.ss_family != AF_UNIX) {
ret = __socket_keepalive (new_sock,
+ new_sockaddr.ss_family,
priv->keepaliveintvl,
priv->keepaliveidle);
if (ret == -1)
@@ -1881,6 +2494,15 @@ socket_server_event_handler (int fd, int idx, void *data,
if (!new_trans)
goto unlock;
+ ret = pthread_mutex_init(&new_trans->lock, NULL);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "pthread_mutex_init() failed: %s",
+ strerror (errno));
+ close (new_sock);
+ goto unlock;
+ }
+
new_trans->name = gf_strdup (this->name);
memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr,
@@ -1902,7 +2524,11 @@ socket_server_event_handler (int fd, int idx, void *data,
}
get_transport_identifiers (new_trans);
- socket_init (new_trans);
+ ret = socket_init(new_trans);
+ if (ret != 0) {
+ close(new_sock);
+ goto unlock;
+ }
new_trans->ops = this->ops;
new_trans->init = this->init;
new_trans->fini = this->fini;
@@ -1913,20 +2539,62 @@ socket_server_event_handler (int fd, int idx, void *data,
new_trans->listener = this;
new_priv = new_trans->private;
+ new_priv->use_ssl = priv->use_ssl;
+ new_priv->sock = new_sock;
+ new_priv->own_thread = priv->own_thread;
+
+ new_priv->ssl_ctx = priv->ssl_ctx;
+ if (priv->use_ssl && !priv->own_thread) {
+ if (ssl_setup_connection(new_trans,1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "server setup failed");
+ close(new_sock);
+ goto unlock;
+ }
+ }
+
+ if (!priv->bio && !priv->own_thread) {
+ ret = __socket_nonblock (new_sock);
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "NBIO on %d failed (%s)",
+ new_sock, strerror (errno));
+
+ close (new_sock);
+ goto unlock;
+ }
+ }
+
pthread_mutex_lock (&new_priv->lock);
{
- new_priv->sock = new_sock;
+ /*
+ * In the own_thread case, this is used to
+ * indicate that we're initializing a server
+ * connection.
+ */
new_priv->connected = 1;
+ new_priv->is_server = _gf_true;
rpc_transport_ref (new_trans);
- new_priv->idx =
- event_register (ctx->event_pool,
- new_sock,
- socket_event_handler,
- new_trans, 1, 0);
+ if (new_priv->own_thread) {
+ if (pipe(new_priv->pipe) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create pipe");
+ }
+ socket_spawn(new_trans);
+ }
+ else {
+ new_priv->idx =
+ event_register (ctx->event_pool,
+ new_sock,
+ socket_event_handler,
+ new_trans,
+ 1, 0);
+ if (new_priv->idx == -1)
+ ret = -1;
+ }
- if (new_priv->idx == -1)
- ret = -1;
}
pthread_mutex_unlock (&new_priv->lock);
if (ret == -1) {
@@ -1935,8 +2603,10 @@ socket_server_event_handler (int fd, int idx, void *data,
goto unlock;
}
- ret = rpc_transport_notify (this, RPC_TRANSPORT_ACCEPT,
- new_trans);
+ if (!priv->own_thread) {
+ ret = rpc_transport_notify (this,
+ RPC_TRANSPORT_ACCEPT, new_trans);
+ }
}
}
unlock:
@@ -1947,7 +2617,7 @@ out:
}
-int
+static int
socket_disconnect (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -1969,7 +2639,7 @@ out:
}
-int
+static int
socket_connect (rpc_transport_t *this, int port)
{
int ret = -1;
@@ -1978,7 +2648,9 @@ socket_connect (rpc_transport_t *this, int port)
socklen_t sockaddr_len = 0;
glusterfs_ctx_t *ctx = NULL;
sa_family_t sa_family = {0, };
+ char *local_addr = NULL;
union gf_sock_union sock_union;
+ struct sockaddr_in *addr = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, err);
GF_VALIDATE_OR_GOTO ("socket", this->private, err);
@@ -2006,6 +2678,10 @@ socket_connect (rpc_transport_t *this, int port)
goto err;
}
+ gf_log (this->name, GF_LOG_TRACE,
+ "connecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
+
ret = socket_client_get_remote_sockaddr (this, &sock_union.sa,
&sockaddr_len, &sa_family);
if (ret == -1) {
@@ -2016,6 +2692,20 @@ socket_connect (rpc_transport_t *this, int port)
if (port > 0) {
sock_union.sin.sin_port = htons (port);
}
+ if (ntohs(sock_union.sin.sin_port) == GF_DEFAULT_SOCKET_LISTEN_PORT) {
+ if (priv->use_ssl) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "disabling SSL for portmapper connection");
+ priv->use_ssl = _gf_false;
+ }
+ }
+ else {
+ if (priv->ssl_enabled && !priv->use_ssl) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "re-enabling SSL for I/O connection");
+ priv->use_ssl = _gf_true;
+ }
+ }
pthread_mutex_lock (&priv->lock);
{
if (priv->sock != -1) {
@@ -2061,7 +2751,7 @@ socket_connect (rpc_transport_t *this, int port)
}
}
- if (priv->nodelay) {
+ if (priv->nodelay && (sa_family != AF_UNIX)) {
ret = __socket_nodelay (priv->sock);
if (ret == -1) {
@@ -2071,21 +2761,9 @@ socket_connect (rpc_transport_t *this, int port)
}
}
- if (!priv->bio) {
- ret = __socket_nonblock (priv->sock);
-
- if (ret == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "NBIO on %d failed (%s)",
- priv->sock, strerror (errno));
- close (priv->sock);
- priv->sock = -1;
- goto unlock;
- }
- }
-
- if (priv->keepalive) {
+ if (priv->keepalive && sa_family != AF_UNIX) {
ret = __socket_keepalive (priv->sock,
+ sa_family,
priv->keepaliveintvl,
priv->keepaliveidle);
if (ret == -1)
@@ -2097,6 +2775,15 @@ socket_connect (rpc_transport_t *this, int port)
SA (&this->myinfo.sockaddr)->sa_family =
SA (&this->peerinfo.sockaddr)->sa_family;
+ /* If a source addr is explicitly specified, use it */
+ ret = dict_get_str (this->options,
+ "transport.socket.source-addr",
+ &local_addr);
+ if (!ret && SA (&this->myinfo.sockaddr)->sa_family == AF_INET) {
+ addr = (struct sockaddr_in *)(&this->myinfo.sockaddr);
+ ret = inet_pton (AF_INET, local_addr, &(addr->sin_addr.s_addr));
+ }
+
ret = client_bind (this, SA (&this->myinfo.sockaddr),
&this->myinfo.sockaddr_len, priv->sock);
if (ret == -1) {
@@ -2107,29 +2794,86 @@ socket_connect (rpc_transport_t *this, int port)
goto unlock;
}
+ if (!priv->use_ssl && !priv->bio && !priv->own_thread) {
+ ret = __socket_nonblock (priv->sock);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
ret = connect (priv->sock, SA (&this->peerinfo.sockaddr),
this->peerinfo.sockaddr_len);
if (ret == -1 && ((errno != EINPROGRESS) && (errno != ENOENT))) {
- gf_log (this->name, GF_LOG_ERROR,
- "connection attempt failed (%s)",
- strerror (errno));
+ /* For unix path based sockets, the socket path is
+ * cryptic (md5sum of path) and may not be useful for
+ * the user in debugging so log it in DEBUG
+ */
+ gf_log (this->name, ((sa_family == AF_UNIX) ?
+ GF_LOG_DEBUG : GF_LOG_ERROR),
+ "connection attempt on %s failed, (%s)",
+ this->peerinfo.identifier, strerror (errno));
close (priv->sock);
priv->sock = -1;
goto unlock;
}
- priv->connected = 0;
+ if (priv->use_ssl && !priv->own_thread) {
+ ret = ssl_setup_connection(this,0);
+ if (ret < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "client setup failed");
+ close(priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
- rpc_transport_ref (this);
+ if (!priv->bio && !priv->own_thread) {
+ ret = __socket_nonblock (priv->sock);
- priv->idx = event_register (ctx->event_pool, priv->sock,
- socket_event_handler, this, 1, 1);
- if (priv->idx == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "failed to register the event");
- ret = -1;
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
}
+
+ /*
+ * In the own_thread case, this is used to indicate that we're
+ * initializing a client connection.
+ */
+ priv->connected = 0;
+ priv->is_server = _gf_false;
+ rpc_transport_ref (this);
+
+ if (priv->own_thread) {
+ if (pipe(priv->pipe) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create pipe");
+ }
+
+ this->listener = this;
+ socket_spawn(this);
+ }
+ else {
+ priv->idx = event_register (ctx->event_pool, priv->sock,
+ socket_event_handler,
+ this, 1, 1);
+ if (priv->idx == -1) {
+ gf_log ("", GF_LOG_WARNING,
+ "failed to register the event");
+ ret = -1;
+ }
+ }
}
unlock:
pthread_mutex_unlock (&priv->lock);
@@ -2139,7 +2883,7 @@ err:
}
-int
+static int
socket_listen (rpc_transport_t *this)
{
socket_private_t * priv = NULL;
@@ -2221,7 +2965,7 @@ socket_listen (rpc_transport_t *this)
}
}
- if (priv->nodelay) {
+ if (priv->nodelay && (sa_family != AF_UNIX)) {
ret = __socket_nodelay (priv->sock);
if (ret == -1) {
gf_log (this->name, GF_LOG_ERROR,
@@ -2290,7 +3034,7 @@ out:
}
-int32_t
+static int32_t
socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
{
socket_private_t *priv = NULL;
@@ -2299,6 +3043,7 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
char need_append = 1;
struct ioq *entry = NULL;
glusterfs_ctx_t *ctx = NULL;
+ char a_byte = 'j';
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -2324,21 +3069,31 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
goto unlock;
if (list_empty (&priv->ioq)) {
- ret = __socket_ioq_churn_entry (this, entry);
+ ret = __socket_ioq_churn_entry (this, entry, 1);
- if (ret == 0)
+ if (ret == 0) {
need_append = 0;
-
- if (ret > 0)
+ }
+ if (ret > 0) {
need_poll_out = 1;
+ }
}
if (need_append) {
list_add_tail (&entry->list, &priv->ioq);
+ if (priv->own_thread) {
+ /*
+ * Make sure the polling thread wakes up, by
+ * writing a byte to represent this entry.
+ */
+ if (write(priv->pipe[1],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "write error on pipe");
+ }
+ }
ret = 0;
}
-
- if (need_poll_out) {
+ if (!priv->own_thread && need_poll_out) {
/* first entry to wait. continue writing on POLLOUT */
priv->idx = event_select_on (ctx->event_pool,
priv->sock,
@@ -2353,7 +3108,7 @@ out:
}
-int32_t
+static int32_t
socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
{
socket_private_t *priv = NULL;
@@ -2362,6 +3117,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
char need_append = 1;
struct ioq *entry = NULL;
glusterfs_ctx_t *ctx = NULL;
+ char a_byte = 'd';
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -2380,33 +3136,44 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
}
goto unlock;
}
+
priv->submit_log = 0;
entry = __socket_ioq_new (this, &reply->msg);
if (!entry)
goto unlock;
+
if (list_empty (&priv->ioq)) {
- ret = __socket_ioq_churn_entry (this, entry);
+ ret = __socket_ioq_churn_entry (this, entry, 1);
- if (ret == 0)
+ if (ret == 0) {
need_append = 0;
-
- if (ret > 0)
+ }
+ if (ret > 0) {
need_poll_out = 1;
+ }
}
if (need_append) {
list_add_tail (&entry->list, &priv->ioq);
+ if (priv->own_thread) {
+ /*
+ * Make sure the polling thread wakes up, by
+ * writing a byte to represent this entry.
+ */
+ if (write(priv->pipe[1],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "write error on pipe");
+ }
+ }
ret = 0;
}
-
- if (need_poll_out) {
+ if (!priv->own_thread && need_poll_out) {
/* first entry to wait. continue writing on POLLOUT */
priv->idx = event_select_on (ctx->event_pool,
priv->sock,
priv->idx, -1, 1);
}
}
-
unlock:
pthread_mutex_unlock (&priv->lock);
@@ -2415,7 +3182,7 @@ out:
}
-int32_t
+static int32_t
socket_getpeername (rpc_transport_t *this, char *hostname, int hostlen)
{
int32_t ret = -1;
@@ -2434,7 +3201,7 @@ out:
}
-int32_t
+static int32_t
socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,
struct sockaddr_storage *sa, socklen_t salen)
{
@@ -2455,7 +3222,7 @@ out:
}
-int32_t
+static int32_t
socket_getmyname (rpc_transport_t *this, char *hostname, int hostlen)
{
int32_t ret = -1;
@@ -2474,7 +3241,7 @@ out:
}
-int32_t
+static int32_t
socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen,
struct sockaddr_storage *sa, socklen_t salen)
{
@@ -2494,6 +3261,25 @@ out:
}
+static int
+socket_throttle (rpc_transport_t *this, gf_boolean_t onoff)
+{
+ socket_private_t *priv = NULL;
+
+ priv = this->private;
+
+ /* The way we implement throttling is by taking off
+ POLLIN event from the polled flags. This way we
+ never get called with the POLLIN event and therefore
+ will never read() any more data until throttling
+ is turned off.
+ */
+ priv->idx = event_select_on (this->ctx->event_pool, priv->sock,
+ priv->idx, (int) !onoff, -1);
+ return 0;
+}
+
+
struct rpc_transport_ops tops = {
.listen = socket_listen,
.connect = socket_connect,
@@ -2504,6 +3290,7 @@ struct rpc_transport_ops tops = {
.get_peeraddr = socket_getpeeraddr,
.get_myname = socket_getmyname,
.get_myaddr = socket_getmyaddr,
+ .throttle = socket_throttle,
};
int
@@ -2560,7 +3347,7 @@ out:
}
-int
+static int
socket_init (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -2569,6 +3356,7 @@ socket_init (rpc_transport_t *this)
char *optstr = NULL;
uint32_t keepalive = 0;
uint32_t backlog = 0;
+ int session_id = 0;
if (this->private) {
gf_log_callingfn (this->name, GF_LOG_ERROR,
@@ -2580,6 +3368,7 @@ socket_init (rpc_transport_t *this)
if (!priv) {
return -1;
}
+ memset(priv,0,sizeof(*priv));
pthread_mutex_init (&priv->lock, NULL);
@@ -2697,11 +3486,134 @@ socket_init (rpc_transport_t *this)
}
}
- optstr = NULL;
+ priv->windowsize = (int)windowsize;
+
+ priv->ssl_enabled = _gf_false;
+ if (dict_get_str(this->options,SSL_ENABLED_OPT,&optstr) == 0) {
+ if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid value given for ssl-enabled boolean");
+ }
+ }
+
+ priv->ssl_own_cert = DEFAULT_CERT_PATH;
+ if (dict_get_str(this->options,SSL_OWN_CERT_OPT,&optstr) == 0) {
+ if (!priv->ssl_enabled) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "%s specified without %s (ignored)",
+ SSL_OWN_CERT_OPT, SSL_ENABLED_OPT);
+ }
+ priv->ssl_own_cert = optstr;
+ }
+ priv->ssl_own_cert = gf_strdup(priv->ssl_own_cert);
+
+ priv->ssl_private_key = DEFAULT_KEY_PATH;
+ if (dict_get_str(this->options,SSL_PRIVATE_KEY_OPT,&optstr) == 0) {
+ if (!priv->ssl_enabled) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "%s specified without %s (ignored)",
+ SSL_PRIVATE_KEY_OPT, SSL_ENABLED_OPT);
+ }
+ priv->ssl_private_key = optstr;
+ }
+ priv->ssl_private_key = gf_strdup(priv->ssl_private_key);
+
+ priv->ssl_ca_list = DEFAULT_CA_PATH;
+ if (dict_get_str(this->options,SSL_CA_LIST_OPT,&optstr) == 0) {
+ if (!priv->ssl_enabled) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "%s specified without %s (ignored)",
+ SSL_CA_LIST_OPT, SSL_ENABLED_OPT);
+ }
+ priv->ssl_ca_list = optstr;
+ }
+ priv->ssl_ca_list = gf_strdup(priv->ssl_ca_list);
+
+ gf_log(this->name,GF_LOG_INFO,"SSL support is %s",
+ priv->ssl_enabled ? "ENABLED" : "NOT enabled");
+ /*
+ * This might get overridden temporarily in socket_connect (q.v.)
+ * if we're using the glusterd portmapper.
+ */
+ priv->use_ssl = priv->ssl_enabled;
+
+ priv->own_thread = priv->use_ssl;
+ if (dict_get_str(this->options,OWN_THREAD_OPT,&optstr) == 0) {
+ if (gf_string2boolean (optstr, &priv->own_thread) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid value given for own-thread boolean");
+ }
+ }
+ gf_log(this->name,GF_LOG_INFO,"using %s polling thread",
+ priv->own_thread ? "private" : "system");
+
+ if (priv->use_ssl) {
+ SSL_library_init();
+ SSL_load_error_strings();
+ priv->ssl_meth = (SSL_METHOD *)TLSv1_method();
+ priv->ssl_ctx = SSL_CTX_new(priv->ssl_meth);
+
+ if (SSL_CTX_set_cipher_list(priv->ssl_ctx,
+ "HIGH:-SSLv2") == 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "failed to find any valid ciphers");
+ goto err;
+ }
+
+ if (!SSL_CTX_use_certificate_chain_file(priv->ssl_ctx,
+ priv->ssl_own_cert)) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not load our cert");
+ goto err;
+ }
+
+ if (!SSL_CTX_use_PrivateKey_file(priv->ssl_ctx,
+ priv->ssl_private_key,
+ SSL_FILETYPE_PEM)) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not load private key");
+ goto err;
+ }
+
+ if (!SSL_CTX_load_verify_locations(priv->ssl_ctx,
+ priv->ssl_ca_list,0)) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not load CA list");
+ goto err;
+ }
+
+#if (OPENSSL_VERSION_NUMBER < 0x00905100L)
+ SSL_CTX_set_verify_depth(ctx,1);
+#endif
+
+ priv->ssl_session_id = ++session_id;
+ SSL_CTX_set_session_id_context(priv->ssl_ctx,
+ (void *)&priv->ssl_session_id,
+ sizeof(priv->ssl_session_id));
+
+ SSL_CTX_set_verify(priv->ssl_ctx,SSL_VERIFY_PEER,0);
+ }
+
+ if (priv->own_thread) {
+ priv->ot_state = OT_IDLE;
+ }
+
out:
this->private = priv;
-
return 0;
+
+err:
+ if (priv->ssl_own_cert) {
+ GF_FREE(priv->ssl_own_cert);
+ }
+ if (priv->ssl_private_key) {
+ GF_FREE(priv->ssl_private_key);
+ }
+ if (priv->ssl_ca_list) {
+ GF_FREE(priv->ssl_ca_list);
+ }
+ GF_FREE(priv);
+ return -1;
}
@@ -2727,6 +3639,15 @@ fini (rpc_transport_t *this)
"transport %p destroyed", this);
pthread_mutex_destroy (&priv->lock);
+ if (priv->ssl_private_key) {
+ GF_FREE(priv->ssl_private_key);
+ }
+ if (priv->ssl_own_cert) {
+ GF_FREE(priv->ssl_own_cert);
+ }
+ if (priv->ssl_ca_list) {
+ GF_FREE(priv->ssl_ca_list);
+ }
GF_FREE (priv);
}
@@ -2771,8 +3692,7 @@ struct volume_options options[] = {
},
{ .key = { "transport.address-family",
"address-family" },
- .value = {"inet", "inet6", "inet/inet6", "inet6/inet",
- "unix", "inet-sdp" },
+ .value = {"inet", "inet6", "unix", "inet-sdp" },
.type = GF_OPTION_TYPE_STR
},
@@ -2805,5 +3725,20 @@ struct volume_options options[] = {
{ .key = {"transport.socket.read-fail-log"},
.type = GF_OPTION_TYPE_BOOL
},
+ { .key = {SSL_ENABLED_OPT},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {SSL_OWN_CERT_OPT},
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {SSL_PRIVATE_KEY_OPT},
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {SSL_CA_LIST_OPT},
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {OWN_THREAD_OPT},
+ .type = GF_OPTION_TYPE_BOOL
+ },
{ .key = {NULL} }
};