diff options
Diffstat (limited to 'libglusterfs/src')
-rw-r--r-- | libglusterfs/src/event-epoll.c | 114 | ||||
-rw-r--r-- | libglusterfs/src/event-poll.c | 10 | ||||
-rw-r--r-- | libglusterfs/src/event.c | 10 | ||||
-rw-r--r-- | libglusterfs/src/gf-event.h | 19 |
4 files changed, 125 insertions, 28 deletions
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c index 9826cc9e275..041a7e6c583 100644 --- a/libglusterfs/src/event-epoll.c +++ b/libglusterfs/src/event-epoll.c @@ -30,6 +30,7 @@ struct event_slot_epoll { int fd; int events; int gen; + int idx; gf_atomic_t ref; int do_close; int in_handler; @@ -37,6 +38,7 @@ struct event_slot_epoll { void *data; event_handler_t handler; gf_lock_t lock; + struct list_head poller_death; }; struct event_thread_data { @@ -57,6 +59,7 @@ __event_newtable(struct event_pool *event_pool, int table_idx) for (i = 0; i < EVENT_EPOLL_SLOTS; i++) { table[i].fd = -1; LOCK_INIT(&table[i].lock); + INIT_LIST_HEAD(&table[i].poller_death); } event_pool->ereg[table_idx] = table; @@ -66,7 +69,8 @@ __event_newtable(struct event_pool *event_pool, int table_idx) } static int -__event_slot_alloc(struct event_pool *event_pool, int fd) +__event_slot_alloc(struct event_pool *event_pool, int fd, + char notify_poller_death) { int i = 0; int table_idx = -1; @@ -109,8 +113,15 @@ __event_slot_alloc(struct event_pool *event_pool, int fd) table[i].gen = gen + 1; LOCK_INIT(&table[i].lock); + INIT_LIST_HEAD(&table[i].poller_death); table[i].fd = fd; + if (notify_poller_death) { + table[i].idx = table_idx * EVENT_EPOLL_SLOTS + i; + list_add_tail(&table[i].poller_death, + &event_pool->poller_death); + } + event_pool->slots_used[table_idx]++; break; @@ -121,13 +132,14 @@ __event_slot_alloc(struct event_pool *event_pool, int fd) } static int -event_slot_alloc(struct event_pool *event_pool, int fd) +event_slot_alloc(struct event_pool *event_pool, int fd, + char notify_poller_death) { int idx = -1; pthread_mutex_lock(&event_pool->mutex); { - idx = __event_slot_alloc(event_pool, fd); + idx = __event_slot_alloc(event_pool, fd, notify_poller_death); } pthread_mutex_unlock(&event_pool->mutex); @@ -155,6 +167,7 @@ __event_slot_dealloc(struct event_pool *event_pool, int idx) slot->fd = -1; slot->handled_error = 0; slot->in_handler = 0; + list_del_init(&slot->poller_death); event_pool->slots_used[table_idx]--; return; @@ -172,6 +185,15 @@ event_slot_dealloc(struct event_pool *event_pool, int idx) return; } +static int +event_slot_ref(struct event_slot_epoll *slot) +{ + if (!slot) + return -1; + + return GF_ATOMIC_INC(slot->ref); +} + static struct event_slot_epoll * event_slot_get(struct event_pool *event_pool, int idx) { @@ -188,12 +210,41 @@ event_slot_get(struct event_pool *event_pool, int idx) return NULL; slot = &table[offset]; - GF_ATOMIC_INC(slot->ref); + event_slot_ref(slot); return slot; } static void +__event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot, + int idx) +{ + int ref = -1; + int fd = -1; + int do_close = 0; + + ref = GF_ATOMIC_DEC(slot->ref); + if (ref) + /* slot still alive */ + goto done; + + LOCK(&slot->lock); + { + fd = slot->fd; + do_close = slot->do_close; + slot->do_close = 0; + } + UNLOCK(&slot->lock); + + __event_slot_dealloc(event_pool, idx); + + if (do_close) + sys_close(fd); +done: + return; +} + +static void event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot, int idx) { @@ -248,7 +299,7 @@ event_pool_new_epoll(int count, int eventthreadcount) event_pool->fd = epfd; event_pool->count = count; - + INIT_LIST_HEAD(&event_pool->poller_death); event_pool->eventthreadcount = eventthreadcount; event_pool->auto_thread_count = 0; @@ -297,7 +348,7 @@ __slot_update_events(struct event_slot_epoll *slot, int poll_in, int poll_out) int event_register_epoll(struct event_pool *event_pool, int fd, event_handler_t handler, void *data, int poll_in, - int poll_out) + int poll_out, char notify_poller_death) { int idx = -1; int ret = -1; @@ -328,7 +379,7 @@ event_register_epoll(struct event_pool *event_pool, int fd, if (destroy == 1) goto out; - idx = event_slot_alloc(event_pool, fd); + idx = event_slot_alloc(event_pool, fd, notify_poller_death); if (idx == -1) { gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND, "could not find slot for fd=%d", fd); @@ -591,7 +642,7 @@ pre_unlock: ret = handler(fd, idx, gen, data, (event->events & (EPOLLIN | EPOLLPRI)), (event->events & (EPOLLOUT)), - (event->events & (EPOLLERR | EPOLLHUP))); + (event->events & (EPOLLERR | EPOLLHUP)), 0); } out: event_slot_unref(event_pool, slot, idx); @@ -607,7 +658,9 @@ event_dispatch_epoll_worker(void *data) struct event_thread_data *ev_data = data; struct event_pool *event_pool; int myindex = -1; - int timetodie = 0; + int timetodie = 0, gen = 0; + struct list_head poller_death_notify; + struct event_slot_epoll *slot = NULL, *tmp = NULL; GF_VALIDATE_OR_GOTO("event", ev_data, out); @@ -619,7 +672,7 @@ event_dispatch_epoll_worker(void *data) gf_msg("epoll", GF_LOG_INFO, 0, LG_MSG_STARTED_EPOLL_THREAD, "Started" " thread with index %d", - myindex); + myindex - 1); pthread_mutex_lock(&event_pool->mutex); { @@ -637,20 +690,55 @@ event_dispatch_epoll_worker(void *data) pthread_mutex_lock(&event_pool->mutex); { if (event_pool->eventthreadcount < myindex) { + while (event_pool->poller_death_sliced) { + pthread_cond_wait(&event_pool->cond, + &event_pool->mutex); + } + + INIT_LIST_HEAD(&poller_death_notify); /* if found true in critical section, * die */ event_pool->pollers[myindex - 1] = 0; event_pool->activethreadcount--; timetodie = 1; + gen = ++event_pool->poller_gen; + list_for_each_entry(slot, &event_pool->poller_death, + poller_death) + { + event_slot_ref(slot); + } + + list_splice_init(&event_pool->poller_death, + &poller_death_notify); + event_pool->poller_death_sliced = 1; pthread_cond_broadcast(&event_pool->cond); } } pthread_mutex_unlock(&event_pool->mutex); if (timetodie) { + list_for_each_entry(slot, &poller_death_notify, poller_death) + { + slot->handler(slot->fd, 0, gen, slot->data, 0, 0, 0, 1); + } + + pthread_mutex_lock(&event_pool->mutex); + { + list_for_each_entry_safe(slot, tmp, &poller_death_notify, + poller_death) + { + __event_slot_unref(event_pool, slot, slot->idx); + } + + list_splice(&poller_death_notify, + &event_pool->poller_death); + event_pool->poller_death_sliced = 0; + pthread_cond_broadcast(&event_pool->cond); + } + pthread_mutex_unlock(&event_pool->mutex); + gf_msg("epoll", GF_LOG_INFO, 0, LG_MSG_EXITED_EPOLL_THREAD, - "Exited " - "thread with index %d", - myindex); + "Exited thread with index %d", myindex); + goto out; } } diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c index 727d2a000a2..5bac4291c47 100644 --- a/libglusterfs/src/event-poll.c +++ b/libglusterfs/src/event-poll.c @@ -33,11 +33,11 @@ struct event_slot_poll { static int event_register_poll(struct event_pool *event_pool, int fd, event_handler_t handler, void *data, int poll_in, - int poll_out); + int poll_out, char notify_poller_death); static int __flush_fd(int fd, int idx, int gen, void *data, int poll_in, int poll_out, - int poll_err) + int poll_err, char event_thread_died) { char buf[64]; int ret = -1; @@ -146,7 +146,7 @@ event_pool_new_poll(int count, int eventthreadcount) } ret = event_register_poll(event_pool, event_pool->breaker[0], __flush_fd, - NULL, 1, 0); + NULL, 1, 0, 0); if (ret == -1) { gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_REGISTER_PIPE_FAILED, "could not register pipe fd with poll event loop"); @@ -180,7 +180,7 @@ event_pool_new_poll(int count, int eventthreadcount) static int event_register_poll(struct event_pool *event_pool, int fd, event_handler_t handler, void *data, int poll_in, - int poll_out) + int poll_out, char notify_poller_death) { int idx = -1; @@ -378,7 +378,7 @@ unlock: ret = handler(ufds[i].fd, idx, 0, data, (ufds[i].revents & (POLLIN | POLLPRI)), (ufds[i].revents & (POLLOUT)), - (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL))); + (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL)), 0); return ret; } diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c index 49f70c83366..ddba9810b0b 100644 --- a/libglusterfs/src/event.c +++ b/libglusterfs/src/event.c @@ -54,14 +54,14 @@ event_pool_new(int count, int eventthreadcount) int event_register(struct event_pool *event_pool, int fd, event_handler_t handler, - void *data, int poll_in, int poll_out) + void *data, int poll_in, int poll_out, char notify_poller_death) { int ret = -1; GF_VALIDATE_OR_GOTO("event", event_pool, out); - ret = event_pool->ops->event_register(event_pool, fd, handler, data, - poll_in, poll_out); + ret = event_pool->ops->event_register( + event_pool, fd, handler, data, poll_in, poll_out, notify_poller_death); out: return ret; } @@ -161,7 +161,7 @@ out: int poller_destroy_handler(int fd, int idx, int gen, void *data, int poll_out, - int poll_in, int poll_err) + int poll_in, int poll_err, char event_thread_exit) { struct event_destroy_data *destroy = NULL; int readfd = -1, ret = -1; @@ -233,7 +233,7 @@ event_dispatch_destroy(struct event_pool *event_pool) /* From the main thread register an event on the pipe fd[0], */ - idx = event_register(event_pool, fd[0], poller_destroy_handler, &data, 1, + idx = event_register(event_pool, fd[0], poller_destroy_handler, &data, 1, 0, 0); if (idx < 0) goto out; diff --git a/libglusterfs/src/gf-event.h b/libglusterfs/src/gf-event.h index 5c3724cc953..5d92a2dd285 100644 --- a/libglusterfs/src/gf-event.h +++ b/libglusterfs/src/gf-event.h @@ -12,6 +12,7 @@ #define _GF_EVENT_H_ #include <pthread.h> +#include "list.h" struct event_pool; struct event_ops; @@ -23,7 +24,8 @@ struct event_data { } __attribute__((__packed__, __may_alias__)); typedef int (*event_handler_t)(int fd, int idx, int gen, void *data, - int poll_in, int poll_out, int poll_err); + int poll_in, int poll_out, int poll_err, + char event_thread_exit); #define EVENT_EPOLL_TABLES 1024 #define EVENT_EPOLL_SLOTS 1024 @@ -40,6 +42,13 @@ struct event_pool { struct event_slot_epoll *ereg[EVENT_EPOLL_TABLES]; int slots_used[EVENT_EPOLL_TABLES]; + struct list_head poller_death; + int poller_death_sliced; /* track whether the list of fds interested + * poller_death is sliced. If yes, new thread death + * notification has to wait till the list is added + * back + */ + int poller_gen; int used; int changed; @@ -52,8 +61,8 @@ struct event_pool { /* NOTE: Currently used only when event processing is done using * epoll. */ int eventthreadcount; /* number of event threads to execute. */ - pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store, - * and live status */ + pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store, and live + status */ int destroy; int activethreadcount; @@ -81,7 +90,7 @@ struct event_ops { int (*event_register)(struct event_pool *event_pool, int fd, event_handler_t handler, void *data, int poll_in, - int poll_out); + int poll_out, char notify_poller_death); int (*event_select_on)(struct event_pool *event_pool, int fd, int idx, int poll_in, int poll_out); @@ -107,7 +116,7 @@ event_select_on(struct event_pool *event_pool, int fd, int idx, int poll_in, int poll_out); int event_register(struct event_pool *event_pool, int fd, event_handler_t handler, - void *data, int poll_in, int poll_out); + void *data, int poll_in, int poll_out, char notify_poller_death); int event_unregister(struct event_pool *event_pool, int fd, int idx); int |