diff options
Diffstat (limited to 'libglusterfs')
| -rw-r--r-- | libglusterfs/src/event-epoll.c | 94 | ||||
| -rw-r--r-- | libglusterfs/src/event-poll.c | 45 | ||||
| -rw-r--r-- | libglusterfs/src/event.c | 113 | ||||
| -rw-r--r-- | libglusterfs/src/event.h | 6 | 
4 files changed, 246 insertions, 12 deletions
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c index ff191129da0..92420f3734e 100644 --- a/libglusterfs/src/event-epoll.c +++ b/libglusterfs/src/event-epoll.c @@ -317,6 +317,7 @@ event_register_epoll (struct event_pool *event_pool, int fd,  {          int                 idx = -1;          int                 ret = -1; +        int             destroy = 0;          struct epoll_event  epoll_event = {0, };          struct event_data  *ev_data = (void *)&epoll_event.data;  	struct event_slot_epoll *slot = NULL; @@ -324,6 +325,24 @@ event_register_epoll (struct event_pool *event_pool, int fd,          GF_VALIDATE_OR_GOTO ("event", event_pool, out); +        /* TODO: Even with the below check, there is a possiblity of race, +         * What if the destroy mode is set after the check is done. +         * Not sure of the best way to prevent this race, ref counting +         * is one possibility. +         * There is no harm in registering and unregistering the fd +         * even after destroy mode is set, just that such fds will remain +         * open until unregister is called, also the events on that fd will be +         * notified, until one of the poller thread is alive. +         */ +        pthread_mutex_lock (&event_pool->mutex); +        { +                destroy = event_pool->destroy; +        } +        pthread_mutex_unlock (&event_pool->mutex); + +        if (destroy == 1) +               goto out; +  	idx = event_slot_alloc (event_pool, fd);  	if (idx == -1) {  		gf_log ("epoll", GF_LOG_ERROR, @@ -609,6 +628,12 @@ event_dispatch_epoll_worker (void *data)          gf_log ("epoll", GF_LOG_INFO, "Started thread with index %d", myindex); +        pthread_mutex_lock (&event_pool->mutex); +        { +                event_pool->activethreadcount++; +        } +        pthread_mutex_unlock (&event_pool->mutex); +  	for (;;) {                  if (event_pool->eventthreadcount < myindex) {                          /* ...time to die, thread count was decreased below @@ -623,7 +648,9 @@ event_dispatch_epoll_worker (void *data)                                          /* if found true in critical section,                                           * die */                                          event_pool->pollers[myindex - 1] = 0; +                                        event_pool->activethreadcount--;                                          timetodie = 1; +                                        pthread_cond_broadcast (&event_pool->cond);                                  }                          }                          pthread_mutex_unlock (&event_pool->mutex); @@ -676,6 +703,8 @@ event_dispatch_epoll (struct event_pool *event_pool)                  if (pollercount <= 0)                          pollercount = 1; +                event_pool->activethreadcount++; +                  for (i = 0; i < pollercount; i++) {                          ev_data = GF_CALLOC (1, sizeof (*ev_data),                                       gf_common_mt_event_pool); @@ -729,6 +758,12 @@ event_dispatch_epoll (struct event_pool *event_pool)          if (event_pool->pollers[0] != 0)  		pthread_join (event_pool->pollers[0], NULL); +        pthread_mutex_lock (&event_pool->mutex); +        { +                event_pool->activethreadcount--; +        } +        pthread_mutex_unlock (&event_pool->mutex); +  	return ret;  } @@ -736,21 +771,26 @@ int  event_reconfigure_threads_epoll (struct event_pool *event_pool, int value)  {          int                              i; -        int                              ret; +        int                              ret = 0;          pthread_t                        t_id;          int                              oldthreadcount;          struct event_thread_data        *ev_data = NULL; -        /* Set to MAX if greater */ -        if (value > EVENT_MAX_THREADS) -                value = EVENT_MAX_THREADS; - -        /* Default pollers to 1 in case this is set incorrectly */ -        if (value <= 0) -                value = 1; -          pthread_mutex_lock (&event_pool->mutex);          { +                /* Reconfigure to 0 threads is allowed only in destroy mode */ +                if (event_pool->destroy == 1) { +                        value = 0; +                } else { +                        /* Set to MAX if greater */ +                        if (value > EVENT_MAX_THREADS) +                                value = EVENT_MAX_THREADS; + +                        /* Default pollers to 1 in case this is set incorrectly */ +                        if (value <= 0) +                                value = 1; +                } +                  oldthreadcount = event_pool->eventthreadcount;                  if (oldthreadcount < value) { @@ -797,6 +837,39 @@ event_reconfigure_threads_epoll (struct event_pool *event_pool, int value)          return 0;  } +/* This function is the destructor for the event_pool data structure + * Should be called only after poller_threads_destroy() is called, + * else will lead to crashes. + */ +static int +event_pool_destroy_epoll (struct event_pool *event_pool) +{ +        int ret = 0, i = 0, j = 0; +        struct event_slot_epoll *table = NULL; + +        ret = close (event_pool->fd); + +        for (i = 0; i < EVENT_EPOLL_TABLES; i++) { +                if (event_pool->ereg[i]) { +                        table = event_pool->ereg[i]; +                        event_pool->ereg[i] = NULL; +                                for (j = 0; j < EVENT_EPOLL_SLOTS; j++) { +                                        LOCK_DESTROY (&table[j].lock); +                                } +                        GF_FREE (table); +                } +        } + +        pthread_mutex_destroy (&event_pool->mutex); +        pthread_cond_destroy (&event_pool->cond); + +        GF_FREE (event_pool->evcache); +        GF_FREE (event_pool->reg); +        GF_FREE (event_pool); + +        return ret; +} +  struct event_ops event_ops_epoll = {          .new                       = event_pool_new_epoll,          .event_register            = event_register_epoll, @@ -804,7 +877,8 @@ struct event_ops event_ops_epoll = {          .event_unregister          = event_unregister_epoll,          .event_unregister_close    = event_unregister_close_epoll,          .event_dispatch            = event_dispatch_epoll, -        .event_reconfigure_threads = event_reconfigure_threads_epoll +        .event_reconfigure_threads = event_reconfigure_threads_epoll, +        .event_pool_destroy        = event_pool_destroy_epoll  };  #endif diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c index c91fa8487b5..0daceb0d5a6 100644 --- a/libglusterfs/src/event-poll.c +++ b/libglusterfs/src/event-poll.c @@ -449,7 +449,24 @@ event_dispatch_poll (struct event_pool *event_pool)          GF_VALIDATE_OR_GOTO ("event", event_pool, out); +        pthread_mutex_lock (&event_pool->mutex); +        { +                event_pool->activethreadcount = 1; +        } +        pthread_mutex_unlock (&event_pool->mutex); +          while (1) { +                pthread_mutex_lock (&event_pool->mutex); +                { +                        if (event_pool->destroy == 1) { +                                event_pool->activethreadcount = 0; +                                pthread_cond_broadcast (&event_pool->cond); +                                pthread_mutex_unlock (&event_pool->mutex); +                                return 0; +                        } +                } +                pthread_mutex_unlock (&event_pool->mutex); +                  size = event_dispatch_poll_resize (event_pool, ufds, size);                  ufds = event_pool->evcache; @@ -482,6 +499,31 @@ event_reconfigure_threads_poll (struct event_pool *event_pool, int value)          return 0;  } +/* This function is the destructor for the event_pool data structure + * Should be called only after poller_threads_destroy() is called, + * else will lead to crashes. + */ +static int +event_pool_destroy_poll (struct event_pool *event_pool) +{ +        int ret = 0; + +        ret = close (event_pool->breaker[0]); +        if (ret) +                return ret; + +        ret = close (event_pool->breaker[1]); +        if (ret) +                return ret; + +        event_pool->breaker[0] = event_pool->breaker[1] = -1; + +        GF_FREE (event_pool->reg); +        GF_FREE (event_pool); + +        return ret; +} +  struct event_ops event_ops_poll = {          .new                    = event_pool_new_poll,          .event_register         = event_register_poll, @@ -489,5 +531,6 @@ struct event_ops event_ops_poll = {          .event_unregister       = event_unregister_poll,          .event_unregister_close = event_unregister_close_poll,          .event_dispatch         = event_dispatch_poll, -        .event_reconfigure_threads = event_reconfigure_threads_poll +        .event_reconfigure_threads = event_reconfigure_threads_poll, +        .event_pool_destroy     = event_pool_destroy_poll  }; diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c index 4dd0f991700..f19d43a0ab1 100644 --- a/libglusterfs/src/event.c +++ b/libglusterfs/src/event.c @@ -144,3 +144,116 @@ event_reconfigure_threads (struct event_pool *event_pool, int value)  out:          return ret;  } + +int +event_pool_destroy (struct event_pool *event_pool) +{ +        int ret = -1; +        int destroy = 0, activethreadcount = 0; + +        GF_VALIDATE_OR_GOTO ("event", event_pool, out); + +        pthread_mutex_lock (&event_pool->mutex); +        { +                destroy = event_pool->destroy; +                activethreadcount = event_pool->activethreadcount; +        } +        pthread_mutex_unlock (&event_pool->mutex); + +        if (!destroy || (activethreadcount > 0)) +                goto out; + +        ret = event_pool->ops->event_pool_destroy (event_pool); +out: +        return ret; +} + +int +poller_destroy_handler (int fd, int idx, void *data, +                       int poll_out, int poll_in, int poll_err) +{ +        int readfd = -1; +        char buf = '\0'; + +        readfd = *(int *)data; +        if (readfd < 0) +                return -1; + +        while (read (readfd, &buf, 1) > 0) { +        } +        return 0; +} + +/* This function destroys all the poller threads. + * Note: to be called before event_pool_destroy is called. + * The order in which cleaning is performed: + * - Register a pipe fd(this is for waking threads in poll()/epoll_wait()) + * - Set the destroy mode, which this no new event registration will succede + * - Reconfigure the thread count to 0(this will succede only in destroy mode) + * - Wake up all the threads in poll() or epoll_wait(), so that they can + *   destroy themselves. + * - Wait for the thread to join(which will happen only after all the other + *   threads are destroyed) + */ +int +event_dispatch_destroy (struct event_pool *event_pool) +{ +        int  ret     = -1; +        int  fd[2]   = {-1}; +        int  idx     = -1; +        struct timespec   sleep_till = {0, }; + +        GF_VALIDATE_OR_GOTO ("event", event_pool, out); + +        ret = pipe2 (fd, O_NONBLOCK); +        if (ret < 0) +                goto out; + +        /* From the main thread register an event on the pipe fd[0], +         */ +        idx = event_register (event_pool, fd[0], poller_destroy_handler, +                              &fd[1], 1, 0); +        if (idx < 0) +                goto out; + +        /* Enter the destroy mode first, set this before reconfiguring to 0 +         * threads, to prevent further reconfigure to thread count > 0. +         */ +        pthread_mutex_lock (&event_pool->mutex); +        { +                event_pool->destroy = 1; +        } +        pthread_mutex_unlock (&event_pool->mutex); + +        ret = event_reconfigure_threads (event_pool, 0); +        if (ret < 0) +                goto out; + +        /* Write something onto the write end of the pipe(fd[1]) so that +         * poll wakes up and calls the handler, poller_destroy_handler() +         */ +        pthread_mutex_lock (&event_pool->mutex); +        { +                /* Write to pipe(fd[1]) and then wait for 1 second or until +                 * a poller thread that is dying, broadcasts. +                 */ +                while (event_pool->activethreadcount > 0) { +                        write (fd[1], "dummy", 6); +                        sleep_till.tv_sec = time (NULL) + 1; +                        ret = pthread_cond_timedwait (&event_pool->cond, +                                                      &event_pool->mutex, +                                                      &sleep_till); +                } +        } +        pthread_mutex_unlock (&event_pool->mutex); + +        ret = event_unregister (event_pool, fd[0], idx); + + out: +        if (fd[0] != -1) +                close (fd[0]); +        if (fd[1] != -1) +                close (fd[1]); + +        return ret; +} diff --git a/libglusterfs/src/event.h b/libglusterfs/src/event.h index 930a7d1e28b..eac57bc01ad 100644 --- a/libglusterfs/src/event.h +++ b/libglusterfs/src/event.h @@ -60,6 +60,8 @@ struct event_pool {          int eventthreadcount; /* number of event threads to execute. */          pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store,                                                       * and live status */ +        int destroy; +        int activethreadcount;  };  struct event_ops { @@ -81,6 +83,7 @@ struct event_ops {          int (*event_reconfigure_threads) (struct event_pool *event_pool,                                            int newcount); +        int (*event_pool_destroy) (struct event_pool *event_pool);  };  struct event_pool *event_pool_new (int count, int eventthreadcount); @@ -93,5 +96,6 @@ int event_unregister (struct event_pool *event_pool, int fd, int idx);  int event_unregister_close (struct event_pool *event_pool, int fd, int idx);  int event_dispatch (struct event_pool *event_pool);  int event_reconfigure_threads (struct event_pool *event_pool, int value); - +int event_pool_destroy (struct event_pool *event_pool); +int event_dispatch_destroy (struct event_pool *event_pool);  #endif /* _EVENT_H_ */  | 
