diff options
-rw-r--r-- | libglusterfs/src/common-utils.c | 30 | ||||
-rw-r--r-- | libglusterfs/src/common-utils.h | 3 | ||||
-rw-r--r-- | libglusterfs/src/libglusterfs-messages.h | 10 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 186 |
4 files changed, 87 insertions, 142 deletions
diff --git a/libglusterfs/src/common-utils.c b/libglusterfs/src/common-utils.c index a225c957f2e..97d9ef8da1a 100644 --- a/libglusterfs/src/common-utils.c +++ b/libglusterfs/src/common-utils.c @@ -3555,6 +3555,36 @@ gf_thread_create (pthread_t *thread, const pthread_attr_t *attr, } int +gf_thread_create_detached (pthread_t *thread, + void *(*start_routine)(void *), void *arg) +{ + pthread_attr_t attr; + int ret = -1; + + ret = pthread_attr_init (&attr); + if (ret) { + gf_msg (THIS->name, GF_LOG_ERROR, ret, + LG_MSG_PTHREAD_ATTR_INIT_FAILED, + "Thread attribute initialization failed"); + return -1; + } + + pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); + + ret = gf_thread_create (thread, &attr, start_routine, arg); + if (ret) { + gf_msg (THIS->name, GF_LOG_ERROR, ret, + LG_MSG_PTHREAD_FAILED, + "Thread creation failed"); + ret = -1; + } + + pthread_attr_destroy (&attr); + + return ret; +} + +int gf_skip_header_section (int fd, int header_len) { int ret = -1; diff --git a/libglusterfs/src/common-utils.h b/libglusterfs/src/common-utils.h index 09d585ad9c3..4741d430a03 100644 --- a/libglusterfs/src/common-utils.h +++ b/libglusterfs/src/common-utils.h @@ -766,6 +766,9 @@ int gf_set_timestamp (const char *src, const char* dest); int gf_thread_create (pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg); +int gf_thread_create_detached (pthread_t *thread, + void *(*start_routine)(void *), void *arg); + gf_boolean_t gf_is_service_running (char *pidfile, int *pid); int diff --git a/libglusterfs/src/libglusterfs-messages.h b/libglusterfs/src/libglusterfs-messages.h index c0bcabac798..d18f4cb3112 100644 --- a/libglusterfs/src/libglusterfs-messages.h +++ b/libglusterfs/src/libglusterfs-messages.h @@ -36,7 +36,7 @@ */ #define GLFS_LG_BASE GLFS_MSGID_COMP_LIBGLUSTERFS -#define GLFS_LG_NUM_MESSAGES 205 +#define GLFS_LG_NUM_MESSAGES 206 #define GLFS_LG_MSGID_END (GLFS_LG_BASE + GLFS_LG_NUM_MESSAGES + 1) /* Messaged with message IDs */ #define glfs_msg_start_lg GLFS_LG_BASE, "Invalid: Start of messages" @@ -1754,6 +1754,14 @@ * @recommendedaction * */ +#define LG_MSG_PTHREAD_ATTR_INIT_FAILED (GLFS_LG_BASE + 206) + +/*! + * @messageid + * @diagnosis + * @recommendedaction + * + */ /*------------*/ #define glfs_msg_end_lg GLFS_LG_MSGID_END, "Invalid: End of messages" diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 209d89a225b..030a37961b6 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -187,117 +187,6 @@ struct socket_connect_error_state_ { }; typedef struct socket_connect_error_state_ socket_connect_error_state_t; - -/* This timer and queue are used to reap dead threads. The timer triggers every - * minute and pthread_joins any threads that added themselves to the reap queue - * - * TODO: Make the timer configurable? (Not sure if required) - */ -static gf_timer_t *reap_timer; -static struct list_head reap_queue; -static pthread_mutex_t reap_lock = PTHREAD_MUTEX_INITIALIZER; -const struct timespec reap_ts = {60, 0}; - -struct tid_wrap { - struct list_head list; - pthread_t tid; -}; - -/* _socket_reap_own_threads iterated over the queue of tid's and pthread_joins - * them. If a thread join fails, it logs the failure and continues - */ -static void -_socket_reap_own_threads() { - struct tid_wrap *node = NULL; - struct tid_wrap *tmp = NULL; - pthread_t tid = 0; - int i = 0; - - list_for_each_entry_safe (node, tmp, &reap_queue, list) { - list_del_init (&node->list); - if (pthread_join (node->tid, NULL)) { - gf_log (THIS->name, GF_LOG_ERROR, - "own-thread: failed to join thread (tid: %zu)", - tid); - } - node->tid = 0; - GF_FREE (node); - node = NULL; - i++; - } - - if (i) { - gf_log (THIS->name, GF_LOG_TRACE, "reaped %d own-threads", i); - } - - return; -} - -/* socket_thread_reaper reaps threads and restarts the reap_timer - */ -static void -socket_thread_reaper () { - - pthread_mutex_lock (&reap_lock); - - gf_timer_call_cancel (THIS->ctx, reap_timer); - reap_timer = 0; - - _socket_reap_own_threads(); - - reap_timer = gf_timer_call_after (THIS->ctx, reap_ts, - socket_thread_reaper, NULL); - if (!reap_timer) - gf_log (THIS->name, GF_LOG_ERROR, - "failed to restart socket own-thread reap timer"); - - pthread_mutex_unlock (&reap_lock); - - return; -} - -/* socket_thread_reaper_init initializes reap_timer and reap_queue. - * Initializations are done only the first time this is called. - * - * To make sure that the reap_timer is always run, reaper_init it is better to - * call this whenever an own-thread is launched - */ -static void -socket_thread_reaper_init () { - pthread_mutex_lock (&reap_lock); - - if (reap_timer == NULL) { - reap_timer = gf_timer_call_after (THIS->ctx, reap_ts, - socket_thread_reaper, NULL); - INIT_LIST_HEAD (&reap_queue); - } - - pthread_mutex_unlock (&reap_lock); - - return; -} - -/* socket_thread_reaper_add adds the given thread id to the queue of threads - * that will be reaped by socket_thread_reaper - * own-threads need to call this with their thread-ids before dying - */ -static int -socket_thread_reaper_add (pthread_t tid) { - struct tid_wrap *node = NULL; - - pthread_mutex_lock (&reap_lock); - - node = GF_CALLOC (1, sizeof (*node), gf_sock_mt_tid_wrap); - node->tid = tid; - INIT_LIST_HEAD (&node->list); - list_add_tail (&node->list, &reap_queue); - - pthread_mutex_unlock (&reap_lock); - - return 0; -} - - static int socket_init (rpc_transport_t *this); static void @@ -2640,10 +2529,6 @@ err: pthread_mutex_unlock(&priv->lock); rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); - /* Add the thread to the reap_queue before freeing up the transport and - * dying - */ - socket_thread_reaper_add (priv->thread); rpc_transport_unref (this); @@ -2651,11 +2536,11 @@ err: } -static void +static int socket_spawn (rpc_transport_t *this) { socket_private_t *priv = this->private; - + int ret = -1; switch (priv->ot_state) { case OT_IDLE: case OT_PLEASE_DIE: @@ -2663,7 +2548,7 @@ socket_spawn (rpc_transport_t *this) default: gf_log (this->name, GF_LOG_WARNING, "refusing to start redundant poller"); - return; + return ret; } priv->ot_gen += 7; @@ -2671,12 +2556,16 @@ socket_spawn (rpc_transport_t *this) gf_log (this->name, GF_LOG_TRACE, "spawning %p with gen %u", this, priv->ot_gen); - if (gf_thread_create(&priv->thread,NULL,socket_poller,this) != 0) { + /* Create thread after enable detach flag */ + + ret = gf_thread_create_detached (&priv->thread, socket_poller, this); + if (ret) { gf_log (this->name, GF_LOG_ERROR, "could not create poll thread"); + ret = -1; } - /* start the reaper thread */ - socket_thread_reaper_init(); + + return ret; } static int @@ -2860,30 +2749,38 @@ socket_server_event_handler (int fd, int idx, void *data, new_priv->is_server = _gf_true; rpc_transport_ref (new_trans); - if (new_priv->own_thread) { - if (pipe(new_priv->pipe) < 0) { - gf_log(this->name,GF_LOG_ERROR, - "could not create pipe"); - } - socket_spawn(new_trans); - } - else { - new_priv->idx = - event_register (ctx->event_pool, - new_sock, - socket_event_handler, - new_trans, - 1, 0); - if (new_priv->idx == -1) - ret = -1; - } + if (new_priv->own_thread) { + if (pipe(new_priv->pipe) < 0) { + gf_log(this->name, GF_LOG_ERROR, + "could not create pipe"); + } + ret = socket_spawn(new_trans); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, + "could not spawn thread"); + sys_close (new_priv->pipe[0]); + sys_close (new_priv->pipe[1]); + } + } else { + new_priv->idx = + event_register (ctx->event_pool, + new_sock, + socket_event_handler, + new_trans, + 1, 0); + if (new_priv->idx == -1) { + ret = -1; + gf_log(this->name, GF_LOG_ERROR, + "failed to register the socket with event"); + } + } } pthread_mutex_unlock (&new_priv->lock); if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "failed to register the socket with event"); sys_close (new_sock); + GF_FREE (new_trans->name); + GF_FREE (new_trans); rpc_transport_unref (new_trans); goto unlock; } @@ -3200,7 +3097,14 @@ handler: } this->listener = this; - socket_spawn(this); + ret = socket_spawn(this); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, + "could not spawn thread"); + sys_close(priv->pipe[0]); + sys_close(priv->pipe[1]); + priv->sock = -1; + } } else { priv->idx = event_register (ctx->event_pool, priv->sock, |