diff options
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 374 |
1 files changed, 187 insertions, 187 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index cbd303496..5744ce29a 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -48,90 +48,90 @@ #define SA(ptr) ((struct sockaddr *)ptr) -#define __socket_proto_reset_pending(priv) do { \ - memset (&priv->incoming.frag.vector, 0, \ - sizeof (priv->incoming.frag.vector)); \ - priv->incoming.frag.pending_vector = \ - &priv->incoming.frag.vector; \ - priv->incoming.frag.pending_vector->iov_base = \ - priv->incoming.frag.fragcurrent; \ - priv->incoming.pending_vector = \ - priv->incoming.frag.pending_vector; \ +#define __socket_proto_reset_pending(priv) do { \ + memset (&priv->incoming.frag.vector, 0, \ + sizeof (priv->incoming.frag.vector)); \ + priv->incoming.frag.pending_vector = \ + &priv->incoming.frag.vector; \ + priv->incoming.frag.pending_vector->iov_base = \ + priv->incoming.frag.fragcurrent; \ + priv->incoming.pending_vector = \ + priv->incoming.frag.pending_vector; \ } while (0); -#define __socket_proto_update_pending(priv) \ - do { \ - uint32_t remaining_fragsize = 0; \ - if (priv->incoming.frag.pending_vector->iov_len == 0) { \ +#define __socket_proto_update_pending(priv) \ + do { \ + uint32_t remaining_fragsize = 0; \ + if (priv->incoming.frag.pending_vector->iov_len == 0) { \ remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \ - - priv->incoming.frag.bytes_read; \ - \ - priv->incoming.frag.pending_vector->iov_len = \ + - priv->incoming.frag.bytes_read; \ + \ + priv->incoming.frag.pending_vector->iov_len = \ remaining_fragsize > priv->incoming.frag.remaining_size \ ? priv->incoming.frag.remaining_size : remaining_fragsize; \ - \ - priv->incoming.frag.remaining_size -= \ - priv->incoming.frag.pending_vector->iov_len; \ - } \ + \ + priv->incoming.frag.remaining_size -= \ + priv->incoming.frag.pending_vector->iov_len; \ + } \ } while (0); -#define __socket_proto_update_priv_after_read(priv, ret, bytes_read) \ - { \ - priv->incoming.frag.fragcurrent += bytes_read; \ - priv->incoming.frag.bytes_read += bytes_read; \ - \ +#define __socket_proto_update_priv_after_read(priv, ret, bytes_read) \ + { \ + priv->incoming.frag.fragcurrent += bytes_read; \ + priv->incoming.frag.bytes_read += bytes_read; \ + \ if ((ret > 0) || (priv->incoming.frag.remaining_size != 0)) { \ - if (priv->incoming.frag.remaining_size != 0) { \ - __socket_proto_reset_pending (priv); \ - } \ - \ + if (priv->incoming.frag.remaining_size != 0) { \ + __socket_proto_reset_pending (priv); \ + } \ + \ gf_log (this->name, GF_LOG_TRACE, "partial read on non-blocking socket"); \ - \ - break; \ - } \ - } - -#define __socket_proto_init_pending(priv, size) \ - do { \ - uint32_t remaining_fragsize = 0; \ - remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \ - - priv->incoming.frag.bytes_read; \ - \ - __socket_proto_reset_pending (priv); \ - \ - priv->incoming.frag.pending_vector->iov_len = \ + \ + break; \ + } \ + } + +#define __socket_proto_init_pending(priv, size) \ + do { \ + uint32_t remaining_fragsize = 0; \ + remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \ + - priv->incoming.frag.bytes_read; \ + \ + __socket_proto_reset_pending (priv); \ + \ + priv->incoming.frag.pending_vector->iov_len = \ remaining_fragsize > size ? size : remaining_fragsize; \ - \ - priv->incoming.frag.remaining_size = \ - size - priv->incoming.frag.pending_vector->iov_len; \ - \ -} while (0); + \ + priv->incoming.frag.remaining_size = \ + size - priv->incoming.frag.pending_vector->iov_len; \ + \ + } while (0); /* This will be used in a switch case and breaks from the switch case if all * the pending data is not read. */ -#define __socket_proto_read(priv, ret) \ - { \ - size_t bytes_read = 0; \ - \ - __socket_proto_update_pending (priv); \ - \ - ret = __socket_readv (this, \ - priv->incoming.pending_vector, 1, \ - &priv->incoming.pending_vector, \ - &priv->incoming.pending_count, \ - &bytes_read); \ - if (ret == -1) { \ - gf_log (this->name, GF_LOG_TRACE, \ - "reading from socket failed. Error (%s), " \ - "peer (%s)", strerror (errno), \ - this->peerinfo.identifier); \ - break; \ - } \ +#define __socket_proto_read(priv, ret) \ + { \ + size_t bytes_read = 0; \ + \ + __socket_proto_update_pending (priv); \ + \ + ret = __socket_readv (this, \ + priv->incoming.pending_vector, 1, \ + &priv->incoming.pending_vector, \ + &priv->incoming.pending_count, \ + &bytes_read); \ + if (ret == -1) { \ + gf_log (this->name, GF_LOG_TRACE, \ + "reading from socket failed. Error (%s), " \ + "peer (%s)", strerror (errno), \ + this->peerinfo.identifier); \ + break; \ + } \ __socket_proto_update_priv_after_read (priv, ret, bytes_read); \ - } + } int socket_init (rpc_transport_t *this); @@ -161,8 +161,8 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, priv = this->private; sock = priv->sock; - opvector = vector; - opcount = count; + opvector = vector; + opcount = count; if (bytes != NULL) { *bytes = 0; @@ -248,7 +248,7 @@ __socket_readv (rpc_transport_t *this, struct iovec *vector, int count, int ret = -1; ret = __socket_rwv (this, vector, count, - pending_vector, pending_count, bytes, 0); + pending_vector, pending_count, bytes, 0); return ret; } @@ -261,7 +261,7 @@ __socket_writev (rpc_transport_t *this, struct iovec *vector, int count, int ret = -1; ret = __socket_rwv (this, vector, count, - pending_vector, pending_count, NULL, 1); + pending_vector, pending_count, NULL, 1); return ret; } @@ -297,17 +297,17 @@ __socket_server_bind (rpc_transport_t *this) { socket_private_t *priv = NULL; int ret = -1; - int opt = 1; + int opt = 1; int reuse_check_sock = -1; struct sockaddr_storage unix_addr = {0}; if (!this || !this->private) goto out; - priv = this->private; + priv = this->private; ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, - &opt, sizeof (opt)); + &opt, sizeof (opt)); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, @@ -330,7 +330,7 @@ __socket_server_bind (rpc_transport_t *this) } ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr, - this->myinfo.sockaddr_len); + this->myinfo.sockaddr_len); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, @@ -369,7 +369,7 @@ __socket_nodelay (int fd) int ret = -1; ret = setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, - &on, sizeof (on)); + &on, sizeof (on)); if (!ret) gf_log ("", GF_LOG_TRACE, "NODELAY enabled for socket %d", fd); @@ -610,9 +610,9 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry) int ret = -1; ret = __socket_writev (this, entry->pending_vector, - entry->pending_count, + entry->pending_count, &entry->pending_vector, - &entry->pending_count); + &entry->pending_count); if (ret == 0) { /* current entry was completely written */ @@ -649,7 +649,7 @@ __socket_ioq_churn (rpc_transport_t *this) if (list_empty (&priv->ioq)) { /* all pending writes done, not interested in POLLOUT */ priv->idx = event_select_on (this->ctx->event_pool, - priv->sock, priv->idx, -1, 0); + priv->sock, priv->idx, -1, 0); } out: @@ -1031,7 +1031,7 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) "xdr_sizeof on gfs3_read_rsp failed"); ret = -1; goto out; - } + } __socket_proto_init_pending (priv, gluster_read_rsp_hdr_len); priv->incoming.frag.call_body.reply.accepted_success_state @@ -1609,21 +1609,21 @@ socket_proto_state_machine (rpc_transport_t *this, rpc_transport_pollin_t **pollin) { socket_private_t *priv = NULL; - int ret = 0; + int ret = 0; if (!this || !this->private) goto out; - priv = this->private; + priv = this->private; - pthread_mutex_lock (&priv->lock); - { - ret = __socket_proto_state_machine (this, pollin); - } + pthread_mutex_lock (&priv->lock); + { + ret = __socket_proto_state_machine (this, pollin); + } pthread_mutex_unlock (&priv->lock); out: - return ret; + return ret; } @@ -1661,51 +1661,51 @@ socket_connect_finish (rpc_transport_t *this) pthread_mutex_lock (&priv->lock); { - if (priv->connected) - goto unlock; + if (priv->connected) + goto unlock; - ret = __socket_connect_finish (priv->sock); + ret = __socket_connect_finish (priv->sock); - if (ret == -1 && errno == EINPROGRESS) - ret = 1; + if (ret == -1 && errno == EINPROGRESS) + ret = 1; - if (ret == -1 && errno != EINPROGRESS) { - if (!priv->connect_finish_log) { - gf_log (this->name, GF_LOG_ERROR, - "connection to %s failed (%s)", + if (ret == -1 && errno != EINPROGRESS) { + if (!priv->connect_finish_log) { + gf_log (this->name, GF_LOG_ERROR, + "connection to %s failed (%s)", this->peerinfo.identifier, - strerror (errno)); - priv->connect_finish_log = 1; - } - __socket_disconnect (this); - notify_rpc = 1; - event = RPC_TRANSPORT_DISCONNECT; - goto unlock; - } - - if (ret == 0) { - notify_rpc = 1; - - this->myinfo.sockaddr_len = - sizeof (this->myinfo.sockaddr); - - ret = getsockname (priv->sock, - SA (&this->myinfo.sockaddr), - &this->myinfo.sockaddr_len); - if (ret == -1) { - gf_log (this->name, GF_LOG_DEBUG, - "getsockname on (%d) failed (%s)", - priv->sock, strerror (errno)); - __socket_disconnect (this); - event = GF_EVENT_POLLERR; - goto unlock; - } - - priv->connected = 1; - priv->connect_finish_log = 0; - event = RPC_TRANSPORT_CONNECT; - get_transport_identifiers (this); - } + strerror (errno)); + priv->connect_finish_log = 1; + } + __socket_disconnect (this); + notify_rpc = 1; + event = RPC_TRANSPORT_DISCONNECT; + goto unlock; + } + + if (ret == 0) { + notify_rpc = 1; + + this->myinfo.sockaddr_len = + sizeof (this->myinfo.sockaddr); + + ret = getsockname (priv->sock, + SA (&this->myinfo.sockaddr), + &this->myinfo.sockaddr_len); + if (ret == -1) { + gf_log (this->name, GF_LOG_DEBUG, + "getsockname on (%d) failed (%s)", + priv->sock, strerror (errno)); + __socket_disconnect (this); + event = GF_EVENT_POLLERR; + goto unlock; + } + + priv->connected = 1; + priv->connect_finish_log = 0; + event = RPC_TRANSPORT_CONNECT; + get_transport_identifiers (this); + } } unlock: pthread_mutex_unlock (&priv->lock); @@ -1776,7 +1776,7 @@ socket_server_event_handler (int fd, int idx, void *data, struct sockaddr_storage new_sockaddr = {0, }; socklen_t addrlen = sizeof (new_sockaddr); socket_private_t *new_priv = NULL; - glusterfs_ctx_t *ctx = NULL; + glusterfs_ctx_t *ctx = NULL; this = data; if (!this || !this->private || !this->xl) @@ -1784,7 +1784,7 @@ socket_server_event_handler (int fd, int idx, void *data, THIS = this->xl; priv = this->private; - ctx = this->ctx; + ctx = this->ctx; pthread_mutex_lock (&priv->lock); { @@ -1792,7 +1792,7 @@ socket_server_event_handler (int fd, int idx, void *data, if (poll_in) { new_sock = accept (priv->sock, SA (&new_sockaddr), - &addrlen); + &addrlen); if (new_sock == -1) goto unlock; @@ -1838,11 +1838,11 @@ socket_server_event_handler (int fd, int idx, void *data, new_trans->name = gf_strdup (this->name); memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, - addrlen); + addrlen); new_trans->peerinfo.sockaddr_len = addrlen; new_trans->myinfo.sockaddr_len = - sizeof (new_trans->myinfo.sockaddr); + sizeof (new_trans->myinfo.sockaddr); ret = getsockname (new_sock, SA (&new_trans->myinfo.sockaddr), @@ -1874,10 +1874,10 @@ socket_server_event_handler (int fd, int idx, void *data, rpc_transport_ref (new_trans); new_priv->idx = - event_register (ctx->event_pool, - new_sock, - socket_event_handler, - new_trans, 1, 0); + event_register (ctx->event_pool, + new_sock, + socket_event_handler, + new_trans, 1, 0); if (new_priv->idx == -1) ret = -1; @@ -1924,18 +1924,18 @@ int socket_connect (rpc_transport_t *this, int port) { int ret = -1; - int sock = -1; + int sock = -1; socket_private_t *priv = NULL; struct sockaddr_storage sockaddr = {0, }; socklen_t sockaddr_len = 0; - glusterfs_ctx_t *ctx = NULL; + glusterfs_ctx_t *ctx = NULL; sa_family_t sa_family = {0, }; if (!this || !this->private) goto err; priv = this->private; - ctx = this->ctx; + ctx = this->ctx; if (!priv) { gf_log (this->name, GF_LOG_DEBUG, @@ -1981,7 +1981,7 @@ socket_connect (rpc_transport_t *this, int port) if (priv->sock == -1) { gf_log (this->name, GF_LOG_ERROR, "socket creation failed (%s)", - strerror (errno)); + strerror (errno)); goto unlock; } @@ -2040,10 +2040,10 @@ socket_connect (rpc_transport_t *this, int port) } SA (&this->myinfo.sockaddr)->sa_family = - SA (&this->peerinfo.sockaddr)->sa_family; + SA (&this->peerinfo.sockaddr)->sa_family; ret = client_bind (this, SA (&this->myinfo.sockaddr), - &this->myinfo.sockaddr_len, priv->sock); + &this->myinfo.sockaddr_len, priv->sock); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, "client bind failed: %s", strerror (errno)); @@ -2053,12 +2053,12 @@ socket_connect (rpc_transport_t *this, int port) } ret = connect (priv->sock, SA (&this->peerinfo.sockaddr), - this->peerinfo.sockaddr_len); + this->peerinfo.sockaddr_len); if (ret == -1 && errno != EINPROGRESS) { gf_log (this->name, GF_LOG_ERROR, "connection attempt failed (%s)", - strerror (errno)); + strerror (errno)); close (priv->sock); priv->sock = -1; goto unlock; @@ -2086,19 +2086,19 @@ socket_listen (rpc_transport_t *this) { socket_private_t * priv = NULL; int ret = -1; - int sock = -1; + int sock = -1; struct sockaddr_storage sockaddr; socklen_t sockaddr_len; peer_info_t *myinfo = NULL; - glusterfs_ctx_t *ctx = NULL; + glusterfs_ctx_t *ctx = NULL; sa_family_t sa_family = {0, }; if (!this || !this->private) goto out; - priv = this->private; - myinfo = &this->myinfo; - ctx = this->ctx; + priv = this->private; + myinfo = &this->myinfo; + ctx = this->ctx; pthread_mutex_lock (&priv->lock); { @@ -2134,7 +2134,7 @@ socket_listen (rpc_transport_t *this) if (priv->sock == -1) { gf_log (this->name, GF_LOG_ERROR, "socket creation failed (%s)", - strerror (errno)); + strerror (errno)); goto unlock; } @@ -2195,7 +2195,7 @@ socket_listen (rpc_transport_t *this) if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "could not set socket %d to listen mode (%s)", - priv->sock, strerror (errno)); + priv->sock, strerror (errno)); close (priv->sock); priv->sock = -1; goto unlock; @@ -2205,12 +2205,12 @@ socket_listen (rpc_transport_t *this) priv->idx = event_register (ctx->event_pool, priv->sock, socket_server_event_handler, - this, 1, 0); + this, 1, 0); if (priv->idx == -1) { gf_log (this->name, GF_LOG_DEBUG, "could not register socket %d with events", - priv->sock); + priv->sock); ret = -1; close (priv->sock); priv->sock = -1; @@ -2233,13 +2233,13 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) char need_poll_out = 0; char need_append = 1; struct ioq *entry = NULL; - glusterfs_ctx_t *ctx = NULL; + glusterfs_ctx_t *ctx = NULL; if (!this || !this->private) goto out; priv = this->private; - ctx = this->ctx; + ctx = this->ctx; pthread_mutex_lock (&priv->lock); { @@ -2276,7 +2276,7 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) if (need_poll_out) { /* first entry to wait. continue writing on POLLOUT */ priv->idx = event_select_on (ctx->event_pool, - priv->sock, + priv->sock, priv->idx, -1, 1); } } @@ -2296,13 +2296,13 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) char need_poll_out = 0; char need_append = 1; struct ioq *entry = NULL; - glusterfs_ctx_t *ctx = NULL; + glusterfs_ctx_t *ctx = NULL; if (!this || !this->private) goto out; priv = this->private; - ctx = this->ctx; + ctx = this->ctx; pthread_mutex_lock (&priv->lock); { @@ -2337,7 +2337,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) if (need_poll_out) { /* first entry to wait. continue writing on POLLOUT */ priv->idx = event_select_on (ctx->event_pool, - priv->sock, + priv->sock, priv->idx, -1, 1); } } @@ -2450,22 +2450,22 @@ validate_options (rpc_transport_t *this, dict_t *options, char **op_errstr) char *optstr = NULL; int ret = -1; gf_boolean_t tmp_bool = _gf_false; - + if (dict_get_str (options, "transport.socket.keepalive", - &optstr) == 0) { - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->name, GF_LOG_ERROR, - "'transport.socket.keepalive' takes only " - "boolean options, not taking any action"); - *op_errstr = "Value should be only boolean!!"; - ret =-1; - goto out; - } + &optstr) == 0) { + if (gf_string2boolean (optstr, &tmp_bool) == -1) { + gf_log (this->name, GF_LOG_ERROR, + "'transport.socket.keepalive' takes only " + "boolean options, not taking any action"); + *op_errstr = "Value should be only boolean!!"; + ret =-1; + goto out; + } } ret =0; out: - return ret; + return ret; } @@ -2481,22 +2481,22 @@ reconfigure (rpc_transport_t *this, dict_t *options) ret =-1; goto out; } - - + + priv = this->private; if (dict_get_str (this->options, "transport.socket.keepalive", - &optstr) == 0) { - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->name, GF_LOG_ERROR, - "'transport.socket.keepalive' takes only " - "boolean options, not taking any action"); - priv->keepalive = 1; - goto out; - } - gf_log (this->name, GF_LOG_DEBUG, "Reconfigured transport.socket.keepalive"); - - priv->keepalive = tmp_bool; + &optstr) == 0) { + if (gf_string2boolean (optstr, &tmp_bool) == -1) { + gf_log (this->name, GF_LOG_ERROR, + "'transport.socket.keepalive' takes only " + "boolean options, not taking any action"); + priv->keepalive = 1; + goto out; + } + gf_log (this->name, GF_LOG_DEBUG, "Reconfigured transport.socket.keepalive"); + + priv->keepalive = tmp_bool; } else priv->keepalive = 1; @@ -2525,7 +2525,7 @@ socket_init (rpc_transport_t *this) if (!priv) { gf_log (this->name, GF_LOG_ERROR, "calloc (1, %"GF_PRI_SIZET") returned NULL", - sizeof (*priv)); + sizeof (*priv)); return -1; } @@ -2551,7 +2551,7 @@ socket_init (rpc_transport_t *this) if (gf_string2boolean (optstr, &tmp_bool) == -1) { gf_log (this->name, GF_LOG_ERROR, "'non-blocking-io' takes only boolean options," - " not taking any action"); + " not taking any action"); tmp_bool = 1; } @@ -2604,7 +2604,7 @@ socket_init (rpc_transport_t *this) if (gf_string2boolean (optstr, &tmp_bool) == -1) { gf_log (this->name, GF_LOG_ERROR, "'transport.socket.keepalive' takes only " - "boolean options, not taking any action"); + "boolean options, not taking any action"); tmp_bool = 1; } |