diff options
| -rw-r--r-- | libglusterfs/src/common-utils.c | 30 | ||||
| -rw-r--r-- | libglusterfs/src/common-utils.h | 3 | ||||
| -rw-r--r-- | libglusterfs/src/libglusterfs-messages.h | 10 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 186 | 
4 files changed, 87 insertions, 142 deletions
diff --git a/libglusterfs/src/common-utils.c b/libglusterfs/src/common-utils.c index a225c957f2e..97d9ef8da1a 100644 --- a/libglusterfs/src/common-utils.c +++ b/libglusterfs/src/common-utils.c @@ -3555,6 +3555,36 @@ gf_thread_create (pthread_t *thread, const pthread_attr_t *attr,  }  int +gf_thread_create_detached (pthread_t *thread, +                         void *(*start_routine)(void *), void *arg) +{ +        pthread_attr_t attr; +        int ret = -1; + +        ret = pthread_attr_init (&attr); +        if (ret) { +                gf_msg (THIS->name, GF_LOG_ERROR, ret, +                        LG_MSG_PTHREAD_ATTR_INIT_FAILED, +                        "Thread attribute initialization failed"); +                return -1; +        } + +        pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); + +        ret = gf_thread_create (thread, &attr, start_routine, arg); +        if (ret) { +                gf_msg (THIS->name, GF_LOG_ERROR, ret, +                        LG_MSG_PTHREAD_FAILED, +                        "Thread creation failed"); +                ret = -1; +        } + +        pthread_attr_destroy (&attr); + +        return ret; +} + +int  gf_skip_header_section (int fd, int header_len)  {          int  ret           = -1; diff --git a/libglusterfs/src/common-utils.h b/libglusterfs/src/common-utils.h index 09d585ad9c3..4741d430a03 100644 --- a/libglusterfs/src/common-utils.h +++ b/libglusterfs/src/common-utils.h @@ -766,6 +766,9 @@ int gf_set_timestamp  (const char *src, const char* dest);  int gf_thread_create (pthread_t *thread, const pthread_attr_t *attr,                        void *(*start_routine)(void *), void *arg); +int gf_thread_create_detached (pthread_t *thread, +                      void *(*start_routine)(void *), void *arg); +  gf_boolean_t  gf_is_service_running (char *pidfile, int *pid);  int diff --git a/libglusterfs/src/libglusterfs-messages.h b/libglusterfs/src/libglusterfs-messages.h index c0bcabac798..d18f4cb3112 100644 --- a/libglusterfs/src/libglusterfs-messages.h +++ b/libglusterfs/src/libglusterfs-messages.h @@ -36,7 +36,7 @@   */  #define GLFS_LG_BASE            GLFS_MSGID_COMP_LIBGLUSTERFS -#define GLFS_LG_NUM_MESSAGES    205 +#define GLFS_LG_NUM_MESSAGES    206  #define GLFS_LG_MSGID_END       (GLFS_LG_BASE + GLFS_LG_NUM_MESSAGES + 1)  /* Messaged with message IDs */  #define glfs_msg_start_lg GLFS_LG_BASE, "Invalid: Start of messages" @@ -1754,6 +1754,14 @@   * @recommendedaction   *   */ +#define LG_MSG_PTHREAD_ATTR_INIT_FAILED                  (GLFS_LG_BASE + 206) + +/*! + * @messageid + * @diagnosis + * @recommendedaction + * + */  /*------------*/  #define glfs_msg_end_lg GLFS_LG_MSGID_END, "Invalid: End of messages" diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 209d89a225b..030a37961b6 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -187,117 +187,6 @@ struct socket_connect_error_state_ {  };  typedef struct socket_connect_error_state_ socket_connect_error_state_t; - -/* This timer and queue are used to reap dead threads. The timer triggers every - * minute and pthread_joins any threads that added themselves to the reap queue - * - * TODO: Make the timer configurable? (Not sure if required) - */ -static gf_timer_t *reap_timer; -static struct list_head reap_queue; -static pthread_mutex_t reap_lock = PTHREAD_MUTEX_INITIALIZER; -const struct timespec reap_ts = {60, 0}; - -struct tid_wrap { -        struct list_head list; -        pthread_t tid; -}; - -/* _socket_reap_own_threads iterated over the queue of tid's and pthread_joins - * them.  If a thread join fails, it logs the failure and continues - */ -static void -_socket_reap_own_threads() { -        struct tid_wrap *node = NULL; -        struct tid_wrap *tmp = NULL; -        pthread_t tid = 0; -        int i = 0; - -        list_for_each_entry_safe (node, tmp, &reap_queue, list) { -                list_del_init (&node->list); -                if (pthread_join (node->tid, NULL)) { -                        gf_log (THIS->name, GF_LOG_ERROR, -                                "own-thread: failed to join thread (tid: %zu)", -                                tid); -                } -                node->tid = 0; -                GF_FREE (node); -                node = NULL; -                i++; -        } - -        if (i) { -                gf_log (THIS->name, GF_LOG_TRACE, "reaped %d own-threads", i); -        } - -        return; -} - -/* socket_thread_reaper reaps threads and restarts the reap_timer - */ -static void -socket_thread_reaper () { - -        pthread_mutex_lock (&reap_lock); - -        gf_timer_call_cancel (THIS->ctx, reap_timer); -        reap_timer = 0; - -        _socket_reap_own_threads(); - -        reap_timer = gf_timer_call_after (THIS->ctx, reap_ts, -                                          socket_thread_reaper, NULL); -        if (!reap_timer) -                gf_log (THIS->name, GF_LOG_ERROR, -                        "failed to restart socket own-thread reap timer"); - -        pthread_mutex_unlock (&reap_lock); - -        return; -} - -/* socket_thread_reaper_init initializes reap_timer and reap_queue. - * Initializations are done only the first time this is called. - * - * To make sure that the reap_timer is always run, reaper_init it is better to - * call this whenever an own-thread is launched - */ -static void -socket_thread_reaper_init () { -        pthread_mutex_lock (&reap_lock); - -        if (reap_timer == NULL) { -                reap_timer = gf_timer_call_after (THIS->ctx, reap_ts, -                                                  socket_thread_reaper, NULL); -                INIT_LIST_HEAD (&reap_queue); -        } - -        pthread_mutex_unlock (&reap_lock); - -        return; -} - -/* socket_thread_reaper_add adds the given thread id to the queue of threads - * that will be reaped by socket_thread_reaper - * own-threads need to call this with their thread-ids before dying - */ -static int -socket_thread_reaper_add (pthread_t tid) { -        struct tid_wrap *node = NULL; - -        pthread_mutex_lock (&reap_lock); - -        node = GF_CALLOC (1, sizeof (*node), gf_sock_mt_tid_wrap); -        node->tid = tid; -        INIT_LIST_HEAD (&node->list); -        list_add_tail (&node->list, &reap_queue); - -        pthread_mutex_unlock (&reap_lock); - -        return 0; -} - -  static int socket_init (rpc_transport_t *this);  static void @@ -2640,10 +2529,6 @@ err:          pthread_mutex_unlock(&priv->lock);          rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); -        /* Add the thread to the reap_queue before freeing up the transport and -         * dying -         */ -        socket_thread_reaper_add (priv->thread);          rpc_transport_unref (this); @@ -2651,11 +2536,11 @@ err:  } -static void +static int  socket_spawn (rpc_transport_t *this)  {          socket_private_t        *priv   = this->private; - +        int ret = -1;          switch (priv->ot_state) {          case OT_IDLE:          case OT_PLEASE_DIE: @@ -2663,7 +2548,7 @@ socket_spawn (rpc_transport_t *this)          default:                  gf_log (this->name, GF_LOG_WARNING,                          "refusing to start redundant poller"); -                return; +                return ret;          }          priv->ot_gen += 7; @@ -2671,12 +2556,16 @@ socket_spawn (rpc_transport_t *this)          gf_log (this->name, GF_LOG_TRACE,                  "spawning %p with gen %u", this, priv->ot_gen); -        if (gf_thread_create(&priv->thread,NULL,socket_poller,this) != 0) { +        /* Create thread after enable detach flag */ + +        ret = gf_thread_create_detached (&priv->thread, socket_poller, this); +        if (ret) {                  gf_log (this->name, GF_LOG_ERROR,                          "could not create poll thread"); +                ret = -1;          } -        /* start the reaper thread */ -        socket_thread_reaper_init(); + +        return ret;  }  static int @@ -2860,30 +2749,38 @@ socket_server_event_handler (int fd, int idx, void *data,                                  new_priv->is_server = _gf_true;                                  rpc_transport_ref (new_trans); -				if (new_priv->own_thread) { -					if (pipe(new_priv->pipe) < 0) { -						gf_log(this->name,GF_LOG_ERROR, -						       "could not create pipe"); -					} -                                        socket_spawn(new_trans); -				} -				else { -					new_priv->idx = -						event_register (ctx->event_pool, -								new_sock, -								socket_event_handler, -								new_trans, -								1, 0); -					if (new_priv->idx == -1) -						ret = -1; -				} +                                if (new_priv->own_thread) { +                                        if (pipe(new_priv->pipe) < 0) { +                                                gf_log(this->name, GF_LOG_ERROR, +                                                       "could not create pipe"); +                                        } +                                        ret = socket_spawn(new_trans); +                                        if (ret) { +                                                gf_log(this->name, GF_LOG_ERROR, +                                                       "could not spawn thread"); +                                                sys_close (new_priv->pipe[0]); +                                                sys_close (new_priv->pipe[1]); +                                        } +                                }  else { +                                        new_priv->idx = +                                                event_register (ctx->event_pool, +                                                                new_sock, +                                                           socket_event_handler, +                                                                new_trans, +                                                                1, 0); +                                        if (new_priv->idx == -1) { +                                                ret = -1; +                                                gf_log(this->name, GF_LOG_ERROR, +                                                       "failed to register the socket with event"); +                                        } +                                }                          }                          pthread_mutex_unlock (&new_priv->lock);                          if (ret == -1) { -                                gf_log (this->name, GF_LOG_WARNING, -                                        "failed to register the socket with event");                                  sys_close (new_sock); +                                GF_FREE (new_trans->name); +                                GF_FREE (new_trans);                                  rpc_transport_unref (new_trans);                                  goto unlock;                          } @@ -3200,7 +3097,14 @@ handler:                          }                          this->listener = this; -                        socket_spawn(this); +                        ret =  socket_spawn(this); +                        if (ret) { +                                gf_log(this->name, GF_LOG_ERROR, +                                       "could not spawn thread"); +                                sys_close(priv->pipe[0]); +                                sys_close(priv->pipe[1]); +                                priv->sock = -1; +                        }                  }                  else {                          priv->idx = event_register (ctx->event_pool, priv->sock,  | 
