summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport/socket/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c202
1 files changed, 126 insertions, 76 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index ccddfbc8d76..da9a6d64b6c 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -153,6 +153,9 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
int opcount = 0;
int moved = 0;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
sock = priv->sock;
@@ -228,6 +231,7 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
if (pending_count)
*pending_count = opcount;
+out:
return opcount;
}
@@ -265,6 +269,10 @@ __socket_disconnect (rpc_transport_t *this)
socket_private_t *priv = NULL;
int ret = -1;
+ if (!this || !this->private)
+ goto out;
+
+
priv = this->private;
if (priv->sock != -1) {
@@ -275,6 +283,7 @@ __socket_disconnect (rpc_transport_t *this)
ret);
}
+out:
return ret;
}
@@ -286,6 +295,9 @@ __socket_server_bind (rpc_transport_t *this)
int ret = -1;
int opt = 1;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR,
@@ -310,6 +322,7 @@ __socket_server_bind (rpc_transport_t *this)
}
}
+out:
return ret;
}
@@ -367,6 +380,9 @@ __socket_reset (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
/* TODO: use mem-pool on incoming data */
@@ -388,6 +404,9 @@ __socket_reset (rpc_transport_t *this)
priv->sock = -1;
priv->idx = -1;
priv->connected = -1;
+
+out:
+ return;
}
@@ -419,12 +438,14 @@ __socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg)
int count = 0;
uint32_t size = 0;
+ if (!this)
+ goto out;
+
/* TODO: use mem-pool */
entry = GF_CALLOC (1, sizeof (*entry), gf_common_mt_ioq);
if (!entry)
return NULL;
-
count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount;
assert (count <= (MAX_IOVEC - 1));
@@ -473,6 +494,7 @@ __socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg)
INIT_LIST_HEAD (&entry->list);
+out:
return entry;
}
@@ -480,6 +502,9 @@ __socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg)
void
__socket_ioq_entry_free (struct ioq *entry)
{
+ if (!entry)
+ return;
+
list_del_init (&entry->list);
if (entry->iobref)
iobref_unref (entry->iobref);
@@ -495,6 +520,9 @@ __socket_ioq_flush (rpc_transport_t *this)
socket_private_t *priv = NULL;
struct ioq *entry = NULL;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
while (!list_empty (&priv->ioq)) {
@@ -502,6 +530,7 @@ __socket_ioq_flush (rpc_transport_t *this)
__socket_ioq_entry_free (entry);
}
+out:
return;
}
@@ -533,6 +562,9 @@ __socket_ioq_churn (rpc_transport_t *this)
int ret = 0;
struct ioq *entry = NULL;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
while (!list_empty (&priv->ioq)) {
@@ -551,6 +583,7 @@ __socket_ioq_churn (rpc_transport_t *this)
priv->sock, priv->idx, -1, 0);
}
+out:
return ret;
}
@@ -561,6 +594,9 @@ socket_event_poll_err (rpc_transport_t *this)
socket_private_t *priv = NULL;
int ret = -1;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
pthread_mutex_lock (&priv->lock);
@@ -572,6 +608,7 @@ socket_event_poll_err (rpc_transport_t *this)
rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
+out:
return ret;
}
@@ -582,6 +619,9 @@ socket_event_poll_out (rpc_transport_t *this)
socket_private_t *priv = NULL;
int ret = -1;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
pthread_mutex_lock (&priv->lock);
@@ -598,6 +638,7 @@ socket_event_poll_out (rpc_transport_t *this)
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL);
+out:
return ret;
}
@@ -610,6 +651,9 @@ __socket_read_simple_msg (rpc_transport_t *this)
uint32_t remaining_size = 0;
size_t bytes_read = 0;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
switch (priv->incoming.frag.simple_state) {
@@ -662,6 +706,7 @@ __socket_read_simple_msg (rpc_transport_t *this)
}
}
+out:
return ret;
}
@@ -689,6 +734,9 @@ __socket_read_vectored_request (rpc_transport_t *this)
uint32_t remaining_size = 0;
uint32_t gluster_write_proc_len = 0;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
switch (priv->incoming.frag.call_body.request.vector_state) {
@@ -802,6 +850,7 @@ __socket_read_vectored_request (rpc_transport_t *this)
break;
}
+out:
return ret;
}
@@ -822,6 +871,9 @@ __socket_read_request (rpc_transport_t *this)
int ret = -1;
char *buf = NULL;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
switch (priv->incoming.frag.call_body.request.header_state) {
@@ -871,6 +923,7 @@ __socket_read_request (rpc_transport_t *this)
break;
}
+out:
return ret;
}
@@ -883,6 +936,9 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
struct iobuf *iobuf = NULL;
uint32_t gluster_read_rsp_hdr_len = 0;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
switch (priv->incoming.frag.call_body.reply.accepted_success_state) {
@@ -960,6 +1016,9 @@ __socket_read_accepted_reply (rpc_transport_t *this)
uint32_t verflen = 0, len = 0;
uint32_t remaining_size = 0;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
switch (priv->incoming.frag.call_body.reply.accepted_state) {
@@ -1035,6 +1094,7 @@ __socket_read_accepted_reply (rpc_transport_t *this)
break;
}
+out:
return ret;
}
@@ -1057,6 +1117,9 @@ __socket_read_vectored_reply (rpc_transport_t *this)
char *buf = NULL;
uint32_t remaining_size = 0;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
switch (priv->incoming.frag.call_body.reply.status_state) {
@@ -1107,6 +1170,7 @@ __socket_read_vectored_reply (rpc_transport_t *this)
break;
}
+out:
return ret;
}
@@ -1127,6 +1191,9 @@ __socket_read_reply (rpc_transport_t *this)
int32_t ret = -1;
rpc_request_info_t *request_info = NULL;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
buf = rpc_xid_addr (iobuf_ptr (priv->incoming.iobuf));
@@ -1182,6 +1249,9 @@ __socket_read_frag (rpc_transport_t *this)
char *buf = NULL;
uint32_t remaining_size = 0;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
switch (priv->incoming.frag.state) {
@@ -1226,6 +1296,7 @@ __socket_read_frag (rpc_transport_t *this)
break;
}
+out:
return ret;
}
@@ -1259,6 +1330,9 @@ __socket_proto_state_machine (rpc_transport_t *this,
struct iobref *iobref = NULL;
struct iovec vector[2];
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
while (priv->incoming.record_state != SP_STATE_COMPLETE) {
switch (priv->incoming.record_state) {
@@ -1429,6 +1503,9 @@ socket_proto_state_machine (rpc_transport_t *this,
socket_private_t *priv = NULL;
int ret = 0;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
pthread_mutex_lock (&priv->lock);
@@ -1437,6 +1514,7 @@ socket_proto_state_machine (rpc_transport_t *this,
}
pthread_mutex_unlock (&priv->lock);
+out:
return ret;
}
@@ -1468,6 +1546,9 @@ socket_connect_finish (rpc_transport_t *this)
rpc_transport_event_t event = 0;
char notify_rpc = 0;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
pthread_mutex_lock (&priv->lock);
@@ -1524,7 +1605,7 @@ unlock:
if (notify_rpc) {
rpc_transport_notify (this, event, this);
}
-
+out:
return 0;
}
@@ -1539,11 +1620,12 @@ socket_event_handler (int fd, int idx, void *data,
int ret = 0;
this = data;
+ if (!this || !this->private || !this->xl)
+ goto out;
+
THIS = this->xl;
priv = this->private;
- if (!priv)
- return 0;
pthread_mutex_lock (&priv->lock);
{
@@ -1569,6 +1651,7 @@ socket_event_handler (int fd, int idx, void *data,
rpc_transport_unref (this);
}
+out:
return 0;
}
@@ -1588,13 +1671,13 @@ socket_server_event_handler (int fd, int idx, void *data,
glusterfs_ctx_t *ctx = NULL;
this = data;
+ if (!this || !this->private || !this->xl)
+ goto out;
+
THIS = this->xl;
priv = this->private;
ctx = this->ctx;
- if (!priv)
- return 0;
-
pthread_mutex_lock (&priv->lock);
{
priv->idx = idx;
@@ -1692,6 +1775,7 @@ socket_server_event_handler (int fd, int idx, void *data,
unlock:
pthread_mutex_unlock (&priv->lock);
+out:
return ret;
}
@@ -1702,6 +1786,9 @@ socket_disconnect (rpc_transport_t *this)
socket_private_t *priv = NULL;
int ret = -1;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
pthread_mutex_lock (&priv->lock);
@@ -1710,6 +1797,7 @@ socket_disconnect (rpc_transport_t *this)
}
pthread_mutex_unlock (&priv->lock);
+out:
return ret;
}
@@ -1725,6 +1813,9 @@ socket_connect (rpc_transport_t *this, int port)
glusterfs_ctx_t *ctx = NULL;
sa_family_t sa_family = {0, };
+ if (!this || !this->private)
+ goto err;
+
priv = this->private;
ctx = this->ctx;
@@ -1874,6 +1965,9 @@ socket_listen (rpc_transport_t *this)
glusterfs_ctx_t *ctx = NULL;
sa_family_t sa_family = {0, };
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
myinfo = &this->myinfo;
ctx = this->ctx;
@@ -1998,74 +2092,11 @@ socket_listen (rpc_transport_t *this)
unlock:
pthread_mutex_unlock (&priv->lock);
+out:
return ret;
}
-/* TODO: implement per transfer limit */
-#if 0
-int
-socket_submit (rpc_transport_t *this, char *buf, int len,
- struct iovec *vector, int count,
- struct iobref *iobref)
-{
- socket_private_t *priv = NULL;
- int ret = -1;
- char need_poll_out = 0;
- char need_append = 1;
- struct ioq *entry = NULL;
- glusterfs_ctx_t *ctx = NULL;
-
- priv = this->private;
- ctx = this->ctx;
-
- pthread_mutex_lock (&priv->lock);
- {
- if (priv->connected != 1) {
- if (!priv->submit_log && !priv->connect_finish_log) {
- gf_log (this->name, GF_LOG_DEBUG,
- "not connected (priv->connected = %d)",
- priv->connected);
- priv->submit_log = 1;
- }
- goto unlock;
- }
-
- priv->submit_log = 0;
- entry = __socket_ioq_new (this, buf, len, vector, count, iobref);
- if (!entry)
- goto unlock;
-
- if (list_empty (&priv->ioq)) {
- ret = __socket_ioq_churn_entry (this, entry);
-
- if (ret == 0)
- need_append = 0;
-
- if (ret > 0)
- need_poll_out = 1;
- }
-
- if (need_append) {
- list_add_tail (&entry->list, &priv->ioq);
- ret = 0;
- }
-
- if (need_poll_out) {
- /* first entry to wait. continue writing on POLLOUT */
- priv->idx = event_select_on (ctx->event_pool,
- priv->sock,
- priv->idx, -1, 1);
- }
- }
-unlock:
- pthread_mutex_unlock (&priv->lock);
-
- return ret;
-}
-#endif
-
-
int32_t
socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
{
@@ -2076,6 +2107,9 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
struct ioq *entry = NULL;
glusterfs_ctx_t *ctx = NULL;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
ctx = this->ctx;
@@ -2121,6 +2155,7 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
unlock:
pthread_mutex_unlock (&priv->lock);
+out:
return ret;
}
@@ -2135,6 +2170,9 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
struct ioq *entry = NULL;
glusterfs_ctx_t *ctx = NULL;
+ if (!this || !this->private)
+ goto out;
+
priv = this->private;
ctx = this->ctx;
@@ -2179,6 +2217,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
unlock:
pthread_mutex_unlock (&priv->lock);
+out:
return ret;
}
@@ -2375,10 +2414,21 @@ socket_init (rpc_transport_t *this)
void
fini (rpc_transport_t *this)
{
- socket_private_t *priv = this->private;
+ socket_private_t *priv = NULL;
- this->private = NULL;
+ if (!this)
+ return;
+
+ priv = this->private;
if (priv) {
+ if (priv->sock != -1) {
+ pthread_mutex_lock (&priv->lock);
+ {
+ __socket_ioq_flush (this);
+ __socket_reset (this);
+ }
+ pthread_mutex_unlock (&priv->lock);
+ }
gf_log (this->name, GF_LOG_TRACE,
"transport %p destroyed", this);
@@ -2386,10 +2436,10 @@ fini (rpc_transport_t *this)
GF_FREE (priv);
}
- if (this->name) {
+ if (this->name)
GF_FREE (this->name);
- this->name = NULL;
- }
+
+ this->private = NULL;
}