diff options
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 202 |
1 files changed, 126 insertions, 76 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index ccddfbc8d76..da9a6d64b6c 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -153,6 +153,9 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, int opcount = 0; int moved = 0; + if (!this || !this->private) + goto out; + priv = this->private; sock = priv->sock; @@ -228,6 +231,7 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, if (pending_count) *pending_count = opcount; +out: return opcount; } @@ -265,6 +269,10 @@ __socket_disconnect (rpc_transport_t *this) socket_private_t *priv = NULL; int ret = -1; + if (!this || !this->private) + goto out; + + priv = this->private; if (priv->sock != -1) { @@ -275,6 +283,7 @@ __socket_disconnect (rpc_transport_t *this) ret); } +out: return ret; } @@ -286,6 +295,9 @@ __socket_server_bind (rpc_transport_t *this) int ret = -1; int opt = 1; + if (!this || !this->private) + goto out; + priv = this->private; ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, @@ -310,6 +322,7 @@ __socket_server_bind (rpc_transport_t *this) } } +out: return ret; } @@ -367,6 +380,9 @@ __socket_reset (rpc_transport_t *this) { socket_private_t *priv = NULL; + if (!this || !this->private) + goto out; + priv = this->private; /* TODO: use mem-pool on incoming data */ @@ -388,6 +404,9 @@ __socket_reset (rpc_transport_t *this) priv->sock = -1; priv->idx = -1; priv->connected = -1; + +out: + return; } @@ -419,12 +438,14 @@ __socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg) int count = 0; uint32_t size = 0; + if (!this) + goto out; + /* TODO: use mem-pool */ entry = GF_CALLOC (1, sizeof (*entry), gf_common_mt_ioq); if (!entry) return NULL; - count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount; assert (count <= (MAX_IOVEC - 1)); @@ -473,6 +494,7 @@ __socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg) INIT_LIST_HEAD (&entry->list); +out: return entry; } @@ -480,6 +502,9 @@ __socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg) void __socket_ioq_entry_free (struct ioq *entry) { + if (!entry) + return; + list_del_init (&entry->list); if (entry->iobref) iobref_unref (entry->iobref); @@ -495,6 +520,9 @@ __socket_ioq_flush (rpc_transport_t *this) socket_private_t *priv = NULL; struct ioq *entry = NULL; + if (!this || !this->private) + goto out; + priv = this->private; while (!list_empty (&priv->ioq)) { @@ -502,6 +530,7 @@ __socket_ioq_flush (rpc_transport_t *this) __socket_ioq_entry_free (entry); } +out: return; } @@ -533,6 +562,9 @@ __socket_ioq_churn (rpc_transport_t *this) int ret = 0; struct ioq *entry = NULL; + if (!this || !this->private) + goto out; + priv = this->private; while (!list_empty (&priv->ioq)) { @@ -551,6 +583,7 @@ __socket_ioq_churn (rpc_transport_t *this) priv->sock, priv->idx, -1, 0); } +out: return ret; } @@ -561,6 +594,9 @@ socket_event_poll_err (rpc_transport_t *this) socket_private_t *priv = NULL; int ret = -1; + if (!this || !this->private) + goto out; + priv = this->private; pthread_mutex_lock (&priv->lock); @@ -572,6 +608,7 @@ socket_event_poll_err (rpc_transport_t *this) rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); +out: return ret; } @@ -582,6 +619,9 @@ socket_event_poll_out (rpc_transport_t *this) socket_private_t *priv = NULL; int ret = -1; + if (!this || !this->private) + goto out; + priv = this->private; pthread_mutex_lock (&priv->lock); @@ -598,6 +638,7 @@ socket_event_poll_out (rpc_transport_t *this) ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL); +out: return ret; } @@ -610,6 +651,9 @@ __socket_read_simple_msg (rpc_transport_t *this) uint32_t remaining_size = 0; size_t bytes_read = 0; + if (!this || !this->private) + goto out; + priv = this->private; switch (priv->incoming.frag.simple_state) { @@ -662,6 +706,7 @@ __socket_read_simple_msg (rpc_transport_t *this) } } +out: return ret; } @@ -689,6 +734,9 @@ __socket_read_vectored_request (rpc_transport_t *this) uint32_t remaining_size = 0; uint32_t gluster_write_proc_len = 0; + if (!this || !this->private) + goto out; + priv = this->private; switch (priv->incoming.frag.call_body.request.vector_state) { @@ -802,6 +850,7 @@ __socket_read_vectored_request (rpc_transport_t *this) break; } +out: return ret; } @@ -822,6 +871,9 @@ __socket_read_request (rpc_transport_t *this) int ret = -1; char *buf = NULL; + if (!this || !this->private) + goto out; + priv = this->private; switch (priv->incoming.frag.call_body.request.header_state) { @@ -871,6 +923,7 @@ __socket_read_request (rpc_transport_t *this) break; } +out: return ret; } @@ -883,6 +936,9 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) struct iobuf *iobuf = NULL; uint32_t gluster_read_rsp_hdr_len = 0; + if (!this || !this->private) + goto out; + priv = this->private; switch (priv->incoming.frag.call_body.reply.accepted_success_state) { @@ -960,6 +1016,9 @@ __socket_read_accepted_reply (rpc_transport_t *this) uint32_t verflen = 0, len = 0; uint32_t remaining_size = 0; + if (!this || !this->private) + goto out; + priv = this->private; switch (priv->incoming.frag.call_body.reply.accepted_state) { @@ -1035,6 +1094,7 @@ __socket_read_accepted_reply (rpc_transport_t *this) break; } +out: return ret; } @@ -1057,6 +1117,9 @@ __socket_read_vectored_reply (rpc_transport_t *this) char *buf = NULL; uint32_t remaining_size = 0; + if (!this || !this->private) + goto out; + priv = this->private; switch (priv->incoming.frag.call_body.reply.status_state) { @@ -1107,6 +1170,7 @@ __socket_read_vectored_reply (rpc_transport_t *this) break; } +out: return ret; } @@ -1127,6 +1191,9 @@ __socket_read_reply (rpc_transport_t *this) int32_t ret = -1; rpc_request_info_t *request_info = NULL; + if (!this || !this->private) + goto out; + priv = this->private; buf = rpc_xid_addr (iobuf_ptr (priv->incoming.iobuf)); @@ -1182,6 +1249,9 @@ __socket_read_frag (rpc_transport_t *this) char *buf = NULL; uint32_t remaining_size = 0; + if (!this || !this->private) + goto out; + priv = this->private; switch (priv->incoming.frag.state) { @@ -1226,6 +1296,7 @@ __socket_read_frag (rpc_transport_t *this) break; } +out: return ret; } @@ -1259,6 +1330,9 @@ __socket_proto_state_machine (rpc_transport_t *this, struct iobref *iobref = NULL; struct iovec vector[2]; + if (!this || !this->private) + goto out; + priv = this->private; while (priv->incoming.record_state != SP_STATE_COMPLETE) { switch (priv->incoming.record_state) { @@ -1429,6 +1503,9 @@ socket_proto_state_machine (rpc_transport_t *this, socket_private_t *priv = NULL; int ret = 0; + if (!this || !this->private) + goto out; + priv = this->private; pthread_mutex_lock (&priv->lock); @@ -1437,6 +1514,7 @@ socket_proto_state_machine (rpc_transport_t *this, } pthread_mutex_unlock (&priv->lock); +out: return ret; } @@ -1468,6 +1546,9 @@ socket_connect_finish (rpc_transport_t *this) rpc_transport_event_t event = 0; char notify_rpc = 0; + if (!this || !this->private) + goto out; + priv = this->private; pthread_mutex_lock (&priv->lock); @@ -1524,7 +1605,7 @@ unlock: if (notify_rpc) { rpc_transport_notify (this, event, this); } - +out: return 0; } @@ -1539,11 +1620,12 @@ socket_event_handler (int fd, int idx, void *data, int ret = 0; this = data; + if (!this || !this->private || !this->xl) + goto out; + THIS = this->xl; priv = this->private; - if (!priv) - return 0; pthread_mutex_lock (&priv->lock); { @@ -1569,6 +1651,7 @@ socket_event_handler (int fd, int idx, void *data, rpc_transport_unref (this); } +out: return 0; } @@ -1588,13 +1671,13 @@ socket_server_event_handler (int fd, int idx, void *data, glusterfs_ctx_t *ctx = NULL; this = data; + if (!this || !this->private || !this->xl) + goto out; + THIS = this->xl; priv = this->private; ctx = this->ctx; - if (!priv) - return 0; - pthread_mutex_lock (&priv->lock); { priv->idx = idx; @@ -1692,6 +1775,7 @@ socket_server_event_handler (int fd, int idx, void *data, unlock: pthread_mutex_unlock (&priv->lock); +out: return ret; } @@ -1702,6 +1786,9 @@ socket_disconnect (rpc_transport_t *this) socket_private_t *priv = NULL; int ret = -1; + if (!this || !this->private) + goto out; + priv = this->private; pthread_mutex_lock (&priv->lock); @@ -1710,6 +1797,7 @@ socket_disconnect (rpc_transport_t *this) } pthread_mutex_unlock (&priv->lock); +out: return ret; } @@ -1725,6 +1813,9 @@ socket_connect (rpc_transport_t *this, int port) glusterfs_ctx_t *ctx = NULL; sa_family_t sa_family = {0, }; + if (!this || !this->private) + goto err; + priv = this->private; ctx = this->ctx; @@ -1874,6 +1965,9 @@ socket_listen (rpc_transport_t *this) glusterfs_ctx_t *ctx = NULL; sa_family_t sa_family = {0, }; + if (!this || !this->private) + goto out; + priv = this->private; myinfo = &this->myinfo; ctx = this->ctx; @@ -1998,74 +2092,11 @@ socket_listen (rpc_transport_t *this) unlock: pthread_mutex_unlock (&priv->lock); +out: return ret; } -/* TODO: implement per transfer limit */ -#if 0 -int -socket_submit (rpc_transport_t *this, char *buf, int len, - struct iovec *vector, int count, - struct iobref *iobref) -{ - socket_private_t *priv = NULL; - int ret = -1; - char need_poll_out = 0; - char need_append = 1; - struct ioq *entry = NULL; - glusterfs_ctx_t *ctx = NULL; - - priv = this->private; - ctx = this->ctx; - - pthread_mutex_lock (&priv->lock); - { - if (priv->connected != 1) { - if (!priv->submit_log && !priv->connect_finish_log) { - gf_log (this->name, GF_LOG_DEBUG, - "not connected (priv->connected = %d)", - priv->connected); - priv->submit_log = 1; - } - goto unlock; - } - - priv->submit_log = 0; - entry = __socket_ioq_new (this, buf, len, vector, count, iobref); - if (!entry) - goto unlock; - - if (list_empty (&priv->ioq)) { - ret = __socket_ioq_churn_entry (this, entry); - - if (ret == 0) - need_append = 0; - - if (ret > 0) - need_poll_out = 1; - } - - if (need_append) { - list_add_tail (&entry->list, &priv->ioq); - ret = 0; - } - - if (need_poll_out) { - /* first entry to wait. continue writing on POLLOUT */ - priv->idx = event_select_on (ctx->event_pool, - priv->sock, - priv->idx, -1, 1); - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - return ret; -} -#endif - - int32_t socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) { @@ -2076,6 +2107,9 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) struct ioq *entry = NULL; glusterfs_ctx_t *ctx = NULL; + if (!this || !this->private) + goto out; + priv = this->private; ctx = this->ctx; @@ -2121,6 +2155,7 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) unlock: pthread_mutex_unlock (&priv->lock); +out: return ret; } @@ -2135,6 +2170,9 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) struct ioq *entry = NULL; glusterfs_ctx_t *ctx = NULL; + if (!this || !this->private) + goto out; + priv = this->private; ctx = this->ctx; @@ -2179,6 +2217,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) unlock: pthread_mutex_unlock (&priv->lock); +out: return ret; } @@ -2375,10 +2414,21 @@ socket_init (rpc_transport_t *this) void fini (rpc_transport_t *this) { - socket_private_t *priv = this->private; + socket_private_t *priv = NULL; - this->private = NULL; + if (!this) + return; + + priv = this->private; if (priv) { + if (priv->sock != -1) { + pthread_mutex_lock (&priv->lock); + { + __socket_ioq_flush (this); + __socket_reset (this); + } + pthread_mutex_unlock (&priv->lock); + } gf_log (this->name, GF_LOG_TRACE, "transport %p destroyed", this); @@ -2386,10 +2436,10 @@ fini (rpc_transport_t *this) GF_FREE (priv); } - if (this->name) { + if (this->name) GF_FREE (this->name); - this->name = NULL; - } + + this->private = NULL; } |