summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport/socket/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c299
1 files changed, 186 insertions, 113 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index fffc137f6..93da3f296 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -147,21 +147,14 @@ typedef int SSL_trinary_func (SSL *, void *, int);
&in->pending_vector, \
&in->pending_count, \
&bytes_read); \
- if (ret == -1) { \
- if (priv->read_fail_log) \
- gf_log (this->name, GF_LOG_WARNING, \
- "reading from socket failed." \
- "Error (%s), peer (%s)", \
- strerror (errno), \
- this->peerinfo.identifier); \
+ if (ret == -1) \
break; \
- } \
__socket_proto_update_priv_after_read (priv, ret, bytes_read); \
}
-int socket_init (rpc_transport_t *this);
+static int socket_init (rpc_transport_t *this);
-void
+static void
ssl_dump_error_stack (const char *caller)
{
unsigned long errnum = 0;
@@ -175,7 +168,7 @@ ssl_dump_error_stack (const char *caller)
}
}
-int
+static int
ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func)
{
int r = (-1);
@@ -246,7 +239,7 @@ out:
#define ssl_read_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read)
#define ssl_write_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write)
-int
+static int
ssl_setup_connection (rpc_transport_t *this, int server)
{
X509 *peer = NULL;
@@ -311,7 +304,7 @@ done:
}
-void
+static void
ssl_teardown_connection (socket_private_t *priv)
{
SSL_shutdown(priv->ssl_ssl);
@@ -321,7 +314,7 @@ ssl_teardown_connection (socket_private_t *priv)
}
-ssize_t
+static ssize_t
__socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount)
{
socket_private_t *priv = NULL;
@@ -341,7 +334,7 @@ __socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount)
}
-ssize_t
+static ssize_t
__socket_ssl_read (rpc_transport_t *this, void *buf, size_t count)
{
struct iovec iov = {0, };
@@ -356,7 +349,7 @@ __socket_ssl_read (rpc_transport_t *this, void *buf, size_t count)
}
-int
+static int
__socket_cached_read (rpc_transport_t *this, struct iovec *opvector, int opcount)
{
socket_private_t *priv = NULL;
@@ -424,6 +417,19 @@ out:
return ret;
}
+static gf_boolean_t
+__does_socket_rwv_error_need_logging (socket_private_t *priv, int write)
+{
+ int read = !write;
+
+ if (priv->connected == -1) /* Didn't even connect, of course it fails */
+ return _gf_false;
+
+ if (read && (priv->read_fail_log == _gf_false))
+ return _gf_false;
+
+ return _gf_true;
+}
/*
* return value:
@@ -432,7 +438,7 @@ out:
* > 0 = incomplete
*/
-int
+static int
__socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
struct iovec **pending_vector, int *pending_count, size_t *bytes,
int write)
@@ -507,11 +513,15 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
if (errno == EINTR)
continue;
- if (write || (!write && priv->read_fail_log))
+ if (__does_socket_rwv_error_need_logging (priv,
+ write)) {
gf_log (this->name, GF_LOG_WARNING,
- "%s failed (%s)",
+ "%s on %s failed (%s)",
write ? "writev":"readv",
+ this->peerinfo.identifier,
strerror (errno));
+ }
+
if (priv->use_ssl) {
ssl_dump_error_stack(this->name);
}
@@ -562,7 +572,7 @@ out:
}
-int
+static int
__socket_readv (rpc_transport_t *this, struct iovec *vector, int count,
struct iovec **pending_vector, int *pending_count,
size_t *bytes)
@@ -576,7 +586,7 @@ __socket_readv (rpc_transport_t *this, struct iovec *vector, int count,
}
-int
+static int
__socket_writev (rpc_transport_t *this, struct iovec *vector, int count,
struct iovec **pending_vector, int *pending_count)
{
@@ -589,7 +599,7 @@ __socket_writev (rpc_transport_t *this, struct iovec *vector, int count,
}
-int
+static int
__socket_shutdown (rpc_transport_t *this)
{
int ret = -1;
@@ -608,7 +618,7 @@ __socket_shutdown (rpc_transport_t *this)
return ret;
}
-int
+static int
__socket_disconnect (rpc_transport_t *this)
{
int ret = -1;
@@ -619,25 +629,23 @@ __socket_disconnect (rpc_transport_t *this)
priv = this->private;
+ gf_log (this->name, GF_LOG_TRACE,
+ "disconnecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
+
if (priv->sock != -1) {
ret = __socket_shutdown(this);
if (priv->own_thread) {
- /*
- * Without this, reconnect (= disconnect + connect)
- * won't work except by accident.
- */
- close(priv->sock);
- priv->sock = -1;
/*
- * Closing the socket forces an error that will wake
- * up the polling thread. Wait for it to notice and
- * respond.
+ * Without this, reconnect (= disconnect + connect)
+ * won't work except by accident.
*/
- if (priv->ot_state == OT_ALIVE) {
- priv->ot_state = OT_DYING;
- pthread_cond_wait(&priv->ot_event,&priv->lock);
- }
- }
+ close(priv->sock);
+ priv->sock = -1;
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_PLEASE_DIE on %p", this);
+ priv->ot_state = OT_PLEASE_DIE;
+ }
else if (priv->use_ssl) {
ssl_teardown_connection(priv);
}
@@ -648,7 +656,7 @@ out:
}
-int
+static int
__socket_server_bind (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -704,7 +712,7 @@ out:
}
-int
+static int
__socket_nonblock (int fd)
{
int flags = 0;
@@ -718,7 +726,7 @@ __socket_nonblock (int fd)
return ret;
}
-int
+static int
__socket_nodelay (int fd)
{
int on = 1;
@@ -794,7 +802,7 @@ err:
}
-int
+static int
__socket_connect_finish (int fd)
{
int ret = -1;
@@ -812,7 +820,7 @@ __socket_connect_finish (int fd)
}
-void
+static void
__socket_reset (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -849,13 +857,13 @@ out:
}
-void
+static void
socket_set_lastfrag (uint32_t *fragsize) {
(*fragsize) |= 0x80000000U;
}
-void
+static void
socket_set_frag_header_size (uint32_t size, char *haddr)
{
size = htonl (size);
@@ -863,14 +871,14 @@ socket_set_frag_header_size (uint32_t size, char *haddr)
}
-void
+static void
socket_set_last_frag_header_size (uint32_t size, char *haddr)
{
socket_set_lastfrag (&size);
socket_set_frag_header_size (size, haddr);
}
-struct ioq *
+static struct ioq *
__socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg)
{
struct ioq *entry = NULL;
@@ -937,7 +945,7 @@ out:
}
-void
+static void
__socket_ioq_entry_free (struct ioq *entry)
{
GF_VALIDATE_OR_GOTO ("socket", entry, out);
@@ -954,7 +962,7 @@ out:
}
-void
+static void
__socket_ioq_flush (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -975,7 +983,7 @@ out:
}
-int
+static int
__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct)
{
int ret = -1;
@@ -1009,7 +1017,7 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct)
}
-int
+static int
__socket_ioq_churn (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -1042,7 +1050,7 @@ out:
}
-int
+static int
socket_event_poll_err (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -1067,7 +1075,7 @@ out:
}
-int
+static int
socket_event_poll_out (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -1405,7 +1413,7 @@ __socket_read_request (rpc_transport_t *this)
buf = rpc_procnum_addr (iobuf_ptr (in->iobuf));
procnum = ntoh32 (*((uint32_t *)buf));
- if (this->listener) {
+ if (priv->is_server) {
/* this check is needed as rpcsvc and rpc-clnt
* actor structures are not same */
vector_sizer =
@@ -1911,7 +1919,7 @@ void __socket_reset_priv (socket_private_t *priv)
}
-int
+static int
__socket_proto_state_machine (rpc_transport_t *this,
rpc_transport_pollin_t **pollin)
{
@@ -1952,17 +1960,8 @@ __socket_proto_state_machine (rpc_transport_t *this,
&in->pending_vector,
&in->pending_count,
NULL);
- if (ret == -1) {
- if (priv->read_fail_log == 1) {
- gf_log (this->name,
- ((priv->connected == 1) ?
- GF_LOG_WARNING : GF_LOG_DEBUG),
- "reading from socket failed. Error (%s)"
- ", peer (%s)", strerror (errno),
- this->peerinfo.identifier);
- }
+ if (ret == -1)
goto out;
- }
if (ret > 0) {
gf_log (this->name, GF_LOG_TRACE, "partial "
@@ -2083,7 +2082,7 @@ out:
}
-int
+static int
socket_proto_state_machine (rpc_transport_t *this,
rpc_transport_pollin_t **pollin)
{
@@ -2106,17 +2105,22 @@ out:
}
-int
+static int
socket_event_poll_in (rpc_transport_t *this)
{
int ret = -1;
rpc_transport_pollin_t *pollin = NULL;
+ socket_private_t *priv = this->private;
ret = socket_proto_state_machine (this, &pollin);
if (pollin != NULL) {
+ priv->ot_state = OT_CALLBACK;
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
pollin);
+ if (priv->ot_state == OT_CALLBACK) {
+ priv->ot_state = OT_RUNNING;
+ }
rpc_transport_pollin_destroy (pollin);
}
@@ -2124,7 +2128,7 @@ socket_event_poll_in (rpc_transport_t *this)
}
-int
+static int
socket_connect_finish (rpc_transport_t *this)
{
int ret = -1;
@@ -2158,8 +2162,6 @@ socket_connect_finish (rpc_transport_t *this)
priv->connect_finish_log = 1;
}
__socket_disconnect (this);
- notify_rpc = 1;
- event = RPC_TRANSPORT_DISCONNECT;
goto unlock;
}
@@ -2198,7 +2200,7 @@ out:
/* reads rpc_requests during pollin */
-int
+static int
socket_event_handler (int fd, int idx, void *data,
int poll_in, int poll_out, int poll_err)
{
@@ -2243,7 +2245,7 @@ out:
}
-void *
+static void *
socket_poller (void *ctx)
{
rpc_transport_t *this = ctx;
@@ -2251,6 +2253,9 @@ socket_poller (void *ctx)
struct pollfd pfd[2] = {{0,},};
gf_boolean_t to_write = _gf_false;
int ret = 0;
+ uint32_t gen = 0;
+
+ priv->ot_state = OT_RUNNING;
if (priv->use_ssl) {
if (ssl_setup_connection(this,priv->connected) < 0) {
@@ -2286,6 +2291,7 @@ socket_poller (void *ctx)
"asynchronous rpc_transport_notify failed");
}
+ gen = priv->ot_gen;
for (;;) {
pthread_mutex_lock(&priv->lock);
to_write = !list_empty(&priv->ioq);
@@ -2322,6 +2328,13 @@ socket_poller (void *ctx)
else if (errno == ENOTCONN) {
ret = 0;
}
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (input request)",
+ this);
+ priv->ot_state = OT_IDLE;
+ break;
+ }
}
else if (pfd[1].revents & POLL_MASK_OUTPUT) {
ret = socket_event_poll_out(this);
@@ -2332,6 +2345,13 @@ socket_poller (void *ctx)
else if (errno == ENOTCONN) {
ret = 0;
}
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (output request)",
+ this);
+ priv->ot_state = OT_IDLE;
+ break;
+ }
}
else {
/*
@@ -2352,21 +2372,17 @@ socket_poller (void *ctx)
"error in polling loop");
break;
}
+ if (priv->ot_gen != gen) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "generation mismatch, my %u != %u",
+ gen, priv->ot_gen);
+ return NULL;
+ }
}
err:
/* All (and only) I/O errors should come here. */
pthread_mutex_lock(&priv->lock);
- if (priv->ot_state == OT_ALIVE) {
- /*
- * We have to do this if we're here because of an error we
- * detected ourselves, but need to avoid a recursive call
- * if our death is the result of an external disconnect.
- */
- __socket_shutdown(this);
- close(priv->sock);
- priv->sock = -1;
- }
if (priv->ssl_ssl) {
/*
* We're always responsible for this part, but only actually
@@ -2375,40 +2391,45 @@ err:
*/
ssl_teardown_connection(priv);
}
+ __socket_shutdown(this);
+ close(priv->sock);
+ priv->sock = -1;
priv->ot_state = OT_IDLE;
- /*
- * We expect there to be only one waiter, but if there do happen to
- * be multiple it's probably better to unblock them than to let them
- * hang. If there are none, this is a harmless no-op.
- */
- pthread_cond_broadcast(&priv->ot_event);
pthread_mutex_unlock(&priv->lock);
- rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, this);
- rpc_transport_unref (this);
+ rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT,
+ this);
+ rpc_transport_unref (this);
return NULL;
}
-void
+static void
socket_spawn (rpc_transport_t *this)
{
socket_private_t *priv = this->private;
- if (priv->ot_state == OT_ALIVE) {
+ switch (priv->ot_state) {
+ case OT_IDLE:
+ case OT_PLEASE_DIE:
+ break;
+ default:
gf_log (this->name, GF_LOG_WARNING,
"refusing to start redundant poller");
return;
}
- priv->ot_state = OT_ALIVE;
+ priv->ot_gen += 7;
+ priv->ot_state = OT_SPAWNING;
+ gf_log (this->name, GF_LOG_TRACE,
+ "spawning %p with gen %u", this, priv->ot_gen);
- if (pthread_create(&priv->thread,NULL,socket_poller,this) != 0) {
+ if (gf_thread_create(&priv->thread,NULL,socket_poller,this) != 0) {
gf_log (this->name, GF_LOG_ERROR,
"could not create poll thread");
}
}
-int
+static int
socket_server_event_handler (int fd, int idx, void *data,
int poll_in, int poll_out, int poll_err)
{
@@ -2446,7 +2467,7 @@ socket_server_event_handler (int fd, int idx, void *data,
goto unlock;
}
- if (priv->nodelay) {
+ if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) {
ret = __socket_nodelay (new_sock);
if (ret == -1) {
gf_log (this->name, GF_LOG_WARNING,
@@ -2456,7 +2477,8 @@ socket_server_event_handler (int fd, int idx, void *data,
}
}
- if (priv->keepalive) {
+ if (priv->keepalive &&
+ new_sockaddr.ss_family != AF_UNIX) {
ret = __socket_keepalive (new_sock,
new_sockaddr.ss_family,
priv->keepaliveintvl,
@@ -2472,6 +2494,15 @@ socket_server_event_handler (int fd, int idx, void *data,
if (!new_trans)
goto unlock;
+ ret = pthread_mutex_init(&new_trans->lock, NULL);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "pthread_mutex_init() failed: %s",
+ strerror (errno));
+ close (new_sock);
+ goto unlock;
+ }
+
new_trans->name = gf_strdup (this->name);
memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr,
@@ -2543,6 +2574,7 @@ socket_server_event_handler (int fd, int idx, void *data,
* connection.
*/
new_priv->connected = 1;
+ new_priv->is_server = _gf_true;
rpc_transport_ref (new_trans);
if (new_priv->own_thread) {
@@ -2585,7 +2617,7 @@ out:
}
-int
+static int
socket_disconnect (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -2607,7 +2639,7 @@ out:
}
-int
+static int
socket_connect (rpc_transport_t *this, int port)
{
int ret = -1;
@@ -2646,6 +2678,10 @@ socket_connect (rpc_transport_t *this, int port)
goto err;
}
+ gf_log (this->name, GF_LOG_TRACE,
+ "connecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
+
ret = socket_client_get_remote_sockaddr (this, &sock_union.sa,
&sockaddr_len, &sa_family);
if (ret == -1) {
@@ -2715,7 +2751,7 @@ socket_connect (rpc_transport_t *this, int port)
}
}
- if (priv->nodelay) {
+ if (priv->nodelay && (sa_family != AF_UNIX)) {
ret = __socket_nodelay (priv->sock);
if (ret == -1) {
@@ -2725,7 +2761,7 @@ socket_connect (rpc_transport_t *this, int port)
}
}
- if (priv->keepalive) {
+ if (priv->keepalive && sa_family != AF_UNIX) {
ret = __socket_keepalive (priv->sock,
sa_family,
priv->keepaliveintvl,
@@ -2758,13 +2794,30 @@ socket_connect (rpc_transport_t *this, int port)
goto unlock;
}
+ if (!priv->use_ssl && !priv->bio && !priv->own_thread) {
+ ret = __socket_nonblock (priv->sock);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
ret = connect (priv->sock, SA (&this->peerinfo.sockaddr),
this->peerinfo.sockaddr_len);
if (ret == -1 && ((errno != EINPROGRESS) && (errno != ENOENT))) {
- gf_log (this->name, GF_LOG_ERROR,
- "connection attempt failed (%s)",
- strerror (errno));
+ /* For unix path based sockets, the socket path is
+ * cryptic (md5sum of path) and may not be useful for
+ * the user in debugging so log it in DEBUG
+ */
+ gf_log (this->name, ((sa_family == AF_UNIX) ?
+ GF_LOG_DEBUG : GF_LOG_ERROR),
+ "connection attempt on %s failed, (%s)",
+ this->peerinfo.identifier, strerror (errno));
close (priv->sock);
priv->sock = -1;
goto unlock;
@@ -2799,6 +2852,7 @@ socket_connect (rpc_transport_t *this, int port)
* initializing a client connection.
*/
priv->connected = 0;
+ priv->is_server = _gf_false;
rpc_transport_ref (this);
if (priv->own_thread) {
@@ -2829,7 +2883,7 @@ err:
}
-int
+static int
socket_listen (rpc_transport_t *this)
{
socket_private_t * priv = NULL;
@@ -2911,7 +2965,7 @@ socket_listen (rpc_transport_t *this)
}
}
- if (priv->nodelay) {
+ if (priv->nodelay && (sa_family != AF_UNIX)) {
ret = __socket_nodelay (priv->sock);
if (ret == -1) {
gf_log (this->name, GF_LOG_ERROR,
@@ -2980,7 +3034,7 @@ out:
}
-int32_t
+static int32_t
socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
{
socket_private_t *priv = NULL;
@@ -3054,7 +3108,7 @@ out:
}
-int32_t
+static int32_t
socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
{
socket_private_t *priv = NULL;
@@ -3128,7 +3182,7 @@ out:
}
-int32_t
+static int32_t
socket_getpeername (rpc_transport_t *this, char *hostname, int hostlen)
{
int32_t ret = -1;
@@ -3147,7 +3201,7 @@ out:
}
-int32_t
+static int32_t
socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,
struct sockaddr_storage *sa, socklen_t salen)
{
@@ -3168,7 +3222,7 @@ out:
}
-int32_t
+static int32_t
socket_getmyname (rpc_transport_t *this, char *hostname, int hostlen)
{
int32_t ret = -1;
@@ -3187,7 +3241,7 @@ out:
}
-int32_t
+static int32_t
socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen,
struct sockaddr_storage *sa, socklen_t salen)
{
@@ -3207,6 +3261,25 @@ out:
}
+static int
+socket_throttle (rpc_transport_t *this, gf_boolean_t onoff)
+{
+ socket_private_t *priv = NULL;
+
+ priv = this->private;
+
+ /* The way we implement throttling is by taking off
+ POLLIN event from the polled flags. This way we
+ never get called with the POLLIN event and therefore
+ will never read() any more data until throttling
+ is turned off.
+ */
+ priv->idx = event_select_on (this->ctx->event_pool, priv->sock,
+ priv->idx, (int) !onoff, -1);
+ return 0;
+}
+
+
struct rpc_transport_ops tops = {
.listen = socket_listen,
.connect = socket_connect,
@@ -3217,6 +3290,7 @@ struct rpc_transport_ops tops = {
.get_peeraddr = socket_getpeeraddr,
.get_myname = socket_getmyname,
.get_myaddr = socket_getmyaddr,
+ .throttle = socket_throttle,
};
int
@@ -3273,7 +3347,7 @@ out:
}
-int
+static int
socket_init (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -3522,7 +3596,6 @@ socket_init (rpc_transport_t *this)
if (priv->own_thread) {
priv->ot_state = OT_IDLE;
- pthread_cond_init (&priv->ot_event, NULL);
}
out: