summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport/socket/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c1002
1 files changed, 524 insertions, 478 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 776e647d4f6..ed8b473be23 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -10,22 +10,16 @@
#include "socket.h"
#include "name.h"
-#include "dict.h"
-#include "rpc-transport.h"
-#include "logging.h"
-#include "xlator.h"
-#include "syscall.h"
-#include "byte-order.h"
-#include "common-utils.h"
-#include "compat-errno.h"
+#include <glusterfs/dict.h>
+#include <glusterfs/syscall.h>
+#include <glusterfs/byte-order.h>
+#include <glusterfs/compat-errno.h>
#include "socket-mem-types.h"
-#include "timer.h"
/* ugly #includes below */
#include "protocol-common.h"
#include "glusterfs3-xdr.h"
#include "glusterfs4-xdr.h"
-#include "xdr-nfs3.h"
#include "rpcsvc.h"
/* for TCP_USER_TIMEOUT */
@@ -35,7 +29,6 @@
#include <netinet/tcp.h>
#endif
-#include <fcntl.h>
#include <errno.h>
#include <rpc/xdr.h>
#include <sys/ioctl.h>
@@ -139,7 +132,7 @@ ssl_setup_connection_params(rpc_transport_t *this);
\
gf_log(this->name, GF_LOG_TRACE, \
"partial read on non-blocking socket"); \
- \
+ ret = 0; \
break; \
} \
}
@@ -173,7 +166,7 @@ ssl_setup_connection_params(rpc_transport_t *this);
\
ret = __socket_readv(this, in->pending_vector, 1, &in->pending_vector, \
&in->pending_count, &bytes_read); \
- if (ret == -1) \
+ if (ret < 0) \
break; \
__socket_proto_update_priv_after_read(priv, ret, bytes_read); \
}
@@ -198,7 +191,7 @@ socket_dump_info(struct sockaddr *sa, int is_server, int is_ssl, int sock,
0,
};
char *addr = NULL;
- char *peer_type = NULL;
+ const char *peer_type = NULL;
int af = sa->sa_family;
int so_error = -1;
socklen_t slen = sizeof(so_error);
@@ -253,7 +246,6 @@ ssl_do(rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func)
int r = (-1);
socket_private_t *priv = NULL;
- GF_VALIDATE_OR_GOTO(this->name, this->private, out);
priv = this->private;
if (buf) {
@@ -269,11 +261,9 @@ ssl_do(rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func)
}
r = func(priv->ssl_ssl, buf, len);
} else {
- /*
- * We actually need these functions to get to
- * priv->connected == 1.
- */
- r = ((SSL_unary_func *)func)(priv->ssl_ssl);
+ /* This should be treated as error */
+ gf_log(this->name, GF_LOG_ERROR, "buffer is empty %s", __func__);
+ goto out;
}
switch (SSL_get_error(priv->ssl_ssl, r)) {
case SSL_ERROR_NONE:
@@ -308,14 +298,69 @@ out:
#define ssl_write_one(t, b, l) \
ssl_do((t), (b), (l), (SSL_trinary_func *)SSL_write)
-int
-ssl_setup_connection_prefix(rpc_transport_t *this)
+/* set crl verify flags only for server */
+/* see man X509_VERIFY_PARAM_SET_FLAGS(3)
+ * X509_V_FLAG_CRL_CHECK enables CRL checking for the certificate chain
+ * leaf certificate. An error occurs if a suitable CRL cannot be found.
+ * Since we're never going to revoke a gluster node cert, we better disable
+ * CRL check for server certs to avoid getting error and failed connection
+ * attempts.
+ */
+static void
+ssl_clear_crl_verify_flags(SSL_CTX *ssl_ctx)
+{
+#ifdef X509_V_FLAG_CRL_CHECK_ALL
+#ifdef HAVE_SSL_CTX_GET0_PARAM
+ X509_VERIFY_PARAM *vpm;
+
+ vpm = SSL_CTX_get0_param(ssl_ctx);
+ if (vpm) {
+ X509_VERIFY_PARAM_clear_flags(
+ vpm, (X509_V_FLAG_CRL_CHECK | X509_V_FLAG_CRL_CHECK_ALL));
+ }
+#else
+ /* CRL verify flag need not be cleared for rhel6 kind of clients */
+#endif
+#else
+ gf_log(this->name, GF_LOG_ERROR, "OpenSSL version does not support CRL");
+#endif
+ return;
+}
+
+/* set crl verify flags only for server */
+static void
+ssl_set_crl_verify_flags(SSL_CTX *ssl_ctx)
+{
+#ifdef X509_V_FLAG_CRL_CHECK_ALL
+#ifdef HAVE_SSL_CTX_GET0_PARAM
+ X509_VERIFY_PARAM *vpm;
+
+ vpm = SSL_CTX_get0_param(ssl_ctx);
+ if (vpm) {
+ unsigned long flags;
+
+ flags = X509_VERIFY_PARAM_get_flags(vpm);
+ flags |= (X509_V_FLAG_CRL_CHECK | X509_V_FLAG_CRL_CHECK_ALL);
+ X509_VERIFY_PARAM_set_flags(vpm, flags);
+ }
+#else
+ X509_STORE *x509store;
+
+ x509store = SSL_CTX_get_cert_store(ssl_ctx);
+ X509_STORE_set_flags(x509store,
+ X509_V_FLAG_CRL_CHECK | X509_V_FLAG_CRL_CHECK_ALL);
+#endif
+#else
+ gf_log(this->name, GF_LOG_ERROR, "OpenSSL version does not support CRL");
+#endif
+}
+
+static int
+ssl_setup_connection_prefix(rpc_transport_t *this, gf_boolean_t server)
{
int ret = -1;
socket_private_t *priv = NULL;
- GF_VALIDATE_OR_GOTO(this->name, this->private, done);
-
priv = this->private;
if (ssl_setup_connection_params(this) < 0) {
@@ -332,6 +377,9 @@ ssl_setup_connection_prefix(rpc_transport_t *this)
priv->ssl_accepted = _gf_false;
priv->ssl_context_created = _gf_false;
+ if (!server && priv->crl_path)
+ ssl_clear_crl_verify_flags(priv->ssl_ctx);
+
priv->ssl_ssl = SSL_new(priv->ssl_ctx);
if (!priv->ssl_ssl) {
gf_log(this->name, GF_LOG_ERROR, "SSL_new failed");
@@ -364,7 +412,6 @@ ssl_setup_connection_postfix(rpc_transport_t *this)
char peer_CN[256] = "";
socket_private_t *priv = NULL;
- GF_VALIDATE_OR_GOTO(this->name, this->private, done);
priv = this->private;
/* Make sure _SSL verification_ succeeded, yielding an identity. */
@@ -386,6 +433,7 @@ ssl_setup_connection_postfix(rpc_transport_t *this)
gf_log(this->name, GF_LOG_DEBUG,
"SSL verification succeeded (client: %s) (server: %s)",
this->peerinfo.identifier, this->myinfo.identifier);
+ X509_free(peer);
return gf_strdup(peer_CN);
/* Error paths. */
@@ -397,11 +445,10 @@ ssl_error:
SSL_free(priv->ssl_ssl);
priv->ssl_ssl = NULL;
-done:
return NULL;
}
-int
+static int
ssl_complete_connection(rpc_transport_t *this)
{
int ret = -1; /* 1 : implies go back to epoll_wait()
@@ -640,7 +687,6 @@ __socket_rwv(rpc_transport_t *this, struct iovec *vector, int count,
int opcount = 0;
int moved = 0;
- GF_VALIDATE_OR_GOTO("socket", this, out);
GF_VALIDATE_OR_GOTO("socket", this->private, out);
priv = this->private;
@@ -679,11 +725,11 @@ __socket_rwv(rpc_transport_t *this, struct iovec *vector, int count,
ret = sys_writev(sock, opvector, IOV_MIN(opcount));
}
- if (ret == 0 || (ret == -1 && errno == EAGAIN)) {
+ if ((ret == 0) || ((ret < 0) && (errno == EAGAIN))) {
/* done for now */
break;
- }
- this->total_bytes_write += ret;
+ } else if (ret > 0)
+ this->total_bytes_write += ret;
} else {
ret = __socket_cached_read(this, opvector, opcount);
if (ret == 0) {
@@ -694,11 +740,11 @@ __socket_rwv(rpc_transport_t *this, struct iovec *vector, int count,
errno = ENODATA;
ret = -1;
}
- if (ret == -1 && errno == EAGAIN) {
+ if ((ret < 0) && (errno == EAGAIN)) {
/* done for now */
break;
- }
- this->total_bytes_read += ret;
+ } else if (ret > 0)
+ this->total_bytes_read += ret;
}
if (ret == 0) {
@@ -710,7 +756,7 @@ __socket_rwv(rpc_transport_t *this, struct iovec *vector, int count,
errno = ENOTCONN;
break;
}
- if (ret == -1) {
+ if (ret < 0) {
if (errno == EINTR)
continue;
@@ -773,24 +819,16 @@ static int
__socket_readv(rpc_transport_t *this, struct iovec *vector, int count,
struct iovec **pending_vector, int *pending_count, size_t *bytes)
{
- int ret = -1;
-
- ret = __socket_rwv(this, vector, count, pending_vector, pending_count,
- bytes, 0);
-
- return ret;
+ return __socket_rwv(this, vector, count, pending_vector, pending_count,
+ bytes, 0);
}
static int
__socket_writev(rpc_transport_t *this, struct iovec *vector, int count,
struct iovec **pending_vector, int *pending_count)
{
- int ret = -1;
-
- ret = __socket_rwv(this, vector, count, pending_vector, pending_count, NULL,
- 1);
-
- return ret;
+ return __socket_rwv(this, vector, count, pending_vector, pending_count,
+ NULL, 1);
}
static int
@@ -807,8 +845,8 @@ __socket_shutdown(rpc_transport_t *this)
gf_log(this->name, GF_LOG_DEBUG, "shutdown() returned %d. %s", ret,
strerror(errno));
} else {
- gf_log(this->name, GF_LOG_INFO, "intentional socket shutdown(%d)",
- priv->sock);
+ GF_LOG_OCCASIONALLY(priv->shutdown_log_ctr, this->name, GF_LOG_INFO,
+ "intentional socket shutdown(%d)", priv->sock);
}
return ret;
@@ -817,20 +855,14 @@ __socket_shutdown(rpc_transport_t *this)
static int
__socket_teardown_connection(rpc_transport_t *this)
{
- int ret = -1;
socket_private_t *priv = NULL;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
priv = this->private;
if (priv->use_ssl)
ssl_teardown_connection(priv);
- ret = __socket_shutdown(this);
-out:
- return ret;
+ return __socket_shutdown(this);
}
static int
@@ -839,15 +871,12 @@ __socket_disconnect(rpc_transport_t *this)
int ret = -1;
socket_private_t *priv = NULL;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
priv = this->private;
gf_log(this->name, GF_LOG_TRACE, "disconnecting %p, sock=%d", this,
priv->sock);
- if (priv->sock != -1) {
+ if (priv->sock >= 0) {
gf_log_callingfn(this->name, GF_LOG_TRACE,
"tearing down socket connection");
ret = __socket_teardown_connection(this);
@@ -858,7 +887,6 @@ __socket_disconnect(rpc_transport_t *this)
}
}
-out:
return ret;
}
@@ -872,9 +900,8 @@ __socket_server_bind(rpc_transport_t *this)
int ret = -1;
int opt = 1;
int reuse_check_sock = -1;
-
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
+ uint16_t sin_port = 0;
+ int retries = 0;
priv = this->private;
ctx = this->ctx;
@@ -882,7 +909,7 @@ __socket_server_bind(rpc_transport_t *this)
ret = setsockopt(priv->sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
- if (ret == -1) {
+ if (ret != 0) {
gf_log(this->name, GF_LOG_ERROR,
"setsockopt() for SO_REUSEADDR failed (%s)", strerror(errno));
}
@@ -895,7 +922,7 @@ __socket_server_bind(rpc_transport_t *this)
if (reuse_check_sock >= 0) {
ret = connect(reuse_check_sock, SA(&unix_addr),
this->myinfo.sockaddr_len);
- if ((ret == -1) && (ECONNREFUSED == errno)) {
+ if ((ret != 0) && (ECONNREFUSED == errno)) {
sys_unlink(((struct sockaddr_un *)&unix_addr)->sun_path);
}
gf_log(this->name, GF_LOG_INFO,
@@ -904,19 +931,47 @@ __socket_server_bind(rpc_transport_t *this)
}
}
- ret = bind(priv->sock, (struct sockaddr *)&this->myinfo.sockaddr,
- this->myinfo.sockaddr_len);
+ if (AF_UNIX != SA(&this->myinfo.sockaddr)->sa_family) {
+ sin_port = (int)ntohs(
+ ((struct sockaddr_in *)&this->myinfo.sockaddr)->sin_port);
+ if (!sin_port) {
+ sin_port = GF_DEFAULT_SOCKET_LISTEN_PORT;
+ ((struct sockaddr_in *)&this->myinfo.sockaddr)->sin_port = htons(
+ sin_port);
+ }
+ retries = 10;
+ while (retries) {
+ ret = bind(priv->sock, (struct sockaddr *)&this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len);
+ if (ret != 0) {
+ gf_log(this->name, GF_LOG_ERROR, "binding to %s failed: %s",
+ this->myinfo.identifier, strerror(errno));
+ if (errno == EADDRINUSE) {
+ gf_log(this->name, GF_LOG_ERROR, "Port is already in use");
+ sleep(1);
+ retries--;
+ } else {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ } else {
+ ret = bind(priv->sock, (struct sockaddr *)&this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len);
- if (ret == -1) {
- gf_log(this->name, GF_LOG_ERROR, "binding to %s failed: %s",
- this->myinfo.identifier, strerror(errno));
- if (errno == EADDRINUSE) {
- gf_log(this->name, GF_LOG_ERROR, "Port is already in use");
+ if (ret != 0) {
+ gf_log(this->name, GF_LOG_ERROR, "binding to %s failed: %s",
+ this->myinfo.identifier, strerror(errno));
+ if (errno == EADDRINUSE) {
+ gf_log(this->name, GF_LOG_ERROR, "Port is already in use");
+ }
}
}
if (AF_UNIX != SA(&this->myinfo.sockaddr)->sa_family) {
if (getsockname(priv->sock, SA(&this->myinfo.sockaddr),
- &this->myinfo.sockaddr_len) == -1) {
+ &this->myinfo.sockaddr_len) != 0) {
gf_log(this->name, GF_LOG_WARNING,
"getsockname on (%d) failed (%s)", priv->sock,
strerror(errno));
@@ -944,7 +999,7 @@ __socket_nonblock(int fd)
flags = fcntl(fd, F_GETFL);
- if (flags != -1)
+ if (flags >= 0)
ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
return ret;
@@ -974,7 +1029,7 @@ __socket_keepalive(int fd, int family, int keepaliveintvl, int keepaliveidle,
#endif
ret = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on));
- if (ret == -1) {
+ if (ret != 0) {
gf_log("socket", GF_LOG_WARNING,
"failed to set keep alive option on socket %d", fd);
goto err;
@@ -991,7 +1046,7 @@ __socket_keepalive(int fd, int family, int keepaliveintvl, int keepaliveidle,
ret = setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE, &keepaliveintvl,
sizeof(keepaliveintvl));
#endif
- if (ret == -1) {
+ if (ret != 0) {
gf_log("socket", GF_LOG_WARNING,
"failed to set keep alive interval on socket %d", fd);
goto err;
@@ -1002,7 +1057,7 @@ __socket_keepalive(int fd, int family, int keepaliveintvl, int keepaliveidle,
ret = setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &keepaliveidle,
sizeof(keepaliveidle));
- if (ret == -1) {
+ if (ret != 0) {
gf_log("socket", GF_LOG_WARNING,
"failed to set keep idle %d on socket %d, %s", keepaliveidle, fd,
strerror(errno));
@@ -1010,7 +1065,7 @@ __socket_keepalive(int fd, int family, int keepaliveintvl, int keepaliveidle,
}
ret = setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &keepaliveintvl,
sizeof(keepaliveintvl));
- if (ret == -1) {
+ if (ret != 0) {
gf_log("socket", GF_LOG_WARNING,
"failed to set keep interval %d on socket %d, %s",
keepaliveintvl, fd, strerror(errno));
@@ -1022,7 +1077,7 @@ __socket_keepalive(int fd, int family, int keepaliveintvl, int keepaliveidle,
goto done;
ret = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_ms,
sizeof(timeout_ms));
- if (ret == -1) {
+ if (ret != 0) {
gf_log("socket", GF_LOG_WARNING,
"failed to set "
"TCP_USER_TIMEOUT %d on socket %d, %s",
@@ -1033,7 +1088,7 @@ __socket_keepalive(int fd, int family, int keepaliveintvl, int keepaliveidle,
#if defined(TCP_KEEPCNT)
ret = setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &keepalivecnt,
sizeof(keepalivecnt));
- if (ret == -1) {
+ if (ret != 0) {
gf_log("socket", GF_LOG_WARNING,
"failed to set "
"TCP_KEEPCNT %d on socket %d, %s",
@@ -1075,9 +1130,6 @@ __socket_reset(rpc_transport_t *this)
{
socket_private_t *priv = NULL;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
priv = this->private;
/* TODO: use mem-pool on incoming data */
@@ -1096,8 +1148,16 @@ __socket_reset(rpc_transport_t *this)
memset(&priv->incoming, 0, sizeof(priv->incoming));
- event_unregister_close(this->ctx->event_pool, priv->sock, priv->idx);
-
+ gf_event_unregister_close(this->ctx->event_pool, priv->sock, priv->idx);
+ if (priv->use_ssl && priv->ssl_ssl) {
+ SSL_clear(priv->ssl_ssl);
+ SSL_free(priv->ssl_ssl);
+ priv->ssl_ssl = NULL;
+ }
+ if (priv->ssl_ctx) {
+ SSL_CTX_free(priv->ssl_ctx);
+ priv->ssl_ctx = NULL;
+ }
priv->sock = -1;
priv->idx = -1;
priv->connected = -1;
@@ -1117,8 +1177,6 @@ __socket_reset(rpc_transport_t *this)
GF_FREE(priv->ssl_ca_list);
priv->ssl_ca_list = NULL;
}
-out:
- return;
}
static void
@@ -1148,8 +1206,6 @@ __socket_ioq_new(rpc_transport_t *this, rpc_transport_msg_t *msg)
int count = 0;
uint32_t size = 0;
- GF_VALIDATE_OR_GOTO("socket", this, out);
-
/* TODO: use mem-pool */
entry = GF_CALLOC(1, sizeof(*entry), gf_common_mt_ioq);
if (!entry)
@@ -1204,7 +1260,6 @@ __socket_ioq_new(rpc_transport_t *this, rpc_transport_msg_t *msg)
INIT_LIST_HEAD(&entry->list);
-out:
return entry;
}
@@ -1225,27 +1280,18 @@ out:
}
static void
-__socket_ioq_flush(rpc_transport_t *this)
+__socket_ioq_flush(socket_private_t *priv)
{
- socket_private_t *priv = NULL;
struct ioq *entry = NULL;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
- priv = this->private;
-
while (!list_empty(&priv->ioq)) {
entry = priv->ioq_next;
__socket_ioq_entry_free(entry);
}
-
-out:
- return;
}
static int
-__socket_ioq_churn_entry(rpc_transport_t *this, struct ioq *entry, int direct)
+__socket_ioq_churn_entry(rpc_transport_t *this, struct ioq *entry)
{
int ret = -1;
@@ -1268,16 +1314,13 @@ __socket_ioq_churn(rpc_transport_t *this)
int ret = 0;
struct ioq *entry = NULL;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
priv = this->private;
while (!list_empty(&priv->ioq)) {
/* pick next entry */
entry = priv->ioq_next;
- ret = __socket_ioq_churn_entry(this, entry, 0);
+ ret = __socket_ioq_churn_entry(this, entry);
if (ret != 0)
break;
@@ -1285,11 +1328,10 @@ __socket_ioq_churn(rpc_transport_t *this)
if (list_empty(&priv->ioq)) {
/* all pending writes done, not interested in POLLOUT */
- priv->idx = event_select_on(this->ctx->event_pool, priv->sock,
- priv->idx, -1, 0);
+ priv->idx = gf_event_select_on(this->ctx->event_pool, priv->sock,
+ priv->idx, -1, 0);
}
-out:
return ret;
}
@@ -1299,22 +1341,17 @@ socket_event_poll_err(rpc_transport_t *this, int gen, int idx)
socket_private_t *priv = NULL;
gf_boolean_t socket_closed = _gf_false;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
priv = this->private;
- pthread_mutex_lock(&priv->in_lock);
pthread_mutex_lock(&priv->out_lock);
{
- if ((priv->gen == gen) && (priv->idx == idx) && (priv->sock != -1)) {
- __socket_ioq_flush(this);
+ if ((priv->gen == gen) && (priv->idx == idx) && (priv->sock >= 0)) {
+ __socket_ioq_flush(priv);
__socket_reset(this);
socket_closed = _gf_true;
}
}
pthread_mutex_unlock(&priv->out_lock);
- pthread_mutex_unlock(&priv->in_lock);
if (socket_closed) {
pthread_mutex_lock(&priv->notify.lock);
@@ -1327,7 +1364,6 @@ socket_event_poll_err(rpc_transport_t *this, int gen, int idx)
rpc_transport_notify(this, RPC_TRANSPORT_DISCONNECT, this);
}
-out:
return socket_closed;
}
@@ -1337,9 +1373,6 @@ socket_event_poll_out(rpc_transport_t *this)
socket_private_t *priv = NULL;
int ret = -1;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
priv = this->private;
pthread_mutex_lock(&priv->out_lock);
@@ -1347,7 +1380,7 @@ socket_event_poll_out(rpc_transport_t *this)
if (priv->connected == 1) {
ret = __socket_ioq_churn(this);
- if (ret == -1) {
+ if (ret < 0) {
gf_log(this->name, GF_LOG_TRACE,
"__socket_ioq_churn returned -1; "
"disconnecting socket");
@@ -1358,9 +1391,11 @@ socket_event_poll_out(rpc_transport_t *this)
pthread_mutex_unlock(&priv->out_lock);
if (ret == 0)
- ret = rpc_transport_notify(this, RPC_TRANSPORT_MSG_SENT, NULL);
+ rpc_transport_notify(this, RPC_TRANSPORT_MSG_SENT, NULL);
+
+ if (ret > 0)
+ ret = 0;
-out:
return ret;
}
@@ -1403,7 +1438,7 @@ __socket_read_simple_msg(rpc_transport_t *this)
&bytes_read);
}
- if (ret == -1) {
+ if (ret < 0) {
gf_log(this->name, GF_LOG_WARNING,
"reading from socket failed. Error (%s), "
"peer (%s)",
@@ -1417,6 +1452,7 @@ __socket_read_simple_msg(rpc_transport_t *this)
if (ret > 0) {
gf_log(this->name, GF_LOG_TRACE,
"partial read on non-blocking socket.");
+ ret = 0;
break;
}
@@ -1429,12 +1465,6 @@ out:
return ret;
}
-static int
-__socket_read_simple_request(rpc_transport_t *this)
-{
- return __socket_read_simple_msg(this);
-}
-
#define rpc_cred_addr(buf) (buf + RPC_MSGTYPE_SIZE + RPC_CALL_BODY_SIZE - 4)
#define rpc_verf_addr(fragcurrent) (fragcurrent - 4)
@@ -1461,9 +1491,6 @@ __socket_read_vectored_request(rpc_transport_t *this,
struct gf_sock_incoming_frag *frag = NULL;
sp_rpcfrag_request_state_t *request = NULL;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
priv = this->private;
/* used to reduce the indirection */
@@ -1600,8 +1627,8 @@ __socket_read_vectored_request(rpc_transport_t *this,
remaining_size = RPC_FRAGSIZE(in->fraghdr) - frag->bytes_read;
- if ((ret == -1) || ((ret == 0) && (remaining_size == 0) &&
- RPC_LASTFRAG(in->fraghdr))) {
+ if ((ret < 0) || ((ret == 0) && (remaining_size == 0) &&
+ RPC_LASTFRAG(in->fraghdr))) {
request->vector_state = SP_STATE_VECTORED_REQUEST_INIT;
in->payload_vector.iov_len = ((unsigned long)frag->fragcurrent -
(unsigned long)
@@ -1610,7 +1637,6 @@ __socket_read_vectored_request(rpc_transport_t *this,
break;
}
-out:
return ret;
}
@@ -1627,9 +1653,6 @@ __socket_read_request(rpc_transport_t *this)
struct gf_sock_incoming_frag *frag = NULL;
sp_rpcfrag_request_state_t *request = NULL;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
priv = this->private;
/* used to reduce the indirection */
@@ -1673,20 +1696,19 @@ __socket_read_request(rpc_transport_t *this)
if (vector_sizer) {
ret = __socket_read_vectored_request(this, vector_sizer);
} else {
- ret = __socket_read_simple_request(this);
+ ret = __socket_read_simple_msg(this);
}
remaining_size = RPC_FRAGSIZE(in->fraghdr) - frag->bytes_read;
- if ((ret == -1) || ((ret == 0) && (remaining_size == 0) &&
- (RPC_LASTFRAG(in->fraghdr)))) {
+ if ((ret < 0) || ((ret == 0) && (remaining_size == 0) &&
+ (RPC_LASTFRAG(in->fraghdr)))) {
request->header_state = SP_STATE_REQUEST_HEADER_INIT;
}
break;
}
-out:
return ret;
}
@@ -1704,9 +1726,7 @@ __socket_read_accepted_successful_reply(rpc_transport_t *this)
XDR xdr;
struct gf_sock_incoming *in = NULL;
struct gf_sock_incoming_frag *frag = NULL;
-
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
+ uint32_t remaining_size = 0;
priv = this->private;
@@ -1807,7 +1827,9 @@ __socket_read_accepted_successful_reply(rpc_transport_t *this)
case SP_STATE_READ_PROC_HEADER:
/* now read the entire remaining msg into new iobuf */
ret = __socket_read_simple_msg(this);
- if ((ret == -1) || ((ret == 0) && RPC_LASTFRAG(in->fraghdr))) {
+ remaining_size = RPC_FRAGSIZE(in->fraghdr) - frag->bytes_read;
+ if ((ret < 0) || ((ret == 0) && (remaining_size == 0) &&
+ RPC_LASTFRAG(in->fraghdr))) {
frag->call_body.reply.accepted_success_state =
SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT;
}
@@ -1833,9 +1855,7 @@ __socket_read_accepted_successful_reply_v2(rpc_transport_t *this)
XDR xdr;
struct gf_sock_incoming *in = NULL;
struct gf_sock_incoming_frag *frag = NULL;
-
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
+ uint32_t remaining_size = 0;
priv = this->private;
@@ -1937,7 +1957,9 @@ __socket_read_accepted_successful_reply_v2(rpc_transport_t *this)
case SP_STATE_READ_PROC_HEADER:
/* now read the entire remaining msg into new iobuf */
ret = __socket_read_simple_msg(this);
- if ((ret == -1) || ((ret == 0) && RPC_LASTFRAG(in->fraghdr))) {
+ remaining_size = RPC_FRAGSIZE(in->fraghdr) - frag->bytes_read;
+ if ((ret < 0) || ((ret == 0) && (remaining_size == 0) &&
+ RPC_LASTFRAG(in->fraghdr))) {
frag->call_body.reply.accepted_success_state =
SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT;
}
@@ -1963,9 +1985,6 @@ __socket_read_accepted_reply(rpc_transport_t *this)
struct gf_sock_incoming *in = NULL;
struct gf_sock_incoming_frag *frag = NULL;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
priv = this->private;
/* used to reduce the indirection */
in = &priv->incoming;
@@ -2036,8 +2055,8 @@ __socket_read_accepted_reply(rpc_transport_t *this)
remaining_size = RPC_FRAGSIZE(in->fraghdr) - frag->bytes_read;
- if ((ret == -1) || ((ret == 0) && (remaining_size == 0) &&
- (RPC_LASTFRAG(in->fraghdr)))) {
+ if ((ret < 0) || ((ret == 0) && (remaining_size == 0) &&
+ (RPC_LASTFRAG(in->fraghdr)))) {
frag->call_body.reply
.accepted_state = SP_STATE_ACCEPTED_REPLY_INIT;
}
@@ -2045,7 +2064,6 @@ __socket_read_accepted_reply(rpc_transport_t *this)
break;
}
-out:
return ret;
}
@@ -2067,9 +2085,6 @@ __socket_read_vectored_reply(rpc_transport_t *this)
struct gf_sock_incoming *in = NULL;
struct gf_sock_incoming_frag *frag = NULL;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
priv = this->private;
in = &priv->incoming;
frag = &in->frag;
@@ -2102,8 +2117,8 @@ __socket_read_vectored_reply(rpc_transport_t *this)
remaining_size = RPC_FRAGSIZE(in->fraghdr) - frag->bytes_read;
- if ((ret == -1) || ((ret == 0) && (remaining_size == 0) &&
- (RPC_LASTFRAG(in->fraghdr)))) {
+ if ((ret < 0) || ((ret == 0) && (remaining_size == 0) &&
+ (RPC_LASTFRAG(in->fraghdr)))) {
frag->call_body.reply
.status_state = SP_STATE_VECTORED_REPLY_STATUS_INIT;
in->payload_vector.iov_len = (unsigned long)frag->fragcurrent -
@@ -2113,7 +2128,6 @@ __socket_read_vectored_reply(rpc_transport_t *this)
break;
}
-out:
return ret;
}
@@ -2136,9 +2150,6 @@ __socket_read_reply(rpc_transport_t *this)
struct gf_sock_incoming *in = NULL;
struct gf_sock_incoming_frag *frag = NULL;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
priv = this->private;
in = &priv->incoming;
frag = &in->frag;
@@ -2164,17 +2175,13 @@ __socket_read_reply(rpc_transport_t *this)
* and priv->lock, since we are doing an upcall here.
*/
frag->state = SP_STATE_NOTIFYING_XID;
- pthread_mutex_unlock(&priv->in_lock);
- {
- ret = rpc_transport_notify(this, RPC_TRANSPORT_MAP_XID_REQUEST,
- in->request_info);
- }
- pthread_mutex_lock(&priv->in_lock);
+ ret = rpc_transport_notify(this, RPC_TRANSPORT_MAP_XID_REQUEST,
+ in->request_info);
/* Transition back to externally visible state. */
frag->state = SP_STATE_READ_MSGTYPE;
- if (ret == -1) {
+ if (ret < 0) {
gf_log(this->name, GF_LOG_WARNING,
"notify for event MAP_XID failed for %s",
this->peerinfo.identifier);
@@ -2208,9 +2215,6 @@ __socket_read_frag(rpc_transport_t *this)
struct gf_sock_incoming *in = NULL;
struct gf_sock_incoming_frag *frag = NULL;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
priv = this->private;
/* used to reduce the indirection */
in = &priv->incoming;
@@ -2252,8 +2256,8 @@ __socket_read_frag(rpc_transport_t *this)
remaining_size = RPC_FRAGSIZE(in->fraghdr) - frag->bytes_read;
- if ((ret == -1) || ((ret == 0) && (remaining_size == 0) &&
- (RPC_LASTFRAG(in->fraghdr)))) {
+ if ((ret < 0) || ((ret == 0) && (remaining_size == 0) &&
+ (RPC_LASTFRAG(in->fraghdr)))) {
/* frag->state = SP_STATE_NADA; */
frag->state = SP_STATE_RPCFRAG_INIT;
}
@@ -2268,7 +2272,6 @@ __socket_read_frag(rpc_transport_t *this)
break;
}
-out:
return ret;
}
@@ -2337,13 +2340,14 @@ __socket_proto_state_machine(rpc_transport_t *this,
ret = __socket_readv(this, in->pending_vector, 1,
&in->pending_vector, &in->pending_count,
NULL);
- if (ret == -1)
+ if (ret < 0)
goto out;
if (ret > 0) {
gf_log(this->name, GF_LOG_TRACE,
"partial "
"fragment header read");
+ ret = 0;
goto out;
}
@@ -2358,7 +2362,7 @@ __socket_proto_state_machine(rpc_transport_t *this,
in->total_bytes_read += RPC_FRAGSIZE(in->fraghdr);
if (in->total_bytes_read >= GF_UNIT_GB) {
- ret = -ENOMEM;
+ ret = -1;
goto out;
}
@@ -2366,7 +2370,7 @@ __socket_proto_state_machine(rpc_transport_t *this,
this->ctx->iobuf_pool,
(in->total_bytes_read + sizeof(in->fraghdr)));
if (!iobuf) {
- ret = -ENOMEM;
+ ret = -1;
goto out;
}
@@ -2393,7 +2397,7 @@ __socket_proto_state_machine(rpc_transport_t *this,
case SP_STATE_READING_FRAG:
ret = __socket_read_frag(this);
- if ((ret == -1) ||
+ if ((ret < 0) ||
(frag->bytes_read != RPC_FRAGSIZE(in->fraghdr))) {
goto out;
}
@@ -2474,10 +2478,6 @@ __socket_proto_state_machine(rpc_transport_t *this,
}
out:
- if ((ret == -1) && (errno == EAGAIN)) {
- ret = 0;
- }
-
return ret;
}
@@ -2485,22 +2485,34 @@ static int
socket_proto_state_machine(rpc_transport_t *this,
rpc_transport_pollin_t **pollin)
{
- socket_private_t *priv = NULL;
- int ret = 0;
+ return __socket_proto_state_machine(this, pollin);
+}
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
+static void
+socket_event_poll_in_async(xlator_t *xl, gf_async_t *async)
+{
+ rpc_transport_pollin_t *pollin;
+ rpc_transport_t *this;
+ socket_private_t *priv;
+ pollin = caa_container_of(async, rpc_transport_pollin_t, async);
+ this = pollin->trans;
priv = this->private;
- pthread_mutex_lock(&priv->in_lock);
+ rpc_transport_notify(this, RPC_TRANSPORT_MSG_RECEIVED, pollin);
+
+ rpc_transport_unref(this);
+
+ rpc_transport_pollin_destroy(pollin);
+
+ pthread_mutex_lock(&priv->notify.lock);
{
- ret = __socket_proto_state_machine(this, pollin);
- }
- pthread_mutex_unlock(&priv->in_lock);
+ --priv->notify.in_progress;
-out:
- return ret;
+ if (!priv->notify.in_progress)
+ pthread_cond_signal(&priv->notify.cond);
+ }
+ pthread_mutex_unlock(&priv->notify.lock);
}
static int
@@ -2523,22 +2535,12 @@ socket_event_poll_in(rpc_transport_t *this, gf_boolean_t notify_handled)
pthread_mutex_unlock(&priv->notify.lock);
}
- if (notify_handled && (ret != -1))
- event_handled(ctx->event_pool, priv->sock, priv->idx, priv->gen);
+ if (notify_handled && (ret >= 0))
+ gf_event_handled(ctx->event_pool, priv->sock, priv->idx, priv->gen);
if (pollin) {
- ret = rpc_transport_notify(this, RPC_TRANSPORT_MSG_RECEIVED, pollin);
-
- rpc_transport_pollin_destroy(pollin);
-
- pthread_mutex_lock(&priv->notify.lock);
- {
- --priv->notify.in_progress;
-
- if (!priv->notify.in_progress)
- pthread_cond_signal(&priv->notify.cond);
- }
- pthread_mutex_unlock(&priv->notify.lock);
+ rpc_transport_ref(this);
+ gf_async(&pollin->async, THIS, socket_event_poll_in_async);
}
return ret;
@@ -2552,12 +2554,8 @@ socket_connect_finish(rpc_transport_t *this)
rpc_transport_event_t event = 0;
char notify_rpc = 0;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
priv = this->private;
- pthread_mutex_lock(&priv->in_lock);
pthread_mutex_lock(&priv->out_lock);
{
if (priv->connected != 0)
@@ -2567,10 +2565,10 @@ socket_connect_finish(rpc_transport_t *this)
ret = __socket_connect_finish(priv->sock);
- if (ret == -1 && errno == EINPROGRESS)
+ if ((ret < 0) && (errno == EINPROGRESS))
ret = 1;
- if (ret == -1 && errno != EINPROGRESS) {
+ if ((ret < 0) && (errno != EINPROGRESS)) {
if (!priv->connect_finish_log) {
gf_log(this->name, GF_LOG_ERROR,
"connection to %s failed (%s); "
@@ -2589,7 +2587,7 @@ socket_connect_finish(rpc_transport_t *this)
ret = getsockname(priv->sock, SA(&this->myinfo.sockaddr),
&this->myinfo.sockaddr_len);
- if (ret == -1) {
+ if (ret != 0) {
gf_log(this->name, GF_LOG_WARNING,
"getsockname on (%d) failed (%s) - "
"disconnecting socket",
@@ -2606,12 +2604,11 @@ socket_connect_finish(rpc_transport_t *this)
}
unlock:
pthread_mutex_unlock(&priv->out_lock);
- pthread_mutex_unlock(&priv->in_lock);
if (notify_rpc) {
rpc_transport_notify(this, event, this);
}
-out:
+
return ret;
}
@@ -2620,12 +2617,8 @@ socket_disconnect(rpc_transport_t *this, gf_boolean_t wait);
/* socket_is_connected() is for use only in socket_event_handler() */
static inline gf_boolean_t
-socket_is_connected(rpc_transport_t *this)
+socket_is_connected(socket_private_t *priv)
{
- socket_private_t *priv = NULL;
-
- priv = this->private;
-
if (priv->use_ssl) {
return priv->is_server ? priv->ssl_accepted : priv->ssl_connected;
} else {
@@ -2650,10 +2643,10 @@ ssl_rearm_event_fd(rpc_transport_t *this)
fd = priv->sock;
if (priv->ssl_error_required == SSL_ERROR_WANT_READ)
- event_select_on(ctx->event_pool, fd, idx, 1, -1);
+ gf_event_select_on(ctx->event_pool, fd, idx, 1, -1);
if (priv->ssl_error_required == SSL_ERROR_WANT_WRITE)
- event_select_on(ctx->event_pool, fd, idx, -1, 1);
- event_handled(ctx->event_pool, fd, idx, gen);
+ gf_event_select_on(ctx->event_pool, fd, idx, -1, 1);
+ gf_event_handled(ctx->event_pool, fd, idx, gen);
}
static int
@@ -2674,7 +2667,7 @@ ssl_handle_server_connection_attempt(rpc_transport_t *this)
fd = priv->sock;
if (!priv->ssl_context_created) {
- ret = ssl_setup_connection_prefix(this);
+ ret = ssl_setup_connection_prefix(this, _gf_true);
if (ret < 0) {
gf_log(this->name, GF_LOG_TRACE,
"> ssl_setup_connection_prefix() failed!");
@@ -2687,8 +2680,8 @@ ssl_handle_server_connection_attempt(rpc_transport_t *this)
ret = ssl_complete_connection(this);
if (ret == 0) {
/* nothing to do */
- event_select_on(ctx->event_pool, fd, idx, 1, 0);
- event_handled(ctx->event_pool, fd, idx, gen);
+ gf_event_select_on(ctx->event_pool, fd, idx, 1, 0);
+ gf_event_handled(ctx->event_pool, fd, idx, gen);
ret = 1;
} else {
if (errno == EAGAIN) {
@@ -2722,13 +2715,13 @@ ssl_handle_client_connection_attempt(rpc_transport_t *this)
/* SSL client */
if (priv->connect_failed) {
gf_log(this->name, GF_LOG_TRACE, ">>> disconnecting SSL socket");
- ret = socket_disconnect(this, _gf_false);
+ (void)socket_disconnect(this, _gf_false);
/* Force ret to be -1, as we are officially done with
this socket */
ret = -1;
} else {
if (!priv->ssl_context_created) {
- ret = ssl_setup_connection_prefix(this);
+ ret = ssl_setup_connection_prefix(this, _gf_false);
if (ret < 0) {
gf_log(this->name, GF_LOG_TRACE,
"> ssl_setup_connection_prefix() "
@@ -2742,7 +2735,7 @@ ssl_handle_client_connection_attempt(rpc_transport_t *this)
ret = ssl_complete_connection(this);
if (ret == 0) {
ret = socket_connect_finish(this);
- event_select_on(ctx->event_pool, fd, idx, 1, 0);
+ gf_event_select_on(ctx->event_pool, fd, idx, 1, 0);
gf_log(this->name, GF_LOG_TRACE, ">>> completed client connect");
} else {
if (errno == EAGAIN) {
@@ -2752,7 +2745,7 @@ ssl_handle_client_connection_attempt(rpc_transport_t *this)
ret = 1;
} else {
/* this is a connection failure */
- ret = socket_connect_finish(this);
+ (void)socket_connect_finish(this);
gf_log(this->name, GF_LOG_TRACE,
"ssl_complete_connection "
"returned error");
@@ -2811,7 +2804,7 @@ socket_handle_client_connection_attempt(rpc_transport_t *this)
* return 1
*/
ret = 1;
- event_handled(ctx->event_pool, fd, idx, gen);
+ gf_event_handled(ctx->event_pool, fd, idx, gen);
}
}
return ret;
@@ -2847,7 +2840,7 @@ socket_complete_connection(rpc_transport_t *this)
* socket_server_event_handler()
*/
priv->accepted = _gf_true;
- event_handled(ctx->event_pool, fd, idx, gen);
+ gf_event_handled(ctx->event_pool, fd, idx, gen);
ret = 1;
} else {
ret = socket_handle_client_connection_attempt(this);
@@ -2857,7 +2850,7 @@ socket_complete_connection(rpc_transport_t *this)
}
/* reads rpc_requests during pollin */
-static int
+static void
socket_event_handler(int fd, int idx, int gen, void *data, int poll_in,
int poll_out, int poll_err, char event_thread_died)
{
@@ -2871,9 +2864,16 @@ socket_event_handler(int fd, int idx, int gen, void *data, int poll_in,
if (event_thread_died) {
/* to avoid duplicate notifications, notify only for listener sockets */
- return 0;
+ return;
}
+ /* At this point we are sure no other thread is using the transport because
+ * we cannot receive more events until we call gf_event_handled(). However
+ * this function may call gf_event_handled() in some cases. When this is
+ * done, the transport may be destroyed at any moment if another thread
+ * handled an error event. To prevent that we take a reference here. */
+ rpc_transport_ref(this);
+
GF_VALIDATE_OR_GOTO("socket", this, out);
GF_VALIDATE_OR_GOTO("socket", this->private, out);
GF_VALIDATE_OR_GOTO("socket", this->xl, out);
@@ -2882,21 +2882,19 @@ socket_event_handler(int fd, int idx, int gen, void *data, int poll_in,
priv = this->private;
ctx = this->ctx;
- pthread_mutex_lock(&priv->in_lock);
pthread_mutex_lock(&priv->out_lock);
{
priv->idx = idx;
priv->gen = gen;
}
pthread_mutex_unlock(&priv->out_lock);
- pthread_mutex_unlock(&priv->in_lock);
gf_log(this->name, GF_LOG_TRACE, "%s (sock:%d) in:%d, out:%d, err:%d",
(priv->is_server ? "server" : "client"), priv->sock, poll_in,
poll_out, poll_err);
if (!poll_err) {
- if (!socket_is_connected(this)) {
+ if (!socket_is_connected(priv)) {
gf_log(this->name, GF_LOG_TRACE,
"%s (sock:%d) socket is not connected, "
"completing connection",
@@ -2912,7 +2910,7 @@ socket_event_handler(int fd, int idx, int gen, void *data, int poll_in,
if (ret > 0) {
gf_log(this->name, GF_LOG_TRACE,
"(sock:%d) returning to wait on socket", priv->sock);
- return 0;
+ goto out;
}
} else {
char *sock_type = (priv->is_server ? "Server" : "Client");
@@ -2952,6 +2950,13 @@ socket_event_handler(int fd, int idx, int gen, void *data, int poll_in,
socket_dump_info(sa, priv->is_server, priv->use_ssl, priv->sock,
this->name, "disconnecting from");
+ /* Dump the SSL error stack to clear any errors that may otherwise
+ * resurface in the future.
+ */
+ if (priv->use_ssl && priv->ssl_ssl) {
+ ssl_dump_error_stack(this->name);
+ }
+
/* Logging has happened already in earlier cases */
gf_log("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),
"EPOLLERR - disconnecting (sock:%d) (%s)", priv->sock,
@@ -2963,14 +2968,14 @@ socket_event_handler(int fd, int idx, int gen, void *data, int poll_in,
rpc_transport_unref(this);
} else if (!notify_handled) {
- event_handled(ctx->event_pool, fd, idx, gen);
+ gf_event_handled(ctx->event_pool, fd, idx, gen);
}
out:
- return ret;
+ rpc_transport_unref(this);
}
-static int
+static void
socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
int poll_out, int poll_err, char event_thread_died)
{
@@ -2999,7 +3004,7 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
if (event_thread_died) {
rpc_transport_notify(this, RPC_TRANSPORT_EVENT_THREAD_DIED,
(void *)(unsigned long)gen);
- return 0;
+ return;
}
/* NOTE:
@@ -3009,6 +3014,12 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
* thread context while we are using it here.
*/
priv->idx = idx;
+ priv->gen = gen;
+
+ if (poll_err) {
+ socket_event_poll_err(this, gen, idx);
+ goto out;
+ }
if (poll_in) {
int aflags = 0;
@@ -3018,31 +3029,33 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
new_sock = sys_accept(priv->sock, SA(&new_sockaddr), &addrlen, aflags);
- event_handled(ctx->event_pool, fd, idx, gen);
+ gf_event_handled(ctx->event_pool, fd, idx, gen);
- if (new_sock == -1) {
+ if (new_sock < 0) {
gf_log(this->name, GF_LOG_WARNING, "accept on %d failed (%s)",
priv->sock, strerror(errno));
goto out;
}
- if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) {
- ret = __socket_nodelay(new_sock);
- if (ret == -1) {
- gf_log(this->name, GF_LOG_WARNING,
- "setsockopt() failed for "
- "NODELAY (%s)",
- strerror(errno));
+ if (new_sockaddr.ss_family != AF_UNIX) {
+ if (priv->nodelay) {
+ ret = __socket_nodelay(new_sock);
+ if (ret != 0) {
+ gf_log(this->name, GF_LOG_WARNING,
+ "setsockopt() failed for "
+ "NODELAY (%s)",
+ strerror(errno));
+ }
}
- }
- if (priv->keepalive && new_sockaddr.ss_family != AF_UNIX) {
- ret = __socket_keepalive(new_sock, new_sockaddr.ss_family,
- priv->keepaliveintvl, priv->keepaliveidle,
- priv->keepalivecnt, priv->timeout);
- if (ret == -1)
- gf_log(this->name, GF_LOG_WARNING,
- "Failed to set keep-alive: %s", strerror(errno));
+ if (priv->keepalive) {
+ ret = __socket_keepalive(
+ new_sock, new_sockaddr.ss_family, priv->keepaliveintvl,
+ priv->keepaliveidle, priv->keepalivecnt, priv->timeout);
+ if (ret != 0)
+ gf_log(this->name, GF_LOG_WARNING,
+ "Failed to set keep-alive: %s", strerror(errno));
+ }
}
new_trans = GF_CALLOC(1, sizeof(*new_trans), gf_common_mt_rpc_trans_t);
@@ -3056,7 +3069,7 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
}
ret = pthread_mutex_init(&new_trans->lock, NULL);
- if (ret == -1) {
+ if (ret != 0) {
gf_log(this->name, GF_LOG_WARNING,
"pthread_mutex_init() failed: %s; closing newly accepted "
"socket %d",
@@ -3076,7 +3089,7 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
ret = getsockname(new_sock, SA(&new_trans->myinfo.sockaddr),
&new_trans->myinfo.sockaddr_len);
- if (ret == -1) {
+ if (ret != 0) {
gf_log(this->name, GF_LOG_WARNING,
"getsockname on socket %d "
"failed (errno:%s); closing newly accepted socket",
@@ -3091,7 +3104,30 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
gf_log(this->name, GF_LOG_TRACE, "XXX server:%s, client:%s",
new_trans->myinfo.identifier, new_trans->peerinfo.identifier);
+ /* Make options available to local socket_init() to create new
+ * SSL_CTX per transport. A separate SSL_CTX per transport is
+ * required to avoid setting crl checking options for client
+ * connections. The verification options eventually get copied
+ * to the SSL object. Unfortunately, there's no way to identify
+ * whether socket_init() is being called after a client-side
+ * connect() or a server-side accept(). Although, we could pass
+ * a flag from the transport init() to the socket_init() and
+ * from this place, this doesn't identify the case where the
+ * server-side transport loading is done for the first time.
+ * Also, SSL doesn't apply for UNIX sockets.
+ */
+ if (new_sockaddr.ss_family != AF_UNIX)
+ new_trans->options = dict_ref(this->options);
+ new_trans->ctx = this->ctx;
+
ret = socket_init(new_trans);
+
+ /* reset options to NULL to avoid double free */
+ if (new_sockaddr.ss_family != AF_UNIX) {
+ dict_unref(new_trans->options);
+ new_trans->options = NULL;
+ }
+
if (ret != 0) {
gf_log(this->name, GF_LOG_WARNING,
"initialization of new_trans "
@@ -3133,7 +3169,6 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
new_priv->sock = new_sock;
new_priv->ssl_enabled = priv->ssl_enabled;
- new_priv->ssl_ctx = priv->ssl_ctx;
new_priv->connected = 1;
new_priv->is_server = _gf_true;
@@ -3160,8 +3195,8 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
*/
ret = rpc_transport_notify(this, RPC_TRANSPORT_ACCEPT, new_trans);
- if (ret != -1) {
- new_priv->idx = event_register(
+ if (ret >= 0) {
+ new_priv->idx = gf_event_register(
ctx->event_pool, new_sock, socket_event_handler, new_trans,
1, 0, new_trans->notify_poller_death);
if (new_priv->idx == -1) {
@@ -3198,7 +3233,7 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
rpc_transport_unref(new_trans);
}
- if (ret == -1) {
+ if (ret < 0) {
gf_log(this->name, GF_LOG_WARNING, "closing newly accepted socket");
sys_close(new_sock);
/* this unref is to actually cause the destruction of
@@ -3208,7 +3243,7 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
}
}
out:
- return ret;
+ return;
}
static int
@@ -3217,20 +3252,14 @@ socket_disconnect(rpc_transport_t *this, gf_boolean_t wait)
socket_private_t *priv = NULL;
int ret = -1;
- GF_VALIDATE_OR_GOTO("socket", this, out);
- GF_VALIDATE_OR_GOTO("socket", this->private, out);
-
priv = this->private;
- pthread_mutex_lock(&priv->in_lock);
pthread_mutex_lock(&priv->out_lock);
{
ret = __socket_disconnect(this);
}
pthread_mutex_unlock(&priv->out_lock);
- pthread_mutex_unlock(&priv->in_lock);
-out:
return ret;
}
@@ -3329,10 +3358,9 @@ socket_connect(rpc_transport_t *this, int port)
goto err;
}
- pthread_mutex_lock(&priv->in_lock);
pthread_mutex_lock(&priv->out_lock);
{
- if (priv->sock != -1) {
+ if (priv->sock >= 0) {
gf_log_callingfn(this->name, GF_LOG_TRACE,
"connect () called on transport "
"already connected");
@@ -3346,7 +3374,7 @@ socket_connect(rpc_transport_t *this, int port)
ret = socket_client_get_remote_sockaddr(this, &sock_union.sa,
&sockaddr_len, &sa_family);
- if (ret == -1) {
+ if (ret < 0) {
/* logged inside client_get_remote_sockaddr */
goto unlock;
}
@@ -3365,7 +3393,7 @@ socket_connect(rpc_transport_t *this, int port)
this->peerinfo.sockaddr_len = sockaddr_len;
priv->sock = sys_socket(sa_family, SOCK_STREAM, 0);
- if (priv->sock == -1) {
+ if (priv->sock < 0) {
gf_log(this->name, GF_LOG_ERROR, "socket creation failed (%s)",
strerror(errno));
ret = -1;
@@ -3377,7 +3405,7 @@ socket_connect(rpc_transport_t *this, int port)
*/
if (priv->windowsize != 0) {
if (setsockopt(priv->sock, SOL_SOCKET, SO_RCVBUF, &priv->windowsize,
- sizeof(priv->windowsize)) < 0) {
+ sizeof(priv->windowsize)) != 0) {
gf_log(this->name, GF_LOG_ERROR,
"setting receive window "
"size failed: %d: %d: %s",
@@ -3385,7 +3413,7 @@ socket_connect(rpc_transport_t *this, int port)
}
if (setsockopt(priv->sock, SOL_SOCKET, SO_SNDBUF, &priv->windowsize,
- sizeof(priv->windowsize)) < 0) {
+ sizeof(priv->windowsize)) != 0) {
gf_log(this->name, GF_LOG_ERROR,
"setting send window size "
"failed: %d: %d: %s",
@@ -3407,30 +3435,32 @@ socket_connect(rpc_transport_t *this, int port)
}
#endif
- if (priv->nodelay && (sa_family != AF_UNIX)) {
- ret = __socket_nodelay(priv->sock);
-
- if (ret == -1) {
- gf_log(this->name, GF_LOG_ERROR, "NODELAY on %d failed (%s)",
- priv->sock, strerror(errno));
+ if (sa_family != AF_UNIX) {
+ if (priv->nodelay) {
+ ret = __socket_nodelay(priv->sock);
+ if (ret != 0) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "NODELAY on %d failed (%s)", priv->sock,
+ strerror(errno));
+ }
}
- }
- if (priv->keepalive && sa_family != AF_UNIX) {
- ret = __socket_keepalive(priv->sock, sa_family,
- priv->keepaliveintvl, priv->keepaliveidle,
- priv->keepalivecnt, priv->timeout);
- if (ret == -1)
- gf_log(this->name, GF_LOG_ERROR, "Failed to set keep-alive: %s",
- strerror(errno));
+ if (priv->keepalive) {
+ ret = __socket_keepalive(
+ priv->sock, sa_family, priv->keepaliveintvl,
+ priv->keepaliveidle, priv->keepalivecnt, priv->timeout);
+ if (ret != 0)
+ gf_log(this->name, GF_LOG_ERROR,
+ "Failed to set keep-alive: %s", strerror(errno));
+ }
}
SA(&this->myinfo.sockaddr)->sa_family = SA(&this->peerinfo.sockaddr)
->sa_family;
/* If a source addr is explicitly specified, use it */
- ret = dict_get_str(this->options, "transport.socket.source-addr",
- &local_addr);
+ ret = dict_get_str_sizen(this->options, "transport.socket.source-addr",
+ &local_addr);
if (!ret && SA(&this->myinfo.sockaddr)->sa_family == AF_INET) {
addr = (struct sockaddr_in *)(&this->myinfo.sockaddr);
ret = inet_pton(AF_INET, local_addr, &(addr->sin_addr.s_addr));
@@ -3442,7 +3472,7 @@ socket_connect(rpc_transport_t *this, int port)
ret = client_bind(this, SA(&this->myinfo.sockaddr),
&this->myinfo.sockaddr_len, priv->sock);
- if (ret == -1) {
+ if (ret < 0) {
gf_log(this->name, GF_LOG_WARNING, "client bind failed: %s",
strerror(errno));
goto handler;
@@ -3451,7 +3481,7 @@ socket_connect(rpc_transport_t *this, int port)
/* make socket non-blocking for all types of sockets */
if (!priv->bio) {
ret = __socket_nonblock(priv->sock);
- if (ret == -1) {
+ if (ret != 0) {
gf_log(this->name, GF_LOG_ERROR, "NBIO on %d failed (%s)",
priv->sock, strerror(errno));
goto handler;
@@ -3478,7 +3508,7 @@ socket_connect(rpc_transport_t *this, int port)
connect_attempted = _gf_true;
- if (ret == -1 && errno == ENOENT && ign_enoent) {
+ if ((ret != 0) && (errno == ENOENT) && ign_enoent) {
gf_log(this->name, GF_LOG_WARNING,
"Ignore failed connection attempt on %s, (%s) ",
this->peerinfo.identifier, strerror(errno));
@@ -3496,7 +3526,7 @@ socket_connect(rpc_transport_t *this, int port)
goto handler;
}
- if (ret == -1 && ((errno != EINPROGRESS) && (errno != ENOENT))) {
+ if ((ret != 0) && (errno != EINPROGRESS) && (errno != ENOENT)) {
/* For unix path based sockets, the socket path is
* cryptic (md5sum of path) and may not be useful for
* the user in debugging so log it in DEBUG
@@ -3541,9 +3571,9 @@ socket_connect(rpc_transport_t *this, int port)
refd = _gf_true;
this->listener = this;
- priv->idx = event_register(ctx->event_pool, priv->sock,
- socket_event_handler, this, 1, 1,
- this->notify_poller_death);
+ priv->idx = gf_event_register(ctx->event_pool, priv->sock,
+ socket_event_handler, this, 1, 1,
+ this->notify_poller_death);
if (priv->idx == -1) {
gf_log("", GF_LOG_WARNING,
"failed to register the event; "
@@ -3558,11 +3588,10 @@ socket_connect(rpc_transport_t *this, int port)
sock = priv->sock;
}
pthread_mutex_unlock(&priv->out_lock);
- pthread_mutex_unlock(&priv->in_lock);
err:
- /* if sock != -1, then cleanup is done from the event handler */
- if (ret == -1 && sock == -1) {
+ /* if sock >= 0, then cleanup is done from the event handler */
+ if ((ret < 0) && (sock < 0)) {
/* Cleaup requires to send notification to upper layer which
intern holds the big_lock. There can be dead-lock situation
if big_lock is already held by the current thread.
@@ -3610,29 +3639,26 @@ socket_listen(rpc_transport_t *this)
myinfo = &this->myinfo;
ctx = this->ctx;
- pthread_mutex_lock(&priv->in_lock);
pthread_mutex_lock(&priv->out_lock);
{
sock = priv->sock;
}
pthread_mutex_unlock(&priv->out_lock);
- pthread_mutex_unlock(&priv->in_lock);
- if (sock != -1) {
+ if (sock >= 0) {
gf_log_callingfn(this->name, GF_LOG_DEBUG, "already listening");
return ret;
}
ret = socket_server_get_local_sockaddr(this, SA(&sockaddr), &sockaddr_len,
&sa_family);
- if (ret == -1) {
+ if (ret < 0) {
return ret;
}
- pthread_mutex_lock(&priv->in_lock);
pthread_mutex_lock(&priv->out_lock);
{
- if (priv->sock != -1) {
+ if (priv->sock >= 0) {
gf_log(this->name, GF_LOG_DEBUG, "already listening");
goto unlock;
}
@@ -3642,7 +3668,7 @@ socket_listen(rpc_transport_t *this)
priv->sock = sys_socket(sa_family, SOCK_STREAM, 0);
- if (priv->sock == -1) {
+ if (priv->sock < 0) {
gf_log(this->name, GF_LOG_ERROR, "socket creation failed (%s)",
strerror(errno));
goto unlock;
@@ -3653,7 +3679,7 @@ socket_listen(rpc_transport_t *this)
*/
if (priv->windowsize != 0) {
if (setsockopt(priv->sock, SOL_SOCKET, SO_RCVBUF, &priv->windowsize,
- sizeof(priv->windowsize)) < 0) {
+ sizeof(priv->windowsize)) != 0) {
gf_log(this->name, GF_LOG_ERROR,
"setting receive window size "
"failed: %d: %d: %s",
@@ -3661,7 +3687,7 @@ socket_listen(rpc_transport_t *this)
}
if (setsockopt(priv->sock, SOL_SOCKET, SO_SNDBUF, &priv->windowsize,
- sizeof(priv->windowsize)) < 0) {
+ sizeof(priv->windowsize)) != 0) {
gf_log(this->name, GF_LOG_ERROR,
"setting send window size failed:"
" %d: %d: %s",
@@ -3671,7 +3697,7 @@ socket_listen(rpc_transport_t *this)
if (priv->nodelay && (sa_family != AF_UNIX)) {
ret = __socket_nodelay(priv->sock);
- if (ret == -1) {
+ if (ret != 0) {
gf_log(this->name, GF_LOG_ERROR,
"setsockopt() failed for NODELAY (%s)", strerror(errno));
}
@@ -3680,7 +3706,7 @@ socket_listen(rpc_transport_t *this)
if (!priv->bio) {
ret = __socket_nonblock(priv->sock);
- if (ret == -1) {
+ if (ret != 0) {
gf_log(this->name, GF_LOG_ERROR,
"NBIO on socket %d failed "
"(errno:%s); closing socket",
@@ -3691,9 +3717,10 @@ socket_listen(rpc_transport_t *this)
}
}
+ /* coverity[SLEEP] */
ret = __socket_server_bind(this);
- if ((ret == -EADDRINUSE) || (ret == -1)) {
+ if (ret < 0) {
/* logged inside __socket_server_bind() */
gf_log(this->name, GF_LOG_ERROR,
"__socket_server_bind failed;"
@@ -3709,7 +3736,7 @@ socket_listen(rpc_transport_t *this)
ret = listen(priv->sock, priv->backlog);
- if (ret == -1) {
+ if (ret != 0) {
gf_log(this->name, GF_LOG_ERROR,
"could not set socket %d to listen mode (errno:%s); "
"closing socket",
@@ -3721,9 +3748,9 @@ socket_listen(rpc_transport_t *this)
rpc_transport_ref(this);
- priv->idx = event_register(ctx->event_pool, priv->sock,
- socket_server_event_handler, this, 1, 0,
- this->notify_poller_death);
+ priv->idx = gf_event_register(ctx->event_pool, priv->sock,
+ socket_server_event_handler, this, 1, 0,
+ this->notify_poller_death);
if (priv->idx == -1) {
gf_log(this->name, GF_LOG_WARNING,
@@ -3738,7 +3765,6 @@ socket_listen(rpc_transport_t *this)
}
unlock:
pthread_mutex_unlock(&priv->out_lock);
- pthread_mutex_unlock(&priv->in_lock);
out:
return ret;
@@ -3777,7 +3803,7 @@ socket_submit_outgoing_msg(rpc_transport_t *this, rpc_transport_msg_t *msg)
goto unlock;
if (list_empty(&priv->ioq)) {
- ret = __socket_ioq_churn_entry(this, entry, 1);
+ ret = __socket_ioq_churn_entry(this, entry);
if (ret == 0) {
need_append = 0;
@@ -3793,8 +3819,8 @@ socket_submit_outgoing_msg(rpc_transport_t *this, rpc_transport_msg_t *msg)
}
if (need_poll_out) {
/* first entry to wait. continue writing on POLLOUT */
- priv->idx = event_select_on(ctx->event_pool, priv->sock, priv->idx,
- -1, 1);
+ priv->idx = gf_event_select_on(ctx->event_pool, priv->sock,
+ priv->idx, -1, 1);
}
}
unlock:
@@ -3903,7 +3929,6 @@ socket_throttle(rpc_transport_t *this, gf_boolean_t onoff)
will never read() any more data until throttling
is turned off.
*/
- pthread_mutex_lock(&priv->in_lock);
pthread_mutex_lock(&priv->out_lock);
{
/* Throttling is useless on a disconnected transport. In fact,
@@ -3912,11 +3937,10 @@ socket_throttle(rpc_transport_t *this, gf_boolean_t onoff)
* registered fd mapping. */
if (priv->connected == 1)
- priv->idx = event_select_on(this->ctx->event_pool, priv->sock,
- priv->idx, (int)!onoff, -1);
+ priv->idx = gf_event_select_on(this->ctx->event_pool, priv->sock,
+ priv->idx, (int)!onoff, -1);
}
pthread_mutex_unlock(&priv->out_lock);
- pthread_mutex_unlock(&priv->in_lock);
return 0;
}
@@ -3939,31 +3963,23 @@ reconfigure(rpc_transport_t *this, dict_t *options)
socket_private_t *priv = NULL;
gf_boolean_t tmp_bool = _gf_false;
char *optstr = NULL;
- int ret = 0;
+ int ret = -1;
uint32_t backlog = 0;
uint64_t windowsize = 0;
- uint32_t timeout = GF_NETWORK_TIMEOUT;
- int keepaliveidle = GF_KEEPALIVE_TIME;
- int keepaliveintvl = GF_KEEPALIVE_INTERVAL;
- int keepalivecnt = GF_KEEPALIVE_COUNT;
+ data_t *data;
GF_VALIDATE_OR_GOTO("socket", this, out);
GF_VALIDATE_OR_GOTO("socket", this->private, out);
- if (!this || !this->private) {
- ret = -1;
- goto out;
- }
-
priv = this->private;
- if (dict_get_str(options, "transport.socket.keepalive", &optstr) == 0) {
- if (gf_string2boolean(optstr, &tmp_bool) == -1) {
+ if (dict_get_str_sizen(options, "transport.socket.keepalive", &optstr) ==
+ 0) {
+ if (gf_string2boolean(optstr, &tmp_bool) != 0) {
gf_log(this->name, GF_LOG_ERROR,
"'transport.socket.keepalive' takes only "
"boolean options, not taking any action");
priv->keepalive = 1;
- ret = -1;
goto out;
}
gf_log(this->name, GF_LOG_DEBUG,
@@ -3973,48 +3989,43 @@ reconfigure(rpc_transport_t *this, dict_t *options)
} else
priv->keepalive = 1;
- if (dict_get_int32(options, "transport.tcp-user-timeout",
- &(priv->timeout)) != 0)
- priv->timeout = timeout;
+ if (dict_get_int32_sizen(options, "transport.tcp-user-timeout",
+ &(priv->timeout)) != 0)
+ priv->timeout = GF_NETWORK_TIMEOUT;
gf_log(this->name, GF_LOG_DEBUG,
- "Reconfigued "
- "transport.tcp-user-timeout=%d",
- priv->timeout);
+ "Reconfigured transport.tcp-user-timeout=%d", priv->timeout);
if (dict_get_uint32(options, "transport.listen-backlog", &backlog) == 0) {
priv->backlog = backlog;
gf_log(this->name, GF_LOG_DEBUG,
- "Reconfigued "
- "transport.listen-backlog=%d",
- priv->backlog);
+ "Reconfigured transport.listen-backlog=%d", priv->backlog);
}
- if (dict_get_int32(options, "transport.socket.keepalive-time",
- &(priv->keepaliveidle)) != 0)
- priv->keepaliveidle = keepaliveidle;
- gf_log(this->name, GF_LOG_DEBUG,
- "Reconfigued "
- "transport.socket.keepalive-time=%d",
- priv->keepaliveidle);
+ if (priv->keepalive) {
+ if (dict_get_int32_sizen(options, "transport.socket.keepalive-time",
+ &(priv->keepaliveidle)) != 0)
+ priv->keepaliveidle = GF_KEEPALIVE_TIME;
+ gf_log(this->name, GF_LOG_DEBUG,
+ "Reconfigured transport.socket.keepalive-time=%d",
+ priv->keepaliveidle);
- if (dict_get_int32(options, "transport.socket.keepalive-interval",
- &(priv->keepaliveintvl)) != 0)
- priv->keepaliveintvl = keepaliveintvl;
- gf_log(this->name, GF_LOG_DEBUG,
- "Reconfigued "
- "transport.socket.keepalive-interval=%d",
- priv->keepaliveintvl);
+ if (dict_get_int32_sizen(options, "transport.socket.keepalive-interval",
+ &(priv->keepaliveintvl)) != 0)
+ priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL;
+ gf_log(this->name, GF_LOG_DEBUG,
+ "Reconfigured transport.socket.keepalive-interval=%d",
+ priv->keepaliveintvl);
- if (dict_get_int32(options, "transport.socket.keepalive-count",
- &(priv->keepalivecnt)) != 0)
- priv->keepalivecnt = keepalivecnt;
- gf_log(this->name, GF_LOG_DEBUG,
- "Reconfigued "
- "transport.socket.keepalive-count=%d",
- priv->keepalivecnt);
+ if (dict_get_int32_sizen(options, "transport.socket.keepalive-count",
+ &(priv->keepalivecnt)) != 0)
+ priv->keepalivecnt = GF_KEEPALIVE_COUNT;
+ gf_log(this->name, GF_LOG_DEBUG,
+ "Reconfigured transport.socket.keepalive-count=%d",
+ priv->keepalivecnt);
+ }
optstr = NULL;
- if (dict_get_str(options, "tcp-window-size", &optstr) == 0) {
+ if (dict_get_str_sizen(options, "tcp-window-size", &optstr) == 0) {
if (gf_string2uint64(optstr, &windowsize) != 0) {
gf_log(this->name, GF_LOG_ERROR, "invalid number format: %s",
optstr);
@@ -4024,10 +4035,11 @@ reconfigure(rpc_transport_t *this, dict_t *options)
priv->windowsize = (int)windowsize;
- if (dict_get(options, "non-blocking-io")) {
- optstr = data_to_str(dict_get(options, "non-blocking-io"));
+ data = dict_get_sizen(options, "non-blocking-io");
+ if (data) {
+ optstr = data_to_str(data);
- if (gf_string2boolean(optstr, &tmp_bool) == -1) {
+ if (gf_string2boolean(optstr, &tmp_bool) != 0) {
gf_log(this->name, GF_LOG_ERROR,
"'non-blocking-io' takes only boolean options,"
" not taking any action");
@@ -4042,7 +4054,7 @@ reconfigure(rpc_transport_t *this, dict_t *options)
if (!priv->bio) {
ret = __socket_nonblock(priv->sock);
- if (ret == -1) {
+ if (ret != 0) {
gf_log(this->name, GF_LOG_WARNING, "NBIO on %d failed (%s)",
priv->sock, strerror(errno));
goto out;
@@ -4156,6 +4168,34 @@ static void __attribute__((destructor)) fini_openssl_mt(void)
ERR_free_strings();
}
+/* The function returns 0 if AES bit is enabled on the CPU */
+static int
+ssl_check_aes_bit(void)
+{
+ FILE *fp = fopen("/proc/cpuinfo", "r");
+ int ret = 1;
+ size_t len = 0;
+ char *line = NULL;
+ char *match = NULL;
+
+ GF_ASSERT(fp != NULL);
+
+ while (getline(&line, &len, fp) > 0) {
+ if (!strncmp(line, "flags", 5)) {
+ match = strstr(line, " aes");
+ if ((match != NULL) && ((match[4] == ' ') || (match[4] == 0))) {
+ ret = 0;
+ break;
+ }
+ }
+ }
+
+ free(line);
+ fclose(fp);
+
+ return ret;
+}
+
static int
ssl_setup_connection_params(rpc_transport_t *this)
{
@@ -4166,7 +4206,7 @@ ssl_setup_connection_params(rpc_transport_t *this)
char *cipher_list = DEFAULT_CIPHER_LIST;
char *dh_param = DEFAULT_DH_PARAM;
char *ec_curve = DEFAULT_EC_CURVE;
- char *crl_path = NULL;
+ gf_boolean_t dh_flag = _gf_false;
priv = this->private;
@@ -4175,8 +4215,16 @@ ssl_setup_connection_params(rpc_transport_t *this)
return 0;
}
+ if (!priv->ssl_enabled && !priv->mgmt_ssl) {
+ return 0;
+ }
+
+ if (!ssl_check_aes_bit()) {
+ cipher_list = "AES128:" DEFAULT_CIPHER_LIST;
+ }
+
priv->ssl_own_cert = DEFAULT_CERT_PATH;
- if (dict_get_str(this->options, SSL_OWN_CERT_OPT, &optstr) == 0) {
+ if (dict_get_str_sizen(this->options, SSL_OWN_CERT_OPT, &optstr) == 0) {
if (!priv->ssl_enabled) {
gf_log(this->name, GF_LOG_WARNING,
"%s specified without %s (ignored)", SSL_OWN_CERT_OPT,
@@ -4187,7 +4235,7 @@ ssl_setup_connection_params(rpc_transport_t *this)
priv->ssl_own_cert = gf_strdup(priv->ssl_own_cert);
priv->ssl_private_key = DEFAULT_KEY_PATH;
- if (dict_get_str(this->options, SSL_PRIVATE_KEY_OPT, &optstr) == 0) {
+ if (dict_get_str_sizen(this->options, SSL_PRIVATE_KEY_OPT, &optstr) == 0) {
if (!priv->ssl_enabled) {
gf_log(this->name, GF_LOG_WARNING,
"%s specified without %s (ignored)", SSL_PRIVATE_KEY_OPT,
@@ -4198,7 +4246,7 @@ ssl_setup_connection_params(rpc_transport_t *this)
priv->ssl_private_key = gf_strdup(priv->ssl_private_key);
priv->ssl_ca_list = DEFAULT_CA_PATH;
- if (dict_get_str(this->options, SSL_CA_LIST_OPT, &optstr) == 0) {
+ if (dict_get_str_sizen(this->options, SSL_CA_LIST_OPT, &optstr) == 0) {
if (!priv->ssl_enabled) {
gf_log(this->name, GF_LOG_WARNING,
"%s specified without %s (ignored)", SSL_CA_LIST_OPT,
@@ -4208,42 +4256,41 @@ ssl_setup_connection_params(rpc_transport_t *this)
}
priv->ssl_ca_list = gf_strdup(priv->ssl_ca_list);
- if (dict_get_str(this->options, SSL_CRL_PATH_OPT, &optstr) == 0) {
+ optstr = NULL;
+ if (dict_get_str_sizen(this->options, SSL_CRL_PATH_OPT, &optstr) == 0) {
if (!priv->ssl_enabled) {
gf_log(this->name, GF_LOG_WARNING,
"%s specified without %s (ignored)", SSL_CRL_PATH_OPT,
SSL_ENABLED_OPT);
}
if (strcasecmp(optstr, "NULL") == 0)
- crl_path = NULL;
+ priv->crl_path = NULL;
else
- crl_path = optstr;
+ priv->crl_path = gf_strdup(optstr);
}
- gf_log(this->name, priv->ssl_enabled ? GF_LOG_INFO : GF_LOG_DEBUG,
- "SSL support on the I/O path is %s",
- priv->ssl_enabled ? "ENABLED" : "NOT enabled");
- gf_log(this->name, priv->mgmt_ssl ? GF_LOG_INFO : GF_LOG_DEBUG,
- "SSL support for glusterd is %s",
- priv->mgmt_ssl ? "ENABLED" : "NOT enabled");
-
if (!priv->mgmt_ssl) {
- if (!dict_get_int32(this->options, SSL_CERT_DEPTH_OPT, &cert_depth)) {
- gf_log(this->name, GF_LOG_INFO, "using certificate depth %d",
- cert_depth);
+ if (!dict_get_int32_sizen(this->options, SSL_CERT_DEPTH_OPT,
+ &cert_depth)) {
}
} else {
cert_depth = this->ctx->ssl_cert_depth;
- gf_log(this->name, GF_LOG_INFO, "using certificate depth %d",
- cert_depth);
}
- if (!dict_get_str(this->options, SSL_CIPHER_LIST_OPT, &cipher_list)) {
+ gf_log(this->name, priv->ssl_enabled ? GF_LOG_INFO : GF_LOG_DEBUG,
+ "SSL support for MGMT is %s IO path is %s certificate depth is %d "
+ "for peer %s",
+ (priv->mgmt_ssl ? "ENABLED" : "NOT enabled"),
+ (priv->ssl_enabled ? "ENABLED" : "NOT enabled"), cert_depth,
+ this->peerinfo.identifier);
+
+ if (!dict_get_str_sizen(this->options, SSL_CIPHER_LIST_OPT, &cipher_list)) {
gf_log(this->name, GF_LOG_INFO, "using cipher list %s", cipher_list);
}
- if (!dict_get_str(this->options, SSL_DH_PARAM_OPT, &dh_param)) {
+ if (!dict_get_str_sizen(this->options, SSL_DH_PARAM_OPT, &dh_param)) {
+ dh_flag = _gf_true;
gf_log(this->name, GF_LOG_INFO, "using DH parameters %s", dh_param);
}
- if (!dict_get_str(this->options, SSL_EC_CURVE_OPT, &ec_curve)) {
+ if (!dict_get_str_sizen(this->options, SSL_EC_CURVE_OPT, &ec_curve)) {
gf_log(this->name, GF_LOG_INFO, "using EC curve %s", ec_curve);
}
@@ -4275,12 +4322,15 @@ ssl_setup_connection_params(rpc_transport_t *this)
#ifdef SSL_OP_NO_COMPRESSION
SSL_CTX_set_options(priv->ssl_ctx, SSL_OP_NO_COMPRESSION);
#endif
-
- if ((bio = BIO_new_file(dh_param, "r")) == NULL) {
- gf_log(this->name, GF_LOG_INFO,
- "failed to open %s, "
- "DH ciphers are disabled",
- dh_param);
+ /* Upload file to bio wrapper only if dh param is configured
+ */
+ if (dh_flag) {
+ if ((bio = BIO_new_file(dh_param, "r")) == NULL) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "failed to open %s, "
+ "DH ciphers are disabled",
+ dh_param);
+ }
}
if (bio != NULL) {
@@ -4359,25 +4409,15 @@ ssl_setup_connection_params(rpc_transport_t *this)
}
if (!SSL_CTX_load_verify_locations(priv->ssl_ctx, priv->ssl_ca_list,
- crl_path)) {
+ priv->crl_path)) {
gf_log(this->name, GF_LOG_ERROR, "could not load CA list");
goto err;
}
SSL_CTX_set_verify_depth(priv->ssl_ctx, cert_depth);
- if (crl_path) {
-#ifdef X509_V_FLAG_CRL_CHECK_ALL
- X509_STORE *x509store;
-
- x509store = SSL_CTX_get_cert_store(priv->ssl_ctx);
- X509_STORE_set_flags(
- x509store, X509_V_FLAG_CRL_CHECK | X509_V_FLAG_CRL_CHECK_ALL);
-#else
- gf_log(this->name, GF_LOG_ERROR,
- "OpenSSL version does not support CRL");
-#endif
- }
+ if (priv->crl_path)
+ ssl_set_crl_verify_flags(priv->ssl_ctx);
priv->ssl_session_id = session_id++;
SSL_CTX_set_session_id_context(priv->ssl_ctx,
@@ -4407,25 +4447,19 @@ socket_init(rpc_transport_t *this)
gf_boolean_t tmp_bool = 0;
uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
char *optstr = NULL;
- uint32_t timeout = GF_NETWORK_TIMEOUT;
- int keepaliveidle = GF_KEEPALIVE_TIME;
- int keepaliveintvl = GF_KEEPALIVE_INTERVAL;
- int keepalivecnt = GF_KEEPALIVE_COUNT;
- uint32_t backlog = 0;
+ data_t *data;
if (this->private) {
gf_log_callingfn(this->name, GF_LOG_ERROR, "double init attempted");
return -1;
}
- priv = GF_MALLOC(sizeof(*priv), gf_common_mt_socket_private_t);
+ priv = GF_CALLOC(1, sizeof(*priv), gf_common_mt_socket_private_t);
if (!priv) {
return -1;
}
- memset(priv, 0, sizeof(*priv));
this->private = priv;
- pthread_mutex_init(&priv->in_lock, NULL);
pthread_mutex_init(&priv->out_lock, NULL);
pthread_mutex_init(&priv->cond_lock, NULL);
pthread_cond_init(&priv->cond, NULL);
@@ -4448,10 +4482,11 @@ socket_init(rpc_transport_t *this)
if (!this->options)
goto out;
- if (dict_get(this->options, "non-blocking-io")) {
- optstr = data_to_str(dict_get(this->options, "non-blocking-io"));
+ data = dict_get_sizen(this->options, "non-blocking-io");
+ if (data) {
+ optstr = data_to_str(data);
- if (gf_string2boolean(optstr, &tmp_bool) == -1) {
+ if (gf_string2boolean(optstr, &tmp_bool) != 0) {
gf_log(this->name, GF_LOG_ERROR,
"'non-blocking-io' takes only boolean options,"
" not taking any action");
@@ -4467,11 +4502,11 @@ socket_init(rpc_transport_t *this)
optstr = NULL;
/* By default, we enable NODELAY */
- if (dict_get(this->options, "transport.socket.nodelay")) {
- optstr = data_to_str(
- dict_get(this->options, "transport.socket.nodelay"));
+ data = dict_get_sizen(this->options, "transport.socket.nodelay");
+ if (data) {
+ optstr = data_to_str(data);
- if (gf_string2boolean(optstr, &tmp_bool) == -1) {
+ if (gf_string2boolean(optstr, &tmp_bool) != 0) {
gf_log(this->name, GF_LOG_ERROR,
"'transport.socket.nodelay' takes only "
"boolean options, not taking any action");
@@ -4484,7 +4519,7 @@ socket_init(rpc_transport_t *this)
}
optstr = NULL;
- if (dict_get_str(this->options, "tcp-window-size", &optstr) == 0) {
+ if (dict_get_str_sizen(this->options, "tcp-window-size", &optstr) == 0) {
if (gf_string2uint64(optstr, &windowsize) != 0) {
gf_log(this->name, GF_LOG_ERROR, "invalid number format: %s",
optstr);
@@ -4500,9 +4535,9 @@ socket_init(rpc_transport_t *this)
priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL;
priv->keepaliveidle = GF_KEEPALIVE_TIME;
priv->keepalivecnt = GF_KEEPALIVE_COUNT;
- if (dict_get_str(this->options, "transport.socket.keepalive", &optstr) ==
- 0) {
- if (gf_string2boolean(optstr, &tmp_bool) == -1) {
+ if (dict_get_str_sizen(this->options, "transport.socket.keepalive",
+ &optstr) == 0) {
+ if (gf_string2boolean(optstr, &tmp_bool) != 0) {
gf_log(this->name, GF_LOG_ERROR,
"'transport.socket.keepalive' takes only "
"boolean options, not taking any action");
@@ -4513,46 +4548,46 @@ socket_init(rpc_transport_t *this)
priv->keepalive = 0;
}
- if (dict_get_int32(this->options, "transport.tcp-user-timeout",
- &(priv->timeout)) != 0)
- priv->timeout = timeout;
- gf_log(this->name, GF_LOG_DEBUG,
- "Configued "
- "transport.tcp-user-timeout=%d",
+ if (dict_get_int32_sizen(this->options, "transport.tcp-user-timeout",
+ &(priv->timeout)) != 0)
+ priv->timeout = GF_NETWORK_TIMEOUT;
+ gf_log(this->name, GF_LOG_DEBUG, "Configured transport.tcp-user-timeout=%d",
priv->timeout);
- if (dict_get_int32(this->options, "transport.socket.keepalive-time",
- &(priv->keepaliveidle)) != 0) {
- priv->keepaliveidle = keepaliveidle;
- }
+ if (priv->keepalive) {
+ if (dict_get_int32_sizen(this->options,
+ "transport.socket.keepalive-time",
+ &(priv->keepaliveidle)) != 0) {
+ priv->keepaliveidle = GF_KEEPALIVE_TIME;
+ }
- if (dict_get_int32(this->options, "transport.socket.keepalive-interval",
- &(priv->keepaliveintvl)) != 0) {
- priv->keepaliveintvl = keepaliveintvl;
- }
+ if (dict_get_int32_sizen(this->options,
+ "transport.socket.keepalive-interval",
+ &(priv->keepaliveintvl)) != 0) {
+ priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL;
+ }
- if (dict_get_int32(this->options, "transport.socket.keepalive-count",
- &(priv->keepalivecnt)) != 0)
- priv->keepalivecnt = keepalivecnt;
- gf_log(this->name, GF_LOG_DEBUG,
- "Reconfigued "
- "transport.keepalivecnt=%d",
- keepalivecnt);
+ if (dict_get_int32_sizen(this->options,
+ "transport.socket.keepalive-count",
+ &(priv->keepalivecnt)) != 0)
+ priv->keepalivecnt = GF_KEEPALIVE_COUNT;
+ gf_log(this->name, GF_LOG_DEBUG,
+ "Reconfigured transport.keepalivecnt=%d", priv->keepalivecnt);
+ }
- if (dict_get_uint32(this->options, "transport.listen-backlog", &backlog) !=
- 0) {
- backlog = GLUSTERFS_SOCKET_LISTEN_BACKLOG;
+ if (dict_get_uint32(this->options, "transport.listen-backlog",
+ &(priv->backlog)) != 0) {
+ priv->backlog = GLUSTERFS_SOCKET_LISTEN_BACKLOG;
}
- priv->backlog = backlog;
optstr = NULL;
/* Check if socket read failures are to be logged */
priv->read_fail_log = 1;
- if (dict_get(this->options, "transport.socket.read-fail-log")) {
- optstr = data_to_str(
- dict_get(this->options, "transport.socket.read-fail-log"));
- if (gf_string2boolean(optstr, &tmp_bool) == -1) {
+ data = dict_get_sizen(this->options, "transport.socket.read-fail-log");
+ if (data) {
+ optstr = data_to_str(data);
+ if (gf_string2boolean(optstr, &tmp_bool) != 0) {
gf_log(this->name, GF_LOG_WARNING,
"'transport.socket.read-fail-log' takes only "
"boolean options; logging socket read fails");
@@ -4564,7 +4599,7 @@ socket_init(rpc_transport_t *this)
priv->windowsize = (int)windowsize;
priv->ssl_enabled = _gf_false;
- if (dict_get_str(this->options, SSL_ENABLED_OPT, &optstr) == 0) {
+ if (dict_get_str_sizen(this->options, SSL_ENABLED_OPT, &optstr) == 0) {
if (gf_string2boolean(optstr, &priv->ssl_enabled) != 0) {
gf_log(this->name, GF_LOG_ERROR,
"invalid value given for ssl-enabled boolean");
@@ -4589,22 +4624,34 @@ fini(rpc_transport_t *this)
priv = this->private;
if (priv) {
- if (priv->sock != -1) {
- pthread_mutex_lock(&priv->in_lock);
+ if (priv->sock >= 0) {
pthread_mutex_lock(&priv->out_lock);
{
- __socket_ioq_flush(this);
+ __socket_ioq_flush(priv);
__socket_reset(this);
}
pthread_mutex_unlock(&priv->out_lock);
- pthread_mutex_unlock(&priv->in_lock);
}
gf_log(this->name, GF_LOG_TRACE, "transport %p destroyed", this);
- pthread_mutex_destroy(&priv->in_lock);
pthread_mutex_destroy(&priv->out_lock);
pthread_mutex_destroy(&priv->cond_lock);
pthread_cond_destroy(&priv->cond);
+
+ GF_ASSERT(priv->notify.in_progress == 0);
+ pthread_mutex_destroy(&priv->notify.lock);
+ pthread_cond_destroy(&priv->notify.cond);
+
+ if (priv->use_ssl && priv->ssl_ssl) {
+ SSL_clear(priv->ssl_ssl);
+ SSL_free(priv->ssl_ssl);
+ priv->ssl_ssl = NULL;
+ }
+ if (priv->ssl_ctx) {
+ SSL_CTX_free(priv->ssl_ctx);
+ priv->ssl_ctx = NULL;
+ }
+
if (priv->ssl_private_key) {
GF_FREE(priv->ssl_private_key);
}
@@ -4629,7 +4676,7 @@ init(rpc_transport_t *this)
ret = socket_init(this);
- if (ret == -1) {
+ if (ret < 0) {
gf_log(this->name, GF_LOG_DEBUG, "socket_init() failed");
}
@@ -4679,7 +4726,6 @@ struct volume_options options[] = {
{.key = {"transport.socket.nodelay"},
.type = GF_OPTION_TYPE_BOOL,
.default_value = "1"},
- {.key = {"transport.socket.lowlat"}, .type = GF_OPTION_TYPE_BOOL},
{.key = {"transport.socket.keepalive"},
.type = GF_OPTION_TYPE_BOOL,
.op_version = {1},