diff options
author | Milind Changire <mchangir@redhat.com> | 2018-01-05 11:17:46 +0530 |
---|---|---|
committer | Raghavendra G <rgowdapp@redhat.com> | 2018-01-07 03:55:51 +0000 |
commit | 9b12157fea5ac054106ed14f9d91cdb8bad665c7 (patch) | |
tree | d7b6701996424111ce8e39c5f2d03239830fae3c /rpc/rpc-transport/socket/src/socket.c | |
parent | 515a832de0e761639b1d076a59bf918070ec3130 (diff) |
Revert "rpc: merge ssl infra with epoll infra"
This reverts commit 56e5fdae74845dfec0ff7ad0c8fee77695d36ad5.
Change-Id: Ia62cee5440bbe8e23f5da9cff692d792091d544a
Signed-off-by: Milind Changire <mchangir@redhat.com>
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 1569 |
1 files changed, 762 insertions, 807 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 7ffebad555f..65d0b641333 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -90,7 +90,6 @@ typedef int SSL_unary_func (SSL *); typedef int SSL_trinary_func (SSL *, void *, int); -static int ssl_setup_connection_params(rpc_transport_t *this); #define __socket_proto_reset_pending(priv) do { \ struct gf_sock_incoming_frag *frag; \ @@ -189,63 +188,18 @@ struct socket_connect_error_state_ { typedef struct socket_connect_error_state_ socket_connect_error_state_t; static int socket_init (rpc_transport_t *this); -static int __socket_nonblock (int fd); - -static void -socket_dump_info (struct sockaddr *sa, int is_server, int is_ssl, int sock, - char *log_domain, char *log_label) -{ - char addr_buf[INET6_ADDRSTRLEN+1] = {0, }; - char *addr = NULL; - char *peer_type = NULL; - int af = sa->sa_family; - int so_error = -1; - socklen_t slen = sizeof(so_error); - - if (af == AF_UNIX) { - addr = ((struct sockaddr_un *)(sa))->sun_path; - } else { - if (af == AF_INET6) { - struct sockaddr_in6 *sin6 = - (struct sockaddr_in6 *)(sa); - - inet_ntop (af, &sin6->sin6_addr, addr_buf, - sizeof (addr_buf)); - addr = addr_buf; - } else { - struct sockaddr_in *sin = - (struct sockaddr_in *)(sa); - - inet_ntop (af, &sin->sin_addr, addr_buf, - sizeof (addr_buf)); - addr = addr_buf; - } - } - if (is_server) - peer_type = "server"; - else - peer_type = "client"; - - getsockopt (sock, SOL_SOCKET, SO_ERROR, &so_error, &slen); - - gf_log (log_domain, GF_LOG_TRACE, - "$$$ %s: %s (af:%d,sock:%d) %s %s (errno:%d:%s)", - peer_type, log_label, af, sock, addr, - (is_ssl ? "SSL" : "non-SSL"), - so_error, strerror (so_error)); -} static void ssl_dump_error_stack (const char *caller) { unsigned long errnum = 0; - char errbuf[120] = {0,}; + char errbuf[120] = {0, }; /* OpenSSL docs explicitly give 120 as the error-string length. */ while ((errnum = ERR_get_error())) { - ERR_error_string(errnum,errbuf); - gf_log(caller,GF_LOG_ERROR," %s",errbuf); + ERR_error_string(errnum, errbuf); + gf_log(caller, GF_LOG_ERROR, " %s", errbuf); } } @@ -253,119 +207,128 @@ static int ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func) { int r = (-1); + struct pollfd pfd = {-1, }; socket_private_t *priv = NULL; - GF_VALIDATE_OR_GOTO(this->name,this->private,out); + GF_VALIDATE_OR_GOTO(this->name, this->private, out); priv = this->private; - if (buf) { - if (priv->connected == -1) { + for (;;) { + if (buf) { + if (priv->connected == -1) { + /* + * Fields in the SSL structure (especially + * the BIO pointers) are not valid at this + * point, so we'll segfault if we pass them + * to SSL_read/SSL_write. + */ + gf_log(this->name, GF_LOG_INFO, + "lost connection in %s", __func__); + break; + } + r = func(priv->ssl_ssl, buf, len); + } else { /* - * Fields in the SSL structure (especially - * the BIO pointers) are not valid at this - * point, so we'll segfault if we pass them - * to SSL_read/SSL_write. + * We actually need these functions to get to + * priv->connected == 1. */ - gf_log (this->name, GF_LOG_INFO, - "lost connection in %s", __func__); - return -1; - } - r = func (priv->ssl_ssl, buf, len); - } else { - /* - * We actually need these functions to get to - * priv->connected == 1. - */ - r = ((SSL_unary_func *)func)(priv->ssl_ssl); - } - switch (SSL_get_error (priv->ssl_ssl, r)) { - case SSL_ERROR_NONE: - /* fall through */ - case SSL_ERROR_WANT_READ: - /* fall through */ - case SSL_ERROR_WANT_WRITE: - errno = EAGAIN; - return r; + r = ((SSL_unary_func *)func)(priv->ssl_ssl); + } + switch (SSL_get_error(priv->ssl_ssl, r)) { + case SSL_ERROR_NONE: + return r; + case SSL_ERROR_WANT_READ: + /* If we are attempting to connect/accept then we + * should wait here on the poll, for the SSL + * (re)negotiation to complete, else we would error out + * on the accept/connect. + * If we are here when attempting to read/write + * then we return r (or -1) as the socket is always + * primed for the read event, and it would eventually + * call one of the SSL routines */ + /* NOTE: Only way to determine this is a accept/connect + * is to examine buf or func, which is not very + * clean */ + if ((func == (SSL_trinary_func *)SSL_read) + || (func == (SSL_trinary_func *) SSL_write)) { + return r; + } - case SSL_ERROR_SYSCALL: - /* Sometimes SSL_ERROR_SYSCALL returns errno as - * EAGAIN. In such a case we should reattempt operation - * So, for now, just return the return value and the - * errno as is. - */ - gf_log (this->name, GF_LOG_DEBUG, - "syscall error (probably remote disconnect) " - "errno:%d:%s", errno, strerror(errno)); - return r; - default: - errno = EIO; - goto out; /* "break" would just loop again */ + pfd.fd = priv->sock; + pfd.events = POLLIN; + if (poll(&pfd, 1, -1) < 0) { + gf_log(this->name, GF_LOG_ERROR, "poll error %d", + errno); + } + break; + case SSL_ERROR_WANT_WRITE: + if ((func == (SSL_trinary_func *)SSL_read) + || (func == (SSL_trinary_func *) SSL_write)) { + errno = EAGAIN; + return r; + } + pfd.fd = priv->sock; + pfd.events = POLLOUT; + if (poll(&pfd, 1, -1) < 0) { + gf_log(this->name, GF_LOG_ERROR, "poll error %d", + errno); + } + break; + case SSL_ERROR_SYSCALL: + /* This is what we get when remote disconnects. */ + gf_log(this->name, GF_LOG_DEBUG, + "syscall error (probably remote disconnect)"); + errno = ENODATA; + goto out; + default: + errno = EIO; + goto out; /* "break" would just loop again */ + } } out: return -1; } -#define ssl_read_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read) -#define ssl_write_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write) - +#define ssl_connect_one(t) ssl_do((t), NULL, 0, (SSL_trinary_func *)SSL_connect) +#define ssl_accept_one(t) ssl_do((t), NULL, 0, (SSL_trinary_func *)SSL_accept) +#define ssl_read_one(t, b, l) ssl_do((t), (b), (l), (SSL_trinary_func *)SSL_read) +#define ssl_write_one(t, b, l) ssl_do((t), (b), (l), (SSL_trinary_func *)SSL_write) -int -ssl_setup_connection_prefix (rpc_transport_t *this) +static char * +ssl_setup_connection (rpc_transport_t *this, int server) { + X509 *peer = NULL; + char peer_CN[256] = ""; int ret = -1; socket_private_t *priv = NULL; - GF_VALIDATE_OR_GOTO(this->name,this->private,done); - + GF_VALIDATE_OR_GOTO(this->name, this->private, done); priv = this->private; - if (ssl_setup_connection_params (this) < 0) { - gf_log (this->name, GF_LOG_TRACE, - "+ ssl_setup_connection_params() failed!"); - goto done; - } else { - gf_log (this->name, GF_LOG_TRACE, - "+ ssl_setup_connection_params() done!"); - } - - priv->ssl_error_required = SSL_ERROR_NONE; - priv->ssl_connected = _gf_false; - priv->ssl_accepted = _gf_false; - priv->ssl_context_created = _gf_false; - priv->ssl_ssl = SSL_new(priv->ssl_ctx); if (!priv->ssl_ssl) { - gf_log(this->name,GF_LOG_ERROR,"SSL_new failed"); + gf_log(this->name, GF_LOG_ERROR, "SSL_new failed"); ssl_dump_error_stack(this->name); goto done; } - priv->ssl_sbio = BIO_new_socket(priv->sock, BIO_NOCLOSE); if (!priv->ssl_sbio) { - gf_log(this->name,GF_LOG_ERROR,"BIO_new_socket failed"); + gf_log(this->name, GF_LOG_ERROR, "BIO_new_socket failed"); ssl_dump_error_stack(this->name); goto free_ssl; } - SSL_set_bio (priv->ssl_ssl, priv->ssl_sbio, priv->ssl_sbio); - ret = 0; - goto done; + SSL_set_bio(priv->ssl_ssl, priv->ssl_sbio, priv->ssl_sbio); -free_ssl: - SSL_free(priv->ssl_ssl); - priv->ssl_ssl = NULL; -done: - return ret; -} - -static char * -ssl_setup_connection_postfix (rpc_transport_t *this) -{ - X509 *peer = NULL; - char peer_CN[256] = ""; - socket_private_t *priv = NULL; + if (server) { + ret = ssl_accept_one(this); + } else { + ret = ssl_connect_one(this); + } - GF_VALIDATE_OR_GOTO(this->name, this->private, done); - priv = this->private; + /* Make sure _the call_ succeeded. */ + if (ret < 0) { + goto ssl_error; + } /* Make sure _SSL verification_ succeeded, yielding an identity. */ if (SSL_get_verify_result(priv->ssl_ssl) != X509_V_OK) { @@ -376,8 +339,6 @@ ssl_setup_connection_postfix (rpc_transport_t *this) goto ssl_error; } - SSL_set_mode(priv->ssl_ssl, SSL_MODE_ENABLE_PARTIAL_WRITE); - /* Finally, everything seems OK. */ X509_NAME_get_text_by_NID(X509_get_subject_name(peer), NID_commonName, peer_CN, sizeof(peer_CN)-1); @@ -394,7 +355,7 @@ ssl_error: "SSL connect error (client: %s) (server: %s)", this->peerinfo.identifier, this->myinfo.identifier); ssl_dump_error_stack(this->name); - +free_ssl: SSL_free(priv->ssl_ssl); priv->ssl_ssl = NULL; done: @@ -402,86 +363,6 @@ done: } -int -ssl_complete_connection (rpc_transport_t *this) -{ - int ret = -1; /* 1 : implies go back to epoll_wait() - * 0 : implies successful ssl connection - * -1: implies continue processing current event - * as if EPOLLERR has been encountered - */ - char *cname = NULL; - int r = -1; - int ssl_error = -1; - socket_private_t *priv = NULL; - - - priv = this->private; - - if (priv->is_server) { - r = SSL_accept (priv->ssl_ssl); - } else { - r = SSL_connect (priv->ssl_ssl); - } - - ssl_error = SSL_get_error (priv->ssl_ssl, r); - priv->ssl_error_required = ssl_error; - - switch (ssl_error) { - case SSL_ERROR_NONE: - cname = ssl_setup_connection_postfix (this); - if (!cname) { - /* we've failed to get the cname so - * we must close the connection - * - * treat this as EPOLLERR - */ - gf_log (this->name, GF_LOG_TRACE, - "error getting cname"); - errno = ECONNRESET; - ret = -1; - } else { - this->ssl_name = cname; - if (priv->is_server) { - priv->ssl_accepted = _gf_true; - gf_log (this->name, GF_LOG_TRACE, - "ssl_accepted!"); - } else { - priv->ssl_connected = _gf_true; - gf_log (this->name, GF_LOG_TRACE, - "ssl_connected!"); - } - ret = 0; - } - break; - - case SSL_ERROR_WANT_READ: - /* fall through */ - case SSL_ERROR_WANT_WRITE: - errno = EAGAIN; - break; - - case SSL_ERROR_SYSCALL: - /* Sometimes SSL_ERROR_SYSCALL returns with errno as EAGAIN - * So, we should retry the operation. - * So, for now, we just return the return value and errno as is. - */ - break; - - case SSL_ERROR_SSL: - /* treat this as EPOLLERR */ - ret = -1; - break; - - default: - /* treat this as EPOLLERR */ - errno = EIO; - ret = -1; - break; - } - return ret; -} - static void ssl_teardown_connection (socket_private_t *priv) { @@ -489,21 +370,7 @@ ssl_teardown_connection (socket_private_t *priv) SSL_shutdown(priv->ssl_ssl); SSL_clear(priv->ssl_ssl); SSL_free(priv->ssl_ssl); - SSL_CTX_free(priv->ssl_ctx); priv->ssl_ssl = NULL; - priv->ssl_ctx = NULL; - if (priv->ssl_private_key) { - GF_FREE (priv->ssl_private_key); - priv->ssl_private_key = NULL; - } - if (priv->ssl_own_cert) { - GF_FREE (priv->ssl_own_cert); - priv->ssl_own_cert = NULL; - } - if (priv->ssl_ca_list) { - GF_FREE (priv->ssl_ca_list); - priv->ssl_ca_list = NULL; - } } priv->use_ssl = _gf_false; } @@ -520,10 +387,8 @@ __socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount) sock = priv->sock; if (priv->use_ssl) { - gf_log (this->name, GF_LOG_TRACE, "***** reading over SSL"); ret = ssl_read_one (this, opvector->iov_base, opvector->iov_len); } else { - gf_log (this->name, GF_LOG_TRACE, "***** reading over non-SSL"); ret = sys_readv (sock, opvector, IOV_MIN(opcount)); } @@ -661,7 +526,7 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, while (opcount > 0) { if (opvector->iov_len == 0) { - gf_log(this->name,GF_LOG_DEBUG, + gf_log(this->name, GF_LOG_DEBUG, "would have passed zero length to read/write"); ++opvector; --opcount; @@ -675,8 +540,6 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, * non-SSL might be insecure, so just fail it outright. */ ret = -1; - gf_log (this->name, GF_LOG_TRACE, - "### no priv->ssl_ssl yet; ret = -1;"); } else if (write) { if (priv->use_ssl) { ret = ssl_write_one (this, opvector->iov_base, @@ -692,10 +555,9 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, this->total_bytes_write += ret; } else { ret = __socket_cached_read (this, opvector, opcount); + if (ret == 0) { - gf_log (this->name, GF_LOG_DEBUG, - "EOF on socket (errno:%d:%s)", - errno, strerror (errno)); + gf_log(this->name, GF_LOG_DEBUG, "EOF on socket"); errno = ENODATA; ret = -1; } @@ -744,7 +606,7 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, while (moved < ret) { if (!opcount) { - gf_log(this->name,GF_LOG_DEBUG, + gf_log(this->name, GF_LOG_DEBUG, "ran out of iov, moved %d/%d", moved, ret); goto ran_out; @@ -855,8 +717,9 @@ __socket_disconnect (rpc_transport_t *this) priv = this->private; - gf_log (this->name, GF_LOG_TRACE, "disconnecting %p, sock=%d", - this, priv->sock); + gf_log (this->name, GF_LOG_TRACE, + "disconnecting %p, state=%u gen=%u sock=%d", this, + priv->ot_state, priv->ot_gen, priv->sock); if (priv->sock != -1) { gf_log_callingfn (this->name, GF_LOG_TRACE, @@ -867,6 +730,16 @@ __socket_disconnect (rpc_transport_t *this) "__socket_teardown_connection () failed: %s", strerror (errno)); } + + if (priv->own_thread) { + /* + * Without this, reconnect (= disconnect + connect) + * won't work except by accident. + */ + gf_log (this->name, GF_LOG_TRACE, + "OT_PLEASE_DIE on %p", this); + priv->ot_state = OT_PLEASE_DIE; + } } out: @@ -1100,22 +973,7 @@ __socket_reset (rpc_transport_t *this) priv->sock = -1; priv->idx = -1; priv->connected = -1; - priv->ssl_connected = _gf_false; - priv->ssl_accepted = _gf_false; - priv->ssl_context_created = _gf_false; - if (priv->ssl_private_key) { - GF_FREE (priv->ssl_private_key); - priv->ssl_private_key = NULL; - } - if (priv->ssl_own_cert) { - GF_FREE (priv->ssl_own_cert); - priv->ssl_own_cert = NULL; - } - if (priv->ssl_ca_list) { - GF_FREE (priv->ssl_ca_list); - priv->ssl_ca_list = NULL; - } out: return; } @@ -1251,6 +1109,8 @@ static int __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct) { int ret = -1; + socket_private_t *priv = NULL; + char a_byte = 0; ret = __socket_writev (this, entry->pending_vector, entry->pending_count, @@ -1261,6 +1121,18 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct) /* current entry was completely written */ GF_ASSERT (entry->pending_count == 0); __socket_ioq_entry_free (entry); + priv = this->private; + if (priv->own_thread) { + /* + * The pipe should only remain readable if there are + * more entries after this, so drain the byte + * representing this entry. + */ + if (!direct && sys_read (priv->pipe[0], &a_byte, 1) < 1) { + gf_log(this->name, GF_LOG_WARNING, + "read error on pipe"); + } + } } return ret; @@ -1289,7 +1161,7 @@ __socket_ioq_churn (rpc_transport_t *this) break; } - if (list_empty (&priv->ioq)) { + if (!priv->own_thread && list_empty (&priv->ioq)) { /* all pending writes done, not interested in POLLOUT */ priv->idx = event_select_on (this->ctx->event_pool, priv->sock, priv->idx, -1, 0); @@ -2444,9 +2316,15 @@ socket_event_poll_in (rpc_transport_t *this, gf_boolean_t notify_handled) priv->gen); if (pollin) { + priv->ot_state = OT_CALLBACK; + ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, pollin); + if (priv->ot_state == OT_CALLBACK) { + priv->ot_state = OT_RUNNING; + } + rpc_transport_pollin_destroy (pollin); pthread_mutex_lock (&priv->notify.lock); @@ -2539,355 +2417,303 @@ out: static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait); -/* socket_is_connected() is for use only in socket_event_handler() */ -static inline gf_boolean_t -socket_is_connected (rpc_transport_t *this) +/* reads rpc_requests during pollin */ +static int +socket_event_handler (int fd, int idx, int gen, void *data, + int poll_in, int poll_out, int poll_err) { + rpc_transport_t *this = NULL; socket_private_t *priv = NULL; + int ret = -1; + glusterfs_ctx_t *ctx = NULL; + gf_boolean_t socket_closed = _gf_false, notify_handled = _gf_false; - priv = this->private; - - if (priv->use_ssl) { - return priv->is_server ? priv->ssl_accepted : - priv->ssl_connected; - } else { - return priv->is_server ? priv->accepted : - priv->connected; - } -} - -static void -ssl_rearm_event_fd (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - glusterfs_ctx_t *ctx = NULL; - int idx = -1; - int gen = -1; - int fd = -1; - - priv = this->private; - ctx = this->ctx; - - idx = priv->idx; - gen = priv->gen; - fd = priv->sock; - - if (priv->ssl_error_required == SSL_ERROR_WANT_READ) - event_select_on (ctx->event_pool, fd, idx, 1, -1); - if (priv->ssl_error_required == SSL_ERROR_WANT_WRITE) - event_select_on (ctx->event_pool, fd, idx, -1, 1); - event_handled (ctx->event_pool, fd, idx, gen); -} + this = data; -static int -ssl_handle_server_connection_attempt (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - glusterfs_ctx_t *ctx = NULL; - int idx = -1; - int gen = -1; - int ret = -1; - int fd = -1; + GF_VALIDATE_OR_GOTO ("socket", this, out); + GF_VALIDATE_OR_GOTO ("socket", this->private, out); + GF_VALIDATE_OR_GOTO ("socket", this->xl, out); + THIS = this->xl; priv = this->private; ctx = this->ctx; - idx = priv->idx; - gen = priv->gen; - fd = priv->sock; + pthread_mutex_lock (&priv->in_lock); + pthread_mutex_lock (&priv->out_lock); + { + priv->idx = idx; + priv->gen = gen; + } + pthread_mutex_unlock (&priv->out_lock); + pthread_mutex_unlock (&priv->in_lock); - if (!priv->ssl_context_created) { - ret = ssl_setup_connection_prefix (this); - if (ret < 0) { - gf_log (this->name, GF_LOG_TRACE, - "> ssl_setup_connection_prefix() failed!"); + if (priv->connected != 1) { + if (priv->connect_failed) { + /* connect failed with some other error than + EINPROGRESS or ENOENT, so nothing more to do, fail + reading/writing anything even if poll_in or poll_out + is set */ + gf_log ("transport", GF_LOG_DEBUG, + "connect failed with some other error than " + "EINPROGRESS or ENOENT, so nothing more to " + "do; disconnecting socket"); + ret = socket_disconnect (this, _gf_false); + + /* Force ret to be -1, as we are officially done with + this socket */ ret = -1; - goto out; } else { - priv->ssl_context_created = _gf_true; + ret = socket_connect_finish (this); } - } - ret = ssl_complete_connection (this); - if (ret == 0) { - /* nothing to do */ - event_select_on (ctx->event_pool, fd, idx, 1, 0); - event_handled (ctx->event_pool, fd, idx, gen); - ret = 1; } else { - if (errno == EAGAIN) { - ssl_rearm_event_fd (this); - ret = 1; - } else { - ret = -1; - gf_log (this->name, GF_LOG_TRACE, - "ssl_complete_connection returned error"); - } + ret = 0; } -out: - return ret; -} -static int -ssl_handle_client_connection_attempt (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - glusterfs_ctx_t *ctx = NULL; - int idx = -1; - int ret = -1; - int fd = -1; + if (!ret && poll_out) { + ret = socket_event_poll_out (this); + } - priv = this->private; - ctx = this->ctx; + if (!ret && poll_in) { + ret = socket_event_poll_in (this, !poll_err); + notify_handled = _gf_true; + } - idx = priv->idx; - fd = priv->sock; + if ((ret < 0) || poll_err) { + /* Logging has happened already in earlier cases */ + gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG), + "EPOLLERR - disconnecting now"); - /* SSL client */ - if (priv->connect_failed) { - gf_log (this->name, GF_LOG_TRACE, - ">>> disconnecting SSL socket"); - ret = socket_disconnect (this, _gf_false); - /* Force ret to be -1, as we are officially done with - this socket */ - ret = -1; - } else { - if (!priv->ssl_context_created) { - ret = ssl_setup_connection_prefix (this); - if (ret < 0) { - gf_log (this->name, GF_LOG_TRACE, - "> ssl_setup_connection_prefix() " - "failed!"); - ret = -1; - goto out; - } else { - priv->ssl_context_created = _gf_true; - } - } - ret = ssl_complete_connection (this); - if (ret == 0) { - ret = socket_connect_finish (this); - event_select_on (ctx->event_pool, fd, idx, 1, 0); - gf_log (this->name, GF_LOG_TRACE, - ">>> completed client connect"); - } else { - if (errno == EAGAIN) { - gf_log (this->name, GF_LOG_TRACE, - ">>> retrying client connect 2"); - ssl_rearm_event_fd (this); - ret = 1; - } else { - /* this is a connection failure */ - ret = socket_connect_finish (this); - gf_log (this->name, GF_LOG_TRACE, - "ssl_complete_connection " - "returned error"); - ret = -1; - } - } + socket_closed = socket_event_poll_err (this, gen, idx); + + if (socket_closed) + rpc_transport_unref (this); + + } else if (!notify_handled) { + event_handled (ctx->event_pool, fd, idx, gen); } + out: return ret; } -static int -socket_handle_client_connection_attempt (rpc_transport_t *this) +static int poll_err_cnt; +static void * +socket_poller (void *ctx) { - socket_private_t *priv = NULL; - glusterfs_ctx_t *ctx = NULL; - int idx = -1; - int gen = -1; - int ret = -1; - int fd = -1; + rpc_transport_t *this = ctx; + socket_private_t *priv = this->private; + struct pollfd pfd[2] = {{0, }, }; + gf_boolean_t to_write = _gf_false; + int ret = 0; + uint32_t gen = 0; + char *cname = NULL; - priv = this->private; - ctx = this->ctx; + GF_ASSERT (this); + /* Set THIS early on in the life of this thread, instead of setting it + * conditionally + */ + THIS = this->xl; - idx = priv->idx; - gen = priv->gen; - fd = priv->sock; - - /* non-SSL client */ - if (priv->connect_failed) { - /* connect failed with some other error than - EINPROGRESS or ENOENT, so nothing more to - do, fail reading/writing anything even if - poll_in or poll_out - is set - */ - gf_log ("transport", GF_LOG_DEBUG, - "connect failed with some other error " - "than EINPROGRESS or ENOENT, so " - "nothing more to do; disconnecting " - "socket"); - (void)socket_disconnect (this, _gf_false); - - /* Force ret to be -1, as we are officially - * done with this socket - */ - ret = -1; - } else { - ret = socket_connect_finish (this); - gf_log (this->name, GF_LOG_TRACE, - "socket_connect_finish() returned %d", - ret); - if (ret == 0 || ret == 1) { - /* we don't want to do any reads or - * writes on the connection yet in - * socket_event_handler, so just - * return 1 - */ - ret = 1; - event_handled (ctx->event_pool, fd, idx, gen); - } + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_DEBUG, "socket_poller is exiting " + "because socket state is OT_PLEASE_DIE"); + goto err; } - return ret; -} -static int -socket_complete_connection (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - glusterfs_ctx_t *ctx = NULL; - int idx = -1; - int gen = -1; - int ret = -1; - int fd = -1; - - priv = this->private; - ctx = this->ctx; - - idx = priv->idx; - gen = priv->gen; - fd = priv->sock; + priv->ot_state = OT_RUNNING; if (priv->use_ssl) { - if (priv->is_server) { - ret = ssl_handle_server_connection_attempt (this); - } else { - ret = ssl_handle_client_connection_attempt (this); + cname = ssl_setup_connection(this, priv->connected); + if (!cname) { + gf_log (this->name, GF_LOG_ERROR, "%s setup failed", + priv->connected ? "server" : "client"); + goto err; } - } else { - if (priv->is_server) { - /* non-SSL server: nothing much to do - * connection has already been accepted in - * socket_server_event_handler() - */ - priv->accepted = _gf_true; - event_handled (ctx->event_pool, fd, idx, gen); - ret = 1; + if (priv->connected) { + this->ssl_name = cname; } else { - ret = socket_handle_client_connection_attempt (this); + GF_FREE(cname); } } - return ret; -} -/* reads rpc_requests during pollin */ -static int -socket_event_handler (int fd, int idx, int gen, void *data, - int poll_in, int poll_out, int poll_err) -{ - rpc_transport_t *this = NULL; - socket_private_t *priv = NULL; - int ret = -1; - glusterfs_ctx_t *ctx = NULL; - gf_boolean_t socket_closed = _gf_false, notify_handled = _gf_false; - - - this = data; - - GF_VALIDATE_OR_GOTO ("socket", this, out); - GF_VALIDATE_OR_GOTO ("socket", this->private, out); - GF_VALIDATE_OR_GOTO ("socket", this->xl, out); - - THIS = this->xl; - priv = this->private; - ctx = this->ctx; - - pthread_mutex_lock (&priv->in_lock); - pthread_mutex_lock (&priv->out_lock); - { - priv->idx = idx; - priv->gen = gen; + if (!priv->bio) { + ret = __socket_nonblock (priv->sock); + if (ret == -1) { + gf_log (this->name, GF_LOG_WARNING, + "NBIO on %d failed (%s)", + priv->sock, strerror (errno)); + goto err; + } } - pthread_mutex_unlock (&priv->out_lock); - pthread_mutex_unlock (&priv->in_lock); - gf_log (this->name, GF_LOG_TRACE, "%s (sock:%d) in:%d, out:%d, err:%d", - (priv->is_server ? "server" : "client"), - priv->sock, poll_in, poll_out, poll_err); + if (priv->connected == 0) { + ret = socket_connect_finish (this); + if (ret != 0) { + gf_log (this->name, GF_LOG_WARNING, + "asynchronous socket_connect_finish failed"); + } + } - if (!poll_err) { - if (!socket_is_connected (this)) { - gf_log (this->name, GF_LOG_TRACE, - "%s (sock:%d) socket is not connected, " - "completing connection", - (priv->is_server ? "server" : "client"), - priv->sock); + ret = rpc_transport_notify (this->listener, + RPC_TRANSPORT_ACCEPT, this); + if (ret != 0) { + gf_log (this->name, GF_LOG_WARNING, + "asynchronous rpc_transport_notify failed"); + } - ret = socket_complete_connection (this); + gen = priv->ot_gen; + for (;;) { + pthread_mutex_lock(&priv->out_lock); + to_write = !list_empty(&priv->ioq); + pthread_mutex_unlock(&priv->out_lock); + pfd[0].fd = priv->pipe[0]; + pfd[0].events = POLL_MASK_ERROR; + pfd[0].revents = 0; + pfd[1].fd = priv->sock; + pfd[1].events = POLL_MASK_INPUT | POLL_MASK_ERROR; + pfd[1].revents = 0; + if (to_write) { + pfd[1].events |= POLL_MASK_OUTPUT; + } else { + pfd[0].events |= POLL_MASK_INPUT; + } + if (poll(pfd, 2, -1) < 0) { + gf_log(this->name, GF_LOG_ERROR, "poll failed"); + break; + } + if (pfd[0].revents & POLL_MASK_ERROR) { + gf_log(this->name, GF_LOG_ERROR, + "poll error on pipe"); + break; + } - gf_log (this->name, GF_LOG_TRACE, "(sock:%d) " - "socket_complete_connection() returned %d", - priv->sock, ret); + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_DEBUG, + "OT_PLEASE_DIE on %p (exiting socket_poller)", + this); + break; + } - if (ret > 0) { + if (pfd[1].revents & POLL_MASK_INPUT) { + ret = socket_event_poll_in(this, 0); + if (ret >= 0) { + /* Suppress errors while making progress. */ + pfd[1].revents &= ~POLL_MASK_ERROR; + } else if (errno == ENOTCONN) { + ret = 0; + } + if (priv->ot_state == OT_PLEASE_DIE) { gf_log (this->name, GF_LOG_TRACE, - "(sock:%d) returning to wait on socket", - priv->sock); - return 0; + "OT_IDLE on %p (input request)", + this); + break; + } + } else if (pfd[1].revents & POLL_MASK_OUTPUT) { + ret = socket_event_poll_out(this); + if (ret >= 0) { + /* Suppress errors while making progress. */ + pfd[1].revents &= ~POLL_MASK_ERROR; + } else if (errno == ENOTCONN) { + ret = 0; + } + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_TRACE, + "OT_IDLE on %p (output request)", + this); + break; } } else { - char *sock_type = (priv->is_server ? "Server" : - "Client"); - - gf_log (this->name, GF_LOG_TRACE, - "%s socket (%d) is already connected", - sock_type, priv->sock); + /* + * This usually means that we left poll() because + * somebody pushed a byte onto our pipe. That wakeup + * is why the pipe is there, but once awake we can do + * all the checking we need on the next iteration. + */ ret = 0; } + if (pfd[1].revents & POLL_MASK_ERROR) { + gf_log(this->name, GF_LOG_ERROR, + "poll error on socket"); + break; + } + if (ret < 0) { + GF_LOG_OCCASIONALLY (poll_err_cnt, this->name, + GF_LOG_ERROR, + "socket_poller %s failed (%s)", + this->peerinfo.identifier, + strerror (errno)); + break; + } + if (priv->ot_gen != gen) { + gf_log (this->name, GF_LOG_TRACE, + "generation mismatch, my %u != %u", + gen, priv->ot_gen); + return NULL; + } } - if (!ret && poll_out) { - ret = socket_event_poll_out (this); - gf_log (this->name, GF_LOG_TRACE, "(sock:%d) " - "socket_event_poll_out returned %d", priv->sock, ret); - } +err: + /* All (and only) I/O errors should come here. */ + pthread_mutex_lock(&priv->in_lock); + pthread_mutex_lock(&priv->out_lock); + { + gf_log (this->name, GF_LOG_TRACE, "disconnecting socket"); + __socket_teardown_connection (this); + sys_close (priv->sock); + priv->sock = -1; - if (!ret && poll_in) { - ret = socket_event_poll_in (this, !poll_err); - gf_log (this->name, GF_LOG_TRACE, "(sock:%d) " - "socket_event_poll_in returned %d", priv->sock, ret); - notify_handled = _gf_true; + sys_close (priv->pipe[0]); + sys_close (priv->pipe[1]); + priv->pipe[0] = -1; + priv->pipe[1] = -1; + + priv->ot_state = OT_IDLE; } + pthread_mutex_unlock(&priv->out_lock); + pthread_mutex_unlock(&priv->in_lock); - if ((ret < 0) || poll_err) { - struct sockaddr *sa = SA(&this->peerinfo.sockaddr); + rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); - if (priv->is_server && - SA(&this->myinfo.sockaddr)->sa_family == AF_UNIX) { - sa = SA(&this->myinfo.sockaddr); - } + GF_REF_PUT (priv); - socket_dump_info (sa, priv->is_server, priv->use_ssl, - priv->sock, this->name, - "disconnecting from"); + rpc_transport_unref (this); - /* Logging has happened already in earlier cases */ - gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG), - "EPOLLERR - disconnecting (sock:%d) (%s)", - priv->sock, (priv->use_ssl ? "SSL" : "non-SSL")); + return NULL; +} - socket_closed = socket_event_poll_err (this, gen, idx); - if (socket_closed) - rpc_transport_unref (this); +static int +socket_spawn (rpc_transport_t *this) +{ + socket_private_t *priv = this->private; + int ret = -1; + switch (priv->ot_state) { + case OT_IDLE: + case OT_PLEASE_DIE: + break; + default: + gf_log (this->name, GF_LOG_WARNING, + "refusing to start redundant poller"); + return ret; + } - } else if (!notify_handled) { - event_handled (ctx->event_pool, fd, idx, gen); + priv->ot_gen += 7; + priv->ot_state = OT_SPAWNING; + gf_log (this->name, GF_LOG_TRACE, + "spawning %p with gen %u", this, priv->ot_gen); + + GF_REF_GET (priv); + + /* Create thread after enable detach flag */ + + ret = gf_thread_create_detached (&priv->thread, socket_poller, this, + "spoller"); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "could not create poll thread"); + ret = -1; } -out: return ret; } @@ -2904,6 +2730,7 @@ socket_server_event_handler (int fd, int idx, int gen, void *data, socklen_t addrlen = sizeof (new_sockaddr); socket_private_t *new_priv = NULL; glusterfs_ctx_t *ctx = NULL; + char *cname = NULL; this = data; GF_VALIDATE_OR_GOTO ("socket", this, out); @@ -2994,10 +2821,6 @@ socket_server_event_handler (int fd, int idx, int gen, void *data, } get_transport_identifiers (new_trans); - gf_log (this->name, GF_LOG_TRACE, "XXX server:%s, client:%s", - new_trans->myinfo.identifier, - new_trans->peerinfo.identifier); - ret = socket_init(new_trans); if (ret != 0) { sys_close (new_sock); @@ -3033,16 +2856,23 @@ socket_server_event_handler (int fd, int idx, int gen, void *data, } new_priv->sock = new_sock; + new_priv->own_thread = priv->own_thread; - new_priv->ssl_enabled = priv->ssl_enabled; new_priv->ssl_ctx = priv->ssl_ctx; - new_priv->connected = 1; - new_priv->is_server = _gf_true; + if (new_priv->use_ssl && !new_priv->own_thread) { + cname = ssl_setup_connection(new_trans, 1); + if (!cname) { + gf_log(this->name, GF_LOG_ERROR, + "server setup failed"); + sys_close (new_sock); + GF_FREE (new_trans->name); + GF_FREE (new_trans); + goto out; + } + this->ssl_name = cname; + } - /* set O_NONBLOCK for plain text as well as ssl connections */ - if (!priv->bio) { - gf_log (this->name, GF_LOG_TRACE, - "### use non-blocking IO"); + if (!priv->bio && !priv->own_thread) { ret = __socket_nonblock (new_sock); if (ret == -1) { @@ -3056,13 +2886,29 @@ socket_server_event_handler (int fd, int idx, int gen, void *data, goto out; } } + /* - * This is the first ref on the newly accepted - * transport. + * In the own_thread case, this is used to + * indicate that we're initializing a server + * connection. */ + new_priv->connected = 1; + new_priv->is_server = _gf_true; rpc_transport_ref (new_trans); - { + if (new_priv->own_thread) { + if (pipe(new_priv->pipe) < 0) { + gf_log(this->name, GF_LOG_ERROR, + "could not create pipe"); + } + ret = socket_spawn(new_trans); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, + "could not spawn thread"); + sys_close (new_priv->pipe[0]); + sys_close (new_priv->pipe[1]); + } + } else { /* Take a ref on the new_trans to avoid * getting deleted when event_register() * causes socket_event_handler() to race @@ -3131,6 +2977,9 @@ out: if (ctx) event_handled (ctx->event_pool, fd, idx, gen); + if (cname && (cname != this->ssl_name)) { + GF_FREE(cname); + } return ret; } @@ -3140,12 +2989,36 @@ socket_disconnect (rpc_transport_t *this, gf_boolean_t wait) { socket_private_t *priv = NULL; int ret = -1; + char a_byte = 'r'; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; + if (wait && priv->own_thread) { + GF_REF_PUT (priv); + + pthread_mutex_lock (&priv->cond_lock); + { + /* Change the state to OT_PLEASE_DIE so that + * socket_poller can exit. */ + priv->ot_state = OT_PLEASE_DIE; + /* Write something into the pipe so that poller + * thread can wake up.*/ + if (sys_write (priv->pipe[1], &a_byte, 1) < 1) { + gf_log (this->name, GF_LOG_WARNING, + "write error on pipe"); + } + + /* Wait for socket_poller to exit */ + if (!priv->own_thread_done) + pthread_cond_wait (&priv->cond, + &priv->cond_lock); + } + pthread_mutex_unlock (&priv->cond_lock); + } + pthread_mutex_lock (&priv->in_lock); pthread_mutex_lock (&priv->out_lock); { @@ -3186,9 +3059,8 @@ socket_fix_ssl_opts (rpc_transport_t *this, socket_private_t *priv, "%s SSL for portmapper connection", priv->mgmt_ssl ? "enabling" : "disabling"); priv->use_ssl = priv->mgmt_ssl; - } - else if (priv->ssl_enabled && !priv->use_ssl) { - gf_log(this->name,GF_LOG_DEBUG, + } else if (priv->ssl_enabled && !priv->use_ssl) { + gf_log(this->name, GF_LOG_DEBUG, "re-enabling SSL for I/O connection"); priv->use_ssl = _gf_true; } @@ -3237,8 +3109,8 @@ socket_connect (rpc_transport_t *this, int port) gf_boolean_t refd = _gf_false; socket_connect_error_state_t *arg = NULL; pthread_t th_id = {0, }; + char *cname = NULL; gf_boolean_t ign_enoent = _gf_false; - gf_boolean_t connect_attempted = _gf_false; GF_VALIDATE_OR_GOTO ("socket", this, err); GF_VALIDATE_OR_GOTO ("socket", this->private, err); @@ -3255,6 +3127,7 @@ socket_connect (rpc_transport_t *this, int port) pthread_mutex_lock (&priv->in_lock); pthread_mutex_lock (&priv->out_lock); { + priv->own_thread_done = _gf_false; if (priv->sock != -1) { gf_log_callingfn (this->name, GF_LOG_TRACE, "connect () called on transport " @@ -3265,7 +3138,8 @@ socket_connect (rpc_transport_t *this, int port) } gf_log (this->name, GF_LOG_TRACE, - "connecting %p, sock=%d", this, priv->sock); + "connecting %p, state=%u gen=%u sock=%d", this, + priv->ot_state, priv->ot_gen, priv->sock); ret = socket_client_get_remote_sockaddr (this, &sock_union.sa, &sockaddr_len, &sa_family); @@ -3277,8 +3151,7 @@ socket_connect (rpc_transport_t *this, int port) if (sa_family == AF_UNIX) { priv->ssl_enabled = _gf_false; priv->mgmt_ssl = _gf_false; - } - else { + } else { if (port > 0) { sock_union.sin.sin_port = htons (port); } @@ -3376,9 +3249,8 @@ socket_connect (rpc_transport_t *this, int port) } /* If client wants ENOENT to be ignored */ - ign_enoent = dict_get_str_boolean (this->options, - "transport.socket.ignore-enoent", - _gf_false); + ign_enoent = dict_get_str_boolean (this->options, + "transport.socket.ignore-enoent", _gf_false); ret = client_bind (this, SA (&this->myinfo.sockaddr), &this->myinfo.sockaddr_len, priv->sock); @@ -3388,27 +3260,17 @@ socket_connect (rpc_transport_t *this, int port) goto handler; } - /* make socket non-blocking for all types of sockets */ - if (!priv->bio) { + if (!priv->use_ssl && !priv->bio && !priv->own_thread) { ret = __socket_nonblock (priv->sock); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "NBIO on %d failed (%s)", priv->sock, strerror (errno)); goto handler; - } else { - gf_log (this->name, GF_LOG_TRACE, - ">>> connect() with non-blocking IO for ALL"); } } - this->connect_failed = _gf_false; - priv->connect_failed = 0; - priv->connected = 0; - - socket_dump_info (SA(&this->peerinfo.sockaddr), priv->is_server, - priv->use_ssl, priv->sock, this->name, - "connecting to"); + this->connect_failed = _gf_false; if (ign_enoent) { ret = connect_loop (priv->sock, SA (&this->peerinfo.sockaddr), @@ -3419,8 +3281,6 @@ socket_connect (rpc_transport_t *this, int port) this->peerinfo.sockaddr_len); } - connect_attempted = _gf_true; - if (ret == -1 && errno == ENOENT && ign_enoent) { gf_log (this->name, GF_LOG_WARNING, "Ignore failed connection attempt on %s, (%s) ", @@ -3459,41 +3319,94 @@ socket_connect (rpc_transport_t *this, int port) priv->connect_failed = 1; goto handler; - } - else { + } else { /* reset connect_failed so that any previous attempts state is not carried forward */ priv->connect_failed = 0; ret = 0; } + if (priv->use_ssl && !priv->own_thread) { + cname = ssl_setup_connection(this, 0); + if (!cname) { + errno = ENOTCONN; + ret = -1; + gf_log(this->name, GF_LOG_ERROR, + "client setup failed"); + goto handler; + } + if (priv->connected) { + this->ssl_name = cname; + } else { + GF_FREE(cname); + } + } + + if (!priv->bio && !priv->own_thread) { + ret = __socket_nonblock (priv->sock); + + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "NBIO on %d failed (%s)", + priv->sock, strerror (errno)); + goto handler; + } + } + handler: - if (ret < 0 && !connect_attempted) { + if (ret < 0) { /* Ignore error from connect. epoll events should be handled in the socket handler. shutdown(2) will result in EPOLLERR, so cleanup is done in socket_event_handler or socket_poller */ shutdown (priv->sock, SHUT_RDWR); - gf_log (this->name, GF_LOG_TRACE, - "@@@ client shutdown(%d, SHUT_RDWR)", - priv->sock); } + /* + * In the own_thread case, this is used to indicate that we're + * initializing a client connection. + */ priv->connected = 0; priv->is_server = _gf_false; rpc_transport_ref (this); refd = _gf_true; - this->listener = this; - priv->idx = event_register (ctx->event_pool, priv->sock, - socket_event_handler, - this, 1, 1); - if (priv->idx == -1) { - gf_log ("", GF_LOG_WARNING, - "failed to register the event"); - sys_close (priv->sock); - priv->sock = -1; - ret = -1; + if (priv->own_thread) { + if (priv->connect_failed) { + gf_msg_debug (this->name, 0, + "socket connect is failed so close it"); + sys_close (priv->sock); + priv->sock = -1; + ret = -1; + goto unlock; + } + + if (pipe(priv->pipe) < 0) { + gf_log(this->name, GF_LOG_ERROR, + "could not create pipe"); + } + + this->listener = this; + ret = socket_spawn(this); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, + "could not spawn thread"); + sys_close (priv->pipe[0]); + sys_close (priv->pipe[1]); + sys_close (priv->sock); + priv->sock = -1; + } + } else { + priv->idx = event_register (ctx->event_pool, priv->sock, + socket_event_handler, + this, 1, 1); + if (priv->idx == -1) { + gf_log ("", GF_LOG_WARNING, + "failed to register the event"); + sys_close (priv->sock); + priv->sock = -1; + ret = -1; + } } unlock: @@ -3647,10 +3560,6 @@ socket_listen (rpc_transport_t *this) goto unlock; } - socket_dump_info (SA(&this->myinfo.sockaddr), priv->is_server, - priv->use_ssl, priv->sock, this->name, - "listening on"); - ret = listen (priv->sock, priv->backlog); if (ret == -1) { @@ -3692,11 +3601,11 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) { socket_private_t *priv = NULL; int ret = -1; - struct ioq *entry = NULL; - glusterfs_ctx_t *ctx = NULL; char need_poll_out = 0; char need_append = 1; - + struct ioq *entry = NULL; + glusterfs_ctx_t *ctx = NULL; + char a_byte = 'j'; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -3734,9 +3643,19 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) if (need_append) { list_add_tail (&entry->list, &priv->ioq); + if (priv->own_thread) { + /* + * Make sure the polling thread wakes up, by + * writing a byte to represent this entry. + */ + if (sys_write (priv->pipe[1], &a_byte, 1) < 1) { + gf_log(this->name, GF_LOG_WARNING, + "write error on pipe"); + } + } ret = 0; } - if (need_poll_out) { + if (!priv->own_thread && need_poll_out) { /* first entry to wait. continue writing on POLLOUT */ priv->idx = event_select_on (ctx->event_pool, priv->sock, @@ -3756,11 +3675,11 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) { socket_private_t *priv = NULL; int ret = -1; - struct ioq *entry = NULL; - glusterfs_ctx_t *ctx = NULL; char need_poll_out = 0; char need_append = 1; - + struct ioq *entry = NULL; + glusterfs_ctx_t *ctx = NULL; + char a_byte = 'd'; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -3782,7 +3701,6 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) priv->submit_log = 0; entry = __socket_ioq_new (this, &reply->msg); - if (!entry) goto unlock; @@ -3799,9 +3717,19 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) if (need_append) { list_add_tail (&entry->list, &priv->ioq); + if (priv->own_thread) { + /* + * Make sure the polling thread wakes up, by + * writing a byte to represent this entry. + */ + if (sys_write (priv->pipe[1], &a_byte, 1) < 1) { + gf_log(this->name, GF_LOG_WARNING, + "write error on pipe"); + } + } + ret = 0; } - - if (need_poll_out) { + if (!priv->own_thread && need_poll_out) { /* first entry to wait. continue writing on POLLOUT */ priv->idx = event_select_on (ctx->event_pool, priv->sock, @@ -3978,8 +3906,7 @@ reconfigure (rpc_transport_t *this, dict_t *options) gf_log (this->name, GF_LOG_DEBUG, "Reconfigured transport.socket.keepalive"); priv->keepalive = tmp_bool; - } - else + } else priv->keepalive = 1; if (dict_get_int32 (options, "transport.tcp-user-timeout", @@ -4154,25 +4081,204 @@ fini_openssl_mt (void) ERR_free_strings(); } +static void +socket_poller_mayday (socket_private_t *priv) +{ + if (priv == NULL) + return; + + pthread_mutex_lock (&priv->cond_lock); + { + /* Signal waiting threads before exiting from socket_poller */ + if (!priv->own_thread_done) { + gf_log ("socket", GF_LOG_TRACE, "priv->cond SIGNALED"); + pthread_cond_signal (&priv->cond); + priv->own_thread_done = _gf_true; + } + } + pthread_mutex_unlock (&priv->cond_lock); +} + static int -ssl_setup_connection_params(rpc_transport_t *this) +socket_init (rpc_transport_t *this) { socket_private_t *priv = NULL; + gf_boolean_t tmp_bool = 0; + uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; char *optstr = NULL; - static int session_id = 1; + uint32_t timeout = 0; + int keepaliveidle = GF_KEEPALIVE_TIME; + int keepaliveintvl = GF_KEEPALIVE_INTERVAL; + int keepalivecnt = GF_KEEPALIVE_COUNT; + uint32_t backlog = 0; + int session_id = 0; int32_t cert_depth = DEFAULT_VERIFY_DEPTH; char *cipher_list = DEFAULT_CIPHER_LIST; char *dh_param = DEFAULT_DH_PARAM; char *ec_curve = DEFAULT_EC_CURVE; char *crl_path = NULL; - priv = this->private; + if (this->private) { + gf_log_callingfn (this->name, GF_LOG_ERROR, + "double init attempted"); + return -1; + } + + priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t); + if (!priv) { + return -1; + } + memset(priv, 0, sizeof(*priv)); + + pthread_mutex_init (&priv->in_lock, NULL); + pthread_mutex_init (&priv->out_lock, NULL); + pthread_mutex_init (&priv->cond_lock, NULL); + pthread_cond_init (&priv->cond, NULL); + + GF_REF_INIT (priv, socket_poller_mayday); + + priv->sock = -1; + priv->idx = -1; + priv->connected = -1; + priv->nodelay = 1; + priv->bio = 0; + priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; + INIT_LIST_HEAD (&priv->ioq); + pthread_mutex_init (&priv->notify.lock, NULL); + pthread_cond_init (&priv->notify.cond, NULL); + + /* All the below section needs 'this->options' to be present */ + if (!this->options) + goto out; + + if (dict_get (this->options, "non-blocking-io")) { + optstr = data_to_str (dict_get (this->options, + "non-blocking-io")); + + if (gf_string2boolean (optstr, &tmp_bool) == -1) { + gf_log (this->name, GF_LOG_ERROR, + "'non-blocking-io' takes only boolean options," + " not taking any action"); + tmp_bool = 1; + } + + if (!tmp_bool) { + priv->bio = 1; + gf_log (this->name, GF_LOG_WARNING, + "disabling non-blocking IO"); + } + } - if (priv->ssl_ctx != NULL) { - gf_log (this->name, GF_LOG_TRACE, "found old SSL context!"); - return 0; + optstr = NULL; + + /* By default, we enable NODELAY */ + if (dict_get (this->options, "transport.socket.nodelay")) { + optstr = data_to_str (dict_get (this->options, + "transport.socket.nodelay")); + + if (gf_string2boolean (optstr, &tmp_bool) == -1) { + gf_log (this->name, GF_LOG_ERROR, + "'transport.socket.nodelay' takes only " + "boolean options, not taking any action"); + tmp_bool = 1; + } + if (!tmp_bool) { + priv->nodelay = 0; + gf_log (this->name, GF_LOG_DEBUG, + "disabling nodelay"); + } } + optstr = NULL; + if (dict_get_str (this->options, "tcp-window-size", + &optstr) == 0) { + if (gf_string2uint64 (optstr, &windowsize) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "invalid number format: %s", optstr); + return -1; + } + } + + priv->windowsize = (int)windowsize; + + optstr = NULL; + /* Enable Keep-alive by default. */ + priv->keepalive = 1; + priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL; + priv->keepaliveidle = GF_KEEPALIVE_TIME; + priv->keepalivecnt = GF_KEEPALIVE_COUNT; + if (dict_get_str (this->options, "transport.socket.keepalive", + &optstr) == 0) { + if (gf_string2boolean (optstr, &tmp_bool) == -1) { + gf_log (this->name, GF_LOG_ERROR, + "'transport.socket.keepalive' takes only " + "boolean options, not taking any action"); + tmp_bool = 1; + } + + if (!tmp_bool) + priv->keepalive = 0; + } + + if (dict_get_int32 (this->options, "transport.tcp-user-timeout", + &(priv->timeout)) != 0) + priv->timeout = timeout; + gf_log (this->name, GF_LOG_DEBUG, "Configued " + "transport.tcp-user-timeout=%d", priv->timeout); + + if (dict_get_int32 (this->options, + "transport.socket.keepalive-time", + &(priv->keepaliveidle)) != 0) { + priv->keepaliveidle = keepaliveidle; + } + + if (dict_get_int32 (this->options, + "transport.socket.keepalive-interval", + &(priv->keepaliveintvl)) != 0) { + priv->keepaliveintvl = keepaliveintvl; + } + + if (dict_get_int32 (this->options, "transport.socket.keepalive-count", + &(priv->keepalivecnt)) != 0) + priv->keepalivecnt = keepalivecnt; + gf_log (this->name, GF_LOG_DEBUG, "Reconfigued " + "transport.keepalivecnt=%d", keepalivecnt); + + if (dict_get_uint32 (this->options, + "transport.listen-backlog", + &backlog) != 0) { + + backlog = GLUSTERFS_SOCKET_LISTEN_BACKLOG; + } + priv->backlog = backlog; + + optstr = NULL; + + /* Check if socket read failures are to be logged */ + priv->read_fail_log = 1; + if (dict_get (this->options, "transport.socket.read-fail-log")) { + optstr = data_to_str (dict_get (this->options, "transport.socket.read-fail-log")); + if (gf_string2boolean (optstr, &tmp_bool) == -1) { + gf_log (this->name, GF_LOG_WARNING, + "'transport.socket.read-fail-log' takes only " + "boolean options; logging socket read fails"); + } else if (tmp_bool == _gf_false) { + priv->read_fail_log = 0; + } + } + + priv->windowsize = (int)windowsize; + + priv->ssl_enabled = _gf_false; + if (dict_get_str(this->options, SSL_ENABLED_OPT, &optstr) == 0) { + if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "invalid value given for ssl-enabled boolean"); + } + } + priv->mgmt_ssl = this->ctx->secure_mgmt; + priv->srvr_ssl = this->ctx->secure_srvr; + priv->ssl_own_cert = DEFAULT_CERT_PATH; if (dict_get_str(this->options, SSL_OWN_CERT_OPT, &optstr) == 0) { if (!priv->ssl_enabled) { @@ -4224,6 +4330,23 @@ ssl_setup_connection_params(rpc_transport_t *this) gf_log(this->name, priv->mgmt_ssl ? GF_LOG_INFO: GF_LOG_DEBUG, "SSL support for glusterd is %s", priv->mgmt_ssl ? "ENABLED" : "NOT enabled"); + /* + * This might get overridden temporarily in socket_connect (q.v.) + * if we're using the glusterd portmapper. + */ + priv->use_ssl = priv->ssl_enabled; + + priv->own_thread = priv->use_ssl; + if (dict_get_str(this->options, OWN_THREAD_OPT, &optstr) == 0) { + gf_log (this->name, GF_LOG_INFO, "OWN_THREAD_OPT found"); + if (gf_string2boolean (optstr, &priv->own_thread) != 0) { + gf_log (this->name, GF_LOG_WARNING, + "invalid value given for own-thread boolean"); + } + } + gf_log(this->name, priv->own_thread ? GF_LOG_INFO : GF_LOG_DEBUG, + "using %s polling thread", + priv->own_thread ? "private" : "system"); if (!dict_get_int32 (this->options, SSL_CERT_DEPTH_OPT, &cert_depth)) { gf_log (this->name, GF_LOG_INFO, @@ -4258,7 +4381,7 @@ ssl_setup_connection_params(rpc_transport_t *this) #error Old and insecure OpenSSL, use -DUSE_INSECURE_OPENSSL to use it anyway #endif /* SSLv23_method uses highest available protocol */ - priv->ssl_meth = SSLv23_method(); + priv->ssl_meth = (SSL_METHOD *)SSLv23_method(); #endif priv->ssl_ctx = SSL_CTX_new(priv->ssl_meth); @@ -4333,7 +4456,7 @@ ssl_setup_connection_params(rpc_transport_t *this) /* This must be done after DH and ECDH setups */ if (SSL_CTX_set_cipher_list(priv->ssl_ctx, cipher_list) == 0) { - gf_log(this->name,GF_LOG_ERROR, + gf_log(this->name, GF_LOG_ERROR, "failed to find any valid ciphers"); goto err; } @@ -4343,7 +4466,7 @@ ssl_setup_connection_params(rpc_transport_t *this) if (!SSL_CTX_use_certificate_chain_file(priv->ssl_ctx, priv->ssl_own_cert)) { - gf_log(this->name,GF_LOG_ERROR, + gf_log(this->name, GF_LOG_ERROR, "could not load our cert"); goto err; } @@ -4374,15 +4497,14 @@ ssl_setup_connection_params(rpc_transport_t *this) x509store = SSL_CTX_get_cert_store(priv->ssl_ctx); X509_STORE_set_flags(x509store, - X509_V_FLAG_CRL_CHECK | - X509_V_FLAG_CRL_CHECK_ALL); + X509_V_FLAG_CRL_CHECK|X509_V_FLAG_CRL_CHECK_ALL); #else gf_log(this->name, GF_LOG_ERROR, "OpenSSL version does not support CRL"); #endif } - priv->ssl_session_id = session_id++; + priv->ssl_session_id = ++session_id; SSL_CTX_set_session_id_context(priv->ssl_ctx, (void *)&priv->ssl_session_id, sizeof(priv->ssl_session_id)); @@ -4397,194 +4519,27 @@ ssl_setup_connection_params(rpc_transport_t *this) */ SSL_CTX_set_purpose(priv->ssl_ctx, X509_PURPOSE_ANY); } - return 0; -err: - return -1; -} - -static int -socket_init (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - gf_boolean_t tmp_bool = 0; - uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; - char *optstr = NULL; - uint32_t timeout = 0; - int keepaliveidle = GF_KEEPALIVE_TIME; - int keepaliveintvl = GF_KEEPALIVE_INTERVAL; - int keepalivecnt = GF_KEEPALIVE_COUNT; - uint32_t backlog = 0; - - - if (this->private) { - gf_log_callingfn (this->name, GF_LOG_ERROR, - "double init attempted"); - return -1; - } - - priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t); - if (!priv) { - return -1; + if (priv->own_thread) { + priv->ot_state = OT_IDLE; } - memset(priv, 0, sizeof(*priv)); +out: this->private = priv; - pthread_mutex_init (&priv->in_lock, NULL); - pthread_mutex_init (&priv->out_lock, NULL); - pthread_mutex_init (&priv->cond_lock, NULL); - pthread_cond_init (&priv->cond, NULL); - - /*GF_REF_INIT (priv, socket_poller_mayday);*/ - - priv->sock = -1; - priv->idx = -1; - priv->connected = -1; - priv->nodelay = 1; - priv->bio = 0; - priv->ssl_accepted = _gf_false; - priv->ssl_connected = _gf_false; - priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; - INIT_LIST_HEAD (&priv->ioq); - pthread_mutex_init (&priv->notify.lock, NULL); - pthread_cond_init (&priv->notify.cond, NULL); - - /* All the below section needs 'this->options' to be present */ - if (!this->options) - goto out; - - if (dict_get (this->options, "non-blocking-io")) { - optstr = data_to_str (dict_get (this->options, - "non-blocking-io")); - - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->name, GF_LOG_ERROR, - "'non-blocking-io' takes only boolean options," - " not taking any action"); - tmp_bool = 1; - } - - if (!tmp_bool) { - priv->bio = 1; - gf_log (this->name, GF_LOG_WARNING, - "disabling non-blocking IO"); - } - } - - optstr = NULL; - - /* By default, we enable NODELAY */ - if (dict_get (this->options, "transport.socket.nodelay")) { - optstr = data_to_str (dict_get (this->options, - "transport.socket.nodelay")); - - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->name, GF_LOG_ERROR, - "'transport.socket.nodelay' takes only " - "boolean options, not taking any action"); - tmp_bool = 1; - } - if (!tmp_bool) { - priv->nodelay = 0; - gf_log (this->name, GF_LOG_DEBUG, - "disabling nodelay"); - } - } - - optstr = NULL; - if (dict_get_str (this->options, "tcp-window-size", - &optstr) == 0) { - if (gf_string2uint64 (optstr, &windowsize) != 0) { - gf_log (this->name, GF_LOG_ERROR, - "invalid number format: %s", optstr); - return -1; - } - } - - priv->windowsize = (int)windowsize; - - optstr = NULL; - /* Enable Keep-alive by default. */ - priv->keepalive = 1; - priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL; - priv->keepaliveidle = GF_KEEPALIVE_TIME; - priv->keepalivecnt = GF_KEEPALIVE_COUNT; - if (dict_get_str (this->options, "transport.socket.keepalive", - &optstr) == 0) { - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->name, GF_LOG_ERROR, - "'transport.socket.keepalive' takes only " - "boolean options, not taking any action"); - tmp_bool = 1; - } - - if (!tmp_bool) - priv->keepalive = 0; - } - - if (dict_get_int32 (this->options, "transport.tcp-user-timeout", - &(priv->timeout)) != 0) - priv->timeout = timeout; - gf_log (this->name, GF_LOG_DEBUG, "Configued " - "transport.tcp-user-timeout=%d", priv->timeout); - - if (dict_get_int32 (this->options, - "transport.socket.keepalive-time", - &(priv->keepaliveidle)) != 0) { - priv->keepaliveidle = keepaliveidle; - } - - if (dict_get_int32 (this->options, - "transport.socket.keepalive-interval", - &(priv->keepaliveintvl)) != 0) { - priv->keepaliveintvl = keepaliveintvl; - } - - if (dict_get_int32 (this->options, "transport.socket.keepalive-count", - &(priv->keepalivecnt)) != 0) - priv->keepalivecnt = keepalivecnt; - gf_log (this->name, GF_LOG_DEBUG, "Reconfigued " - "transport.keepalivecnt=%d", keepalivecnt); + return 0; - if (dict_get_uint32 (this->options, - "transport.listen-backlog", - &backlog) != 0) { - backlog = GLUSTERFS_SOCKET_LISTEN_BACKLOG; +err: + if (priv->ssl_own_cert) { + GF_FREE(priv->ssl_own_cert); } - priv->backlog = backlog; - - optstr = NULL; - - /* Check if socket read failures are to be logged */ - priv->read_fail_log = 1; - if (dict_get (this->options, "transport.socket.read-fail-log")) { - optstr = data_to_str (dict_get (this->options, - "transport.socket.read-fail-log")); - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->name, GF_LOG_WARNING, - "'transport.socket.read-fail-log' takes only " - "boolean options; logging socket read fails"); - } else if (tmp_bool == _gf_false) { - priv->read_fail_log = 0; - } + if (priv->ssl_private_key) { + GF_FREE(priv->ssl_private_key); } - - priv->windowsize = (int)windowsize; - - priv->ssl_enabled = _gf_false; - if (dict_get_str(this->options, SSL_ENABLED_OPT, &optstr) == 0) { - if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) { - gf_log (this->name, GF_LOG_ERROR, - "invalid value given for ssl-enabled boolean"); - } + if (priv->ssl_ca_list) { + GF_FREE(priv->ssl_ca_list); } - priv->mgmt_ssl = this->ctx->secure_mgmt; - priv->srvr_ssl = this->ctx->secure_srvr; - - ssl_setup_connection_params(this); -out: - this->private = priv; - return 0; + GF_FREE(priv); + return -1; } |