summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport/socket/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c374
1 files changed, 187 insertions, 187 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index cbd303496..5744ce29a 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -48,90 +48,90 @@
#define SA(ptr) ((struct sockaddr *)ptr)
-#define __socket_proto_reset_pending(priv) do { \
- memset (&priv->incoming.frag.vector, 0, \
- sizeof (priv->incoming.frag.vector)); \
- priv->incoming.frag.pending_vector = \
- &priv->incoming.frag.vector; \
- priv->incoming.frag.pending_vector->iov_base = \
- priv->incoming.frag.fragcurrent; \
- priv->incoming.pending_vector = \
- priv->incoming.frag.pending_vector; \
+#define __socket_proto_reset_pending(priv) do { \
+ memset (&priv->incoming.frag.vector, 0, \
+ sizeof (priv->incoming.frag.vector)); \
+ priv->incoming.frag.pending_vector = \
+ &priv->incoming.frag.vector; \
+ priv->incoming.frag.pending_vector->iov_base = \
+ priv->incoming.frag.fragcurrent; \
+ priv->incoming.pending_vector = \
+ priv->incoming.frag.pending_vector; \
} while (0);
-#define __socket_proto_update_pending(priv) \
- do { \
- uint32_t remaining_fragsize = 0; \
- if (priv->incoming.frag.pending_vector->iov_len == 0) { \
+#define __socket_proto_update_pending(priv) \
+ do { \
+ uint32_t remaining_fragsize = 0; \
+ if (priv->incoming.frag.pending_vector->iov_len == 0) { \
remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \
- - priv->incoming.frag.bytes_read; \
- \
- priv->incoming.frag.pending_vector->iov_len = \
+ - priv->incoming.frag.bytes_read; \
+ \
+ priv->incoming.frag.pending_vector->iov_len = \
remaining_fragsize > priv->incoming.frag.remaining_size \
? priv->incoming.frag.remaining_size : remaining_fragsize; \
- \
- priv->incoming.frag.remaining_size -= \
- priv->incoming.frag.pending_vector->iov_len; \
- } \
+ \
+ priv->incoming.frag.remaining_size -= \
+ priv->incoming.frag.pending_vector->iov_len; \
+ } \
} while (0);
-#define __socket_proto_update_priv_after_read(priv, ret, bytes_read) \
- { \
- priv->incoming.frag.fragcurrent += bytes_read; \
- priv->incoming.frag.bytes_read += bytes_read; \
- \
+#define __socket_proto_update_priv_after_read(priv, ret, bytes_read) \
+ { \
+ priv->incoming.frag.fragcurrent += bytes_read; \
+ priv->incoming.frag.bytes_read += bytes_read; \
+ \
if ((ret > 0) || (priv->incoming.frag.remaining_size != 0)) { \
- if (priv->incoming.frag.remaining_size != 0) { \
- __socket_proto_reset_pending (priv); \
- } \
- \
+ if (priv->incoming.frag.remaining_size != 0) { \
+ __socket_proto_reset_pending (priv); \
+ } \
+ \
gf_log (this->name, GF_LOG_TRACE, "partial read on non-blocking socket"); \
- \
- break; \
- } \
- }
-
-#define __socket_proto_init_pending(priv, size) \
- do { \
- uint32_t remaining_fragsize = 0; \
- remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \
- - priv->incoming.frag.bytes_read; \
- \
- __socket_proto_reset_pending (priv); \
- \
- priv->incoming.frag.pending_vector->iov_len = \
+ \
+ break; \
+ } \
+ }
+
+#define __socket_proto_init_pending(priv, size) \
+ do { \
+ uint32_t remaining_fragsize = 0; \
+ remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \
+ - priv->incoming.frag.bytes_read; \
+ \
+ __socket_proto_reset_pending (priv); \
+ \
+ priv->incoming.frag.pending_vector->iov_len = \
remaining_fragsize > size ? size : remaining_fragsize; \
- \
- priv->incoming.frag.remaining_size = \
- size - priv->incoming.frag.pending_vector->iov_len; \
- \
-} while (0);
+ \
+ priv->incoming.frag.remaining_size = \
+ size - priv->incoming.frag.pending_vector->iov_len; \
+ \
+ } while (0);
/* This will be used in a switch case and breaks from the switch case if all
* the pending data is not read.
*/
-#define __socket_proto_read(priv, ret) \
- { \
- size_t bytes_read = 0; \
- \
- __socket_proto_update_pending (priv); \
- \
- ret = __socket_readv (this, \
- priv->incoming.pending_vector, 1, \
- &priv->incoming.pending_vector, \
- &priv->incoming.pending_count, \
- &bytes_read); \
- if (ret == -1) { \
- gf_log (this->name, GF_LOG_TRACE, \
- "reading from socket failed. Error (%s), " \
- "peer (%s)", strerror (errno), \
- this->peerinfo.identifier); \
- break; \
- } \
+#define __socket_proto_read(priv, ret) \
+ { \
+ size_t bytes_read = 0; \
+ \
+ __socket_proto_update_pending (priv); \
+ \
+ ret = __socket_readv (this, \
+ priv->incoming.pending_vector, 1, \
+ &priv->incoming.pending_vector, \
+ &priv->incoming.pending_count, \
+ &bytes_read); \
+ if (ret == -1) { \
+ gf_log (this->name, GF_LOG_TRACE, \
+ "reading from socket failed. Error (%s), " \
+ "peer (%s)", strerror (errno), \
+ this->peerinfo.identifier); \
+ break; \
+ } \
__socket_proto_update_priv_after_read (priv, ret, bytes_read); \
- }
+ }
int socket_init (rpc_transport_t *this);
@@ -161,8 +161,8 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
priv = this->private;
sock = priv->sock;
- opvector = vector;
- opcount = count;
+ opvector = vector;
+ opcount = count;
if (bytes != NULL) {
*bytes = 0;
@@ -248,7 +248,7 @@ __socket_readv (rpc_transport_t *this, struct iovec *vector, int count,
int ret = -1;
ret = __socket_rwv (this, vector, count,
- pending_vector, pending_count, bytes, 0);
+ pending_vector, pending_count, bytes, 0);
return ret;
}
@@ -261,7 +261,7 @@ __socket_writev (rpc_transport_t *this, struct iovec *vector, int count,
int ret = -1;
ret = __socket_rwv (this, vector, count,
- pending_vector, pending_count, NULL, 1);
+ pending_vector, pending_count, NULL, 1);
return ret;
}
@@ -297,17 +297,17 @@ __socket_server_bind (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
int ret = -1;
- int opt = 1;
+ int opt = 1;
int reuse_check_sock = -1;
struct sockaddr_storage unix_addr = {0};
if (!this || !this->private)
goto out;
- priv = this->private;
+ priv = this->private;
ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR,
- &opt, sizeof (opt));
+ &opt, sizeof (opt));
if (ret == -1) {
gf_log (this->name, GF_LOG_ERROR,
@@ -330,7 +330,7 @@ __socket_server_bind (rpc_transport_t *this)
}
ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr,
- this->myinfo.sockaddr_len);
+ this->myinfo.sockaddr_len);
if (ret == -1) {
gf_log (this->name, GF_LOG_ERROR,
@@ -369,7 +369,7 @@ __socket_nodelay (int fd)
int ret = -1;
ret = setsockopt (fd, IPPROTO_TCP, TCP_NODELAY,
- &on, sizeof (on));
+ &on, sizeof (on));
if (!ret)
gf_log ("", GF_LOG_TRACE,
"NODELAY enabled for socket %d", fd);
@@ -610,9 +610,9 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry)
int ret = -1;
ret = __socket_writev (this, entry->pending_vector,
- entry->pending_count,
+ entry->pending_count,
&entry->pending_vector,
- &entry->pending_count);
+ &entry->pending_count);
if (ret == 0) {
/* current entry was completely written */
@@ -649,7 +649,7 @@ __socket_ioq_churn (rpc_transport_t *this)
if (list_empty (&priv->ioq)) {
/* all pending writes done, not interested in POLLOUT */
priv->idx = event_select_on (this->ctx->event_pool,
- priv->sock, priv->idx, -1, 0);
+ priv->sock, priv->idx, -1, 0);
}
out:
@@ -1031,7 +1031,7 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
"xdr_sizeof on gfs3_read_rsp failed");
ret = -1;
goto out;
- }
+ }
__socket_proto_init_pending (priv, gluster_read_rsp_hdr_len);
priv->incoming.frag.call_body.reply.accepted_success_state
@@ -1609,21 +1609,21 @@ socket_proto_state_machine (rpc_transport_t *this,
rpc_transport_pollin_t **pollin)
{
socket_private_t *priv = NULL;
- int ret = 0;
+ int ret = 0;
if (!this || !this->private)
goto out;
- priv = this->private;
+ priv = this->private;
- pthread_mutex_lock (&priv->lock);
- {
- ret = __socket_proto_state_machine (this, pollin);
- }
+ pthread_mutex_lock (&priv->lock);
+ {
+ ret = __socket_proto_state_machine (this, pollin);
+ }
pthread_mutex_unlock (&priv->lock);
out:
- return ret;
+ return ret;
}
@@ -1661,51 +1661,51 @@ socket_connect_finish (rpc_transport_t *this)
pthread_mutex_lock (&priv->lock);
{
- if (priv->connected)
- goto unlock;
+ if (priv->connected)
+ goto unlock;
- ret = __socket_connect_finish (priv->sock);
+ ret = __socket_connect_finish (priv->sock);
- if (ret == -1 && errno == EINPROGRESS)
- ret = 1;
+ if (ret == -1 && errno == EINPROGRESS)
+ ret = 1;
- if (ret == -1 && errno != EINPROGRESS) {
- if (!priv->connect_finish_log) {
- gf_log (this->name, GF_LOG_ERROR,
- "connection to %s failed (%s)",
+ if (ret == -1 && errno != EINPROGRESS) {
+ if (!priv->connect_finish_log) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "connection to %s failed (%s)",
this->peerinfo.identifier,
- strerror (errno));
- priv->connect_finish_log = 1;
- }
- __socket_disconnect (this);
- notify_rpc = 1;
- event = RPC_TRANSPORT_DISCONNECT;
- goto unlock;
- }
-
- if (ret == 0) {
- notify_rpc = 1;
-
- this->myinfo.sockaddr_len =
- sizeof (this->myinfo.sockaddr);
-
- ret = getsockname (priv->sock,
- SA (&this->myinfo.sockaddr),
- &this->myinfo.sockaddr_len);
- if (ret == -1) {
- gf_log (this->name, GF_LOG_DEBUG,
- "getsockname on (%d) failed (%s)",
- priv->sock, strerror (errno));
- __socket_disconnect (this);
- event = GF_EVENT_POLLERR;
- goto unlock;
- }
-
- priv->connected = 1;
- priv->connect_finish_log = 0;
- event = RPC_TRANSPORT_CONNECT;
- get_transport_identifiers (this);
- }
+ strerror (errno));
+ priv->connect_finish_log = 1;
+ }
+ __socket_disconnect (this);
+ notify_rpc = 1;
+ event = RPC_TRANSPORT_DISCONNECT;
+ goto unlock;
+ }
+
+ if (ret == 0) {
+ notify_rpc = 1;
+
+ this->myinfo.sockaddr_len =
+ sizeof (this->myinfo.sockaddr);
+
+ ret = getsockname (priv->sock,
+ SA (&this->myinfo.sockaddr),
+ &this->myinfo.sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "getsockname on (%d) failed (%s)",
+ priv->sock, strerror (errno));
+ __socket_disconnect (this);
+ event = GF_EVENT_POLLERR;
+ goto unlock;
+ }
+
+ priv->connected = 1;
+ priv->connect_finish_log = 0;
+ event = RPC_TRANSPORT_CONNECT;
+ get_transport_identifiers (this);
+ }
}
unlock:
pthread_mutex_unlock (&priv->lock);
@@ -1776,7 +1776,7 @@ socket_server_event_handler (int fd, int idx, void *data,
struct sockaddr_storage new_sockaddr = {0, };
socklen_t addrlen = sizeof (new_sockaddr);
socket_private_t *new_priv = NULL;
- glusterfs_ctx_t *ctx = NULL;
+ glusterfs_ctx_t *ctx = NULL;
this = data;
if (!this || !this->private || !this->xl)
@@ -1784,7 +1784,7 @@ socket_server_event_handler (int fd, int idx, void *data,
THIS = this->xl;
priv = this->private;
- ctx = this->ctx;
+ ctx = this->ctx;
pthread_mutex_lock (&priv->lock);
{
@@ -1792,7 +1792,7 @@ socket_server_event_handler (int fd, int idx, void *data,
if (poll_in) {
new_sock = accept (priv->sock, SA (&new_sockaddr),
- &addrlen);
+ &addrlen);
if (new_sock == -1)
goto unlock;
@@ -1838,11 +1838,11 @@ socket_server_event_handler (int fd, int idx, void *data,
new_trans->name = gf_strdup (this->name);
memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr,
- addrlen);
+ addrlen);
new_trans->peerinfo.sockaddr_len = addrlen;
new_trans->myinfo.sockaddr_len =
- sizeof (new_trans->myinfo.sockaddr);
+ sizeof (new_trans->myinfo.sockaddr);
ret = getsockname (new_sock,
SA (&new_trans->myinfo.sockaddr),
@@ -1874,10 +1874,10 @@ socket_server_event_handler (int fd, int idx, void *data,
rpc_transport_ref (new_trans);
new_priv->idx =
- event_register (ctx->event_pool,
- new_sock,
- socket_event_handler,
- new_trans, 1, 0);
+ event_register (ctx->event_pool,
+ new_sock,
+ socket_event_handler,
+ new_trans, 1, 0);
if (new_priv->idx == -1)
ret = -1;
@@ -1924,18 +1924,18 @@ int
socket_connect (rpc_transport_t *this, int port)
{
int ret = -1;
- int sock = -1;
+ int sock = -1;
socket_private_t *priv = NULL;
struct sockaddr_storage sockaddr = {0, };
socklen_t sockaddr_len = 0;
- glusterfs_ctx_t *ctx = NULL;
+ glusterfs_ctx_t *ctx = NULL;
sa_family_t sa_family = {0, };
if (!this || !this->private)
goto err;
priv = this->private;
- ctx = this->ctx;
+ ctx = this->ctx;
if (!priv) {
gf_log (this->name, GF_LOG_DEBUG,
@@ -1981,7 +1981,7 @@ socket_connect (rpc_transport_t *this, int port)
if (priv->sock == -1) {
gf_log (this->name, GF_LOG_ERROR,
"socket creation failed (%s)",
- strerror (errno));
+ strerror (errno));
goto unlock;
}
@@ -2040,10 +2040,10 @@ socket_connect (rpc_transport_t *this, int port)
}
SA (&this->myinfo.sockaddr)->sa_family =
- SA (&this->peerinfo.sockaddr)->sa_family;
+ SA (&this->peerinfo.sockaddr)->sa_family;
ret = client_bind (this, SA (&this->myinfo.sockaddr),
- &this->myinfo.sockaddr_len, priv->sock);
+ &this->myinfo.sockaddr_len, priv->sock);
if (ret == -1) {
gf_log (this->name, GF_LOG_WARNING,
"client bind failed: %s", strerror (errno));
@@ -2053,12 +2053,12 @@ socket_connect (rpc_transport_t *this, int port)
}
ret = connect (priv->sock, SA (&this->peerinfo.sockaddr),
- this->peerinfo.sockaddr_len);
+ this->peerinfo.sockaddr_len);
if (ret == -1 && errno != EINPROGRESS) {
gf_log (this->name, GF_LOG_ERROR,
"connection attempt failed (%s)",
- strerror (errno));
+ strerror (errno));
close (priv->sock);
priv->sock = -1;
goto unlock;
@@ -2086,19 +2086,19 @@ socket_listen (rpc_transport_t *this)
{
socket_private_t * priv = NULL;
int ret = -1;
- int sock = -1;
+ int sock = -1;
struct sockaddr_storage sockaddr;
socklen_t sockaddr_len;
peer_info_t *myinfo = NULL;
- glusterfs_ctx_t *ctx = NULL;
+ glusterfs_ctx_t *ctx = NULL;
sa_family_t sa_family = {0, };
if (!this || !this->private)
goto out;
- priv = this->private;
- myinfo = &this->myinfo;
- ctx = this->ctx;
+ priv = this->private;
+ myinfo = &this->myinfo;
+ ctx = this->ctx;
pthread_mutex_lock (&priv->lock);
{
@@ -2134,7 +2134,7 @@ socket_listen (rpc_transport_t *this)
if (priv->sock == -1) {
gf_log (this->name, GF_LOG_ERROR,
"socket creation failed (%s)",
- strerror (errno));
+ strerror (errno));
goto unlock;
}
@@ -2195,7 +2195,7 @@ socket_listen (rpc_transport_t *this)
if (ret == -1) {
gf_log (this->name, GF_LOG_ERROR,
"could not set socket %d to listen mode (%s)",
- priv->sock, strerror (errno));
+ priv->sock, strerror (errno));
close (priv->sock);
priv->sock = -1;
goto unlock;
@@ -2205,12 +2205,12 @@ socket_listen (rpc_transport_t *this)
priv->idx = event_register (ctx->event_pool, priv->sock,
socket_server_event_handler,
- this, 1, 0);
+ this, 1, 0);
if (priv->idx == -1) {
gf_log (this->name, GF_LOG_DEBUG,
"could not register socket %d with events",
- priv->sock);
+ priv->sock);
ret = -1;
close (priv->sock);
priv->sock = -1;
@@ -2233,13 +2233,13 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
char need_poll_out = 0;
char need_append = 1;
struct ioq *entry = NULL;
- glusterfs_ctx_t *ctx = NULL;
+ glusterfs_ctx_t *ctx = NULL;
if (!this || !this->private)
goto out;
priv = this->private;
- ctx = this->ctx;
+ ctx = this->ctx;
pthread_mutex_lock (&priv->lock);
{
@@ -2276,7 +2276,7 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
if (need_poll_out) {
/* first entry to wait. continue writing on POLLOUT */
priv->idx = event_select_on (ctx->event_pool,
- priv->sock,
+ priv->sock,
priv->idx, -1, 1);
}
}
@@ -2296,13 +2296,13 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
char need_poll_out = 0;
char need_append = 1;
struct ioq *entry = NULL;
- glusterfs_ctx_t *ctx = NULL;
+ glusterfs_ctx_t *ctx = NULL;
if (!this || !this->private)
goto out;
priv = this->private;
- ctx = this->ctx;
+ ctx = this->ctx;
pthread_mutex_lock (&priv->lock);
{
@@ -2337,7 +2337,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
if (need_poll_out) {
/* first entry to wait. continue writing on POLLOUT */
priv->idx = event_select_on (ctx->event_pool,
- priv->sock,
+ priv->sock,
priv->idx, -1, 1);
}
}
@@ -2450,22 +2450,22 @@ validate_options (rpc_transport_t *this, dict_t *options, char **op_errstr)
char *optstr = NULL;
int ret = -1;
gf_boolean_t tmp_bool = _gf_false;
-
+
if (dict_get_str (options, "transport.socket.keepalive",
- &optstr) == 0) {
- if (gf_string2boolean (optstr, &tmp_bool) == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "'transport.socket.keepalive' takes only "
- "boolean options, not taking any action");
- *op_errstr = "Value should be only boolean!!";
- ret =-1;
- goto out;
- }
+ &optstr) == 0) {
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'transport.socket.keepalive' takes only "
+ "boolean options, not taking any action");
+ *op_errstr = "Value should be only boolean!!";
+ ret =-1;
+ goto out;
+ }
}
ret =0;
out:
- return ret;
+ return ret;
}
@@ -2481,22 +2481,22 @@ reconfigure (rpc_transport_t *this, dict_t *options)
ret =-1;
goto out;
}
-
-
+
+
priv = this->private;
if (dict_get_str (this->options, "transport.socket.keepalive",
- &optstr) == 0) {
- if (gf_string2boolean (optstr, &tmp_bool) == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "'transport.socket.keepalive' takes only "
- "boolean options, not taking any action");
- priv->keepalive = 1;
- goto out;
- }
- gf_log (this->name, GF_LOG_DEBUG, "Reconfigured transport.socket.keepalive");
-
- priv->keepalive = tmp_bool;
+ &optstr) == 0) {
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'transport.socket.keepalive' takes only "
+ "boolean options, not taking any action");
+ priv->keepalive = 1;
+ goto out;
+ }
+ gf_log (this->name, GF_LOG_DEBUG, "Reconfigured transport.socket.keepalive");
+
+ priv->keepalive = tmp_bool;
}
else
priv->keepalive = 1;
@@ -2525,7 +2525,7 @@ socket_init (rpc_transport_t *this)
if (!priv) {
gf_log (this->name, GF_LOG_ERROR,
"calloc (1, %"GF_PRI_SIZET") returned NULL",
- sizeof (*priv));
+ sizeof (*priv));
return -1;
}
@@ -2551,7 +2551,7 @@ socket_init (rpc_transport_t *this)
if (gf_string2boolean (optstr, &tmp_bool) == -1) {
gf_log (this->name, GF_LOG_ERROR,
"'non-blocking-io' takes only boolean options,"
- " not taking any action");
+ " not taking any action");
tmp_bool = 1;
}
@@ -2604,7 +2604,7 @@ socket_init (rpc_transport_t *this)
if (gf_string2boolean (optstr, &tmp_bool) == -1) {
gf_log (this->name, GF_LOG_ERROR,
"'transport.socket.keepalive' takes only "
- "boolean options, not taking any action");
+ "boolean options, not taking any action");
tmp_bool = 1;
}