diff options
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 1530 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 32 |
2 files changed, 814 insertions, 748 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 3068b879802..fbf5d349b93 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -91,6 +91,7 @@ typedef int SSL_unary_func (SSL *); typedef int SSL_trinary_func (SSL *, void *, int); +static int ssl_setup_connection_params(rpc_transport_t *this); #define __socket_proto_reset_pending(priv) do { \ struct gf_sock_incoming_frag *frag; \ @@ -189,6 +190,51 @@ struct socket_connect_error_state_ { typedef struct socket_connect_error_state_ socket_connect_error_state_t; static int socket_init (rpc_transport_t *this); +static int __socket_nonblock (int fd); + +static void +socket_dump_info (struct sockaddr *sa, int is_server, int is_ssl, int sock, + char *log_domain, char *log_label) +{ + char addr_buf[INET6_ADDRSTRLEN+1] = {0, }; + char *addr = NULL; + char *peer_type = NULL; + int af = sa->sa_family; + int so_error = -1; + socklen_t slen = sizeof(so_error); + + if (af == AF_UNIX) { + addr = ((struct sockaddr_un *)(sa))->sun_path; + } else { + if (af == AF_INET6) { + struct sockaddr_in6 *sin6 = + (struct sockaddr_in6 *)(sa); + + inet_ntop (af, &sin6->sin6_addr, addr_buf, + sizeof (addr_buf)); + addr = addr_buf; + } else { + struct sockaddr_in *sin = + (struct sockaddr_in *)(sa); + + inet_ntop (af, &sin->sin_addr, addr_buf, + sizeof (addr_buf)); + addr = addr_buf; + } + } + if (is_server) + peer_type = "server"; + else + peer_type = "client"; + + getsockopt (sock, SOL_SOCKET, SO_ERROR, &so_error, &slen); + + gf_log (log_domain, GF_LOG_TRACE, + "$$$ %s: %s (af:%d,sock:%d) %s %s (errno:%d:%s)", + peer_type, log_label, af, sock, addr, + (is_ssl ? "SSL" : "non-SSL"), + so_error, strerror (so_error)); +} static void ssl_dump_error_stack (const char *caller) @@ -208,128 +254,120 @@ static int ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func) { int r = (-1); - struct pollfd pfd = {-1, }; socket_private_t *priv = NULL; GF_VALIDATE_OR_GOTO(this->name, this->private, out); priv = this->private; - for (;;) { - if (buf) { - if (priv->connected == -1) { - /* - * Fields in the SSL structure (especially - * the BIO pointers) are not valid at this - * point, so we'll segfault if we pass them - * to SSL_read/SSL_write. - */ - gf_log(this->name, GF_LOG_INFO, - "lost connection in %s", __func__); - break; - } - r = func(priv->ssl_ssl, buf, len); - } else { + if (buf) { + if (priv->connected == -1) { /* - * We actually need these functions to get to - * priv->connected == 1. + * Fields in the SSL structure (especially + * the BIO pointers) are not valid at this + * point, so we'll segfault if we pass them + * to SSL_read/SSL_write. */ - r = ((SSL_unary_func *)func)(priv->ssl_ssl); - } - switch (SSL_get_error(priv->ssl_ssl, r)) { - case SSL_ERROR_NONE: - return r; - case SSL_ERROR_WANT_READ: - /* If we are attempting to connect/accept then we - * should wait here on the poll, for the SSL - * (re)negotiation to complete, else we would error out - * on the accept/connect. - * If we are here when attempting to read/write - * then we return r (or -1) as the socket is always - * primed for the read event, and it would eventually - * call one of the SSL routines */ - /* NOTE: Only way to determine this is a accept/connect - * is to examine buf or func, which is not very - * clean */ - if ((func == (SSL_trinary_func *)SSL_read) - || (func == (SSL_trinary_func *) SSL_write)) { - return r; - } - - pfd.fd = priv->sock; - pfd.events = POLLIN; - if (poll(&pfd, 1, -1) < 0) { - gf_log(this->name, GF_LOG_ERROR, "poll error %d", - errno); - } - break; - case SSL_ERROR_WANT_WRITE: - if ((func == (SSL_trinary_func *)SSL_read) - || (func == (SSL_trinary_func *) SSL_write)) { - errno = EAGAIN; - return r; - } - pfd.fd = priv->sock; - pfd.events = POLLOUT; - if (poll(&pfd, 1, -1) < 0) { - gf_log(this->name, GF_LOG_ERROR, "poll error %d", - errno); - } - break; - case SSL_ERROR_SYSCALL: - /* This is what we get when remote disconnects. */ - gf_log(this->name, GF_LOG_DEBUG, - "syscall error (probably remote disconnect)"); - errno = ENODATA; - goto out; - default: - errno = EIO; - goto out; /* "break" would just loop again */ + gf_log (this->name, GF_LOG_INFO, + "lost connection in %s", __func__); + return -1; } + r = func (priv->ssl_ssl, buf, len); + } else { + /* + * We actually need these functions to get to + * priv->connected == 1. + */ + r = ((SSL_unary_func *)func)(priv->ssl_ssl); + } + switch (SSL_get_error (priv->ssl_ssl, r)) { + case SSL_ERROR_NONE: + /* fall through */ + case SSL_ERROR_WANT_READ: + /* fall through */ + case SSL_ERROR_WANT_WRITE: + errno = EAGAIN; + return r; + + case SSL_ERROR_SYSCALL: + /* Sometimes SSL_ERROR_SYSCALL returns errno as + * EAGAIN. In such a case we should reattempt operation + * So, for now, just return the return value and the + * errno as is. + */ + gf_log (this->name, GF_LOG_DEBUG, + "syscall error (probably remote disconnect) " + "errno:%d:%s", errno, strerror(errno)); + return r; + default: + errno = EIO; + goto out; /* "break" would just loop again */ } out: return -1; } -#define ssl_connect_one(t) ssl_do((t), NULL, 0, (SSL_trinary_func *)SSL_connect) -#define ssl_accept_one(t) ssl_do((t), NULL, 0, (SSL_trinary_func *)SSL_accept) #define ssl_read_one(t, b, l) ssl_do((t), (b), (l), (SSL_trinary_func *)SSL_read) #define ssl_write_one(t, b, l) ssl_do((t), (b), (l), (SSL_trinary_func *)SSL_write) -static char * -ssl_setup_connection (rpc_transport_t *this, int server) + +int +ssl_setup_connection_prefix (rpc_transport_t *this) { - X509 *peer = NULL; - char peer_CN[256] = ""; int ret = -1; socket_private_t *priv = NULL; GF_VALIDATE_OR_GOTO(this->name, this->private, done); + priv = this->private; + if (ssl_setup_connection_params (this) < 0) { + gf_log (this->name, GF_LOG_TRACE, + "+ ssl_setup_connection_params() failed!"); + goto done; + } else { + gf_log (this->name, GF_LOG_TRACE, + "+ ssl_setup_connection_params() done!"); + } + + priv->ssl_error_required = SSL_ERROR_NONE; + priv->ssl_connected = _gf_false; + priv->ssl_accepted = _gf_false; + priv->ssl_context_created = _gf_false; + priv->ssl_ssl = SSL_new(priv->ssl_ctx); if (!priv->ssl_ssl) { gf_log(this->name, GF_LOG_ERROR, "SSL_new failed"); ssl_dump_error_stack(this->name); goto done; } + priv->ssl_sbio = BIO_new_socket(priv->sock, BIO_NOCLOSE); if (!priv->ssl_sbio) { gf_log(this->name, GF_LOG_ERROR, "BIO_new_socket failed"); ssl_dump_error_stack(this->name); goto free_ssl; } - SSL_set_bio(priv->ssl_ssl, priv->ssl_sbio, priv->ssl_sbio); - if (server) { - ret = ssl_accept_one(this); - } else { - ret = ssl_connect_one(this); - } + SSL_set_bio (priv->ssl_ssl, priv->ssl_sbio, priv->ssl_sbio); + ret = 0; + goto done; - /* Make sure _the call_ succeeded. */ - if (ret < 0) { - goto ssl_error; - } +free_ssl: + SSL_free(priv->ssl_ssl); + priv->ssl_ssl = NULL; +done: + return ret; +} + +static char * +ssl_setup_connection_postfix (rpc_transport_t *this) +{ + X509 *peer = NULL; + char peer_CN[256] = ""; + socket_private_t *priv = NULL; + + GF_VALIDATE_OR_GOTO(this->name, this->private, done); + priv = this->private; /* Make sure _SSL verification_ succeeded, yielding an identity. */ if (SSL_get_verify_result(priv->ssl_ssl) != X509_V_OK) { @@ -340,6 +378,8 @@ ssl_setup_connection (rpc_transport_t *this, int server) goto ssl_error; } + SSL_set_mode(priv->ssl_ssl, SSL_MODE_ENABLE_PARTIAL_WRITE); + /* Finally, everything seems OK. */ X509_NAME_get_text_by_NID(X509_get_subject_name(peer), NID_commonName, peer_CN, sizeof(peer_CN)-1); @@ -356,7 +396,7 @@ ssl_error: "SSL connect error (client: %s) (server: %s)", this->peerinfo.identifier, this->myinfo.identifier); ssl_dump_error_stack(this->name); -free_ssl: + SSL_free(priv->ssl_ssl); priv->ssl_ssl = NULL; done: @@ -364,6 +404,86 @@ done: } +int +ssl_complete_connection (rpc_transport_t *this) +{ + int ret = -1; /* 1 : implies go back to epoll_wait() + * 0 : implies successful ssl connection + * -1: implies continue processing current event + * as if EPOLLERR has been encountered + */ + char *cname = NULL; + int r = -1; + int ssl_error = -1; + socket_private_t *priv = NULL; + + + priv = this->private; + + if (priv->is_server) { + r = SSL_accept (priv->ssl_ssl); + } else { + r = SSL_connect (priv->ssl_ssl); + } + + ssl_error = SSL_get_error (priv->ssl_ssl, r); + priv->ssl_error_required = ssl_error; + + switch (ssl_error) { + case SSL_ERROR_NONE: + cname = ssl_setup_connection_postfix (this); + if (!cname) { + /* we've failed to get the cname so + * we must close the connection + * + * treat this as EPOLLERR + */ + gf_log (this->name, GF_LOG_TRACE, + "error getting cname"); + errno = ECONNRESET; + ret = -1; + } else { + this->ssl_name = cname; + if (priv->is_server) { + priv->ssl_accepted = _gf_true; + gf_log (this->name, GF_LOG_TRACE, + "ssl_accepted!"); + } else { + priv->ssl_connected = _gf_true; + gf_log (this->name, GF_LOG_TRACE, + "ssl_connected!"); + } + ret = 0; + } + break; + + case SSL_ERROR_WANT_READ: + /* fall through */ + case SSL_ERROR_WANT_WRITE: + errno = EAGAIN; + break; + + case SSL_ERROR_SYSCALL: + /* Sometimes SSL_ERROR_SYSCALL returns with errno as EAGAIN + * So, we should retry the operation. + * So, for now, we just return the return value and errno as is. + */ + break; + + case SSL_ERROR_SSL: + /* treat this as EPOLLERR */ + ret = -1; + break; + + default: + /* treat this as EPOLLERR */ + errno = EIO; + ret = -1; + break; + } + return ret; +} + static void ssl_teardown_connection (socket_private_t *priv) { @@ -371,7 +491,21 @@ ssl_teardown_connection (socket_private_t *priv) SSL_shutdown(priv->ssl_ssl); SSL_clear(priv->ssl_ssl); SSL_free(priv->ssl_ssl); + SSL_CTX_free(priv->ssl_ctx); priv->ssl_ssl = NULL; + priv->ssl_ctx = NULL; + if (priv->ssl_private_key) { + GF_FREE (priv->ssl_private_key); + priv->ssl_private_key = NULL; + } + if (priv->ssl_own_cert) { + GF_FREE (priv->ssl_own_cert); + priv->ssl_own_cert = NULL; + } + if (priv->ssl_ca_list) { + GF_FREE (priv->ssl_ca_list); + priv->ssl_ca_list = NULL; + } } priv->use_ssl = _gf_false; } @@ -388,8 +522,10 @@ __socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount) sock = priv->sock; if (priv->use_ssl) { + gf_log (this->name, GF_LOG_TRACE, "***** reading over SSL"); ret = ssl_read_one (this, opvector->iov_base, opvector->iov_len); } else { + gf_log (this->name, GF_LOG_TRACE, "***** reading over non-SSL"); ret = sys_readv (sock, opvector, IOV_MIN(opcount)); } @@ -541,6 +677,8 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, * non-SSL might be insecure, so just fail it outright. */ ret = -1; + gf_log (this->name, GF_LOG_TRACE, + "### no priv->ssl_ssl yet; ret = -1;"); } else if (write) { if (priv->use_ssl) { ret = ssl_write_one (this, opvector->iov_base, @@ -556,9 +694,10 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, this->total_bytes_write += ret; } else { ret = __socket_cached_read (this, opvector, opcount); - if (ret == 0) { - gf_log(this->name, GF_LOG_DEBUG, "EOF on socket"); + gf_log (this->name, GF_LOG_DEBUG, + "EOF on socket (errno:%d:%s)", + errno, strerror (errno)); errno = ENODATA; ret = -1; } @@ -718,9 +857,8 @@ __socket_disconnect (rpc_transport_t *this) priv = this->private; - gf_log (this->name, GF_LOG_TRACE, - "disconnecting %p, state=%u gen=%u sock=%d", this, - priv->ot_state, priv->ot_gen, priv->sock); + gf_log (this->name, GF_LOG_TRACE, "disconnecting %p, sock=%d", + this, priv->sock); if (priv->sock != -1) { gf_log_callingfn (this->name, GF_LOG_TRACE, @@ -731,16 +869,6 @@ __socket_disconnect (rpc_transport_t *this) "__socket_teardown_connection () failed: %s", strerror (errno)); } - - if (priv->own_thread) { - /* - * Without this, reconnect (= disconnect + connect) - * won't work except by accident. - */ - gf_log (this->name, GF_LOG_TRACE, - "OT_PLEASE_DIE on %p", this); - priv->ot_state = OT_PLEASE_DIE; - } } out: @@ -974,7 +1102,22 @@ __socket_reset (rpc_transport_t *this) priv->sock = -1; priv->idx = -1; priv->connected = -1; + priv->ssl_connected = _gf_false; + priv->ssl_accepted = _gf_false; + priv->ssl_context_created = _gf_false; + if (priv->ssl_private_key) { + GF_FREE (priv->ssl_private_key); + priv->ssl_private_key = NULL; + } + if (priv->ssl_own_cert) { + GF_FREE (priv->ssl_own_cert); + priv->ssl_own_cert = NULL; + } + if (priv->ssl_ca_list) { + GF_FREE (priv->ssl_ca_list); + priv->ssl_ca_list = NULL; + } out: return; } @@ -1110,8 +1253,6 @@ static int __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct) { int ret = -1; - socket_private_t *priv = NULL; - char a_byte = 0; ret = __socket_writev (this, entry->pending_vector, entry->pending_count, @@ -1122,18 +1263,6 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct) /* current entry was completely written */ GF_ASSERT (entry->pending_count == 0); __socket_ioq_entry_free (entry); - priv = this->private; - if (priv->own_thread) { - /* - * The pipe should only remain readable if there are - * more entries after this, so drain the byte - * representing this entry. - */ - if (!direct && sys_read (priv->pipe[0], &a_byte, 1) < 1) { - gf_log(this->name, GF_LOG_WARNING, - "read error on pipe"); - } - } } return ret; @@ -1162,7 +1291,7 @@ __socket_ioq_churn (rpc_transport_t *this) break; } - if (!priv->own_thread && list_empty (&priv->ioq)) { + if (list_empty (&priv->ioq)) { /* all pending writes done, not interested in POLLOUT */ priv->idx = event_select_on (this->ctx->event_pool, priv->sock, priv->idx, -1, 0); @@ -2457,15 +2586,9 @@ socket_event_poll_in (rpc_transport_t *this, gf_boolean_t notify_handled) priv->gen); if (pollin) { - priv->ot_state = OT_CALLBACK; - ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, pollin); - if (priv->ot_state == OT_CALLBACK) { - priv->ot_state = OT_RUNNING; - } - rpc_transport_pollin_destroy (pollin); pthread_mutex_lock (&priv->notify.lock); @@ -2558,303 +2681,355 @@ out: static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait); -/* reads rpc_requests during pollin */ -static int -socket_event_handler (int fd, int idx, int gen, void *data, - int poll_in, int poll_out, int poll_err) +/* socket_is_connected() is for use only in socket_event_handler() */ +static inline gf_boolean_t +socket_is_connected (rpc_transport_t *this) { - rpc_transport_t *this = NULL; socket_private_t *priv = NULL; - int ret = -1; - glusterfs_ctx_t *ctx = NULL; - gf_boolean_t socket_closed = _gf_false, notify_handled = _gf_false; - this = data; + priv = this->private; - GF_VALIDATE_OR_GOTO ("socket", this, out); - GF_VALIDATE_OR_GOTO ("socket", this->private, out); - GF_VALIDATE_OR_GOTO ("socket", this->xl, out); + if (priv->use_ssl) { + return priv->is_server ? priv->ssl_accepted : + priv->ssl_connected; + } else { + return priv->is_server ? priv->accepted : + priv->connected; + } +} + +static void +ssl_rearm_event_fd (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + glusterfs_ctx_t *ctx = NULL; + int idx = -1; + int gen = -1; + int fd = -1; - THIS = this->xl; priv = this->private; ctx = this->ctx; - pthread_mutex_lock (&priv->in_lock); - pthread_mutex_lock (&priv->out_lock); - { - priv->idx = idx; - priv->gen = gen; - } - pthread_mutex_unlock (&priv->out_lock); - pthread_mutex_unlock (&priv->in_lock); + idx = priv->idx; + gen = priv->gen; + fd = priv->sock; - if (priv->connected != 1) { - if (priv->connect_failed) { - /* connect failed with some other error than - EINPROGRESS or ENOENT, so nothing more to do, fail - reading/writing anything even if poll_in or poll_out - is set */ - gf_log ("transport", GF_LOG_DEBUG, - "connect failed with some other error than " - "EINPROGRESS or ENOENT, so nothing more to " - "do; disconnecting socket"); - ret = socket_disconnect (this, _gf_false); - - /* Force ret to be -1, as we are officially done with - this socket */ + if (priv->ssl_error_required == SSL_ERROR_WANT_READ) + event_select_on (ctx->event_pool, fd, idx, 1, -1); + if (priv->ssl_error_required == SSL_ERROR_WANT_WRITE) + event_select_on (ctx->event_pool, fd, idx, -1, 1); + event_handled (ctx->event_pool, fd, idx, gen); +} + +static int +ssl_handle_server_connection_attempt (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + glusterfs_ctx_t *ctx = NULL; + int idx = -1; + int gen = -1; + int ret = -1; + int fd = -1; + + priv = this->private; + ctx = this->ctx; + + idx = priv->idx; + gen = priv->gen; + fd = priv->sock; + + if (!priv->ssl_context_created) { + ret = ssl_setup_connection_prefix (this); + if (ret < 0) { + gf_log (this->name, GF_LOG_TRACE, + "> ssl_setup_connection_prefix() failed!"); ret = -1; + goto out; } else { - ret = socket_connect_finish (this); + priv->ssl_context_created = _gf_true; } - } else { - ret = 0; } - - if (!ret && poll_out) { - ret = socket_event_poll_out (this); - } - - if (!ret && poll_in) { - ret = socket_event_poll_in (this, !poll_err); - notify_handled = _gf_true; + ret = ssl_complete_connection (this); + if (ret == 0) { + /* nothing to do */ + event_select_on (ctx->event_pool, fd, idx, 1, 0); + event_handled (ctx->event_pool, fd, idx, gen); + ret = 1; + } else { + if (errno == EAGAIN) { + ssl_rearm_event_fd (this); + ret = 1; + } else { + ret = -1; + gf_log (this->name, GF_LOG_TRACE, + "ssl_complete_connection returned error"); + } } +out: + return ret; +} - if ((ret < 0) || poll_err) { - /* Logging has happened already in earlier cases */ - gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG), - "EPOLLERR - disconnecting now"); +static int +ssl_handle_client_connection_attempt (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + glusterfs_ctx_t *ctx = NULL; + int idx = -1; + int ret = -1; + int fd = -1; - socket_closed = socket_event_poll_err (this, gen, idx); + priv = this->private; + ctx = this->ctx; - if (socket_closed) - rpc_transport_unref (this); + idx = priv->idx; + fd = priv->sock; - } else if (!notify_handled) { - event_handled (ctx->event_pool, fd, idx, gen); + /* SSL client */ + if (priv->connect_failed) { + gf_log (this->name, GF_LOG_TRACE, + ">>> disconnecting SSL socket"); + ret = socket_disconnect (this, _gf_false); + /* Force ret to be -1, as we are officially done with + this socket */ + ret = -1; + } else { + if (!priv->ssl_context_created) { + ret = ssl_setup_connection_prefix (this); + if (ret < 0) { + gf_log (this->name, GF_LOG_TRACE, + "> ssl_setup_connection_prefix() " + "failed!"); + ret = -1; + goto out; + } else { + priv->ssl_context_created = _gf_true; + } + } + ret = ssl_complete_connection (this); + if (ret == 0) { + ret = socket_connect_finish (this); + event_select_on (ctx->event_pool, fd, idx, 1, 0); + gf_log (this->name, GF_LOG_TRACE, + ">>> completed client connect"); + } else { + if (errno == EAGAIN) { + gf_log (this->name, GF_LOG_TRACE, + ">>> retrying client connect 2"); + ssl_rearm_event_fd (this); + ret = 1; + } else { + /* this is a connection failure */ + ret = socket_connect_finish (this); + gf_log (this->name, GF_LOG_TRACE, + "ssl_complete_connection " + "returned error"); + ret = -1; + } + } } - out: return ret; } -static int poll_err_cnt; -static void * -socket_poller (void *ctx) +static int +socket_handle_client_connection_attempt (rpc_transport_t *this) { - rpc_transport_t *this = ctx; - socket_private_t *priv = this->private; - struct pollfd pfd[2] = {{0, }, }; - gf_boolean_t to_write = _gf_false; - int ret = 0; - uint32_t gen = 0; - char *cname = NULL; + socket_private_t *priv = NULL; + glusterfs_ctx_t *ctx = NULL; + int idx = -1; + int gen = -1; + int ret = -1; + int fd = -1; - GF_ASSERT (this); - /* Set THIS early on in the life of this thread, instead of setting it - * conditionally - */ - THIS = this->xl; + priv = this->private; + ctx = this->ctx; - if (priv->ot_state == OT_PLEASE_DIE) { - gf_log (this->name, GF_LOG_DEBUG, "socket_poller is exiting " - "because socket state is OT_PLEASE_DIE"); - goto err; + idx = priv->idx; + gen = priv->gen; + fd = priv->sock; + + /* non-SSL client */ + if (priv->connect_failed) { + /* connect failed with some other error than + EINPROGRESS or ENOENT, so nothing more to + do, fail reading/writing anything even if + poll_in or poll_out + is set + */ + gf_log ("transport", GF_LOG_DEBUG, + "connect failed with some other error " + "than EINPROGRESS or ENOENT, so " + "nothing more to do; disconnecting " + "socket"); + (void)socket_disconnect (this, _gf_false); + + /* Force ret to be -1, as we are officially + * done with this socket + */ + ret = -1; + } else { + ret = socket_connect_finish (this); + gf_log (this->name, GF_LOG_TRACE, + "socket_connect_finish() returned %d", + ret); + if (ret == 0 || ret == 1) { + /* we don't want to do any reads or + * writes on the connection yet in + * socket_event_handler, so just + * return 1 + */ + ret = 1; + event_handled (ctx->event_pool, fd, idx, gen); + } } + return ret; +} - priv->ot_state = OT_RUNNING; +static int +socket_complete_connection (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + glusterfs_ctx_t *ctx = NULL; + int idx = -1; + int gen = -1; + int ret = -1; + int fd = -1; + + priv = this->private; + ctx = this->ctx; + + idx = priv->idx; + gen = priv->gen; + fd = priv->sock; if (priv->use_ssl) { - cname = ssl_setup_connection(this, priv->connected); - if (!cname) { - gf_log (this->name, GF_LOG_ERROR, "%s setup failed", - priv->connected ? "server" : "client"); - goto err; + if (priv->is_server) { + ret = ssl_handle_server_connection_attempt (this); + } else { + ret = ssl_handle_client_connection_attempt (this); } - if (priv->connected) { - this->ssl_name = cname; + } else { + if (priv->is_server) { + /* non-SSL server: nothing much to do + * connection has already been accepted in + * socket_server_event_handler() + */ + priv->accepted = _gf_true; + event_handled (ctx->event_pool, fd, idx, gen); + ret = 1; } else { - GF_FREE(cname); + ret = socket_handle_client_connection_attempt (this); } } + return ret; +} - if (!priv->bio) { - ret = __socket_nonblock (priv->sock); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "NBIO on %d failed (%s)", - priv->sock, strerror (errno)); - goto err; - } - } +/* reads rpc_requests during pollin */ +static int +socket_event_handler (int fd, int idx, int gen, void *data, + int poll_in, int poll_out, int poll_err) +{ + rpc_transport_t *this = NULL; + socket_private_t *priv = NULL; + int ret = -1; + glusterfs_ctx_t *ctx = NULL; + gf_boolean_t socket_closed = _gf_false, notify_handled = _gf_false; - if (priv->connected == 0) { - ret = socket_connect_finish (this); - if (ret != 0) { - gf_log (this->name, GF_LOG_WARNING, - "asynchronous socket_connect_finish failed"); - } - } - ret = rpc_transport_notify (this->listener, - RPC_TRANSPORT_ACCEPT, this); - if (ret != 0) { - gf_log (this->name, GF_LOG_WARNING, - "asynchronous rpc_transport_notify failed"); + this = data; + + GF_VALIDATE_OR_GOTO ("socket", this, out); + GF_VALIDATE_OR_GOTO ("socket", this->private, out); + GF_VALIDATE_OR_GOTO ("socket", this->xl, out); + + THIS = this->xl; + priv = this->private; + ctx = this->ctx; + + pthread_mutex_lock (&priv->in_lock); + pthread_mutex_lock (&priv->out_lock); + { + priv->idx = idx; + priv->gen = gen; } + pthread_mutex_unlock (&priv->out_lock); + pthread_mutex_unlock (&priv->in_lock); - gen = priv->ot_gen; - for (;;) { - pthread_mutex_lock(&priv->out_lock); - to_write = !list_empty(&priv->ioq); - pthread_mutex_unlock(&priv->out_lock); - pfd[0].fd = priv->pipe[0]; - pfd[0].events = POLL_MASK_ERROR; - pfd[0].revents = 0; - pfd[1].fd = priv->sock; - pfd[1].events = POLL_MASK_INPUT | POLL_MASK_ERROR; - pfd[1].revents = 0; - if (to_write) { - pfd[1].events |= POLL_MASK_OUTPUT; - } else { - pfd[0].events |= POLL_MASK_INPUT; - } - if (poll(pfd, 2, -1) < 0) { - gf_log(this->name, GF_LOG_ERROR, "poll failed"); - break; - } - if (pfd[0].revents & POLL_MASK_ERROR) { - gf_log(this->name, GF_LOG_ERROR, - "poll error on pipe"); - break; - } + gf_log (this->name, GF_LOG_TRACE, "%s (sock:%d) in:%d, out:%d, err:%d", + (priv->is_server ? "server" : "client"), + priv->sock, poll_in, poll_out, poll_err); - if (priv->ot_state == OT_PLEASE_DIE) { - gf_log (this->name, GF_LOG_DEBUG, - "OT_PLEASE_DIE on %p (exiting socket_poller)", - this); - break; - } + if (!poll_err) { + if (!socket_is_connected (this)) { + gf_log (this->name, GF_LOG_TRACE, + "%s (sock:%d) socket is not connected, " + "completing connection", + (priv->is_server ? "server" : "client"), + priv->sock); - if (pfd[1].revents & POLL_MASK_INPUT) { - ret = socket_event_poll_in(this, 0); - if (ret >= 0) { - /* Suppress errors while making progress. */ - pfd[1].revents &= ~POLL_MASK_ERROR; - } else if (errno == ENOTCONN) { - ret = 0; - } - if (priv->ot_state == OT_PLEASE_DIE) { - gf_log (this->name, GF_LOG_TRACE, - "OT_IDLE on %p (input request)", - this); - break; - } - } else if (pfd[1].revents & POLL_MASK_OUTPUT) { - ret = socket_event_poll_out(this); - if (ret >= 0) { - /* Suppress errors while making progress. */ - pfd[1].revents &= ~POLL_MASK_ERROR; - } else if (errno == ENOTCONN) { - ret = 0; - } - if (priv->ot_state == OT_PLEASE_DIE) { + ret = socket_complete_connection (this); + + gf_log (this->name, GF_LOG_TRACE, "(sock:%d) " + "socket_complete_connection() returned %d", + priv->sock, ret); + + if (ret > 0) { gf_log (this->name, GF_LOG_TRACE, - "OT_IDLE on %p (output request)", - this); - break; + "(sock:%d) returning to wait on socket", + priv->sock); + return 0; } } else { - /* - * This usually means that we left poll() because - * somebody pushed a byte onto our pipe. That wakeup - * is why the pipe is there, but once awake we can do - * all the checking we need on the next iteration. - */ - ret = 0; - } - if (pfd[1].revents & POLL_MASK_ERROR) { - gf_log(this->name, GF_LOG_ERROR, - "poll error on socket"); - break; - } - if (ret < 0) { - GF_LOG_OCCASIONALLY (poll_err_cnt, this->name, - GF_LOG_ERROR, - "socket_poller %s failed (%s)", - this->peerinfo.identifier, - strerror (errno)); - break; - } - if (priv->ot_gen != gen) { + char *sock_type = (priv->is_server ? "Server" : + "Client"); + gf_log (this->name, GF_LOG_TRACE, - "generation mismatch, my %u != %u", - gen, priv->ot_gen); - return NULL; + "%s socket (%d) is already connected", + sock_type, priv->sock); + ret = 0; } } -err: - /* All (and only) I/O errors should come here. */ - pthread_mutex_lock(&priv->in_lock); - pthread_mutex_lock(&priv->out_lock); - { - gf_log (this->name, GF_LOG_TRACE, "disconnecting socket"); - __socket_teardown_connection (this); - sys_close (priv->sock); - priv->sock = -1; - - sys_close (priv->pipe[0]); - sys_close (priv->pipe[1]); - priv->pipe[0] = -1; - priv->pipe[1] = -1; - - priv->ot_state = OT_IDLE; + if (!ret && poll_out) { + ret = socket_event_poll_out (this); + gf_log (this->name, GF_LOG_TRACE, "(sock:%d) " + "socket_event_poll_out returned %d", priv->sock, ret); } - pthread_mutex_unlock(&priv->out_lock); - pthread_mutex_unlock(&priv->in_lock); - - rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); - GF_REF_PUT (priv); - - rpc_transport_unref (this); + if (!ret && poll_in) { + ret = socket_event_poll_in (this, !poll_err); + gf_log (this->name, GF_LOG_TRACE, "(sock:%d) " + "socket_event_poll_in returned %d", priv->sock, ret); + notify_handled = _gf_true; + } - return NULL; -} + if ((ret < 0) || poll_err) { + struct sockaddr *sa = SA(&this->peerinfo.sockaddr); + if (priv->is_server && + SA(&this->myinfo.sockaddr)->sa_family == AF_UNIX) { + sa = SA(&this->myinfo.sockaddr); + } -static int -socket_spawn (rpc_transport_t *this) -{ - socket_private_t *priv = this->private; - int ret = -1; - switch (priv->ot_state) { - case OT_IDLE: - case OT_PLEASE_DIE: - break; - default: - gf_log (this->name, GF_LOG_WARNING, - "refusing to start redundant poller"); - return ret; - } + socket_dump_info (sa, priv->is_server, priv->use_ssl, + priv->sock, this->name, + "disconnecting from"); - priv->ot_gen += 7; - priv->ot_state = OT_SPAWNING; - gf_log (this->name, GF_LOG_TRACE, - "spawning %p with gen %u", this, priv->ot_gen); + /* Logging has happened already in earlier cases */ + gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG), + "EPOLLERR - disconnecting (sock:%d) (%s)", + priv->sock, (priv->use_ssl ? "SSL" : "non-SSL")); - GF_REF_GET (priv); + socket_closed = socket_event_poll_err (this, gen, idx); - /* Create thread after enable detach flag */ + if (socket_closed) + rpc_transport_unref (this); - ret = gf_thread_create_detached (&priv->thread, socket_poller, this, - "spoller"); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "could not create poll thread"); - ret = -1; + } else if (!notify_handled) { + event_handled (ctx->event_pool, fd, idx, gen); } +out: return ret; } @@ -2871,7 +3046,6 @@ socket_server_event_handler (int fd, int idx, int gen, void *data, socklen_t addrlen = sizeof (new_sockaddr); socket_private_t *new_priv = NULL; glusterfs_ctx_t *ctx = NULL; - char *cname = NULL; this = data; GF_VALIDATE_OR_GOTO ("socket", this, out); @@ -2965,6 +3139,10 @@ socket_server_event_handler (int fd, int idx, int gen, void *data, } get_transport_identifiers (new_trans); + gf_log (this->name, GF_LOG_TRACE, "XXX server:%s, client:%s", + new_trans->myinfo.identifier, + new_trans->peerinfo.identifier); + ret = socket_init(new_trans); if (ret != 0) { sys_close (new_sock); @@ -3000,23 +3178,16 @@ socket_server_event_handler (int fd, int idx, int gen, void *data, } new_priv->sock = new_sock; - new_priv->own_thread = priv->own_thread; + new_priv->ssl_enabled = priv->ssl_enabled; new_priv->ssl_ctx = priv->ssl_ctx; - if (new_priv->use_ssl && !new_priv->own_thread) { - cname = ssl_setup_connection(new_trans, 1); - if (!cname) { - gf_log(this->name, GF_LOG_ERROR, - "server setup failed"); - sys_close (new_sock); - GF_FREE (new_trans->name); - GF_FREE (new_trans); - goto out; - } - this->ssl_name = cname; - } + new_priv->connected = 1; + new_priv->is_server = _gf_true; - if (!priv->bio && !priv->own_thread) { + /* set O_NONBLOCK for plain text as well as ssl connections */ + if (!priv->bio) { + gf_log (this->name, GF_LOG_TRACE, + "### use non-blocking IO"); ret = __socket_nonblock (new_sock); if (ret == -1) { @@ -3030,29 +3201,13 @@ socket_server_event_handler (int fd, int idx, int gen, void *data, goto out; } } - /* - * In the own_thread case, this is used to - * indicate that we're initializing a server - * connection. + * This is the first ref on the newly accepted + * transport. */ - new_priv->connected = 1; - new_priv->is_server = _gf_true; rpc_transport_ref (new_trans); - if (new_priv->own_thread) { - if (pipe(new_priv->pipe) < 0) { - gf_log(this->name, GF_LOG_ERROR, - "could not create pipe"); - } - ret = socket_spawn(new_trans); - if (ret) { - gf_log(this->name, GF_LOG_ERROR, - "could not spawn thread"); - sys_close (new_priv->pipe[0]); - sys_close (new_priv->pipe[1]); - } - } else { + { /* Take a ref on the new_trans to avoid * getting deleted when event_register() * causes socket_event_handler() to race @@ -3118,9 +3273,6 @@ socket_server_event_handler (int fd, int idx, int gen, void *data, } } out: - if (cname && (cname != this->ssl_name)) { - GF_FREE(cname); - } return ret; } @@ -3130,36 +3282,12 @@ socket_disconnect (rpc_transport_t *this, gf_boolean_t wait) { socket_private_t *priv = NULL; int ret = -1; - char a_byte = 'r'; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; - if (wait && priv->own_thread) { - GF_REF_PUT (priv); - - pthread_mutex_lock (&priv->cond_lock); - { - /* Change the state to OT_PLEASE_DIE so that - * socket_poller can exit. */ - priv->ot_state = OT_PLEASE_DIE; - /* Write something into the pipe so that poller - * thread can wake up.*/ - if (sys_write (priv->pipe[1], &a_byte, 1) < 1) { - gf_log (this->name, GF_LOG_WARNING, - "write error on pipe"); - } - - /* Wait for socket_poller to exit */ - if (!priv->own_thread_done) - pthread_cond_wait (&priv->cond, - &priv->cond_lock); - } - pthread_mutex_unlock (&priv->cond_lock); - } - pthread_mutex_lock (&priv->in_lock); pthread_mutex_lock (&priv->out_lock); { @@ -3250,8 +3378,8 @@ socket_connect (rpc_transport_t *this, int port) gf_boolean_t refd = _gf_false; socket_connect_error_state_t *arg = NULL; pthread_t th_id = {0, }; - char *cname = NULL; gf_boolean_t ign_enoent = _gf_false; + gf_boolean_t connect_attempted = _gf_false; GF_VALIDATE_OR_GOTO ("socket", this, err); GF_VALIDATE_OR_GOTO ("socket", this->private, err); @@ -3268,7 +3396,6 @@ socket_connect (rpc_transport_t *this, int port) pthread_mutex_lock (&priv->in_lock); pthread_mutex_lock (&priv->out_lock); { - priv->own_thread_done = _gf_false; if (priv->sock != -1) { gf_log_callingfn (this->name, GF_LOG_TRACE, "connect () called on transport " @@ -3279,8 +3406,7 @@ socket_connect (rpc_transport_t *this, int port) } gf_log (this->name, GF_LOG_TRACE, - "connecting %p, state=%u gen=%u sock=%d", this, - priv->ot_state, priv->ot_gen, priv->sock); + "connecting %p, sock=%d", this, priv->sock); ret = socket_client_get_remote_sockaddr (this, &sock_union.sa, &sockaddr_len, &sa_family); @@ -3390,8 +3516,9 @@ socket_connect (rpc_transport_t *this, int port) } /* If client wants ENOENT to be ignored */ - ign_enoent = dict_get_str_boolean (this->options, - "transport.socket.ignore-enoent", _gf_false); + ign_enoent = dict_get_str_boolean (this->options, + "transport.socket.ignore-enoent", + _gf_false); ret = client_bind (this, SA (&this->myinfo.sockaddr), &this->myinfo.sockaddr_len, priv->sock); @@ -3401,17 +3528,27 @@ socket_connect (rpc_transport_t *this, int port) goto handler; } - if (!priv->use_ssl && !priv->bio && !priv->own_thread) { + /* make socket non-blocking for all types of sockets */ + if (!priv->bio) { ret = __socket_nonblock (priv->sock); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "NBIO on %d failed (%s)", priv->sock, strerror (errno)); goto handler; + } else { + gf_log (this->name, GF_LOG_TRACE, + ">>> connect() with non-blocking IO for ALL"); } } - this->connect_failed = _gf_false; + priv->connect_failed = 0; + priv->connected = 0; + + socket_dump_info (SA(&this->peerinfo.sockaddr), priv->is_server, + priv->use_ssl, priv->sock, this->name, + "connecting to"); + if (ign_enoent) { ret = connect_loop (priv->sock, SA (&this->peerinfo.sockaddr), @@ -3422,6 +3559,8 @@ socket_connect (rpc_transport_t *this, int port) this->peerinfo.sockaddr_len); } + connect_attempted = _gf_true; + if (ret == -1 && errno == ENOENT && ign_enoent) { gf_log (this->name, GF_LOG_WARNING, "Ignore failed connection attempt on %s, (%s) ", @@ -3467,87 +3606,33 @@ socket_connect (rpc_transport_t *this, int port) ret = 0; } - if (priv->use_ssl && !priv->own_thread) { - cname = ssl_setup_connection(this, 0); - if (!cname) { - errno = ENOTCONN; - ret = -1; - gf_log(this->name, GF_LOG_ERROR, - "client setup failed"); - goto handler; - } - if (priv->connected) { - this->ssl_name = cname; - } else { - GF_FREE(cname); - } - } - - if (!priv->bio && !priv->own_thread) { - ret = __socket_nonblock (priv->sock); - - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "NBIO on %d failed (%s)", - priv->sock, strerror (errno)); - goto handler; - } - } - handler: - if (ret < 0) { + if (ret < 0 && !connect_attempted) { /* Ignore error from connect. epoll events should be handled in the socket handler. shutdown(2) will result in EPOLLERR, so cleanup is done in socket_event_handler or socket_poller */ shutdown (priv->sock, SHUT_RDWR); + gf_log (this->name, GF_LOG_TRACE, + "@@@ client shutdown(%d, SHUT_RDWR)", + priv->sock); } - /* - * In the own_thread case, this is used to indicate that we're - * initializing a client connection. - */ priv->connected = 0; priv->is_server = _gf_false; rpc_transport_ref (this); refd = _gf_true; - if (priv->own_thread) { - if (priv->connect_failed) { - gf_msg_debug (this->name, 0, - "socket connect is failed so close it"); - sys_close (priv->sock); - priv->sock = -1; - ret = -1; - goto unlock; - } - - if (pipe(priv->pipe) < 0) { - gf_log(this->name, GF_LOG_ERROR, - "could not create pipe"); - } - - this->listener = this; - ret = socket_spawn(this); - if (ret) { - gf_log(this->name, GF_LOG_ERROR, - "could not spawn thread"); - sys_close (priv->pipe[0]); - sys_close (priv->pipe[1]); - sys_close (priv->sock); - priv->sock = -1; - } - } else { - priv->idx = event_register (ctx->event_pool, priv->sock, - socket_event_handler, - this, 1, 1); - if (priv->idx == -1) { - gf_log ("", GF_LOG_WARNING, - "failed to register the event"); - sys_close (priv->sock); - priv->sock = -1; - ret = -1; - } + this->listener = this; + priv->idx = event_register (ctx->event_pool, priv->sock, + socket_event_handler, + this, 1, 1); + if (priv->idx == -1) { + gf_log ("", GF_LOG_WARNING, + "failed to register the event"); + sys_close (priv->sock); + priv->sock = -1; + ret = -1; } unlock: @@ -3701,6 +3786,10 @@ socket_listen (rpc_transport_t *this) goto unlock; } + socket_dump_info (SA(&this->myinfo.sockaddr), priv->is_server, + priv->use_ssl, priv->sock, this->name, + "listening on"); + ret = listen (priv->sock, priv->backlog); if (ret == -1) { @@ -3742,11 +3831,11 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) { socket_private_t *priv = NULL; int ret = -1; - char need_poll_out = 0; - char need_append = 1; struct ioq *entry = NULL; glusterfs_ctx_t *ctx = NULL; - char a_byte = 'j'; + char need_poll_out = 0; + char need_append = 1; + GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -3784,19 +3873,9 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) if (need_append) { list_add_tail (&entry->list, &priv->ioq); - if (priv->own_thread) { - /* - * Make sure the polling thread wakes up, by - * writing a byte to represent this entry. - */ - if (sys_write (priv->pipe[1], &a_byte, 1) < 1) { - gf_log(this->name, GF_LOG_WARNING, - "write error on pipe"); - } - } ret = 0; } - if (!priv->own_thread && need_poll_out) { + if (need_poll_out) { /* first entry to wait. continue writing on POLLOUT */ priv->idx = event_select_on (ctx->event_pool, priv->sock, @@ -3816,11 +3895,11 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) { socket_private_t *priv = NULL; int ret = -1; - char need_poll_out = 0; - char need_append = 1; struct ioq *entry = NULL; glusterfs_ctx_t *ctx = NULL; - char a_byte = 'd'; + char need_poll_out = 0; + char need_append = 1; + GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -3842,6 +3921,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) priv->submit_log = 0; entry = __socket_ioq_new (this, &reply->msg); + if (!entry) goto unlock; @@ -3858,19 +3938,9 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) if (need_append) { list_add_tail (&entry->list, &priv->ioq); - if (priv->own_thread) { - /* - * Make sure the polling thread wakes up, by - * writing a byte to represent this entry. - */ - if (sys_write (priv->pipe[1], &a_byte, 1) < 1) { - gf_log(this->name, GF_LOG_WARNING, - "write error on pipe"); - } - } - ret = 0; } - if (!priv->own_thread && need_poll_out) { + + if (need_poll_out) { /* first entry to wait. continue writing on POLLOUT */ priv->idx = event_select_on (ctx->event_pool, priv->sock, @@ -4231,203 +4301,24 @@ fini_openssl_mt (void) ERR_free_strings(); } -static void -socket_poller_mayday (socket_private_t *priv) -{ - if (priv == NULL) - return; - - pthread_mutex_lock (&priv->cond_lock); - { - /* Signal waiting threads before exiting from socket_poller */ - if (!priv->own_thread_done) { - gf_log ("socket", GF_LOG_TRACE, "priv->cond SIGNALED"); - pthread_cond_signal (&priv->cond); - priv->own_thread_done = _gf_true; - } - } - pthread_mutex_unlock (&priv->cond_lock); -} - static int -socket_init (rpc_transport_t *this) +ssl_setup_connection_params(rpc_transport_t *this) { socket_private_t *priv = NULL; - gf_boolean_t tmp_bool = 0; - uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; char *optstr = NULL; - uint32_t timeout = 0; - int keepaliveidle = GF_KEEPALIVE_TIME; - int keepaliveintvl = GF_KEEPALIVE_INTERVAL; - int keepalivecnt = GF_KEEPALIVE_COUNT; - uint32_t backlog = 0; - int session_id = 0; + static int session_id = 1; int32_t cert_depth = DEFAULT_VERIFY_DEPTH; char *cipher_list = DEFAULT_CIPHER_LIST; char *dh_param = DEFAULT_DH_PARAM; char *ec_curve = DEFAULT_EC_CURVE; char *crl_path = NULL; - if (this->private) { - gf_log_callingfn (this->name, GF_LOG_ERROR, - "double init attempted"); - return -1; - } - - priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t); - if (!priv) { - return -1; - } - memset(priv, 0, sizeof(*priv)); - - pthread_mutex_init (&priv->in_lock, NULL); - pthread_mutex_init (&priv->out_lock, NULL); - pthread_mutex_init (&priv->cond_lock, NULL); - pthread_cond_init (&priv->cond, NULL); - - GF_REF_INIT (priv, socket_poller_mayday); - - priv->sock = -1; - priv->idx = -1; - priv->connected = -1; - priv->nodelay = 1; - priv->bio = 0; - priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; - INIT_LIST_HEAD (&priv->ioq); - pthread_mutex_init (&priv->notify.lock, NULL); - pthread_cond_init (&priv->notify.cond, NULL); - - /* All the below section needs 'this->options' to be present */ - if (!this->options) - goto out; - - if (dict_get (this->options, "non-blocking-io")) { - optstr = data_to_str (dict_get (this->options, - "non-blocking-io")); - - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->name, GF_LOG_ERROR, - "'non-blocking-io' takes only boolean options," - " not taking any action"); - tmp_bool = 1; - } - - if (!tmp_bool) { - priv->bio = 1; - gf_log (this->name, GF_LOG_WARNING, - "disabling non-blocking IO"); - } - } - - optstr = NULL; - - /* By default, we enable NODELAY */ - if (dict_get (this->options, "transport.socket.nodelay")) { - optstr = data_to_str (dict_get (this->options, - "transport.socket.nodelay")); - - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->name, GF_LOG_ERROR, - "'transport.socket.nodelay' takes only " - "boolean options, not taking any action"); - tmp_bool = 1; - } - if (!tmp_bool) { - priv->nodelay = 0; - gf_log (this->name, GF_LOG_DEBUG, - "disabling nodelay"); - } - } - - optstr = NULL; - if (dict_get_str (this->options, "tcp-window-size", - &optstr) == 0) { - if (gf_string2uint64 (optstr, &windowsize) != 0) { - gf_log (this->name, GF_LOG_ERROR, - "invalid number format: %s", optstr); - return -1; - } - } - - priv->windowsize = (int)windowsize; - - optstr = NULL; - /* Enable Keep-alive by default. */ - priv->keepalive = 1; - priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL; - priv->keepaliveidle = GF_KEEPALIVE_TIME; - priv->keepalivecnt = GF_KEEPALIVE_COUNT; - if (dict_get_str (this->options, "transport.socket.keepalive", - &optstr) == 0) { - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->name, GF_LOG_ERROR, - "'transport.socket.keepalive' takes only " - "boolean options, not taking any action"); - tmp_bool = 1; - } - - if (!tmp_bool) - priv->keepalive = 0; - } - - if (dict_get_int32 (this->options, "transport.tcp-user-timeout", - &(priv->timeout)) != 0) - priv->timeout = timeout; - gf_log (this->name, GF_LOG_DEBUG, "Configued " - "transport.tcp-user-timeout=%d", priv->timeout); - - if (dict_get_int32 (this->options, - "transport.socket.keepalive-time", - &(priv->keepaliveidle)) != 0) { - priv->keepaliveidle = keepaliveidle; - } - - if (dict_get_int32 (this->options, - "transport.socket.keepalive-interval", - &(priv->keepaliveintvl)) != 0) { - priv->keepaliveintvl = keepaliveintvl; - } - - if (dict_get_int32 (this->options, "transport.socket.keepalive-count", - &(priv->keepalivecnt)) != 0) - priv->keepalivecnt = keepalivecnt; - gf_log (this->name, GF_LOG_DEBUG, "Reconfigued " - "transport.keepalivecnt=%d", keepalivecnt); - - if (dict_get_uint32 (this->options, - "transport.listen-backlog", - &backlog) != 0) { - - backlog = GLUSTERFS_SOCKET_LISTEN_BACKLOG; - } - priv->backlog = backlog; - - optstr = NULL; - - /* Check if socket read failures are to be logged */ - priv->read_fail_log = 1; - if (dict_get (this->options, "transport.socket.read-fail-log")) { - optstr = data_to_str (dict_get (this->options, "transport.socket.read-fail-log")); - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->name, GF_LOG_WARNING, - "'transport.socket.read-fail-log' takes only " - "boolean options; logging socket read fails"); - } else if (tmp_bool == _gf_false) { - priv->read_fail_log = 0; - } - } - - priv->windowsize = (int)windowsize; + priv = this->private; - priv->ssl_enabled = _gf_false; - if (dict_get_str(this->options, SSL_ENABLED_OPT, &optstr) == 0) { - if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) { - gf_log (this->name, GF_LOG_ERROR, - "invalid value given for ssl-enabled boolean"); - } + if (priv->ssl_ctx != NULL) { + gf_log (this->name, GF_LOG_TRACE, "found old SSL context!"); + return 0; } - priv->mgmt_ssl = this->ctx->secure_mgmt; - priv->srvr_ssl = this->ctx->secure_srvr; priv->ssl_own_cert = DEFAULT_CERT_PATH; if (dict_get_str(this->options, SSL_OWN_CERT_OPT, &optstr) == 0) { @@ -4480,23 +4371,6 @@ socket_init (rpc_transport_t *this) gf_log(this->name, priv->mgmt_ssl ? GF_LOG_INFO: GF_LOG_DEBUG, "SSL support for glusterd is %s", priv->mgmt_ssl ? "ENABLED" : "NOT enabled"); - /* - * This might get overridden temporarily in socket_connect (q.v.) - * if we're using the glusterd portmapper. - */ - priv->use_ssl = priv->ssl_enabled; - - priv->own_thread = priv->use_ssl; - if (dict_get_str(this->options, OWN_THREAD_OPT, &optstr) == 0) { - gf_log (this->name, GF_LOG_INFO, "OWN_THREAD_OPT found"); - if (gf_string2boolean (optstr, &priv->own_thread) != 0) { - gf_log (this->name, GF_LOG_WARNING, - "invalid value given for own-thread boolean"); - } - } - gf_log(this->name, priv->own_thread ? GF_LOG_INFO : GF_LOG_DEBUG, - "using %s polling thread", - priv->own_thread ? "private" : "system"); if (!priv->mgmt_ssl) { if (!dict_get_int32 (this->options, SSL_CERT_DEPTH_OPT, &cert_depth)) { @@ -4537,7 +4411,7 @@ socket_init (rpc_transport_t *this) #error Old and insecure OpenSSL, use -DUSE_INSECURE_OPENSSL to use it anyway #endif /* SSLv23_method uses highest available protocol */ - priv->ssl_meth = (SSL_METHOD *)SSLv23_method(); + priv->ssl_meth = SSLv23_method(); #endif priv->ssl_ctx = SSL_CTX_new(priv->ssl_meth); @@ -4655,14 +4529,15 @@ socket_init (rpc_transport_t *this) x509store = SSL_CTX_get_cert_store(priv->ssl_ctx); X509_STORE_set_flags(x509store, - X509_V_FLAG_CRL_CHECK|X509_V_FLAG_CRL_CHECK_ALL); + X509_V_FLAG_CRL_CHECK | + X509_V_FLAG_CRL_CHECK_ALL); #else gf_log(this->name, GF_LOG_ERROR, "OpenSSL version does not support CRL"); #endif } - priv->ssl_session_id = ++session_id; + priv->ssl_session_id = session_id++; SSL_CTX_set_session_id_context(priv->ssl_ctx, (void *)&priv->ssl_session_id, sizeof(priv->ssl_session_id)); @@ -4677,27 +4552,194 @@ socket_init (rpc_transport_t *this) */ SSL_CTX_set_purpose(priv->ssl_ctx, X509_PURPOSE_ANY); } + return 0; + +err: + return -1; +} + +static int +socket_init (rpc_transport_t *this) +{ + socket_private_t *priv = NULL; + gf_boolean_t tmp_bool = 0; + uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; + char *optstr = NULL; + uint32_t timeout = 0; + int keepaliveidle = GF_KEEPALIVE_TIME; + int keepaliveintvl = GF_KEEPALIVE_INTERVAL; + int keepalivecnt = GF_KEEPALIVE_COUNT; + uint32_t backlog = 0; + + + if (this->private) { + gf_log_callingfn (this->name, GF_LOG_ERROR, + "double init attempted"); + return -1; + } - if (priv->own_thread) { - priv->ot_state = OT_IDLE; + priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t); + if (!priv) { + return -1; } + memset(priv, 0, sizeof(*priv)); -out: this->private = priv; - return 0; + pthread_mutex_init (&priv->in_lock, NULL); + pthread_mutex_init (&priv->out_lock, NULL); + pthread_mutex_init (&priv->cond_lock, NULL); + pthread_cond_init (&priv->cond, NULL); -err: - if (priv->ssl_own_cert) { - GF_FREE(priv->ssl_own_cert); + /*GF_REF_INIT (priv, socket_poller_mayday);*/ + + priv->sock = -1; + priv->idx = -1; + priv->connected = -1; + priv->nodelay = 1; + priv->bio = 0; + priv->ssl_accepted = _gf_false; + priv->ssl_connected = _gf_false; + priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; + INIT_LIST_HEAD (&priv->ioq); + pthread_mutex_init (&priv->notify.lock, NULL); + pthread_cond_init (&priv->notify.cond, NULL); + + /* All the below section needs 'this->options' to be present */ + if (!this->options) + goto out; + + if (dict_get (this->options, "non-blocking-io")) { + optstr = data_to_str (dict_get (this->options, + "non-blocking-io")); + + if (gf_string2boolean (optstr, &tmp_bool) == -1) { + gf_log (this->name, GF_LOG_ERROR, + "'non-blocking-io' takes only boolean options," + " not taking any action"); + tmp_bool = 1; + } + + if (!tmp_bool) { + priv->bio = 1; + gf_log (this->name, GF_LOG_WARNING, + "disabling non-blocking IO"); + } } - if (priv->ssl_private_key) { - GF_FREE(priv->ssl_private_key); + + optstr = NULL; + + /* By default, we enable NODELAY */ + if (dict_get (this->options, "transport.socket.nodelay")) { + optstr = data_to_str (dict_get (this->options, + "transport.socket.nodelay")); + + if (gf_string2boolean (optstr, &tmp_bool) == -1) { + gf_log (this->name, GF_LOG_ERROR, + "'transport.socket.nodelay' takes only " + "boolean options, not taking any action"); + tmp_bool = 1; + } + if (!tmp_bool) { + priv->nodelay = 0; + gf_log (this->name, GF_LOG_DEBUG, + "disabling nodelay"); + } } - if (priv->ssl_ca_list) { - GF_FREE(priv->ssl_ca_list); + + optstr = NULL; + if (dict_get_str (this->options, "tcp-window-size", + &optstr) == 0) { + if (gf_string2uint64 (optstr, &windowsize) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "invalid number format: %s", optstr); + return -1; + } } - GF_FREE(priv); - return -1; + + priv->windowsize = (int)windowsize; + + optstr = NULL; + /* Enable Keep-alive by default. */ + priv->keepalive = 1; + priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL; + priv->keepaliveidle = GF_KEEPALIVE_TIME; + priv->keepalivecnt = GF_KEEPALIVE_COUNT; + if (dict_get_str (this->options, "transport.socket.keepalive", + &optstr) == 0) { + if (gf_string2boolean (optstr, &tmp_bool) == -1) { + gf_log (this->name, GF_LOG_ERROR, + "'transport.socket.keepalive' takes only " + "boolean options, not taking any action"); + tmp_bool = 1; + } + + if (!tmp_bool) + priv->keepalive = 0; + } + + if (dict_get_int32 (this->options, "transport.tcp-user-timeout", + &(priv->timeout)) != 0) + priv->timeout = timeout; + gf_log (this->name, GF_LOG_DEBUG, "Configued " + "transport.tcp-user-timeout=%d", priv->timeout); + + if (dict_get_int32 (this->options, + "transport.socket.keepalive-time", + &(priv->keepaliveidle)) != 0) { + priv->keepaliveidle = keepaliveidle; + } + + if (dict_get_int32 (this->options, + "transport.socket.keepalive-interval", + &(priv->keepaliveintvl)) != 0) { + priv->keepaliveintvl = keepaliveintvl; + } + + if (dict_get_int32 (this->options, "transport.socket.keepalive-count", + &(priv->keepalivecnt)) != 0) + priv->keepalivecnt = keepalivecnt; + gf_log (this->name, GF_LOG_DEBUG, "Reconfigued " + "transport.keepalivecnt=%d", keepalivecnt); + + if (dict_get_uint32 (this->options, + "transport.listen-backlog", + &backlog) != 0) { + backlog = GLUSTERFS_SOCKET_LISTEN_BACKLOG; + } + priv->backlog = backlog; + + optstr = NULL; + + /* Check if socket read failures are to be logged */ + priv->read_fail_log = 1; + if (dict_get (this->options, "transport.socket.read-fail-log")) { + optstr = data_to_str (dict_get (this->options, + "transport.socket.read-fail-log")); + if (gf_string2boolean (optstr, &tmp_bool) == -1) { + gf_log (this->name, GF_LOG_WARNING, + "'transport.socket.read-fail-log' takes only " + "boolean options; logging socket read fails"); + } else if (tmp_bool == _gf_false) { + priv->read_fail_log = 0; + } + } + + priv->windowsize = (int)windowsize; + + priv->ssl_enabled = _gf_false; + if (dict_get_str(this->options, SSL_ENABLED_OPT, &optstr) == 0) { + if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "invalid value given for ssl-enabled boolean"); + } + } + priv->mgmt_ssl = this->ctx->secure_mgmt; + priv->srvr_ssl = this->ctx->secure_srvr; + + ssl_setup_connection_params(this); +out: + this->private = priv; + return 0; } diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index ccc2a84cb35..fdfc20774a8 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -248,12 +248,36 @@ typedef struct { char *ssl_ca_list; pthread_t thread; int pipe[2]; - gf_boolean_t own_thread; - gf_boolean_t own_thread_done; - ot_state_t ot_state; - uint32_t ot_gen; gf_boolean_t is_server; int log_ctr; + gf_boolean_t ssl_accepted; /* To indicate SSL_accept() */ + gf_boolean_t ssl_connected;/* or SSL_connect() has been + * been completed on this socket. + * These are valid only when + * use_ssl is true. + */ + /* SSL_CTX is created for each transport. Since we are now using non- + * blocking mechanism for SSL_accept() and SSL_connect(), the SSL + * context is created on the first EPOLLIN event which may lead to + * SSL_ERROR_WANT_READ/SSL_ERROR_WANT_WRITE and may not complete the + * SSL connection at the first attempt. + * ssl_context_created is a flag to note that we've created the SSL + * context for the connection so that we don't blindly create any more + * while !ssl_accepted or !ssl_connected. + */ + gf_boolean_t ssl_context_created; + gf_boolean_t accepted; /* explicit flag to be set in + * socket_event_handler() for + * newly accepted socket + */ + + /* ssl_error_required is used only during the SSL connection setup + * phase. + * It holds the error code returned by SSL_get_error() and is used to + * arm the epoll event set for the required event for the specific fd. + */ + int ssl_error_required; + GF_REF_DECL; /* refcount to keep track of socket_poller threads */ struct { |