diff options
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket-mem-types.h | 1 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 121 | 
2 files changed, 122 insertions, 0 deletions
diff --git a/rpc/rpc-transport/socket/src/socket-mem-types.h b/rpc/rpc-transport/socket/src/socket-mem-types.h index 3181406625d..d1860e6c9a9 100644 --- a/rpc/rpc-transport/socket/src/socket-mem-types.h +++ b/rpc/rpc-transport/socket/src/socket-mem-types.h @@ -16,6 +16,7 @@  typedef enum gf_sock_mem_types_ {          gf_sock_connect_error_state_t     = gf_common_mt_end + 1,          gf_sock_mt_lock_array, +        gf_sock_mt_tid_wrap,          gf_sock_mt_end  } gf_sock_mem_types_t; diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 686620a47af..74b9dd26681 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -24,6 +24,7 @@  #include "common-utils.h"  #include "compat-errno.h"  #include "socket-mem-types.h" +#include "timer.h"  /* ugly #includes below */  #include "protocol-common.h" @@ -191,6 +192,117 @@ struct socket_connect_error_state_ {  };  typedef struct socket_connect_error_state_ socket_connect_error_state_t; + +/* This timer and queue are used to reap dead threads. The timer triggers every + * minute and pthread_joins any threads that added themselves to the reap queue + * + * TODO: Make the timer configurable? (Not sure if required) + */ +static gf_timer_t *reap_timer; +static struct list_head reap_queue; +static pthread_mutex_t reap_lock = PTHREAD_MUTEX_INITIALIZER; +const struct timespec reap_ts = {60, 0}; + +struct tid_wrap { +        struct list_head list; +        pthread_t tid; +}; + +/* _socket_reap_own_threads iterated over the queue of tid's and pthread_joins + * them.  If a thread join fails, it logs the failure and continues + */ +static void +_socket_reap_own_threads() { +        struct tid_wrap *node = NULL; +        struct tid_wrap *tmp = NULL; +        pthread_t tid = 0; +        int i = 0; + +        list_for_each_entry_safe (node, tmp, &reap_queue, list) { +                list_del_init (&node->list); +                if (pthread_join (node->tid, NULL)) { +                        gf_log (THIS->name, GF_LOG_ERROR, +                                "own-thread: failed to join thread (tid: %zu)", +                                tid); +                } +                node->tid = 0; +                GF_FREE (node); +                node = NULL; +                i++; +        } + +        if (i) { +                gf_log (THIS->name, GF_LOG_TRACE, "reaped %d own-threads", i); +        } + +        return; +} + +/* socket_thread_reaper reaps threads and restarts the reap_timer + */ +static void +socket_thread_reaper () { + +        pthread_mutex_lock (&reap_lock); + +        gf_timer_call_cancel (THIS->ctx, reap_timer); +        reap_timer = 0; + +        _socket_reap_own_threads(); + +        reap_timer = gf_timer_call_after (THIS->ctx, reap_ts, +                                          socket_thread_reaper, NULL); +        if (!reap_timer) +                gf_log (THIS->name, GF_LOG_ERROR, +                        "failed to restart socket own-thread reap timer"); + +        pthread_mutex_unlock (&reap_lock); + +        return; +} + +/* socket_thread_reaper_init initializes reap_timer and reap_queue. + * Initializations are done only the first time this is called. + * + * To make sure that the reap_timer is always run, reaper_init it is better to + * call this whenever an own-thread is launched + */ +static void +socket_thread_reaper_init () { +        pthread_mutex_lock (&reap_lock); + +        if (reap_timer == NULL) { +                reap_timer = gf_timer_call_after (THIS->ctx, reap_ts, +                                                  socket_thread_reaper, NULL); +                INIT_LIST_HEAD (&reap_queue); +        } + +        pthread_mutex_unlock (&reap_lock); + +        return; +} + +/* socket_thread_reaper_add adds the given thread id to the queue of threads + * that will be reaped by socket_thread_reaper + * own-threads need to call this with their thread-ids before dying + */ +static int +socket_thread_reaper_add (pthread_t tid) { +        struct tid_wrap *node = NULL; + +        pthread_mutex_lock (&reap_lock); + +        node = GF_CALLOC (1, sizeof (*node), gf_sock_mt_tid_wrap); +        node->tid = tid; +        INIT_LIST_HEAD (&node->list); +        list_add_tail (&node->list, &reap_queue); + +        pthread_mutex_unlock (&reap_lock); + +        return 0; +} + +  static int socket_init (rpc_transport_t *this);  static void @@ -2527,7 +2639,14 @@ err:          priv->ot_state = OT_IDLE;          pthread_mutex_unlock(&priv->lock);          rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); + +        /* Add the thread to the reap_queue before freeing up the transport and +         * dying +         */ +        socket_thread_reaper_add (priv->thread); +          rpc_transport_unref (this); +  	return NULL;  } @@ -2556,6 +2675,8 @@ socket_spawn (rpc_transport_t *this)                  gf_log (this->name, GF_LOG_ERROR,                          "could not create poll thread");          } +        /* start the reaper thread */ +        socket_thread_reaper_init();  }  static int  | 
