summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport
diff options
context:
space:
mode:
authorJeff Darcy <jdarcy@redhat.com>2012-07-17 10:50:43 -0400
committerAnand Avati <avati@redhat.com>2012-07-17 13:18:32 -0700
commitaea7759f1240b1e97684273b9369472695173a66 (patch)
tree2e019059c2f79a159e5c5d5bf56d943be1eff16e /rpc/rpc-transport
parentea0a0937a0524b8a449e470fbaea772a349d40fb (diff)
rpc-transport/socket: Add SSL support.
Based on OpenSSL. Key/certificate management is still manual. Enabling SSL also enables multi-threading, though multi-threading can be forced on or off using a separate option. Change-Id: Icd9f256bb2fd8c6266a7abefdff16936b4f8922d BUG: 764731 Signed-off-by: Jeff Darcy <jdarcy@redhat.com> Reviewed-on: http://review.gluster.com/362 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Anand Avati <avati@redhat.com>
Diffstat (limited to 'rpc/rpc-transport')
-rw-r--r--rpc/rpc-transport/socket/src/Makefile.am2
-rw-r--r--rpc/rpc-transport/socket/src/socket.c729
-rw-r--r--rpc/rpc-transport/socket/src/socket.h19
3 files changed, 667 insertions, 83 deletions
diff --git a/rpc/rpc-transport/socket/src/Makefile.am b/rpc/rpc-transport/socket/src/Makefile.am
index 2c918c7e3..3d1631a34 100644
--- a/rpc/rpc-transport/socket/src/Makefile.am
+++ b/rpc/rpc-transport/socket/src/Makefile.am
@@ -3,7 +3,7 @@ noinst_HEADERS = socket.h name.h
rpctransport_LTLIBRARIES = socket.la
rpctransportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport
-socket_la_LDFLAGS = -module -avoidversion
+socket_la_LDFLAGS = -module -avoidversion -lssl
socket_la_SOURCES = socket.c name.c
socket_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 9db09d77e..088ceb460 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -35,9 +35,33 @@
#include <errno.h>
#include <netinet/tcp.h>
#include <rpc/xdr.h>
+#include <sys/ioctl.h>
#define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR)
#define SA(ptr) ((struct sockaddr *)ptr)
+#define SSL_ENABLED_OPT "transport.socket.ssl-enabled"
+#define SSL_OWN_CERT_OPT "transport.socket.ssl-own-cert"
+#define SSL_PRIVATE_KEY_OPT "transport.socket.ssl-private-key"
+#define SSL_CA_LIST_OPT "transport.socket.ssl-ca-list"
+#define OWN_THREAD_OPT "transport.socket.own-thread"
+
+/* TBD: do automake substitutions etc. (ick) to set these. */
+#if !defined(DEFAULT_CERT_PATH)
+#define DEFAULT_CERT_PATH "/etc/ssl/glusterfs.pem"
+#endif
+#if !defined(DEFAULT_KEY_PATH)
+#define DEFAULT_KEY_PATH "/etc/ssl/glusterfs.key"
+#endif
+#if !defined(DEFAULT_CA_PATH)
+#define DEFAULT_CA_PATH "/etc/ssl/glusterfs.ca"
+#endif
+
+#define POLL_MASK_INPUT (POLLIN | POLLPRI)
+#define POLL_MASK_OUTPUT (POLLOUT)
+#define POLL_MASK_ERROR (POLLERR | POLLHUP | POLLNVAL)
+
+typedef int SSL_unary_func (SSL *);
+typedef int SSL_trinary_func (SSL *, void *, int);
#define __socket_proto_reset_pending(priv) do { \
memset (&priv->incoming.frag.vector, 0, \
@@ -124,9 +148,156 @@
__socket_proto_update_priv_after_read (priv, ret, bytes_read); \
}
-
int socket_init (rpc_transport_t *this);
+void
+ssl_dump_error_stack (const char *caller)
+{
+ unsigned long errnum = 0;
+ char errbuf[120] = {0,};
+
+ /* OpenSSL docs explicitly give 120 as the error-string length. */
+
+ while ((errnum = ERR_get_error())) {
+ ERR_error_string(errnum,errbuf);
+ gf_log(caller,GF_LOG_ERROR," %s",errbuf);
+ }
+}
+
+int
+ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func)
+{
+ int r = (-1);
+ struct pollfd pfd = {-1,};
+ socket_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO(this->name,this->private,out);
+ priv = this->private;
+
+ for (;;) {
+ if (buf) {
+ if (priv->connected == -1) {
+ /*
+ * Fields in the SSL structure (especially
+ * the BIO pointers) are not valid at this
+ * point, so we'll segfault if we pass them
+ * to SSL_read/SSL_write.
+ */
+ gf_log(this->name,GF_LOG_INFO,
+ "lost connection in %s", __func__);
+ break;
+ }
+ r = func(priv->ssl_ssl,buf,len);
+ }
+ else {
+ /*
+ * We actually need these functions to get to
+ * priv->connected == 1.
+ */
+ r = ((SSL_unary_func *)func)(priv->ssl_ssl);
+ }
+ switch (SSL_get_error(priv->ssl_ssl,r)) {
+ case SSL_ERROR_NONE:
+ return r;
+ case SSL_ERROR_WANT_READ:
+ pfd.fd = priv->sock;
+ pfd.events = POLLIN;
+ if (poll(&pfd,1,-1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,"poll error %d",
+ errno);
+ }
+ break;
+ case SSL_ERROR_WANT_WRITE:
+ pfd.fd = priv->sock;
+ pfd.events = POLLOUT;
+ if (poll(&pfd,1,-1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,"poll error %d",
+ errno);
+ }
+ break;
+ case SSL_ERROR_SYSCALL:
+ /* This is what we get when remote disconnects. */
+ gf_log(this->name,GF_LOG_DEBUG,
+ "syscall error (probably remote disconnect)");
+ errno = ENODATA;
+ goto out;
+ default:
+ errno = EIO;
+ goto out; /* "break" would just loop again */
+ }
+ }
+out:
+ return -1;
+}
+
+#define ssl_connect_one(t) ssl_do((t),NULL,0,(SSL_trinary_func *)SSL_connect)
+#define ssl_accept_one(t) ssl_do((t),NULL,0,(SSL_trinary_func *)SSL_accept)
+#define ssl_read_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read)
+#define ssl_write_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write)
+
+int
+ssl_setup_connection (rpc_transport_t *this, int server)
+{
+ X509 *peer = NULL;
+ char peer_CN[256] = "";
+ int ret = -1;
+ socket_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO(this->name,this->private,done);
+ priv = this->private;
+
+ priv->ssl_ssl = SSL_new(priv->ssl_ctx);
+ if (!priv->ssl_ssl) {
+ gf_log(this->name,GF_LOG_ERROR,"SSL_new failed");
+ ssl_dump_error_stack(this->name);
+ goto done;
+ }
+ priv->ssl_sbio = BIO_new_socket(priv->sock,BIO_NOCLOSE);
+ if (!priv->ssl_sbio) {
+ gf_log(this->name,GF_LOG_ERROR,"BIO_new_socket failed");
+ ssl_dump_error_stack(this->name);
+ goto free_ssl;
+ }
+ SSL_set_bio(priv->ssl_ssl,priv->ssl_sbio,priv->ssl_sbio);
+
+ if (server) {
+ ret = ssl_accept_one(this);
+ }
+ else {
+ ret = ssl_connect_one(this);
+ }
+
+ /* Make sure _the call_ succeeded. */
+ if (ret < 0) {
+ goto ssl_error;
+ }
+
+ /* Make sure _SSL verification_ succeeded, yielding an identity. */
+ if (SSL_get_verify_result(priv->ssl_ssl) != X509_V_OK) {
+ goto ssl_error;
+ }
+ peer = SSL_get_peer_certificate(priv->ssl_ssl);
+ if (!peer) {
+ goto ssl_error;
+ }
+
+ /* Finally, everything seems OK. */
+ X509_NAME_get_text_by_NID(X509_get_subject_name(peer),
+ NID_commonName, peer_CN, sizeof(peer_CN)-1);
+ peer_CN[sizeof(peer_CN)-1] = '\0';
+ gf_log(this->name,GF_LOG_INFO,"peer CN = %s", peer_CN);
+ return 0;
+
+ /* Error paths. */
+ssl_error:
+ gf_log(this->name,GF_LOG_ERROR,"SSL connect error");
+ ssl_dump_error_stack(this->name);
+free_ssl:
+ SSL_free(priv->ssl_ssl);
+done:
+ return ret;
+}
+
/*
* return value:
* 0 = success (completed)
@@ -159,9 +330,22 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
*bytes = 0;
}
- while (opcount) {
+ while (opcount > 0) {
+ if (opvector->iov_len == 0) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "would have passed zero length to read/write");
+ ++opvector;
+ --opcount;
+ continue;
+ }
if (write) {
- ret = writev (sock, opvector, opcount);
+ if (priv->use_ssl) {
+ ret = ssl_write_one(this,
+ opvector->iov_base, opvector->iov_len);
+ }
+ else {
+ ret = writev (sock, opvector, opcount);
+ }
if (ret == 0 || (ret == -1 && errno == EAGAIN)) {
/* done for now */
@@ -169,7 +353,18 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
}
this->total_bytes_write += ret;
} else {
- ret = readv (sock, opvector, opcount);
+ if (priv->use_ssl) {
+ ret = ssl_read_one(this,
+ opvector->iov_base, opvector->iov_len);
+ }
+ else {
+ ret = readv (sock, opvector, opcount);
+ }
+ if (ret == 0) {
+ gf_log(this->name,GF_LOG_DEBUG,"EOF on socket");
+ errno = ENODATA;
+ ret = -1;
+ }
if (ret == -1 && errno == EAGAIN) {
/* done for now */
break;
@@ -193,6 +388,9 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
gf_log (this->name, GF_LOG_WARNING,
"%s failed (%s)", write ? "writev" : "readv",
strerror (errno));
+ if (priv->use_ssl) {
+ ssl_dump_error_stack(this->name);
+ }
opcount = -1;
break;
}
@@ -204,6 +402,17 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
moved = 0;
while (moved < ret) {
+ if (!opcount) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "ran out of iov, moved %d/%d",
+ moved, ret);
+ goto ran_out;
+ }
+ if (!opvector[0].iov_len) {
+ opvector++;
+ opcount--;
+ continue;
+ }
if ((ret - moved) >= opvector[0].iov_len) {
moved += opvector[0].iov_len;
opvector++;
@@ -213,13 +422,11 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
opvector[0].iov_base += (ret - moved);
moved += (ret - moved);
}
- while (opcount && !opvector[0].iov_len) {
- opvector++;
- opcount--;
- }
}
}
+ran_out:
+
if (pending_vector)
*pending_vector = opvector;
@@ -279,6 +486,20 @@ __socket_disconnect (rpc_transport_t *this)
"shutdown() returned %d. %s",
ret, strerror (errno));
}
+ if (priv->use_ssl) {
+ SSL_shutdown(priv->ssl_ssl);
+ SSL_clear(priv->ssl_ssl);
+ SSL_free(priv->ssl_ssl);
+ }
+ if (priv->own_thread) {
+ /*
+ * Without this, reconnect (= disconnect + connect)
+ * won't work except by accident.
+ */
+ close(priv->sock);
+ priv->sock = -1;
+ ++(priv->socket_gen);
+ }
}
out:
@@ -356,7 +577,6 @@ __socket_nonblock (int fd)
return ret;
}
-
int
__socket_nodelay (int fd)
{
@@ -610,9 +830,11 @@ out:
int
-__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry)
+__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct)
{
- int ret = -1;
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ char a_byte = 0;
ret = __socket_writev (this, entry->pending_vector,
entry->pending_count,
@@ -623,6 +845,18 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry)
/* current entry was completely written */
GF_ASSERT (entry->pending_count == 0);
__socket_ioq_entry_free (entry);
+ priv = this->private;
+ if (priv->own_thread) {
+ /*
+ * The pipe should only remain readable if there are
+ * more entries after this, so drain the byte
+ * representing this entry.
+ */
+ if (!direct && read(priv->pipe[0],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "read error on pipe");
+ }
+ }
}
return ret;
@@ -645,13 +879,13 @@ __socket_ioq_churn (rpc_transport_t *this)
/* pick next entry */
entry = priv->ioq_next;
- ret = __socket_ioq_churn_entry (this, entry);
+ ret = __socket_ioq_churn_entry (this, entry, 0);
if (ret != 0)
break;
}
- if (list_empty (&priv->ioq)) {
+ if (!priv->own_thread && list_empty (&priv->ioq)) {
/* all pending writes done, not interested in POLLOUT */
priv->idx = event_select_on (this->ctx->event_pool,
priv->sock, priv->idx, -1, 0);
@@ -1729,7 +1963,6 @@ socket_event_poll_in (rpc_transport_t *this)
if (pollin != NULL) {
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
pollin);
-
rpc_transport_pollin_destroy (pollin);
}
@@ -1752,7 +1985,7 @@ socket_connect_finish (rpc_transport_t *this)
pthread_mutex_lock (&priv->lock);
{
- if (priv->connected)
+ if (priv->connected != 0)
goto unlock;
ret = __socket_connect_finish (priv->sock);
@@ -1814,9 +2047,9 @@ int
socket_event_handler (int fd, int idx, void *data,
int poll_in, int poll_out, int poll_err)
{
- rpc_transport_t *this = NULL;
+ rpc_transport_t *this = NULL;
socket_private_t *priv = NULL;
- int ret = 0;
+ int ret = -1;
this = data;
GF_VALIDATE_OR_GOTO ("socket", this, out);
@@ -1826,16 +2059,13 @@ socket_event_handler (int fd, int idx, void *data,
THIS = this->xl;
priv = this->private;
-
pthread_mutex_lock (&priv->lock);
{
priv->idx = idx;
}
pthread_mutex_unlock (&priv->lock);
- if (!priv->connected) {
- ret = socket_connect_finish (this);
- }
+ ret = (priv->connected == 1) ? 0 : socket_connect_finish(this);
if (!ret && poll_out) {
ret = socket_event_poll_out (this);
@@ -1851,13 +2081,112 @@ socket_event_handler (int fd, int idx, void *data,
"disconnecting now");
socket_event_poll_err (this);
rpc_transport_unref (this);
- }
+ }
out:
- return 0;
+ return ret;
}
+void *
+socket_poller (void *ctx)
+{
+ rpc_transport_t *this = ctx;
+ socket_private_t *priv = this->private;
+ struct pollfd pfd[2] = {{0,},};
+ gf_boolean_t to_write = _gf_false;
+ int ret = 0;
+ int orig_gen;
+
+ orig_gen = ++(priv->socket_gen);
+
+ if (priv->connected == 0) {
+ THIS = this->xl;
+ ret = socket_connect_finish (this);
+ }
+
+ for (;;) {
+ if (priv->socket_gen != orig_gen) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "redundant poller exiting");
+ return NULL;
+ }
+ pthread_mutex_lock(&priv->lock);
+ to_write = !list_empty(&priv->ioq);
+ pthread_mutex_unlock(&priv->lock);
+ pfd[0].fd = priv->pipe[0];
+ pfd[0].events = POLL_MASK_ERROR;
+ pfd[0].revents = 0;
+ pfd[1].fd = priv->sock;
+ pfd[1].events = POLL_MASK_INPUT | POLL_MASK_ERROR;
+ pfd[1].revents = 0;
+ if (to_write) {
+ pfd[1].events |= POLL_MASK_OUTPUT;
+ }
+ else {
+ pfd[0].events |= POLL_MASK_INPUT;
+ }
+ if (poll(pfd,2,-1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,"poll failed");
+ break;
+ }
+ if (pfd[0].revents & POLL_MASK_ERROR) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "poll error on pipe");
+ break;
+ }
+ /* Only glusterd actually seems to need this. */
+ THIS = this->xl;
+ if (pfd[1].revents & POLL_MASK_INPUT) {
+ ret = socket_event_poll_in(this);
+ if (ret >= 0) {
+ /* Suppress errors while making progress. */
+ pfd[1].revents &= ~POLL_MASK_ERROR;
+ }
+ else if (errno == ENOTCONN) {
+ ret = 0;
+ }
+ }
+ else if (pfd[1].revents & POLL_MASK_OUTPUT) {
+ ret = socket_event_poll_out(this);
+ if (ret >= 0) {
+ /* Suppress errors while making progress. */
+ pfd[1].revents &= ~POLL_MASK_ERROR;
+ }
+ else if (errno == ENOTCONN) {
+ ret = 0;
+ }
+ }
+ else {
+ /*
+ * This usually means that we left poll() because
+ * somebody pushed a byte onto our pipe. That wakeup
+ * is why the pipe is there, but once awake we can do
+ * all the checking we need on the next iteration.
+ */
+ ret = 0;
+ }
+ if (pfd[1].revents & POLL_MASK_ERROR) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "poll error on socket");
+ break;
+ }
+ if (ret < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "error in polling loop");
+ break;
+ }
+ }
+
+ /* All (and only) I/O errors should come here. */
+ __socket_disconnect (this);
+ rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
+ rpc_transport_unref (this);
+ return NULL;
+}
+
+
+
int
socket_server_event_handler (int fd, int idx, void *data,
int poll_in, int poll_out, int poll_err)
@@ -1896,19 +2225,6 @@ socket_server_event_handler (int fd, int idx, void *data,
goto unlock;
}
- if (!priv->bio) {
- ret = __socket_nonblock (new_sock);
-
- if (ret == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "NBIO on %d failed (%s)",
- new_sock, strerror (errno));
-
- close (new_sock);
- goto unlock;
- }
- }
-
if (priv->nodelay) {
ret = __socket_nodelay (new_sock);
if (ret == -1) {
@@ -1955,7 +2271,11 @@ socket_server_event_handler (int fd, int idx, void *data,
}
get_transport_identifiers (new_trans);
- socket_init (new_trans);
+ ret = socket_init(new_trans);
+ if (ret != 0) {
+ close(new_sock);
+ goto unlock;
+ }
new_trans->ops = this->ops;
new_trans->init = this->init;
new_trans->fini = this->fini;
@@ -1966,20 +2286,61 @@ socket_server_event_handler (int fd, int idx, void *data,
new_trans->listener = this;
new_priv = new_trans->private;
+ new_priv->use_ssl = priv->use_ssl;
+ new_priv->sock = new_sock;
+ new_priv->own_thread = priv->own_thread;
+
+ if (priv->use_ssl) {
+ new_priv->ssl_ctx = priv->ssl_ctx;
+ if (ssl_setup_connection(new_trans,1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "server setup failed");
+ close(new_sock);
+ goto unlock;
+ }
+ }
+
+ if (!priv->bio) {
+ ret = __socket_nonblock (new_sock);
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "NBIO on %d failed (%s)",
+ new_sock, strerror (errno));
+
+ close (new_sock);
+ goto unlock;
+ }
+ }
+
pthread_mutex_lock (&new_priv->lock);
{
- new_priv->sock = new_sock;
new_priv->connected = 1;
rpc_transport_ref (new_trans);
- new_priv->idx =
- event_register (ctx->event_pool,
- new_sock,
- socket_event_handler,
- new_trans, 1, 0);
+ if (new_priv->own_thread) {
+ if (pipe(new_priv->pipe) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create pipe");
+ }
+ if (pthread_create(&new_priv->thread,
+ NULL, socket_poller,
+ new_trans) != 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create poll thread");
+ }
+ }
+ else {
+ new_priv->idx =
+ event_register (ctx->event_pool,
+ new_sock,
+ socket_event_handler,
+ new_trans,
+ 1, 0);
+ if (new_priv->idx == -1)
+ ret = -1;
+ }
- if (new_priv->idx == -1)
- ret = -1;
}
pthread_mutex_unlock (&new_priv->lock);
if (ret == -1) {
@@ -2069,6 +2430,20 @@ socket_connect (rpc_transport_t *this, int port)
if (port > 0) {
sock_union.sin.sin_port = htons (port);
}
+ if (ntohs(sock_union.sin.sin_port) == GF_DEFAULT_SOCKET_LISTEN_PORT) {
+ if (priv->use_ssl) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "disabling SSL for portmapper connection");
+ priv->use_ssl = _gf_false;
+ }
+ }
+ else {
+ if (priv->ssl_enabled && !priv->use_ssl) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "re-enabling SSL for I/O connection");
+ priv->use_ssl = _gf_true;
+ }
+ }
pthread_mutex_lock (&priv->lock);
{
if (priv->sock != -1) {
@@ -2124,19 +2499,6 @@ socket_connect (rpc_transport_t *this, int port)
}
}
- if (!priv->bio) {
- ret = __socket_nonblock (priv->sock);
-
- if (ret == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "NBIO on %d failed (%s)",
- priv->sock, strerror (errno));
- close (priv->sock);
- priv->sock = -1;
- goto unlock;
- }
- }
-
if (priv->keepalive) {
ret = __socket_keepalive (priv->sock,
priv->keepaliveintvl,
@@ -2172,17 +2534,55 @@ socket_connect (rpc_transport_t *this, int port)
goto unlock;
}
- priv->connected = 0;
+ if (priv->use_ssl) {
+ ret = ssl_setup_connection(this,0);
+ if (ret < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "client setup failed");
+ close(priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
- rpc_transport_ref (this);
+ if (!priv->bio) {
+ ret = __socket_nonblock (priv->sock);
- priv->idx = event_register (ctx->event_pool, priv->sock,
- socket_event_handler, this, 1, 1);
- if (priv->idx == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "failed to register the event");
- ret = -1;
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
}
+
+ priv->connected = 0;
+ rpc_transport_ref (this);
+
+ if (priv->own_thread) {
+ if (pipe(priv->pipe) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create pipe");
+ }
+
+ if (pthread_create(&priv->thread,NULL,
+ socket_poller, this) != 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create poll thread");
+ }
+ }
+ else {
+ priv->idx = event_register (ctx->event_pool, priv->sock,
+ socket_event_handler,
+ this, 1, 1);
+ if (priv->idx == -1) {
+ gf_log ("", GF_LOG_WARNING,
+ "failed to register the event");
+ ret = -1;
+ }
+ }
}
unlock:
pthread_mutex_unlock (&priv->lock);
@@ -2352,6 +2752,7 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
char need_append = 1;
struct ioq *entry = NULL;
glusterfs_ctx_t *ctx = NULL;
+ char a_byte = 'j';
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -2377,21 +2778,31 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
goto unlock;
if (list_empty (&priv->ioq)) {
- ret = __socket_ioq_churn_entry (this, entry);
+ ret = __socket_ioq_churn_entry (this, entry, 1);
- if (ret == 0)
+ if (ret == 0) {
need_append = 0;
-
- if (ret > 0)
+ }
+ if (ret > 0) {
need_poll_out = 1;
+ }
}
if (need_append) {
list_add_tail (&entry->list, &priv->ioq);
+ if (priv->own_thread) {
+ /*
+ * Make sure the polling thread wakes up, by
+ * writing a byte to represent this entry.
+ */
+ if (write(priv->pipe[1],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "write error on pipe");
+ }
+ }
ret = 0;
}
-
- if (need_poll_out) {
+ if (!priv->own_thread && need_poll_out) {
/* first entry to wait. continue writing on POLLOUT */
priv->idx = event_select_on (ctx->event_pool,
priv->sock,
@@ -2415,6 +2826,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
char need_append = 1;
struct ioq *entry = NULL;
glusterfs_ctx_t *ctx = NULL;
+ char a_byte = 'd';
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -2433,33 +2845,44 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
}
goto unlock;
}
+
priv->submit_log = 0;
entry = __socket_ioq_new (this, &reply->msg);
if (!entry)
goto unlock;
+
if (list_empty (&priv->ioq)) {
- ret = __socket_ioq_churn_entry (this, entry);
+ ret = __socket_ioq_churn_entry (this, entry, 1);
- if (ret == 0)
+ if (ret == 0) {
need_append = 0;
-
- if (ret > 0)
+ }
+ if (ret > 0) {
need_poll_out = 1;
+ }
}
if (need_append) {
list_add_tail (&entry->list, &priv->ioq);
+ if (priv->own_thread) {
+ /*
+ * Make sure the polling thread wakes up, by
+ * writing a byte to represent this entry.
+ */
+ if (write(priv->pipe[1],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "write error on pipe");
+ }
+ }
ret = 0;
}
-
- if (need_poll_out) {
+ if (!priv->own_thread && need_poll_out) {
/* first entry to wait. continue writing on POLLOUT */
priv->idx = event_select_on (ctx->event_pool,
priv->sock,
priv->idx, -1, 1);
}
}
-
unlock:
pthread_mutex_unlock (&priv->lock);
@@ -2622,6 +3045,7 @@ socket_init (rpc_transport_t *this)
char *optstr = NULL;
uint32_t keepalive = 0;
uint32_t backlog = 0;
+ int session_id = 0;
if (this->private) {
gf_log_callingfn (this->name, GF_LOG_ERROR,
@@ -2750,11 +3174,130 @@ socket_init (rpc_transport_t *this)
}
}
- optstr = NULL;
+ priv->windowsize = (int)windowsize;
+
+ priv->ssl_enabled = _gf_false;
+ if (dict_get_str(this->options,SSL_ENABLED_OPT,&optstr) == 0) {
+ if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid value given for ssl-enabled boolean");
+ }
+ }
+
+ priv->ssl_own_cert = DEFAULT_CERT_PATH;
+ if (dict_get_str(this->options,SSL_OWN_CERT_OPT,&optstr) == 0) {
+ if (!priv->ssl_enabled) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "%s specified without %s (ignored)",
+ SSL_OWN_CERT_OPT, SSL_ENABLED_OPT);
+ }
+ priv->ssl_own_cert = optstr;
+ }
+ priv->ssl_own_cert = gf_strdup(priv->ssl_own_cert);
+
+ priv->ssl_private_key = DEFAULT_KEY_PATH;
+ if (dict_get_str(this->options,SSL_PRIVATE_KEY_OPT,&optstr) == 0) {
+ if (!priv->ssl_enabled) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "%s specified without %s (ignored)",
+ SSL_PRIVATE_KEY_OPT, SSL_ENABLED_OPT);
+ }
+ priv->ssl_private_key = optstr;
+ }
+ priv->ssl_private_key = gf_strdup(priv->ssl_private_key);
+
+ priv->ssl_ca_list = DEFAULT_CA_PATH;
+ if (dict_get_str(this->options,SSL_CA_LIST_OPT,&optstr) == 0) {
+ if (!priv->ssl_enabled) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "%s specified without %s (ignored)",
+ SSL_CA_LIST_OPT, SSL_ENABLED_OPT);
+ }
+ priv->ssl_ca_list = optstr;
+ }
+ priv->ssl_ca_list = gf_strdup(priv->ssl_ca_list);
+
+ gf_log(this->name,GF_LOG_INFO,"SSL support is %s",
+ priv->ssl_enabled ? "ENABLED" : "NOT enabled");
+ /*
+ * This might get overridden temporarily in socket_connect (q.v.)
+ * if we're using the glusterd portmapper.
+ */
+ priv->use_ssl = priv->ssl_enabled;
+
+ priv->own_thread = priv->use_ssl;
+ if (dict_get_str(this->options,OWN_THREAD_OPT,&optstr) == 0) {
+ if (gf_string2boolean (optstr, &priv->own_thread) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid value given for own-thread boolean");
+ }
+ }
+ gf_log(this->name,GF_LOG_INFO,"using %s polling thread",
+ priv->own_thread ? "private" : "system");
+
+ if (priv->use_ssl) {
+ SSL_library_init();
+ SSL_load_error_strings();
+ priv->ssl_meth = (SSL_METHOD *)TLSv1_method();
+ priv->ssl_ctx = SSL_CTX_new(priv->ssl_meth);
+
+ if (SSL_CTX_set_cipher_list(priv->ssl_ctx,
+ "HIGH:-SSLv2") == 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "failed to find any valid ciphers");
+ goto err;
+ }
+
+ if (!SSL_CTX_use_certificate_chain_file(priv->ssl_ctx,
+ priv->ssl_own_cert)) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not load our cert");
+ goto err;
+ }
+
+ if (!SSL_CTX_use_PrivateKey_file(priv->ssl_ctx,
+ priv->ssl_private_key,
+ SSL_FILETYPE_PEM)) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not load private key");
+ goto err;
+ }
+
+ if (!SSL_CTX_load_verify_locations(priv->ssl_ctx,
+ priv->ssl_ca_list,0)) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not load CA list");
+ goto err;
+ }
+
+#if (OPENSSL_VERSION_NUMBER < 0x00905100L)
+ SSL_CTX_set_verify_depth(ctx,1);
+#endif
+
+ priv->ssl_session_id = ++session_id;
+ SSL_CTX_set_session_id_context(priv->ssl_ctx,
+ (void *)&priv->ssl_session_id,
+ sizeof(priv->ssl_session_id));
+
+ SSL_CTX_set_verify(priv->ssl_ctx,SSL_VERIFY_PEER,0);
+ }
+
out:
this->private = priv;
-
return 0;
+
+err:
+ if (priv->ssl_own_cert) {
+ GF_FREE(priv->ssl_own_cert);
+ }
+ if (priv->ssl_private_key) {
+ GF_FREE(priv->ssl_private_key);
+ }
+ if (priv->ssl_ca_list) {
+ GF_FREE(priv->ssl_ca_list);
+ }
+ GF_FREE(priv);
+ return -1;
}
@@ -2780,6 +3323,15 @@ fini (rpc_transport_t *this)
"transport %p destroyed", this);
pthread_mutex_destroy (&priv->lock);
+ if (priv->ssl_private_key) {
+ GF_FREE(priv->ssl_private_key);
+ }
+ if (priv->ssl_own_cert) {
+ GF_FREE(priv->ssl_own_cert);
+ }
+ if (priv->ssl_ca_list) {
+ GF_FREE(priv->ssl_ca_list);
+ }
GF_FREE (priv);
}
@@ -2857,5 +3409,20 @@ struct volume_options options[] = {
{ .key = {"transport.socket.read-fail-log"},
.type = GF_OPTION_TYPE_BOOL
},
+ { .key = {SSL_ENABLED_OPT},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {SSL_OWN_CERT_OPT},
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {SSL_PRIVATE_KEY_OPT},
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {SSL_CA_LIST_OPT},
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {OWN_THREAD_OPT},
+ .type = GF_OPTION_TYPE_BOOL
+ },
{ .key = {NULL} }
};
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index 0304f1db1..0a407cc1a 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -11,6 +11,8 @@
#ifndef _SOCKET_H
#define _SOCKET_H
+#include <openssl/ssl.h>
+#include <openssl/err.h>
#ifndef _CONFIG_H
#define _CONFIG_H
@@ -144,7 +146,8 @@ typedef struct {
typedef struct {
int32_t sock;
int32_t idx;
- unsigned char connected; // -1 = not connected. 0 = in progress. 1 = connected
+ /* -1 = not connected. 0 = in progress. 1 = connected */
+ char connected;
char bio;
char connect_finish_log;
char submit_log;
@@ -195,6 +198,20 @@ typedef struct {
int keepaliveintvl;
uint32_t backlog;
gf_boolean_t read_fail_log;
+ gf_boolean_t ssl_enabled;
+ gf_boolean_t use_ssl;
+ SSL_METHOD *ssl_meth;
+ SSL_CTX *ssl_ctx;
+ int ssl_session_id;
+ BIO *ssl_sbio;
+ SSL *ssl_ssl;
+ char *ssl_own_cert;
+ char *ssl_private_key;
+ char *ssl_ca_list;
+ pthread_t thread;
+ int pipe[2];
+ gf_boolean_t own_thread;
+ volatile int socket_gen;
} socket_private_t;