summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport/socket/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c186
1 files changed, 45 insertions, 141 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 209d89a225b..030a37961b6 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -187,117 +187,6 @@ struct socket_connect_error_state_ {
};
typedef struct socket_connect_error_state_ socket_connect_error_state_t;
-
-/* This timer and queue are used to reap dead threads. The timer triggers every
- * minute and pthread_joins any threads that added themselves to the reap queue
- *
- * TODO: Make the timer configurable? (Not sure if required)
- */
-static gf_timer_t *reap_timer;
-static struct list_head reap_queue;
-static pthread_mutex_t reap_lock = PTHREAD_MUTEX_INITIALIZER;
-const struct timespec reap_ts = {60, 0};
-
-struct tid_wrap {
- struct list_head list;
- pthread_t tid;
-};
-
-/* _socket_reap_own_threads iterated over the queue of tid's and pthread_joins
- * them. If a thread join fails, it logs the failure and continues
- */
-static void
-_socket_reap_own_threads() {
- struct tid_wrap *node = NULL;
- struct tid_wrap *tmp = NULL;
- pthread_t tid = 0;
- int i = 0;
-
- list_for_each_entry_safe (node, tmp, &reap_queue, list) {
- list_del_init (&node->list);
- if (pthread_join (node->tid, NULL)) {
- gf_log (THIS->name, GF_LOG_ERROR,
- "own-thread: failed to join thread (tid: %zu)",
- tid);
- }
- node->tid = 0;
- GF_FREE (node);
- node = NULL;
- i++;
- }
-
- if (i) {
- gf_log (THIS->name, GF_LOG_TRACE, "reaped %d own-threads", i);
- }
-
- return;
-}
-
-/* socket_thread_reaper reaps threads and restarts the reap_timer
- */
-static void
-socket_thread_reaper () {
-
- pthread_mutex_lock (&reap_lock);
-
- gf_timer_call_cancel (THIS->ctx, reap_timer);
- reap_timer = 0;
-
- _socket_reap_own_threads();
-
- reap_timer = gf_timer_call_after (THIS->ctx, reap_ts,
- socket_thread_reaper, NULL);
- if (!reap_timer)
- gf_log (THIS->name, GF_LOG_ERROR,
- "failed to restart socket own-thread reap timer");
-
- pthread_mutex_unlock (&reap_lock);
-
- return;
-}
-
-/* socket_thread_reaper_init initializes reap_timer and reap_queue.
- * Initializations are done only the first time this is called.
- *
- * To make sure that the reap_timer is always run, reaper_init it is better to
- * call this whenever an own-thread is launched
- */
-static void
-socket_thread_reaper_init () {
- pthread_mutex_lock (&reap_lock);
-
- if (reap_timer == NULL) {
- reap_timer = gf_timer_call_after (THIS->ctx, reap_ts,
- socket_thread_reaper, NULL);
- INIT_LIST_HEAD (&reap_queue);
- }
-
- pthread_mutex_unlock (&reap_lock);
-
- return;
-}
-
-/* socket_thread_reaper_add adds the given thread id to the queue of threads
- * that will be reaped by socket_thread_reaper
- * own-threads need to call this with their thread-ids before dying
- */
-static int
-socket_thread_reaper_add (pthread_t tid) {
- struct tid_wrap *node = NULL;
-
- pthread_mutex_lock (&reap_lock);
-
- node = GF_CALLOC (1, sizeof (*node), gf_sock_mt_tid_wrap);
- node->tid = tid;
- INIT_LIST_HEAD (&node->list);
- list_add_tail (&node->list, &reap_queue);
-
- pthread_mutex_unlock (&reap_lock);
-
- return 0;
-}
-
-
static int socket_init (rpc_transport_t *this);
static void
@@ -2640,10 +2529,6 @@ err:
pthread_mutex_unlock(&priv->lock);
rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
- /* Add the thread to the reap_queue before freeing up the transport and
- * dying
- */
- socket_thread_reaper_add (priv->thread);
rpc_transport_unref (this);
@@ -2651,11 +2536,11 @@ err:
}
-static void
+static int
socket_spawn (rpc_transport_t *this)
{
socket_private_t *priv = this->private;
-
+ int ret = -1;
switch (priv->ot_state) {
case OT_IDLE:
case OT_PLEASE_DIE:
@@ -2663,7 +2548,7 @@ socket_spawn (rpc_transport_t *this)
default:
gf_log (this->name, GF_LOG_WARNING,
"refusing to start redundant poller");
- return;
+ return ret;
}
priv->ot_gen += 7;
@@ -2671,12 +2556,16 @@ socket_spawn (rpc_transport_t *this)
gf_log (this->name, GF_LOG_TRACE,
"spawning %p with gen %u", this, priv->ot_gen);
- if (gf_thread_create(&priv->thread,NULL,socket_poller,this) != 0) {
+ /* Create thread after enable detach flag */
+
+ ret = gf_thread_create_detached (&priv->thread, socket_poller, this);
+ if (ret) {
gf_log (this->name, GF_LOG_ERROR,
"could not create poll thread");
+ ret = -1;
}
- /* start the reaper thread */
- socket_thread_reaper_init();
+
+ return ret;
}
static int
@@ -2860,30 +2749,38 @@ socket_server_event_handler (int fd, int idx, void *data,
new_priv->is_server = _gf_true;
rpc_transport_ref (new_trans);
- if (new_priv->own_thread) {
- if (pipe(new_priv->pipe) < 0) {
- gf_log(this->name,GF_LOG_ERROR,
- "could not create pipe");
- }
- socket_spawn(new_trans);
- }
- else {
- new_priv->idx =
- event_register (ctx->event_pool,
- new_sock,
- socket_event_handler,
- new_trans,
- 1, 0);
- if (new_priv->idx == -1)
- ret = -1;
- }
+ if (new_priv->own_thread) {
+ if (pipe(new_priv->pipe) < 0) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "could not create pipe");
+ }
+ ret = socket_spawn(new_trans);
+ if (ret) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "could not spawn thread");
+ sys_close (new_priv->pipe[0]);
+ sys_close (new_priv->pipe[1]);
+ }
+ } else {
+ new_priv->idx =
+ event_register (ctx->event_pool,
+ new_sock,
+ socket_event_handler,
+ new_trans,
+ 1, 0);
+ if (new_priv->idx == -1) {
+ ret = -1;
+ gf_log(this->name, GF_LOG_ERROR,
+ "failed to register the socket with event");
+ }
+ }
}
pthread_mutex_unlock (&new_priv->lock);
if (ret == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "failed to register the socket with event");
sys_close (new_sock);
+ GF_FREE (new_trans->name);
+ GF_FREE (new_trans);
rpc_transport_unref (new_trans);
goto unlock;
}
@@ -3200,7 +3097,14 @@ handler:
}
this->listener = this;
- socket_spawn(this);
+ ret = socket_spawn(this);
+ if (ret) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "could not spawn thread");
+ sys_close(priv->pipe[0]);
+ sys_close(priv->pipe[1]);
+ priv->sock = -1;
+ }
}
else {
priv->idx = event_register (ctx->event_pool, priv->sock,