diff options
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 4 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/Makefile.am | 2 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket-mem-types.h | 21 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 236 |
4 files changed, 169 insertions, 94 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 44324a80431..8a460cfa617 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -429,10 +429,6 @@ rpc_clnt_reconnect (void *conn_ptr) } pthread_mutex_unlock (&conn->lock); - if ((ret == -1) && (errno != EINPROGRESS) && (clnt->notifyfn)) { - clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL); - } - return; } diff --git a/rpc/rpc-transport/socket/src/Makefile.am b/rpc/rpc-transport/socket/src/Makefile.am index 71e6ed6ff4a..5e909aceac8 100644 --- a/rpc/rpc-transport/socket/src/Makefile.am +++ b/rpc/rpc-transport/socket/src/Makefile.am @@ -1,4 +1,4 @@ -noinst_HEADERS = socket.h name.h +noinst_HEADERS = socket.h name.h socket-mem-types.h rpctransport_LTLIBRARIES = socket.la rpctransportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport diff --git a/rpc/rpc-transport/socket/src/socket-mem-types.h b/rpc/rpc-transport/socket/src/socket-mem-types.h new file mode 100644 index 00000000000..e5553b172a2 --- /dev/null +++ b/rpc/rpc-transport/socket/src/socket-mem-types.h @@ -0,0 +1,21 @@ +/* + Copyright (c) 2008-2014 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef __SOCKET_MEM_TYPES_H__ +#define __SOCKET_MEM_TYPES_H__ + +#include "mem-types.h" + +typedef enum gf_sock_mem_types_ { + gf_sock_connect_error_state_t = gf_common_mt_end + 1, + gf_sock_mt_end +} gf_sock_mem_types_t; + +#endif diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 6f566e49345..6d4a862aa8d 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -23,7 +23,7 @@ #include "byte-order.h" #include "common-utils.h" #include "compat-errno.h" - +#include "socket-mem-types.h" /* ugly #includes below */ #include "protocol-common.h" @@ -152,6 +152,13 @@ typedef int SSL_trinary_func (SSL *, void *, int); __socket_proto_update_priv_after_read (priv, ret, bytes_read); \ } +struct socket_connect_error_state_ { + xlator_t *this; + rpc_transport_t *trans; + gf_boolean_t refd; +}; +typedef struct socket_connect_error_state_ socket_connect_error_state_t; + static int socket_init (rpc_transport_t *this); static void @@ -2652,19 +2659,41 @@ out: return ret; } +void* +socket_connect_error_cbk (void *opaque) +{ + socket_connect_error_state_t *arg; + + GF_ASSERT (opaque); + + arg = opaque; + THIS = arg->this; + + rpc_transport_notify (arg->trans, RPC_TRANSPORT_DISCONNECT, arg->trans); + + if (arg->refd) + rpc_transport_unref (arg->trans); + + GF_FREE (opaque); + return NULL; +} static int socket_connect (rpc_transport_t *this, int port) { - int ret = -1; - int sock = -1; - socket_private_t *priv = NULL; - socklen_t sockaddr_len = 0; - glusterfs_ctx_t *ctx = NULL; - sa_family_t sa_family = {0, }; - char *local_addr = NULL; - union gf_sock_union sock_union; - struct sockaddr_in *addr = NULL; + int ret = -1; + int th_ret = -1; + int sock = -1; + socket_private_t *priv = NULL; + socklen_t sockaddr_len = 0; + glusterfs_ctx_t *ctx = NULL; + sa_family_t sa_family = {0, }; + char *local_addr = NULL; + union gf_sock_union sock_union; + struct sockaddr_in *addr = NULL; + gf_boolean_t refd = _gf_false; + socket_connect_error_state_t *arg = NULL; + pthread_t th_id = {0, }; GF_VALIDATE_OR_GOTO ("socket", this, err); GF_VALIDATE_OR_GOTO ("socket", this->private, err); @@ -2680,52 +2709,43 @@ socket_connect (rpc_transport_t *this, int port) pthread_mutex_lock (&priv->lock); { - sock = priv->sock; - } - pthread_mutex_unlock (&priv->lock); - - if (sock != -1) { - gf_log_callingfn (this->name, GF_LOG_TRACE, - "connect () called on transport already connected"); - errno = EINPROGRESS; - ret = -1; - goto err; - } + if (priv->sock != -1) { + gf_log_callingfn (this->name, GF_LOG_TRACE, + "connect () called on transport " + "already connected"); + errno = EINPROGRESS; + ret = -1; + goto unlock; + } - gf_log (this->name, GF_LOG_TRACE, - "connecting %p, state=%u gen=%u sock=%d", this, - priv->ot_state, priv->ot_gen, priv->sock); + gf_log (this->name, GF_LOG_TRACE, + "connecting %p, state=%u gen=%u sock=%d", this, + priv->ot_state, priv->ot_gen, priv->sock); - ret = socket_client_get_remote_sockaddr (this, &sock_union.sa, - &sockaddr_len, &sa_family); - if (ret == -1) { - /* logged inside client_get_remote_sockaddr */ - goto err; - } + ret = socket_client_get_remote_sockaddr (this, &sock_union.sa, + &sockaddr_len, &sa_family); + if (ret == -1) { + /* logged inside client_get_remote_sockaddr */ + goto unlock; + } - if (port > 0) { - sock_union.sin.sin_port = htons (port); - } - if (ntohs(sock_union.sin.sin_port) == GF_DEFAULT_SOCKET_LISTEN_PORT) { - if (priv->use_ssl) { - gf_log(this->name,GF_LOG_DEBUG, - "disabling SSL for portmapper connection"); - priv->use_ssl = _gf_false; + if (port > 0) { + sock_union.sin.sin_port = htons (port); } - } - else { - if (priv->ssl_enabled && !priv->use_ssl) { - gf_log(this->name,GF_LOG_DEBUG, - "re-enabling SSL for I/O connection"); - priv->use_ssl = _gf_true; + if (ntohs(sock_union.sin.sin_port) == + GF_DEFAULT_SOCKET_LISTEN_PORT) { + if (priv->use_ssl) { + gf_log(this->name,GF_LOG_DEBUG, + "disabling SSL for portmapper connection"); + priv->use_ssl = _gf_false; + } } - } - pthread_mutex_lock (&priv->lock); - { - if (priv->sock != -1) { - gf_log (this->name, GF_LOG_TRACE, - "connect() -- already connected"); - goto unlock; + else { + if (priv->ssl_enabled && !priv->use_ssl) { + gf_log(this->name,GF_LOG_DEBUG, + "re-enabling SSL for I/O connection"); + priv->use_ssl = _gf_true; + } } memcpy (&this->peerinfo.sockaddr, &sock_union.storage, @@ -2737,6 +2757,7 @@ socket_connect (rpc_transport_t *this, int port) gf_log (this->name, GF_LOG_ERROR, "socket creation failed (%s)", strerror (errno)); + ret = -1; goto unlock; } @@ -2795,7 +2816,8 @@ socket_connect (rpc_transport_t *this, int port) &local_addr); if (!ret && SA (&this->myinfo.sockaddr)->sa_family == AF_INET) { addr = (struct sockaddr_in *)(&this->myinfo.sockaddr); - ret = inet_pton (AF_INET, local_addr, &(addr->sin_addr.s_addr)); + ret = inet_pton (AF_INET, local_addr, + &(addr->sin_addr.s_addr)); } ret = client_bind (this, SA (&this->myinfo.sockaddr), @@ -2803,9 +2825,7 @@ socket_connect (rpc_transport_t *this, int port) if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, "client bind failed: %s", strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; + goto handler; } if (!priv->use_ssl && !priv->bio && !priv->own_thread) { @@ -2814,9 +2834,7 @@ socket_connect (rpc_transport_t *this, int port) gf_log (this->name, GF_LOG_ERROR, "NBIO on %d failed (%s)", priv->sock, strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; + goto handler; } } @@ -2832,21 +2850,20 @@ socket_connect (rpc_transport_t *this, int port) GF_LOG_DEBUG : GF_LOG_ERROR), "connection attempt on %s failed, (%s)", this->peerinfo.identifier, strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; + goto handler; + } + else { + ret = 0; } - if (priv->use_ssl && !priv->own_thread) { - ret = ssl_setup_connection(this,0); - if (ret < 0) { - gf_log(this->name,GF_LOG_ERROR, - "client setup failed"); - close(priv->sock); - priv->sock = -1; - goto unlock; - } - } + if (priv->use_ssl && !priv->own_thread) { + ret = ssl_setup_connection(this,0); + if (ret < 0) { + gf_log(this->name,GF_LOG_ERROR, + "client setup failed"); + goto handler; + } + } if (!priv->bio && !priv->own_thread) { ret = __socket_nonblock (priv->sock); @@ -2855,10 +2872,24 @@ socket_connect (rpc_transport_t *this, int port) gf_log (this->name, GF_LOG_ERROR, "NBIO on %d failed (%s)", priv->sock, strerror (errno)); - close (priv->sock); + goto handler; + } + } + +handler: + if (ret < 0) { + if (priv->own_thread) { + close(priv->sock); priv->sock = -1; goto unlock; } + else { + /* Ignore error from connect. epoll events + should be handled in the socket handler. + shutdown(2) will result in EPOLLERR, so + cleanup is done in socket_event_handler */ + shutdown (priv->sock, SHUT_RDWR); + } } /* @@ -2868,31 +2899,58 @@ socket_connect (rpc_transport_t *this, int port) priv->connected = 0; priv->is_server = _gf_false; rpc_transport_ref (this); + refd = _gf_true; - if (priv->own_thread) { - if (pipe(priv->pipe) < 0) { - gf_log(this->name,GF_LOG_ERROR, - "could not create pipe"); - } + if (priv->own_thread) { + if (pipe(priv->pipe) < 0) { + gf_log(this->name,GF_LOG_ERROR, + "could not create pipe"); + } this->listener = this; socket_spawn(this); - } - else { - priv->idx = event_register (ctx->event_pool, priv->sock, - socket_event_handler, - this, 1, 1); - if (priv->idx == -1) { - gf_log ("", GF_LOG_WARNING, - "failed to register the event"); - ret = -1; - } - } - } + } + else { + priv->idx = event_register (ctx->event_pool, priv->sock, + socket_event_handler, + this, 1, 1); + if (priv->idx == -1) { + gf_log ("", GF_LOG_WARNING, + "failed to register the event"); + close(priv->sock); + priv->sock = -1; + ret = -1; + } + } + unlock: + sock = priv->sock; + } pthread_mutex_unlock (&priv->lock); err: + /* if sock != -1, then cleanup is done from the event handler */ + if (ret == -1 && sock == -1) { + /* Cleaup requires to send notification to upper layer which + intern holds the big_lock. There can be dead-lock situation + if big_lock is already held by the current thread. + So transfer the ownership to seperate thread for cleanup. + */ + arg = GF_CALLOC (1, sizeof (*arg), + gf_sock_connect_error_state_t); + arg->this = THIS; + arg->trans = this; + arg->refd = refd; + th_ret = pthread_create (&th_id, NULL, socket_connect_error_cbk, + arg); + if (th_ret) { + gf_log (this->name, GF_LOG_ERROR, "pthread_create" + "failed: %s", strerror(errno)); + GF_FREE (arg); + GF_ASSERT (0); + } + } + return ret; } |