summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport
diff options
context:
space:
mode:
authorMilind Changire <mchangir@redhat.com>2018-01-29 13:35:34 +0530
committerRaghavendra G <rgowdapp@redhat.com>2018-07-27 08:59:09 +0000
commit1739f7e0b2af6245d54b130e87d805944e9f7e63 (patch)
tree9f589eda9803e104dc3a1a1e0a36cf663ca3c987 /rpc/rpc-transport
parent9400b6f2c8aa219a493961e0ab9770b7f12e80d2 (diff)
rpc: merge ssl infra with epoll infra
Patch attempts to use the epoll infra for handling SSL connections as well instead of the socket_poller() thread func. This essentially makes priv->own_thread flag redundant. SSL_connect()/SSL_accept() is now non-blocking which has done away with the localised poll() in ssl_do(). So, ssl_do() has been updated appropriately. own_thread and coincidently socket_poller() thread for SSL processing is now deprecated. Change-Id: I1ce54c06ddb643c16baa143598e7e4fbf16bae0a fixes: bz#1561332 Signed-off-by: Milind Changire <mchangir@redhat.com>
Diffstat (limited to 'rpc/rpc-transport')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c1530
-rw-r--r--rpc/rpc-transport/socket/src/socket.h32
2 files changed, 814 insertions, 748 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 3068b879802..fbf5d349b93 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -91,6 +91,7 @@
typedef int SSL_unary_func (SSL *);
typedef int SSL_trinary_func (SSL *, void *, int);
+static int ssl_setup_connection_params(rpc_transport_t *this);
#define __socket_proto_reset_pending(priv) do { \
struct gf_sock_incoming_frag *frag; \
@@ -189,6 +190,51 @@ struct socket_connect_error_state_ {
typedef struct socket_connect_error_state_ socket_connect_error_state_t;
static int socket_init (rpc_transport_t *this);
+static int __socket_nonblock (int fd);
+
+static void
+socket_dump_info (struct sockaddr *sa, int is_server, int is_ssl, int sock,
+ char *log_domain, char *log_label)
+{
+ char addr_buf[INET6_ADDRSTRLEN+1] = {0, };
+ char *addr = NULL;
+ char *peer_type = NULL;
+ int af = sa->sa_family;
+ int so_error = -1;
+ socklen_t slen = sizeof(so_error);
+
+ if (af == AF_UNIX) {
+ addr = ((struct sockaddr_un *)(sa))->sun_path;
+ } else {
+ if (af == AF_INET6) {
+ struct sockaddr_in6 *sin6 =
+ (struct sockaddr_in6 *)(sa);
+
+ inet_ntop (af, &sin6->sin6_addr, addr_buf,
+ sizeof (addr_buf));
+ addr = addr_buf;
+ } else {
+ struct sockaddr_in *sin =
+ (struct sockaddr_in *)(sa);
+
+ inet_ntop (af, &sin->sin_addr, addr_buf,
+ sizeof (addr_buf));
+ addr = addr_buf;
+ }
+ }
+ if (is_server)
+ peer_type = "server";
+ else
+ peer_type = "client";
+
+ getsockopt (sock, SOL_SOCKET, SO_ERROR, &so_error, &slen);
+
+ gf_log (log_domain, GF_LOG_TRACE,
+ "$$$ %s: %s (af:%d,sock:%d) %s %s (errno:%d:%s)",
+ peer_type, log_label, af, sock, addr,
+ (is_ssl ? "SSL" : "non-SSL"),
+ so_error, strerror (so_error));
+}
static void
ssl_dump_error_stack (const char *caller)
@@ -208,128 +254,120 @@ static int
ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func)
{
int r = (-1);
- struct pollfd pfd = {-1, };
socket_private_t *priv = NULL;
GF_VALIDATE_OR_GOTO(this->name, this->private, out);
priv = this->private;
- for (;;) {
- if (buf) {
- if (priv->connected == -1) {
- /*
- * Fields in the SSL structure (especially
- * the BIO pointers) are not valid at this
- * point, so we'll segfault if we pass them
- * to SSL_read/SSL_write.
- */
- gf_log(this->name, GF_LOG_INFO,
- "lost connection in %s", __func__);
- break;
- }
- r = func(priv->ssl_ssl, buf, len);
- } else {
+ if (buf) {
+ if (priv->connected == -1) {
/*
- * We actually need these functions to get to
- * priv->connected == 1.
+ * Fields in the SSL structure (especially
+ * the BIO pointers) are not valid at this
+ * point, so we'll segfault if we pass them
+ * to SSL_read/SSL_write.
*/
- r = ((SSL_unary_func *)func)(priv->ssl_ssl);
- }
- switch (SSL_get_error(priv->ssl_ssl, r)) {
- case SSL_ERROR_NONE:
- return r;
- case SSL_ERROR_WANT_READ:
- /* If we are attempting to connect/accept then we
- * should wait here on the poll, for the SSL
- * (re)negotiation to complete, else we would error out
- * on the accept/connect.
- * If we are here when attempting to read/write
- * then we return r (or -1) as the socket is always
- * primed for the read event, and it would eventually
- * call one of the SSL routines */
- /* NOTE: Only way to determine this is a accept/connect
- * is to examine buf or func, which is not very
- * clean */
- if ((func == (SSL_trinary_func *)SSL_read)
- || (func == (SSL_trinary_func *) SSL_write)) {
- return r;
- }
-
- pfd.fd = priv->sock;
- pfd.events = POLLIN;
- if (poll(&pfd, 1, -1) < 0) {
- gf_log(this->name, GF_LOG_ERROR, "poll error %d",
- errno);
- }
- break;
- case SSL_ERROR_WANT_WRITE:
- if ((func == (SSL_trinary_func *)SSL_read)
- || (func == (SSL_trinary_func *) SSL_write)) {
- errno = EAGAIN;
- return r;
- }
- pfd.fd = priv->sock;
- pfd.events = POLLOUT;
- if (poll(&pfd, 1, -1) < 0) {
- gf_log(this->name, GF_LOG_ERROR, "poll error %d",
- errno);
- }
- break;
- case SSL_ERROR_SYSCALL:
- /* This is what we get when remote disconnects. */
- gf_log(this->name, GF_LOG_DEBUG,
- "syscall error (probably remote disconnect)");
- errno = ENODATA;
- goto out;
- default:
- errno = EIO;
- goto out; /* "break" would just loop again */
+ gf_log (this->name, GF_LOG_INFO,
+ "lost connection in %s", __func__);
+ return -1;
}
+ r = func (priv->ssl_ssl, buf, len);
+ } else {
+ /*
+ * We actually need these functions to get to
+ * priv->connected == 1.
+ */
+ r = ((SSL_unary_func *)func)(priv->ssl_ssl);
+ }
+ switch (SSL_get_error (priv->ssl_ssl, r)) {
+ case SSL_ERROR_NONE:
+ /* fall through */
+ case SSL_ERROR_WANT_READ:
+ /* fall through */
+ case SSL_ERROR_WANT_WRITE:
+ errno = EAGAIN;
+ return r;
+
+ case SSL_ERROR_SYSCALL:
+ /* Sometimes SSL_ERROR_SYSCALL returns errno as
+ * EAGAIN. In such a case we should reattempt operation
+ * So, for now, just return the return value and the
+ * errno as is.
+ */
+ gf_log (this->name, GF_LOG_DEBUG,
+ "syscall error (probably remote disconnect) "
+ "errno:%d:%s", errno, strerror(errno));
+ return r;
+ default:
+ errno = EIO;
+ goto out; /* "break" would just loop again */
}
out:
return -1;
}
-#define ssl_connect_one(t) ssl_do((t), NULL, 0, (SSL_trinary_func *)SSL_connect)
-#define ssl_accept_one(t) ssl_do((t), NULL, 0, (SSL_trinary_func *)SSL_accept)
#define ssl_read_one(t, b, l) ssl_do((t), (b), (l), (SSL_trinary_func *)SSL_read)
#define ssl_write_one(t, b, l) ssl_do((t), (b), (l), (SSL_trinary_func *)SSL_write)
-static char *
-ssl_setup_connection (rpc_transport_t *this, int server)
+
+int
+ssl_setup_connection_prefix (rpc_transport_t *this)
{
- X509 *peer = NULL;
- char peer_CN[256] = "";
int ret = -1;
socket_private_t *priv = NULL;
GF_VALIDATE_OR_GOTO(this->name, this->private, done);
+
priv = this->private;
+ if (ssl_setup_connection_params (this) < 0) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "+ ssl_setup_connection_params() failed!");
+ goto done;
+ } else {
+ gf_log (this->name, GF_LOG_TRACE,
+ "+ ssl_setup_connection_params() done!");
+ }
+
+ priv->ssl_error_required = SSL_ERROR_NONE;
+ priv->ssl_connected = _gf_false;
+ priv->ssl_accepted = _gf_false;
+ priv->ssl_context_created = _gf_false;
+
priv->ssl_ssl = SSL_new(priv->ssl_ctx);
if (!priv->ssl_ssl) {
gf_log(this->name, GF_LOG_ERROR, "SSL_new failed");
ssl_dump_error_stack(this->name);
goto done;
}
+
priv->ssl_sbio = BIO_new_socket(priv->sock, BIO_NOCLOSE);
if (!priv->ssl_sbio) {
gf_log(this->name, GF_LOG_ERROR, "BIO_new_socket failed");
ssl_dump_error_stack(this->name);
goto free_ssl;
}
- SSL_set_bio(priv->ssl_ssl, priv->ssl_sbio, priv->ssl_sbio);
- if (server) {
- ret = ssl_accept_one(this);
- } else {
- ret = ssl_connect_one(this);
- }
+ SSL_set_bio (priv->ssl_ssl, priv->ssl_sbio, priv->ssl_sbio);
+ ret = 0;
+ goto done;
- /* Make sure _the call_ succeeded. */
- if (ret < 0) {
- goto ssl_error;
- }
+free_ssl:
+ SSL_free(priv->ssl_ssl);
+ priv->ssl_ssl = NULL;
+done:
+ return ret;
+}
+
+static char *
+ssl_setup_connection_postfix (rpc_transport_t *this)
+{
+ X509 *peer = NULL;
+ char peer_CN[256] = "";
+ socket_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO(this->name, this->private, done);
+ priv = this->private;
/* Make sure _SSL verification_ succeeded, yielding an identity. */
if (SSL_get_verify_result(priv->ssl_ssl) != X509_V_OK) {
@@ -340,6 +378,8 @@ ssl_setup_connection (rpc_transport_t *this, int server)
goto ssl_error;
}
+ SSL_set_mode(priv->ssl_ssl, SSL_MODE_ENABLE_PARTIAL_WRITE);
+
/* Finally, everything seems OK. */
X509_NAME_get_text_by_NID(X509_get_subject_name(peer),
NID_commonName, peer_CN, sizeof(peer_CN)-1);
@@ -356,7 +396,7 @@ ssl_error:
"SSL connect error (client: %s) (server: %s)",
this->peerinfo.identifier, this->myinfo.identifier);
ssl_dump_error_stack(this->name);
-free_ssl:
+
SSL_free(priv->ssl_ssl);
priv->ssl_ssl = NULL;
done:
@@ -364,6 +404,86 @@ done:
}
+int
+ssl_complete_connection (rpc_transport_t *this)
+{
+ int ret = -1; /* 1 : implies go back to epoll_wait()
+ * 0 : implies successful ssl connection
+ * -1: implies continue processing current event
+ * as if EPOLLERR has been encountered
+ */
+ char *cname = NULL;
+ int r = -1;
+ int ssl_error = -1;
+ socket_private_t *priv = NULL;
+
+
+ priv = this->private;
+
+ if (priv->is_server) {
+ r = SSL_accept (priv->ssl_ssl);
+ } else {
+ r = SSL_connect (priv->ssl_ssl);
+ }
+
+ ssl_error = SSL_get_error (priv->ssl_ssl, r);
+ priv->ssl_error_required = ssl_error;
+
+ switch (ssl_error) {
+ case SSL_ERROR_NONE:
+ cname = ssl_setup_connection_postfix (this);
+ if (!cname) {
+ /* we've failed to get the cname so
+ * we must close the connection
+ *
+ * treat this as EPOLLERR
+ */
+ gf_log (this->name, GF_LOG_TRACE,
+ "error getting cname");
+ errno = ECONNRESET;
+ ret = -1;
+ } else {
+ this->ssl_name = cname;
+ if (priv->is_server) {
+ priv->ssl_accepted = _gf_true;
+ gf_log (this->name, GF_LOG_TRACE,
+ "ssl_accepted!");
+ } else {
+ priv->ssl_connected = _gf_true;
+ gf_log (this->name, GF_LOG_TRACE,
+ "ssl_connected!");
+ }
+ ret = 0;
+ }
+ break;
+
+ case SSL_ERROR_WANT_READ:
+ /* fall through */
+ case SSL_ERROR_WANT_WRITE:
+ errno = EAGAIN;
+ break;
+
+ case SSL_ERROR_SYSCALL:
+ /* Sometimes SSL_ERROR_SYSCALL returns with errno as EAGAIN
+ * So, we should retry the operation.
+ * So, for now, we just return the return value and errno as is.
+ */
+ break;
+
+ case SSL_ERROR_SSL:
+ /* treat this as EPOLLERR */
+ ret = -1;
+ break;
+
+ default:
+ /* treat this as EPOLLERR */
+ errno = EIO;
+ ret = -1;
+ break;
+ }
+ return ret;
+}
+
static void
ssl_teardown_connection (socket_private_t *priv)
{
@@ -371,7 +491,21 @@ ssl_teardown_connection (socket_private_t *priv)
SSL_shutdown(priv->ssl_ssl);
SSL_clear(priv->ssl_ssl);
SSL_free(priv->ssl_ssl);
+ SSL_CTX_free(priv->ssl_ctx);
priv->ssl_ssl = NULL;
+ priv->ssl_ctx = NULL;
+ if (priv->ssl_private_key) {
+ GF_FREE (priv->ssl_private_key);
+ priv->ssl_private_key = NULL;
+ }
+ if (priv->ssl_own_cert) {
+ GF_FREE (priv->ssl_own_cert);
+ priv->ssl_own_cert = NULL;
+ }
+ if (priv->ssl_ca_list) {
+ GF_FREE (priv->ssl_ca_list);
+ priv->ssl_ca_list = NULL;
+ }
}
priv->use_ssl = _gf_false;
}
@@ -388,8 +522,10 @@ __socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount)
sock = priv->sock;
if (priv->use_ssl) {
+ gf_log (this->name, GF_LOG_TRACE, "***** reading over SSL");
ret = ssl_read_one (this, opvector->iov_base, opvector->iov_len);
} else {
+ gf_log (this->name, GF_LOG_TRACE, "***** reading over non-SSL");
ret = sys_readv (sock, opvector, IOV_MIN(opcount));
}
@@ -541,6 +677,8 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
* non-SSL might be insecure, so just fail it outright.
*/
ret = -1;
+ gf_log (this->name, GF_LOG_TRACE,
+ "### no priv->ssl_ssl yet; ret = -1;");
} else if (write) {
if (priv->use_ssl) {
ret = ssl_write_one (this, opvector->iov_base,
@@ -556,9 +694,10 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
this->total_bytes_write += ret;
} else {
ret = __socket_cached_read (this, opvector, opcount);
-
if (ret == 0) {
- gf_log(this->name, GF_LOG_DEBUG, "EOF on socket");
+ gf_log (this->name, GF_LOG_DEBUG,
+ "EOF on socket (errno:%d:%s)",
+ errno, strerror (errno));
errno = ENODATA;
ret = -1;
}
@@ -718,9 +857,8 @@ __socket_disconnect (rpc_transport_t *this)
priv = this->private;
- gf_log (this->name, GF_LOG_TRACE,
- "disconnecting %p, state=%u gen=%u sock=%d", this,
- priv->ot_state, priv->ot_gen, priv->sock);
+ gf_log (this->name, GF_LOG_TRACE, "disconnecting %p, sock=%d",
+ this, priv->sock);
if (priv->sock != -1) {
gf_log_callingfn (this->name, GF_LOG_TRACE,
@@ -731,16 +869,6 @@ __socket_disconnect (rpc_transport_t *this)
"__socket_teardown_connection () failed: %s",
strerror (errno));
}
-
- if (priv->own_thread) {
- /*
- * Without this, reconnect (= disconnect + connect)
- * won't work except by accident.
- */
- gf_log (this->name, GF_LOG_TRACE,
- "OT_PLEASE_DIE on %p", this);
- priv->ot_state = OT_PLEASE_DIE;
- }
}
out:
@@ -974,7 +1102,22 @@ __socket_reset (rpc_transport_t *this)
priv->sock = -1;
priv->idx = -1;
priv->connected = -1;
+ priv->ssl_connected = _gf_false;
+ priv->ssl_accepted = _gf_false;
+ priv->ssl_context_created = _gf_false;
+ if (priv->ssl_private_key) {
+ GF_FREE (priv->ssl_private_key);
+ priv->ssl_private_key = NULL;
+ }
+ if (priv->ssl_own_cert) {
+ GF_FREE (priv->ssl_own_cert);
+ priv->ssl_own_cert = NULL;
+ }
+ if (priv->ssl_ca_list) {
+ GF_FREE (priv->ssl_ca_list);
+ priv->ssl_ca_list = NULL;
+ }
out:
return;
}
@@ -1110,8 +1253,6 @@ static int
__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct)
{
int ret = -1;
- socket_private_t *priv = NULL;
- char a_byte = 0;
ret = __socket_writev (this, entry->pending_vector,
entry->pending_count,
@@ -1122,18 +1263,6 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct)
/* current entry was completely written */
GF_ASSERT (entry->pending_count == 0);
__socket_ioq_entry_free (entry);
- priv = this->private;
- if (priv->own_thread) {
- /*
- * The pipe should only remain readable if there are
- * more entries after this, so drain the byte
- * representing this entry.
- */
- if (!direct && sys_read (priv->pipe[0], &a_byte, 1) < 1) {
- gf_log(this->name, GF_LOG_WARNING,
- "read error on pipe");
- }
- }
}
return ret;
@@ -1162,7 +1291,7 @@ __socket_ioq_churn (rpc_transport_t *this)
break;
}
- if (!priv->own_thread && list_empty (&priv->ioq)) {
+ if (list_empty (&priv->ioq)) {
/* all pending writes done, not interested in POLLOUT */
priv->idx = event_select_on (this->ctx->event_pool,
priv->sock, priv->idx, -1, 0);
@@ -2457,15 +2586,9 @@ socket_event_poll_in (rpc_transport_t *this, gf_boolean_t notify_handled)
priv->gen);
if (pollin) {
- priv->ot_state = OT_CALLBACK;
-
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
pollin);
- if (priv->ot_state == OT_CALLBACK) {
- priv->ot_state = OT_RUNNING;
- }
-
rpc_transport_pollin_destroy (pollin);
pthread_mutex_lock (&priv->notify.lock);
@@ -2558,303 +2681,355 @@ out:
static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);
-/* reads rpc_requests during pollin */
-static int
-socket_event_handler (int fd, int idx, int gen, void *data,
- int poll_in, int poll_out, int poll_err)
+/* socket_is_connected() is for use only in socket_event_handler() */
+static inline gf_boolean_t
+socket_is_connected (rpc_transport_t *this)
{
- rpc_transport_t *this = NULL;
socket_private_t *priv = NULL;
- int ret = -1;
- glusterfs_ctx_t *ctx = NULL;
- gf_boolean_t socket_closed = _gf_false, notify_handled = _gf_false;
- this = data;
+ priv = this->private;
- GF_VALIDATE_OR_GOTO ("socket", this, out);
- GF_VALIDATE_OR_GOTO ("socket", this->private, out);
- GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
+ if (priv->use_ssl) {
+ return priv->is_server ? priv->ssl_accepted :
+ priv->ssl_connected;
+ } else {
+ return priv->is_server ? priv->accepted :
+ priv->connected;
+ }
+}
+
+static void
+ssl_rearm_event_fd (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ int idx = -1;
+ int gen = -1;
+ int fd = -1;
- THIS = this->xl;
priv = this->private;
ctx = this->ctx;
- pthread_mutex_lock (&priv->in_lock);
- pthread_mutex_lock (&priv->out_lock);
- {
- priv->idx = idx;
- priv->gen = gen;
- }
- pthread_mutex_unlock (&priv->out_lock);
- pthread_mutex_unlock (&priv->in_lock);
+ idx = priv->idx;
+ gen = priv->gen;
+ fd = priv->sock;
- if (priv->connected != 1) {
- if (priv->connect_failed) {
- /* connect failed with some other error than
- EINPROGRESS or ENOENT, so nothing more to do, fail
- reading/writing anything even if poll_in or poll_out
- is set */
- gf_log ("transport", GF_LOG_DEBUG,
- "connect failed with some other error than "
- "EINPROGRESS or ENOENT, so nothing more to "
- "do; disconnecting socket");
- ret = socket_disconnect (this, _gf_false);
-
- /* Force ret to be -1, as we are officially done with
- this socket */
+ if (priv->ssl_error_required == SSL_ERROR_WANT_READ)
+ event_select_on (ctx->event_pool, fd, idx, 1, -1);
+ if (priv->ssl_error_required == SSL_ERROR_WANT_WRITE)
+ event_select_on (ctx->event_pool, fd, idx, -1, 1);
+ event_handled (ctx->event_pool, fd, idx, gen);
+}
+
+static int
+ssl_handle_server_connection_attempt (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ int idx = -1;
+ int gen = -1;
+ int ret = -1;
+ int fd = -1;
+
+ priv = this->private;
+ ctx = this->ctx;
+
+ idx = priv->idx;
+ gen = priv->gen;
+ fd = priv->sock;
+
+ if (!priv->ssl_context_created) {
+ ret = ssl_setup_connection_prefix (this);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "> ssl_setup_connection_prefix() failed!");
ret = -1;
+ goto out;
} else {
- ret = socket_connect_finish (this);
+ priv->ssl_context_created = _gf_true;
}
- } else {
- ret = 0;
}
-
- if (!ret && poll_out) {
- ret = socket_event_poll_out (this);
- }
-
- if (!ret && poll_in) {
- ret = socket_event_poll_in (this, !poll_err);
- notify_handled = _gf_true;
+ ret = ssl_complete_connection (this);
+ if (ret == 0) {
+ /* nothing to do */
+ event_select_on (ctx->event_pool, fd, idx, 1, 0);
+ event_handled (ctx->event_pool, fd, idx, gen);
+ ret = 1;
+ } else {
+ if (errno == EAGAIN) {
+ ssl_rearm_event_fd (this);
+ ret = 1;
+ } else {
+ ret = -1;
+ gf_log (this->name, GF_LOG_TRACE,
+ "ssl_complete_connection returned error");
+ }
}
+out:
+ return ret;
+}
- if ((ret < 0) || poll_err) {
- /* Logging has happened already in earlier cases */
- gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),
- "EPOLLERR - disconnecting now");
+static int
+ssl_handle_client_connection_attempt (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ int idx = -1;
+ int ret = -1;
+ int fd = -1;
- socket_closed = socket_event_poll_err (this, gen, idx);
+ priv = this->private;
+ ctx = this->ctx;
- if (socket_closed)
- rpc_transport_unref (this);
+ idx = priv->idx;
+ fd = priv->sock;
- } else if (!notify_handled) {
- event_handled (ctx->event_pool, fd, idx, gen);
+ /* SSL client */
+ if (priv->connect_failed) {
+ gf_log (this->name, GF_LOG_TRACE,
+ ">>> disconnecting SSL socket");
+ ret = socket_disconnect (this, _gf_false);
+ /* Force ret to be -1, as we are officially done with
+ this socket */
+ ret = -1;
+ } else {
+ if (!priv->ssl_context_created) {
+ ret = ssl_setup_connection_prefix (this);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "> ssl_setup_connection_prefix() "
+ "failed!");
+ ret = -1;
+ goto out;
+ } else {
+ priv->ssl_context_created = _gf_true;
+ }
+ }
+ ret = ssl_complete_connection (this);
+ if (ret == 0) {
+ ret = socket_connect_finish (this);
+ event_select_on (ctx->event_pool, fd, idx, 1, 0);
+ gf_log (this->name, GF_LOG_TRACE,
+ ">>> completed client connect");
+ } else {
+ if (errno == EAGAIN) {
+ gf_log (this->name, GF_LOG_TRACE,
+ ">>> retrying client connect 2");
+ ssl_rearm_event_fd (this);
+ ret = 1;
+ } else {
+ /* this is a connection failure */
+ ret = socket_connect_finish (this);
+ gf_log (this->name, GF_LOG_TRACE,
+ "ssl_complete_connection "
+ "returned error");
+ ret = -1;
+ }
+ }
}
-
out:
return ret;
}
-static int poll_err_cnt;
-static void *
-socket_poller (void *ctx)
+static int
+socket_handle_client_connection_attempt (rpc_transport_t *this)
{
- rpc_transport_t *this = ctx;
- socket_private_t *priv = this->private;
- struct pollfd pfd[2] = {{0, }, };
- gf_boolean_t to_write = _gf_false;
- int ret = 0;
- uint32_t gen = 0;
- char *cname = NULL;
+ socket_private_t *priv = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ int idx = -1;
+ int gen = -1;
+ int ret = -1;
+ int fd = -1;
- GF_ASSERT (this);
- /* Set THIS early on in the life of this thread, instead of setting it
- * conditionally
- */
- THIS = this->xl;
+ priv = this->private;
+ ctx = this->ctx;
- if (priv->ot_state == OT_PLEASE_DIE) {
- gf_log (this->name, GF_LOG_DEBUG, "socket_poller is exiting "
- "because socket state is OT_PLEASE_DIE");
- goto err;
+ idx = priv->idx;
+ gen = priv->gen;
+ fd = priv->sock;
+
+ /* non-SSL client */
+ if (priv->connect_failed) {
+ /* connect failed with some other error than
+ EINPROGRESS or ENOENT, so nothing more to
+ do, fail reading/writing anything even if
+ poll_in or poll_out
+ is set
+ */
+ gf_log ("transport", GF_LOG_DEBUG,
+ "connect failed with some other error "
+ "than EINPROGRESS or ENOENT, so "
+ "nothing more to do; disconnecting "
+ "socket");
+ (void)socket_disconnect (this, _gf_false);
+
+ /* Force ret to be -1, as we are officially
+ * done with this socket
+ */
+ ret = -1;
+ } else {
+ ret = socket_connect_finish (this);
+ gf_log (this->name, GF_LOG_TRACE,
+ "socket_connect_finish() returned %d",
+ ret);
+ if (ret == 0 || ret == 1) {
+ /* we don't want to do any reads or
+ * writes on the connection yet in
+ * socket_event_handler, so just
+ * return 1
+ */
+ ret = 1;
+ event_handled (ctx->event_pool, fd, idx, gen);
+ }
}
+ return ret;
+}
- priv->ot_state = OT_RUNNING;
+static int
+socket_complete_connection (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ int idx = -1;
+ int gen = -1;
+ int ret = -1;
+ int fd = -1;
+
+ priv = this->private;
+ ctx = this->ctx;
+
+ idx = priv->idx;
+ gen = priv->gen;
+ fd = priv->sock;
if (priv->use_ssl) {
- cname = ssl_setup_connection(this, priv->connected);
- if (!cname) {
- gf_log (this->name, GF_LOG_ERROR, "%s setup failed",
- priv->connected ? "server" : "client");
- goto err;
+ if (priv->is_server) {
+ ret = ssl_handle_server_connection_attempt (this);
+ } else {
+ ret = ssl_handle_client_connection_attempt (this);
}
- if (priv->connected) {
- this->ssl_name = cname;
+ } else {
+ if (priv->is_server) {
+ /* non-SSL server: nothing much to do
+ * connection has already been accepted in
+ * socket_server_event_handler()
+ */
+ priv->accepted = _gf_true;
+ event_handled (ctx->event_pool, fd, idx, gen);
+ ret = 1;
} else {
- GF_FREE(cname);
+ ret = socket_handle_client_connection_attempt (this);
}
}
+ return ret;
+}
- if (!priv->bio) {
- ret = __socket_nonblock (priv->sock);
- if (ret == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "NBIO on %d failed (%s)",
- priv->sock, strerror (errno));
- goto err;
- }
- }
+/* reads rpc_requests during pollin */
+static int
+socket_event_handler (int fd, int idx, int gen, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ rpc_transport_t *this = NULL;
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ glusterfs_ctx_t *ctx = NULL;
+ gf_boolean_t socket_closed = _gf_false, notify_handled = _gf_false;
- if (priv->connected == 0) {
- ret = socket_connect_finish (this);
- if (ret != 0) {
- gf_log (this->name, GF_LOG_WARNING,
- "asynchronous socket_connect_finish failed");
- }
- }
- ret = rpc_transport_notify (this->listener,
- RPC_TRANSPORT_ACCEPT, this);
- if (ret != 0) {
- gf_log (this->name, GF_LOG_WARNING,
- "asynchronous rpc_transport_notify failed");
+ this = data;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
+
+ THIS = this->xl;
+ priv = this->private;
+ ctx = this->ctx;
+
+ pthread_mutex_lock (&priv->in_lock);
+ pthread_mutex_lock (&priv->out_lock);
+ {
+ priv->idx = idx;
+ priv->gen = gen;
}
+ pthread_mutex_unlock (&priv->out_lock);
+ pthread_mutex_unlock (&priv->in_lock);
- gen = priv->ot_gen;
- for (;;) {
- pthread_mutex_lock(&priv->out_lock);
- to_write = !list_empty(&priv->ioq);
- pthread_mutex_unlock(&priv->out_lock);
- pfd[0].fd = priv->pipe[0];
- pfd[0].events = POLL_MASK_ERROR;
- pfd[0].revents = 0;
- pfd[1].fd = priv->sock;
- pfd[1].events = POLL_MASK_INPUT | POLL_MASK_ERROR;
- pfd[1].revents = 0;
- if (to_write) {
- pfd[1].events |= POLL_MASK_OUTPUT;
- } else {
- pfd[0].events |= POLL_MASK_INPUT;
- }
- if (poll(pfd, 2, -1) < 0) {
- gf_log(this->name, GF_LOG_ERROR, "poll failed");
- break;
- }
- if (pfd[0].revents & POLL_MASK_ERROR) {
- gf_log(this->name, GF_LOG_ERROR,
- "poll error on pipe");
- break;
- }
+ gf_log (this->name, GF_LOG_TRACE, "%s (sock:%d) in:%d, out:%d, err:%d",
+ (priv->is_server ? "server" : "client"),
+ priv->sock, poll_in, poll_out, poll_err);
- if (priv->ot_state == OT_PLEASE_DIE) {
- gf_log (this->name, GF_LOG_DEBUG,
- "OT_PLEASE_DIE on %p (exiting socket_poller)",
- this);
- break;
- }
+ if (!poll_err) {
+ if (!socket_is_connected (this)) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "%s (sock:%d) socket is not connected, "
+ "completing connection",
+ (priv->is_server ? "server" : "client"),
+ priv->sock);
- if (pfd[1].revents & POLL_MASK_INPUT) {
- ret = socket_event_poll_in(this, 0);
- if (ret >= 0) {
- /* Suppress errors while making progress. */
- pfd[1].revents &= ~POLL_MASK_ERROR;
- } else if (errno == ENOTCONN) {
- ret = 0;
- }
- if (priv->ot_state == OT_PLEASE_DIE) {
- gf_log (this->name, GF_LOG_TRACE,
- "OT_IDLE on %p (input request)",
- this);
- break;
- }
- } else if (pfd[1].revents & POLL_MASK_OUTPUT) {
- ret = socket_event_poll_out(this);
- if (ret >= 0) {
- /* Suppress errors while making progress. */
- pfd[1].revents &= ~POLL_MASK_ERROR;
- } else if (errno == ENOTCONN) {
- ret = 0;
- }
- if (priv->ot_state == OT_PLEASE_DIE) {
+ ret = socket_complete_connection (this);
+
+ gf_log (this->name, GF_LOG_TRACE, "(sock:%d) "
+ "socket_complete_connection() returned %d",
+ priv->sock, ret);
+
+ if (ret > 0) {
gf_log (this->name, GF_LOG_TRACE,
- "OT_IDLE on %p (output request)",
- this);
- break;
+ "(sock:%d) returning to wait on socket",
+ priv->sock);
+ return 0;
}
} else {
- /*
- * This usually means that we left poll() because
- * somebody pushed a byte onto our pipe. That wakeup
- * is why the pipe is there, but once awake we can do
- * all the checking we need on the next iteration.
- */
- ret = 0;
- }
- if (pfd[1].revents & POLL_MASK_ERROR) {
- gf_log(this->name, GF_LOG_ERROR,
- "poll error on socket");
- break;
- }
- if (ret < 0) {
- GF_LOG_OCCASIONALLY (poll_err_cnt, this->name,
- GF_LOG_ERROR,
- "socket_poller %s failed (%s)",
- this->peerinfo.identifier,
- strerror (errno));
- break;
- }
- if (priv->ot_gen != gen) {
+ char *sock_type = (priv->is_server ? "Server" :
+ "Client");
+
gf_log (this->name, GF_LOG_TRACE,
- "generation mismatch, my %u != %u",
- gen, priv->ot_gen);
- return NULL;
+ "%s socket (%d) is already connected",
+ sock_type, priv->sock);
+ ret = 0;
}
}
-err:
- /* All (and only) I/O errors should come here. */
- pthread_mutex_lock(&priv->in_lock);
- pthread_mutex_lock(&priv->out_lock);
- {
- gf_log (this->name, GF_LOG_TRACE, "disconnecting socket");
- __socket_teardown_connection (this);
- sys_close (priv->sock);
- priv->sock = -1;
-
- sys_close (priv->pipe[0]);
- sys_close (priv->pipe[1]);
- priv->pipe[0] = -1;
- priv->pipe[1] = -1;
-
- priv->ot_state = OT_IDLE;
+ if (!ret && poll_out) {
+ ret = socket_event_poll_out (this);
+ gf_log (this->name, GF_LOG_TRACE, "(sock:%d) "
+ "socket_event_poll_out returned %d", priv->sock, ret);
}
- pthread_mutex_unlock(&priv->out_lock);
- pthread_mutex_unlock(&priv->in_lock);
-
- rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
- GF_REF_PUT (priv);
-
- rpc_transport_unref (this);
+ if (!ret && poll_in) {
+ ret = socket_event_poll_in (this, !poll_err);
+ gf_log (this->name, GF_LOG_TRACE, "(sock:%d) "
+ "socket_event_poll_in returned %d", priv->sock, ret);
+ notify_handled = _gf_true;
+ }
- return NULL;
-}
+ if ((ret < 0) || poll_err) {
+ struct sockaddr *sa = SA(&this->peerinfo.sockaddr);
+ if (priv->is_server &&
+ SA(&this->myinfo.sockaddr)->sa_family == AF_UNIX) {
+ sa = SA(&this->myinfo.sockaddr);
+ }
-static int
-socket_spawn (rpc_transport_t *this)
-{
- socket_private_t *priv = this->private;
- int ret = -1;
- switch (priv->ot_state) {
- case OT_IDLE:
- case OT_PLEASE_DIE:
- break;
- default:
- gf_log (this->name, GF_LOG_WARNING,
- "refusing to start redundant poller");
- return ret;
- }
+ socket_dump_info (sa, priv->is_server, priv->use_ssl,
+ priv->sock, this->name,
+ "disconnecting from");
- priv->ot_gen += 7;
- priv->ot_state = OT_SPAWNING;
- gf_log (this->name, GF_LOG_TRACE,
- "spawning %p with gen %u", this, priv->ot_gen);
+ /* Logging has happened already in earlier cases */
+ gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),
+ "EPOLLERR - disconnecting (sock:%d) (%s)",
+ priv->sock, (priv->use_ssl ? "SSL" : "non-SSL"));
- GF_REF_GET (priv);
+ socket_closed = socket_event_poll_err (this, gen, idx);
- /* Create thread after enable detach flag */
+ if (socket_closed)
+ rpc_transport_unref (this);
- ret = gf_thread_create_detached (&priv->thread, socket_poller, this,
- "spoller");
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "could not create poll thread");
- ret = -1;
+ } else if (!notify_handled) {
+ event_handled (ctx->event_pool, fd, idx, gen);
}
+out:
return ret;
}
@@ -2871,7 +3046,6 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
socklen_t addrlen = sizeof (new_sockaddr);
socket_private_t *new_priv = NULL;
glusterfs_ctx_t *ctx = NULL;
- char *cname = NULL;
this = data;
GF_VALIDATE_OR_GOTO ("socket", this, out);
@@ -2965,6 +3139,10 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
}
get_transport_identifiers (new_trans);
+ gf_log (this->name, GF_LOG_TRACE, "XXX server:%s, client:%s",
+ new_trans->myinfo.identifier,
+ new_trans->peerinfo.identifier);
+
ret = socket_init(new_trans);
if (ret != 0) {
sys_close (new_sock);
@@ -3000,23 +3178,16 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
}
new_priv->sock = new_sock;
- new_priv->own_thread = priv->own_thread;
+ new_priv->ssl_enabled = priv->ssl_enabled;
new_priv->ssl_ctx = priv->ssl_ctx;
- if (new_priv->use_ssl && !new_priv->own_thread) {
- cname = ssl_setup_connection(new_trans, 1);
- if (!cname) {
- gf_log(this->name, GF_LOG_ERROR,
- "server setup failed");
- sys_close (new_sock);
- GF_FREE (new_trans->name);
- GF_FREE (new_trans);
- goto out;
- }
- this->ssl_name = cname;
- }
+ new_priv->connected = 1;
+ new_priv->is_server = _gf_true;
- if (!priv->bio && !priv->own_thread) {
+ /* set O_NONBLOCK for plain text as well as ssl connections */
+ if (!priv->bio) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "### use non-blocking IO");
ret = __socket_nonblock (new_sock);
if (ret == -1) {
@@ -3030,29 +3201,13 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
goto out;
}
}
-
/*
- * In the own_thread case, this is used to
- * indicate that we're initializing a server
- * connection.
+ * This is the first ref on the newly accepted
+ * transport.
*/
- new_priv->connected = 1;
- new_priv->is_server = _gf_true;
rpc_transport_ref (new_trans);
- if (new_priv->own_thread) {
- if (pipe(new_priv->pipe) < 0) {
- gf_log(this->name, GF_LOG_ERROR,
- "could not create pipe");
- }
- ret = socket_spawn(new_trans);
- if (ret) {
- gf_log(this->name, GF_LOG_ERROR,
- "could not spawn thread");
- sys_close (new_priv->pipe[0]);
- sys_close (new_priv->pipe[1]);
- }
- } else {
+ {
/* Take a ref on the new_trans to avoid
* getting deleted when event_register()
* causes socket_event_handler() to race
@@ -3118,9 +3273,6 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
}
}
out:
- if (cname && (cname != this->ssl_name)) {
- GF_FREE(cname);
- }
return ret;
}
@@ -3130,36 +3282,12 @@ socket_disconnect (rpc_transport_t *this, gf_boolean_t wait)
{
socket_private_t *priv = NULL;
int ret = -1;
- char a_byte = 'r';
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- if (wait && priv->own_thread) {
- GF_REF_PUT (priv);
-
- pthread_mutex_lock (&priv->cond_lock);
- {
- /* Change the state to OT_PLEASE_DIE so that
- * socket_poller can exit. */
- priv->ot_state = OT_PLEASE_DIE;
- /* Write something into the pipe so that poller
- * thread can wake up.*/
- if (sys_write (priv->pipe[1], &a_byte, 1) < 1) {
- gf_log (this->name, GF_LOG_WARNING,
- "write error on pipe");
- }
-
- /* Wait for socket_poller to exit */
- if (!priv->own_thread_done)
- pthread_cond_wait (&priv->cond,
- &priv->cond_lock);
- }
- pthread_mutex_unlock (&priv->cond_lock);
- }
-
pthread_mutex_lock (&priv->in_lock);
pthread_mutex_lock (&priv->out_lock);
{
@@ -3250,8 +3378,8 @@ socket_connect (rpc_transport_t *this, int port)
gf_boolean_t refd = _gf_false;
socket_connect_error_state_t *arg = NULL;
pthread_t th_id = {0, };
- char *cname = NULL;
gf_boolean_t ign_enoent = _gf_false;
+ gf_boolean_t connect_attempted = _gf_false;
GF_VALIDATE_OR_GOTO ("socket", this, err);
GF_VALIDATE_OR_GOTO ("socket", this->private, err);
@@ -3268,7 +3396,6 @@ socket_connect (rpc_transport_t *this, int port)
pthread_mutex_lock (&priv->in_lock);
pthread_mutex_lock (&priv->out_lock);
{
- priv->own_thread_done = _gf_false;
if (priv->sock != -1) {
gf_log_callingfn (this->name, GF_LOG_TRACE,
"connect () called on transport "
@@ -3279,8 +3406,7 @@ socket_connect (rpc_transport_t *this, int port)
}
gf_log (this->name, GF_LOG_TRACE,
- "connecting %p, state=%u gen=%u sock=%d", this,
- priv->ot_state, priv->ot_gen, priv->sock);
+ "connecting %p, sock=%d", this, priv->sock);
ret = socket_client_get_remote_sockaddr (this, &sock_union.sa,
&sockaddr_len, &sa_family);
@@ -3390,8 +3516,9 @@ socket_connect (rpc_transport_t *this, int port)
}
/* If client wants ENOENT to be ignored */
- ign_enoent = dict_get_str_boolean (this->options,
- "transport.socket.ignore-enoent", _gf_false);
+ ign_enoent = dict_get_str_boolean (this->options,
+ "transport.socket.ignore-enoent",
+ _gf_false);
ret = client_bind (this, SA (&this->myinfo.sockaddr),
&this->myinfo.sockaddr_len, priv->sock);
@@ -3401,17 +3528,27 @@ socket_connect (rpc_transport_t *this, int port)
goto handler;
}
- if (!priv->use_ssl && !priv->bio && !priv->own_thread) {
+ /* make socket non-blocking for all types of sockets */
+ if (!priv->bio) {
ret = __socket_nonblock (priv->sock);
if (ret == -1) {
gf_log (this->name, GF_LOG_ERROR,
"NBIO on %d failed (%s)",
priv->sock, strerror (errno));
goto handler;
+ } else {
+ gf_log (this->name, GF_LOG_TRACE,
+ ">>> connect() with non-blocking IO for ALL");
}
}
-
this->connect_failed = _gf_false;
+ priv->connect_failed = 0;
+ priv->connected = 0;
+
+ socket_dump_info (SA(&this->peerinfo.sockaddr), priv->is_server,
+ priv->use_ssl, priv->sock, this->name,
+ "connecting to");
+
if (ign_enoent) {
ret = connect_loop (priv->sock,
SA (&this->peerinfo.sockaddr),
@@ -3422,6 +3559,8 @@ socket_connect (rpc_transport_t *this, int port)
this->peerinfo.sockaddr_len);
}
+ connect_attempted = _gf_true;
+
if (ret == -1 && errno == ENOENT && ign_enoent) {
gf_log (this->name, GF_LOG_WARNING,
"Ignore failed connection attempt on %s, (%s) ",
@@ -3467,87 +3606,33 @@ socket_connect (rpc_transport_t *this, int port)
ret = 0;
}
- if (priv->use_ssl && !priv->own_thread) {
- cname = ssl_setup_connection(this, 0);
- if (!cname) {
- errno = ENOTCONN;
- ret = -1;
- gf_log(this->name, GF_LOG_ERROR,
- "client setup failed");
- goto handler;
- }
- if (priv->connected) {
- this->ssl_name = cname;
- } else {
- GF_FREE(cname);
- }
- }
-
- if (!priv->bio && !priv->own_thread) {
- ret = __socket_nonblock (priv->sock);
-
- if (ret == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "NBIO on %d failed (%s)",
- priv->sock, strerror (errno));
- goto handler;
- }
- }
-
handler:
- if (ret < 0) {
+ if (ret < 0 && !connect_attempted) {
/* Ignore error from connect. epoll events
should be handled in the socket handler. shutdown(2)
will result in EPOLLERR, so cleanup is done in
socket_event_handler or socket_poller */
shutdown (priv->sock, SHUT_RDWR);
+ gf_log (this->name, GF_LOG_TRACE,
+ "@@@ client shutdown(%d, SHUT_RDWR)",
+ priv->sock);
}
- /*
- * In the own_thread case, this is used to indicate that we're
- * initializing a client connection.
- */
priv->connected = 0;
priv->is_server = _gf_false;
rpc_transport_ref (this);
refd = _gf_true;
- if (priv->own_thread) {
- if (priv->connect_failed) {
- gf_msg_debug (this->name, 0,
- "socket connect is failed so close it");
- sys_close (priv->sock);
- priv->sock = -1;
- ret = -1;
- goto unlock;
- }
-
- if (pipe(priv->pipe) < 0) {
- gf_log(this->name, GF_LOG_ERROR,
- "could not create pipe");
- }
-
- this->listener = this;
- ret = socket_spawn(this);
- if (ret) {
- gf_log(this->name, GF_LOG_ERROR,
- "could not spawn thread");
- sys_close (priv->pipe[0]);
- sys_close (priv->pipe[1]);
- sys_close (priv->sock);
- priv->sock = -1;
- }
- } else {
- priv->idx = event_register (ctx->event_pool, priv->sock,
- socket_event_handler,
- this, 1, 1);
- if (priv->idx == -1) {
- gf_log ("", GF_LOG_WARNING,
- "failed to register the event");
- sys_close (priv->sock);
- priv->sock = -1;
- ret = -1;
- }
+ this->listener = this;
+ priv->idx = event_register (ctx->event_pool, priv->sock,
+ socket_event_handler,
+ this, 1, 1);
+ if (priv->idx == -1) {
+ gf_log ("", GF_LOG_WARNING,
+ "failed to register the event");
+ sys_close (priv->sock);
+ priv->sock = -1;
+ ret = -1;
}
unlock:
@@ -3701,6 +3786,10 @@ socket_listen (rpc_transport_t *this)
goto unlock;
}
+ socket_dump_info (SA(&this->myinfo.sockaddr), priv->is_server,
+ priv->use_ssl, priv->sock, this->name,
+ "listening on");
+
ret = listen (priv->sock, priv->backlog);
if (ret == -1) {
@@ -3742,11 +3831,11 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
{
socket_private_t *priv = NULL;
int ret = -1;
- char need_poll_out = 0;
- char need_append = 1;
struct ioq *entry = NULL;
glusterfs_ctx_t *ctx = NULL;
- char a_byte = 'j';
+ char need_poll_out = 0;
+ char need_append = 1;
+
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -3784,19 +3873,9 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
if (need_append) {
list_add_tail (&entry->list, &priv->ioq);
- if (priv->own_thread) {
- /*
- * Make sure the polling thread wakes up, by
- * writing a byte to represent this entry.
- */
- if (sys_write (priv->pipe[1], &a_byte, 1) < 1) {
- gf_log(this->name, GF_LOG_WARNING,
- "write error on pipe");
- }
- }
ret = 0;
}
- if (!priv->own_thread && need_poll_out) {
+ if (need_poll_out) {
/* first entry to wait. continue writing on POLLOUT */
priv->idx = event_select_on (ctx->event_pool,
priv->sock,
@@ -3816,11 +3895,11 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
{
socket_private_t *priv = NULL;
int ret = -1;
- char need_poll_out = 0;
- char need_append = 1;
struct ioq *entry = NULL;
glusterfs_ctx_t *ctx = NULL;
- char a_byte = 'd';
+ char need_poll_out = 0;
+ char need_append = 1;
+
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -3842,6 +3921,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
priv->submit_log = 0;
entry = __socket_ioq_new (this, &reply->msg);
+
if (!entry)
goto unlock;
@@ -3858,19 +3938,9 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
if (need_append) {
list_add_tail (&entry->list, &priv->ioq);
- if (priv->own_thread) {
- /*
- * Make sure the polling thread wakes up, by
- * writing a byte to represent this entry.
- */
- if (sys_write (priv->pipe[1], &a_byte, 1) < 1) {
- gf_log(this->name, GF_LOG_WARNING,
- "write error on pipe");
- }
- }
- ret = 0;
}
- if (!priv->own_thread && need_poll_out) {
+
+ if (need_poll_out) {
/* first entry to wait. continue writing on POLLOUT */
priv->idx = event_select_on (ctx->event_pool,
priv->sock,
@@ -4231,203 +4301,24 @@ fini_openssl_mt (void)
ERR_free_strings();
}
-static void
-socket_poller_mayday (socket_private_t *priv)
-{
- if (priv == NULL)
- return;
-
- pthread_mutex_lock (&priv->cond_lock);
- {
- /* Signal waiting threads before exiting from socket_poller */
- if (!priv->own_thread_done) {
- gf_log ("socket", GF_LOG_TRACE, "priv->cond SIGNALED");
- pthread_cond_signal (&priv->cond);
- priv->own_thread_done = _gf_true;
- }
- }
- pthread_mutex_unlock (&priv->cond_lock);
-}
-
static int
-socket_init (rpc_transport_t *this)
+ssl_setup_connection_params(rpc_transport_t *this)
{
socket_private_t *priv = NULL;
- gf_boolean_t tmp_bool = 0;
- uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
char *optstr = NULL;
- uint32_t timeout = 0;
- int keepaliveidle = GF_KEEPALIVE_TIME;
- int keepaliveintvl = GF_KEEPALIVE_INTERVAL;
- int keepalivecnt = GF_KEEPALIVE_COUNT;
- uint32_t backlog = 0;
- int session_id = 0;
+ static int session_id = 1;
int32_t cert_depth = DEFAULT_VERIFY_DEPTH;
char *cipher_list = DEFAULT_CIPHER_LIST;
char *dh_param = DEFAULT_DH_PARAM;
char *ec_curve = DEFAULT_EC_CURVE;
char *crl_path = NULL;
- if (this->private) {
- gf_log_callingfn (this->name, GF_LOG_ERROR,
- "double init attempted");
- return -1;
- }
-
- priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t);
- if (!priv) {
- return -1;
- }
- memset(priv, 0, sizeof(*priv));
-
- pthread_mutex_init (&priv->in_lock, NULL);
- pthread_mutex_init (&priv->out_lock, NULL);
- pthread_mutex_init (&priv->cond_lock, NULL);
- pthread_cond_init (&priv->cond, NULL);
-
- GF_REF_INIT (priv, socket_poller_mayday);
-
- priv->sock = -1;
- priv->idx = -1;
- priv->connected = -1;
- priv->nodelay = 1;
- priv->bio = 0;
- priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
- INIT_LIST_HEAD (&priv->ioq);
- pthread_mutex_init (&priv->notify.lock, NULL);
- pthread_cond_init (&priv->notify.cond, NULL);
-
- /* All the below section needs 'this->options' to be present */
- if (!this->options)
- goto out;
-
- if (dict_get (this->options, "non-blocking-io")) {
- optstr = data_to_str (dict_get (this->options,
- "non-blocking-io"));
-
- if (gf_string2boolean (optstr, &tmp_bool) == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "'non-blocking-io' takes only boolean options,"
- " not taking any action");
- tmp_bool = 1;
- }
-
- if (!tmp_bool) {
- priv->bio = 1;
- gf_log (this->name, GF_LOG_WARNING,
- "disabling non-blocking IO");
- }
- }
-
- optstr = NULL;
-
- /* By default, we enable NODELAY */
- if (dict_get (this->options, "transport.socket.nodelay")) {
- optstr = data_to_str (dict_get (this->options,
- "transport.socket.nodelay"));
-
- if (gf_string2boolean (optstr, &tmp_bool) == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "'transport.socket.nodelay' takes only "
- "boolean options, not taking any action");
- tmp_bool = 1;
- }
- if (!tmp_bool) {
- priv->nodelay = 0;
- gf_log (this->name, GF_LOG_DEBUG,
- "disabling nodelay");
- }
- }
-
- optstr = NULL;
- if (dict_get_str (this->options, "tcp-window-size",
- &optstr) == 0) {
- if (gf_string2uint64 (optstr, &windowsize) != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "invalid number format: %s", optstr);
- return -1;
- }
- }
-
- priv->windowsize = (int)windowsize;
-
- optstr = NULL;
- /* Enable Keep-alive by default. */
- priv->keepalive = 1;
- priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL;
- priv->keepaliveidle = GF_KEEPALIVE_TIME;
- priv->keepalivecnt = GF_KEEPALIVE_COUNT;
- if (dict_get_str (this->options, "transport.socket.keepalive",
- &optstr) == 0) {
- if (gf_string2boolean (optstr, &tmp_bool) == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "'transport.socket.keepalive' takes only "
- "boolean options, not taking any action");
- tmp_bool = 1;
- }
-
- if (!tmp_bool)
- priv->keepalive = 0;
- }
-
- if (dict_get_int32 (this->options, "transport.tcp-user-timeout",
- &(priv->timeout)) != 0)
- priv->timeout = timeout;
- gf_log (this->name, GF_LOG_DEBUG, "Configued "
- "transport.tcp-user-timeout=%d", priv->timeout);
-
- if (dict_get_int32 (this->options,
- "transport.socket.keepalive-time",
- &(priv->keepaliveidle)) != 0) {
- priv->keepaliveidle = keepaliveidle;
- }
-
- if (dict_get_int32 (this->options,
- "transport.socket.keepalive-interval",
- &(priv->keepaliveintvl)) != 0) {
- priv->keepaliveintvl = keepaliveintvl;
- }
-
- if (dict_get_int32 (this->options, "transport.socket.keepalive-count",
- &(priv->keepalivecnt)) != 0)
- priv->keepalivecnt = keepalivecnt;
- gf_log (this->name, GF_LOG_DEBUG, "Reconfigued "
- "transport.keepalivecnt=%d", keepalivecnt);
-
- if (dict_get_uint32 (this->options,
- "transport.listen-backlog",
- &backlog) != 0) {
-
- backlog = GLUSTERFS_SOCKET_LISTEN_BACKLOG;
- }
- priv->backlog = backlog;
-
- optstr = NULL;
-
- /* Check if socket read failures are to be logged */
- priv->read_fail_log = 1;
- if (dict_get (this->options, "transport.socket.read-fail-log")) {
- optstr = data_to_str (dict_get (this->options, "transport.socket.read-fail-log"));
- if (gf_string2boolean (optstr, &tmp_bool) == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "'transport.socket.read-fail-log' takes only "
- "boolean options; logging socket read fails");
- } else if (tmp_bool == _gf_false) {
- priv->read_fail_log = 0;
- }
- }
-
- priv->windowsize = (int)windowsize;
+ priv = this->private;
- priv->ssl_enabled = _gf_false;
- if (dict_get_str(this->options, SSL_ENABLED_OPT, &optstr) == 0) {
- if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "invalid value given for ssl-enabled boolean");
- }
+ if (priv->ssl_ctx != NULL) {
+ gf_log (this->name, GF_LOG_TRACE, "found old SSL context!");
+ return 0;
}
- priv->mgmt_ssl = this->ctx->secure_mgmt;
- priv->srvr_ssl = this->ctx->secure_srvr;
priv->ssl_own_cert = DEFAULT_CERT_PATH;
if (dict_get_str(this->options, SSL_OWN_CERT_OPT, &optstr) == 0) {
@@ -4480,23 +4371,6 @@ socket_init (rpc_transport_t *this)
gf_log(this->name, priv->mgmt_ssl ? GF_LOG_INFO: GF_LOG_DEBUG,
"SSL support for glusterd is %s",
priv->mgmt_ssl ? "ENABLED" : "NOT enabled");
- /*
- * This might get overridden temporarily in socket_connect (q.v.)
- * if we're using the glusterd portmapper.
- */
- priv->use_ssl = priv->ssl_enabled;
-
- priv->own_thread = priv->use_ssl;
- if (dict_get_str(this->options, OWN_THREAD_OPT, &optstr) == 0) {
- gf_log (this->name, GF_LOG_INFO, "OWN_THREAD_OPT found");
- if (gf_string2boolean (optstr, &priv->own_thread) != 0) {
- gf_log (this->name, GF_LOG_WARNING,
- "invalid value given for own-thread boolean");
- }
- }
- gf_log(this->name, priv->own_thread ? GF_LOG_INFO : GF_LOG_DEBUG,
- "using %s polling thread",
- priv->own_thread ? "private" : "system");
if (!priv->mgmt_ssl) {
if (!dict_get_int32 (this->options, SSL_CERT_DEPTH_OPT, &cert_depth)) {
@@ -4537,7 +4411,7 @@ socket_init (rpc_transport_t *this)
#error Old and insecure OpenSSL, use -DUSE_INSECURE_OPENSSL to use it anyway
#endif
/* SSLv23_method uses highest available protocol */
- priv->ssl_meth = (SSL_METHOD *)SSLv23_method();
+ priv->ssl_meth = SSLv23_method();
#endif
priv->ssl_ctx = SSL_CTX_new(priv->ssl_meth);
@@ -4655,14 +4529,15 @@ socket_init (rpc_transport_t *this)
x509store = SSL_CTX_get_cert_store(priv->ssl_ctx);
X509_STORE_set_flags(x509store,
- X509_V_FLAG_CRL_CHECK|X509_V_FLAG_CRL_CHECK_ALL);
+ X509_V_FLAG_CRL_CHECK |
+ X509_V_FLAG_CRL_CHECK_ALL);
#else
gf_log(this->name, GF_LOG_ERROR,
"OpenSSL version does not support CRL");
#endif
}
- priv->ssl_session_id = ++session_id;
+ priv->ssl_session_id = session_id++;
SSL_CTX_set_session_id_context(priv->ssl_ctx,
(void *)&priv->ssl_session_id,
sizeof(priv->ssl_session_id));
@@ -4677,27 +4552,194 @@ socket_init (rpc_transport_t *this)
*/
SSL_CTX_set_purpose(priv->ssl_ctx, X509_PURPOSE_ANY);
}
+ return 0;
+
+err:
+ return -1;
+}
+
+static int
+socket_init (rpc_transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ gf_boolean_t tmp_bool = 0;
+ uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
+ char *optstr = NULL;
+ uint32_t timeout = 0;
+ int keepaliveidle = GF_KEEPALIVE_TIME;
+ int keepaliveintvl = GF_KEEPALIVE_INTERVAL;
+ int keepalivecnt = GF_KEEPALIVE_COUNT;
+ uint32_t backlog = 0;
+
+
+ if (this->private) {
+ gf_log_callingfn (this->name, GF_LOG_ERROR,
+ "double init attempted");
+ return -1;
+ }
- if (priv->own_thread) {
- priv->ot_state = OT_IDLE;
+ priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t);
+ if (!priv) {
+ return -1;
}
+ memset(priv, 0, sizeof(*priv));
-out:
this->private = priv;
- return 0;
+ pthread_mutex_init (&priv->in_lock, NULL);
+ pthread_mutex_init (&priv->out_lock, NULL);
+ pthread_mutex_init (&priv->cond_lock, NULL);
+ pthread_cond_init (&priv->cond, NULL);
-err:
- if (priv->ssl_own_cert) {
- GF_FREE(priv->ssl_own_cert);
+ /*GF_REF_INIT (priv, socket_poller_mayday);*/
+
+ priv->sock = -1;
+ priv->idx = -1;
+ priv->connected = -1;
+ priv->nodelay = 1;
+ priv->bio = 0;
+ priv->ssl_accepted = _gf_false;
+ priv->ssl_connected = _gf_false;
+ priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
+ INIT_LIST_HEAD (&priv->ioq);
+ pthread_mutex_init (&priv->notify.lock, NULL);
+ pthread_cond_init (&priv->notify.cond, NULL);
+
+ /* All the below section needs 'this->options' to be present */
+ if (!this->options)
+ goto out;
+
+ if (dict_get (this->options, "non-blocking-io")) {
+ optstr = data_to_str (dict_get (this->options,
+ "non-blocking-io"));
+
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'non-blocking-io' takes only boolean options,"
+ " not taking any action");
+ tmp_bool = 1;
+ }
+
+ if (!tmp_bool) {
+ priv->bio = 1;
+ gf_log (this->name, GF_LOG_WARNING,
+ "disabling non-blocking IO");
+ }
}
- if (priv->ssl_private_key) {
- GF_FREE(priv->ssl_private_key);
+
+ optstr = NULL;
+
+ /* By default, we enable NODELAY */
+ if (dict_get (this->options, "transport.socket.nodelay")) {
+ optstr = data_to_str (dict_get (this->options,
+ "transport.socket.nodelay"));
+
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'transport.socket.nodelay' takes only "
+ "boolean options, not taking any action");
+ tmp_bool = 1;
+ }
+ if (!tmp_bool) {
+ priv->nodelay = 0;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "disabling nodelay");
+ }
}
- if (priv->ssl_ca_list) {
- GF_FREE(priv->ssl_ca_list);
+
+ optstr = NULL;
+ if (dict_get_str (this->options, "tcp-window-size",
+ &optstr) == 0) {
+ if (gf_string2uint64 (optstr, &windowsize) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid number format: %s", optstr);
+ return -1;
+ }
}
- GF_FREE(priv);
- return -1;
+
+ priv->windowsize = (int)windowsize;
+
+ optstr = NULL;
+ /* Enable Keep-alive by default. */
+ priv->keepalive = 1;
+ priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL;
+ priv->keepaliveidle = GF_KEEPALIVE_TIME;
+ priv->keepalivecnt = GF_KEEPALIVE_COUNT;
+ if (dict_get_str (this->options, "transport.socket.keepalive",
+ &optstr) == 0) {
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'transport.socket.keepalive' takes only "
+ "boolean options, not taking any action");
+ tmp_bool = 1;
+ }
+
+ if (!tmp_bool)
+ priv->keepalive = 0;
+ }
+
+ if (dict_get_int32 (this->options, "transport.tcp-user-timeout",
+ &(priv->timeout)) != 0)
+ priv->timeout = timeout;
+ gf_log (this->name, GF_LOG_DEBUG, "Configued "
+ "transport.tcp-user-timeout=%d", priv->timeout);
+
+ if (dict_get_int32 (this->options,
+ "transport.socket.keepalive-time",
+ &(priv->keepaliveidle)) != 0) {
+ priv->keepaliveidle = keepaliveidle;
+ }
+
+ if (dict_get_int32 (this->options,
+ "transport.socket.keepalive-interval",
+ &(priv->keepaliveintvl)) != 0) {
+ priv->keepaliveintvl = keepaliveintvl;
+ }
+
+ if (dict_get_int32 (this->options, "transport.socket.keepalive-count",
+ &(priv->keepalivecnt)) != 0)
+ priv->keepalivecnt = keepalivecnt;
+ gf_log (this->name, GF_LOG_DEBUG, "Reconfigued "
+ "transport.keepalivecnt=%d", keepalivecnt);
+
+ if (dict_get_uint32 (this->options,
+ "transport.listen-backlog",
+ &backlog) != 0) {
+ backlog = GLUSTERFS_SOCKET_LISTEN_BACKLOG;
+ }
+ priv->backlog = backlog;
+
+ optstr = NULL;
+
+ /* Check if socket read failures are to be logged */
+ priv->read_fail_log = 1;
+ if (dict_get (this->options, "transport.socket.read-fail-log")) {
+ optstr = data_to_str (dict_get (this->options,
+ "transport.socket.read-fail-log"));
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "'transport.socket.read-fail-log' takes only "
+ "boolean options; logging socket read fails");
+ } else if (tmp_bool == _gf_false) {
+ priv->read_fail_log = 0;
+ }
+ }
+
+ priv->windowsize = (int)windowsize;
+
+ priv->ssl_enabled = _gf_false;
+ if (dict_get_str(this->options, SSL_ENABLED_OPT, &optstr) == 0) {
+ if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid value given for ssl-enabled boolean");
+ }
+ }
+ priv->mgmt_ssl = this->ctx->secure_mgmt;
+ priv->srvr_ssl = this->ctx->secure_srvr;
+
+ ssl_setup_connection_params(this);
+out:
+ this->private = priv;
+ return 0;
}
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index ccc2a84cb35..fdfc20774a8 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -248,12 +248,36 @@ typedef struct {
char *ssl_ca_list;
pthread_t thread;
int pipe[2];
- gf_boolean_t own_thread;
- gf_boolean_t own_thread_done;
- ot_state_t ot_state;
- uint32_t ot_gen;
gf_boolean_t is_server;
int log_ctr;
+ gf_boolean_t ssl_accepted; /* To indicate SSL_accept() */
+ gf_boolean_t ssl_connected;/* or SSL_connect() has been
+ * been completed on this socket.
+ * These are valid only when
+ * use_ssl is true.
+ */
+ /* SSL_CTX is created for each transport. Since we are now using non-
+ * blocking mechanism for SSL_accept() and SSL_connect(), the SSL
+ * context is created on the first EPOLLIN event which may lead to
+ * SSL_ERROR_WANT_READ/SSL_ERROR_WANT_WRITE and may not complete the
+ * SSL connection at the first attempt.
+ * ssl_context_created is a flag to note that we've created the SSL
+ * context for the connection so that we don't blindly create any more
+ * while !ssl_accepted or !ssl_connected.
+ */
+ gf_boolean_t ssl_context_created;
+ gf_boolean_t accepted; /* explicit flag to be set in
+ * socket_event_handler() for
+ * newly accepted socket
+ */
+
+ /* ssl_error_required is used only during the SSL connection setup
+ * phase.
+ * It holds the error code returned by SSL_get_error() and is used to
+ * arm the epoll event set for the required event for the specific fd.
+ */
+ int ssl_error_required;
+
GF_REF_DECL; /* refcount to keep track of socket_poller
threads */
struct {