diff options
author | Vijaikumar M <vmallika@redhat.com> | 2014-06-19 15:41:22 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2015-02-07 13:17:30 -0800 |
commit | c61074400a45e69c6edbf82b8ed02568726d37ae (patch) | |
tree | c9c826414bcd3da0e1f30edbaaf79ac0c716a371 | |
parent | 5e25569ed0717aa8636ad708430a823d39f9aa60 (diff) |
epoll: edge triggered and multi-threaded epoll
- edge triggered (oneshot) polling with epoll
- pick one event to avoid multiple events getting picked up by same
thread
and so get better distribution of events against multiple threads
- wire support for multiple poll threads to epoll_wait in parallel
- evdata to store absolute index and not hint for epoll
- store index and gen of slot instead of fd and index hint
- perform fd close asynchronously inside event.c for multithread safety
- poll is still single threaded
Change-Id: I536851dda0ab224c5d5a1b130a571397c9cace8f
BUG: 1104462
Signed-off-by: Anand Avati <avati@redhat.com>
Signed-off-by: Vijaikumar M <vmallika@redhat.com>
Signed-off-by: Jeff Darcy <jdarcy@redhat.com>
Signed-off-by: Shyam <srangana@redhat.com>
Reviewed-on: http://review.gluster.org/3842
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Raghavendra G <rgowdapp@redhat.com>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
-rw-r--r-- | libglusterfs/src/event-epoll.c | 747 | ||||
-rw-r--r-- | libglusterfs/src/event-poll.c | 43 | ||||
-rw-r--r-- | libglusterfs/src/event.c | 18 | ||||
-rw-r--r-- | libglusterfs/src/event.h | 20 | ||||
-rw-r--r-- | libglusterfs/src/mem-types.h | 1 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 33 |
6 files changed, 557 insertions, 305 deletions
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c index 06b3236246a..9082954e4e4 100644 --- a/libglusterfs/src/event-epoll.c +++ b/libglusterfs/src/event-epoll.c @@ -31,28 +31,203 @@ #include <sys/epoll.h> +struct event_slot_epoll { + int fd; + int events; + int gen; + int ref; + int do_close; + int in_handler; + void *data; + event_handler_t handler; + gf_lock_t lock; +}; + + +static struct event_slot_epoll * +__event_newtable (struct event_pool *event_pool, int table_idx) +{ + struct event_slot_epoll *table = NULL; + int i = -1; + + table = GF_CALLOC (sizeof (*table), EVENT_EPOLL_SLOTS, + gf_common_mt_ereg); + if (!table) + return NULL; + + for (i = 0; i < EVENT_EPOLL_SLOTS; i++) { + table[i].fd = -1; + LOCK_INIT (&table[i].lock); + } + + event_pool->ereg[table_idx] = table; + event_pool->slots_used[table_idx] = 0; + + return table; +} + + static int -__event_getindex (struct event_pool *event_pool, int fd, int idx) +__event_slot_alloc (struct event_pool *event_pool, int fd) { - int ret = -1; int i = 0; + int table_idx = -1; + int gen = -1; + struct event_slot_epoll *table = NULL; + + for (i = 0; i < EVENT_EPOLL_TABLES; i++) { + switch (event_pool->slots_used[i]) { + case EVENT_EPOLL_SLOTS: + continue; + case 0: + if (!event_pool->ereg[i]) { + table = __event_newtable (event_pool, i); + if (!table) + return -1; + } + break; + default: + table = event_pool->ereg[i]; + break; + } + + if (table) + /* break out of the loop */ + break; + } + + if (!table) + return -1; + + table_idx = i; + + for (i = 0; i < EVENT_EPOLL_SLOTS; i++) { + if (table[i].fd == -1) { + /* wipe everything except bump the generation */ + gen = table[i].gen; + memset (&table[i], 0, sizeof (table[i])); + table[i].gen = gen + 1; + + LOCK_INIT (&table[i].lock); + + table[i].fd = fd; + event_pool->slots_used[table_idx]++; + + break; + } + } + + return table_idx * EVENT_EPOLL_SLOTS + i; +} - GF_VALIDATE_OR_GOTO ("event", event_pool, out); - if (idx > -1 && idx < event_pool->used) { - if (event_pool->reg[idx].fd == fd) - ret = idx; - } +static int +event_slot_alloc (struct event_pool *event_pool, int fd) +{ + int idx = -1; - for (i=0; ret == -1 && i<event_pool->used; i++) { - if (event_pool->reg[i].fd == fd) { - ret = i; - break; - } - } + pthread_mutex_lock (&event_pool->mutex); + { + idx = __event_slot_alloc (event_pool, fd); + } + pthread_mutex_unlock (&event_pool->mutex); -out: - return ret; + return idx; +} + + + +static void +__event_slot_dealloc (struct event_pool *event_pool, int idx) +{ + int table_idx = 0; + int offset = 0; + struct event_slot_epoll *table = NULL; + struct event_slot_epoll *slot = NULL; + + table_idx = idx / EVENT_EPOLL_SLOTS; + offset = idx % EVENT_EPOLL_SLOTS; + + table = event_pool->ereg[table_idx]; + if (!table) + return; + + slot = &table[offset]; + slot->gen++; + + slot->fd = -1; + event_pool->slots_used[table_idx]--; + + return; +} + + +static void +event_slot_dealloc (struct event_pool *event_pool, int idx) +{ + pthread_mutex_lock (&event_pool->mutex); + { + __event_slot_dealloc (event_pool, idx); + } + pthread_mutex_unlock (&event_pool->mutex); + + return; +} + + +static struct event_slot_epoll * +event_slot_get (struct event_pool *event_pool, int idx) +{ + struct event_slot_epoll *slot = NULL; + struct event_slot_epoll *table = NULL; + int table_idx = 0; + int offset = 0; + + table_idx = idx / EVENT_EPOLL_SLOTS; + offset = idx % EVENT_EPOLL_SLOTS; + + table = event_pool->ereg[table_idx]; + if (!table) + return NULL; + + slot = &table[offset]; + + LOCK (&slot->lock); + { + slot->ref++; + } + UNLOCK (&slot->lock); + + return slot; +} + + +static void +event_slot_unref (struct event_pool *event_pool, struct event_slot_epoll *slot, + int idx) +{ + int ref = -1; + int fd = -1; + int do_close = 0; + + LOCK (&slot->lock); + { + ref = --slot->ref; + fd = slot->fd; + do_close = slot->do_close; + } + UNLOCK (&slot->lock); + + if (ref) + /* slot still alive */ + goto done; + + event_slot_dealloc (event_pool, idx); + + if (do_close) + close (fd); +done: + return; } @@ -68,17 +243,6 @@ event_pool_new_epoll (int count) if (!event_pool) goto out; - event_pool->count = count; - event_pool->reg = GF_CALLOC (event_pool->count, - sizeof (*event_pool->reg), - gf_common_mt_reg); - - if (!event_pool->reg) { - GF_FREE (event_pool); - event_pool = NULL; - goto out; - } - epfd = epoll_create (count); if (epfd == -1) { @@ -95,13 +259,49 @@ event_pool_new_epoll (int count) event_pool->count = count; pthread_mutex_init (&event_pool->mutex, NULL); - pthread_cond_init (&event_pool->cond, NULL); out: return event_pool; } +static void +__slot_update_events (struct event_slot_epoll *slot, int poll_in, int poll_out) +{ + switch (poll_in) { + case 1: + slot->events |= EPOLLIN; + break; + case 0: + slot->events &= ~EPOLLIN; + break; + case -1: + /* do nothing */ + break; + default: + gf_log ("epoll", GF_LOG_ERROR, + "invalid poll_in value %d", poll_in); + break; + } + + switch (poll_out) { + case 1: + slot->events |= EPOLLOUT; + break; + case 0: + slot->events &= ~EPOLLOUT; + break; + case -1: + /* do nothing */ + break; + default: + gf_log ("epoll", GF_LOG_ERROR, + "invalid poll_out value %d", poll_out); + break; + } +} + + int event_register_epoll (struct event_pool *event_pool, int fd, event_handler_t handler, @@ -111,87 +311,98 @@ event_register_epoll (struct event_pool *event_pool, int fd, int ret = -1; struct epoll_event epoll_event = {0, }; struct event_data *ev_data = (void *)&epoll_event.data; + struct event_slot_epoll *slot = NULL; GF_VALIDATE_OR_GOTO ("event", event_pool, out); - pthread_mutex_lock (&event_pool->mutex); - { - if (event_pool->count == event_pool->used) { - event_pool->count *= 2; + idx = event_slot_alloc (event_pool, fd); + if (idx == -1) { + gf_log ("epoll", GF_LOG_ERROR, + "could not find slot for fd=%d", fd); + return -1; + } + + slot = event_slot_get (event_pool, idx); + + assert (slot->fd == fd); + + LOCK (&slot->lock); + { + /* make epoll edge triggered and 'singleshot', which + means we need to re-add the fd with + epoll_ctl(EPOLL_CTL_MOD) after delivery of every + single event. This assures us that while a poller + thread has picked up and is processing an event, + another poller will not try to pick this at the same + time as well. + */ + + slot->events = EPOLLPRI | EPOLLET | EPOLLONESHOT; + slot->handler = handler; + slot->data = data; + + __slot_update_events (slot, poll_in, poll_out); + + epoll_event.events = slot->events; + ev_data->idx = idx; + ev_data->gen = slot->gen; + + ret = epoll_ctl (event_pool->fd, EPOLL_CTL_ADD, fd, + &epoll_event); + /* check ret after UNLOCK() to avoid deadlock in + event_slot_unref() + */ + } + UNLOCK (&slot->lock); + + if (ret == -1) { + gf_log ("epoll", GF_LOG_ERROR, + "failed to add fd(=%d) to epoll fd(=%d) (%s)", + fd, event_pool->fd, strerror (errno)); + + event_slot_unref (event_pool, slot, idx); + idx = -1; + } + + /* keep slot->ref (do not event_slot_unref) if successful */ +out: + return idx; +} - event_pool->reg = GF_REALLOC (event_pool->reg, - event_pool->count * - sizeof (*event_pool->reg)); - if (!event_pool->reg) { - gf_log ("epoll", GF_LOG_ERROR, - "event registry re-allocation failed"); - goto unlock; - } - } - - idx = event_pool->used; - event_pool->used++; - - event_pool->reg[idx].fd = fd; - event_pool->reg[idx].events = EPOLLPRI; - event_pool->reg[idx].handler = handler; - event_pool->reg[idx].data = data; - - switch (poll_in) { - case 1: - event_pool->reg[idx].events |= EPOLLIN; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLIN; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_in value %d", poll_in); - break; - } +static int +event_unregister_epoll_common (struct event_pool *event_pool, int fd, + int idx, int do_close) +{ + int ret = -1; + struct event_slot_epoll *slot = NULL; - switch (poll_out) { - case 1: - event_pool->reg[idx].events |= EPOLLOUT; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLOUT; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_out value %d", poll_out); - break; - } + GF_VALIDATE_OR_GOTO ("event", event_pool, out); - event_pool->changed = 1; + slot = event_slot_get (event_pool, idx); - epoll_event.events = event_pool->reg[idx].events; - ev_data->fd = fd; - ev_data->idx = idx; + assert (slot->fd == fd); - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_ADD, fd, - &epoll_event); + LOCK (&slot->lock); + { + ret = epoll_ctl (event_pool->fd, EPOLL_CTL_DEL, fd, NULL); if (ret == -1) { gf_log ("epoll", GF_LOG_ERROR, - "failed to add fd(=%d) to epoll fd(=%d) (%s)", + "fail to del fd(=%d) from epoll fd(=%d) (%s)", fd, event_pool->fd, strerror (errno)); goto unlock; } - pthread_cond_broadcast (&event_pool->cond); + slot->do_close = do_close; + slot->gen++; /* detect unregister in dispatch_handler() */ } unlock: - pthread_mutex_unlock (&event_pool->mutex); + UNLOCK (&slot->lock); + event_slot_unref (event_pool, slot, idx); /* one for event_register() */ + event_slot_unref (event_pool, slot, idx); /* one for event_slot_get() */ out: return ret; } @@ -200,233 +411,186 @@ out: static int event_unregister_epoll (struct event_pool *event_pool, int fd, int idx_hint) { - int idx = -1; - int ret = -1; - - struct epoll_event epoll_event = {0, }; - struct event_data *ev_data = (void *)&epoll_event.data; - int lastidx = -1; - - GF_VALIDATE_OR_GOTO ("event", event_pool, out); + int ret = -1; - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, fd, idx_hint); + ret = event_unregister_epoll_common (event_pool, fd, idx_hint, 0); - if (idx == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "index not found for fd=%d (idx_hint=%d)", - fd, idx_hint); - errno = ENOENT; - goto unlock; - } - - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_DEL, fd, NULL); - - /* if ret is -1, this array member should never be accessed */ - /* if it is 0, the array member might be used by idx_cache - * in which case the member should not be accessed till - * it is reallocated - */ - - event_pool->reg[idx].fd = -1; - - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "fail to del fd(=%d) from epoll fd(=%d) (%s)", - fd, event_pool->fd, strerror (errno)); - goto unlock; - } - - lastidx = event_pool->used - 1; - if (lastidx == idx) { - event_pool->used--; - goto unlock; - } + return ret; +} - epoll_event.events = event_pool->reg[lastidx].events; - ev_data->fd = event_pool->reg[lastidx].fd; - ev_data->idx = idx; - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, ev_data->fd, - &epoll_event); - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "fail to modify fd(=%d) index %d to %d (%s)", - ev_data->fd, event_pool->used, idx, - strerror (errno)); - goto unlock; - } +static int +event_unregister_close_epoll (struct event_pool *event_pool, int fd, + int idx_hint) +{ + int ret = -1; - /* just replace the unregistered idx by last one */ - event_pool->reg[idx] = event_pool->reg[lastidx]; - event_pool->used--; - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); + ret = event_unregister_epoll_common (event_pool, fd, idx_hint, 1); -out: - return ret; + return ret; } static int -event_select_on_epoll (struct event_pool *event_pool, int fd, int idx_hint, +event_select_on_epoll (struct event_pool *event_pool, int fd, int idx, int poll_in, int poll_out) { - int idx = -1; int ret = -1; - + struct event_slot_epoll *slot = NULL; struct epoll_event epoll_event = {0, }; struct event_data *ev_data = (void *)&epoll_event.data; GF_VALIDATE_OR_GOTO ("event", event_pool, out); - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, fd, idx_hint); - - if (idx == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "index not found for fd=%d (idx_hint=%d)", - fd, idx_hint); - errno = ENOENT; - goto unlock; - } - - switch (poll_in) { - case 1: - event_pool->reg[idx].events |= EPOLLIN; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLIN; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_in value %d", poll_in); - break; - } - - switch (poll_out) { - case 1: - event_pool->reg[idx].events |= EPOLLOUT; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLOUT; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_out value %d", poll_out); - break; - } - - epoll_event.events = event_pool->reg[idx].events; - ev_data->fd = fd; - ev_data->idx = idx; - - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, fd, - &epoll_event); - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "failed to modify fd(=%d) events to %d", - fd, epoll_event.events); - } - } + slot = event_slot_get (event_pool, idx); + + assert (slot->fd == fd); + + LOCK (&slot->lock); + { + __slot_update_events (slot, poll_in, poll_out); + + epoll_event.events = slot->events; + ev_data->idx = idx; + ev_data->gen = slot->gen; + + if (slot->in_handler) + /* in_handler indicates at least one thread + executing event_dispatch_epoll_handler() + which will perform epoll_ctl(EPOLL_CTL_MOD) + anyways (because of EPOLLET) + + This not only saves a system call, but also + avoids possibility of another epoll thread + parallely picking up the next event while the + ongoing handler is still in progress (and + resulting in unnecessary contention on + rpc_transport_t->mutex). + */ + goto unlock; + + ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, fd, + &epoll_event); + if (ret == -1) { + gf_log ("epoll", GF_LOG_ERROR, + "failed to modify fd(=%d) events to %d", + fd, epoll_event.events); + } + } unlock: - pthread_mutex_unlock (&event_pool->mutex); + UNLOCK (&slot->lock); + + event_slot_unref (event_pool, slot, idx); out: - return ret; + return idx; } static int event_dispatch_epoll_handler (struct event_pool *event_pool, - struct epoll_event *events, int i) + struct epoll_event *event) { - struct event_data *event_data = NULL; + struct event_data *ev_data = NULL; + struct event_slot_epoll *slot = NULL; event_handler_t handler = NULL; void *data = NULL; int idx = -1; + int gen = -1; int ret = -1; + int fd = -1; - - event_data = (void *)&events[i].data; + ev_data = (void *)&event->data; handler = NULL; data = NULL; - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, event_data->fd, - event_data->idx); - - if (idx == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "index not found for fd(=%d) (idx_hint=%d)", - event_data->fd, event_data->idx); - goto unlock; - } - - handler = event_pool->reg[idx].handler; - data = event_pool->reg[idx].data; - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); + idx = ev_data->idx; + gen = ev_data->gen; + + slot = event_slot_get (event_pool, idx); + + LOCK (&slot->lock); + { + fd = slot->fd; + if (fd == -1) { + gf_log ("epoll", GF_LOG_ERROR, + "stale fd found on idx=%d, gen=%d, events=%d, " + "slot->gen=%d", + idx, gen, event->events, slot->gen); + /* fd got unregistered in another thread */ + goto pre_unlock; + } + + if (gen != slot->gen) { + gf_log ("epoll", GF_LOG_ERROR, + "generation mismatch on idx=%d, gen=%d, " + "slot->gen=%d, slot->fd=%d", + idx, gen, slot->gen, slot->fd); + /* slot was re-used and therefore is another fd! */ + goto pre_unlock; + } + + handler = slot->handler; + data = slot->data; + + slot->in_handler++; + } +pre_unlock: + UNLOCK (&slot->lock); + + if (!handler) + goto out; + + ret = handler (fd, idx, data, + (event->events & (EPOLLIN|EPOLLPRI)), + (event->events & (EPOLLOUT)), + (event->events & (EPOLLERR|EPOLLHUP))); + + LOCK (&slot->lock); + { + slot->in_handler--; + + if (gen != slot->gen) { + /* event_unregister() happened while we were + in handler() + */ + gf_log ("epoll", GF_LOG_DEBUG, + "generation bumped on idx=%d from " + "gen=%d to slot->gen=%d, fd=%d, " + "slot->fd=%d", + idx, gen, slot->gen, fd, slot->fd); + goto post_unlock; + } + + /* This call also picks up the changes made by another + thread calling event_select_on_epoll() while this + thread was busy in handler() + */ + event->events = slot->events; + ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, + fd, event); + } +post_unlock: + UNLOCK (&slot->lock); +out: + event_slot_unref (event_pool, slot, idx); - if (handler) - ret = handler (event_data->fd, event_data->idx, data, - (events[i].events & (EPOLLIN|EPOLLPRI)), - (events[i].events & (EPOLLOUT)), - (events[i].events & (EPOLLERR|EPOLLHUP))); return ret; } -static int -event_dispatch_epoll (struct event_pool *event_pool) +static void * +event_dispatch_epoll_worker (void *data) { - struct epoll_event *events = NULL; - int size = 0; - int i = 0; + struct epoll_event event; int ret = -1; + struct event_pool *event_pool = data; GF_VALIDATE_OR_GOTO ("event", event_pool, out); - while (1) { - pthread_mutex_lock (&event_pool->mutex); - { - while (event_pool->used == 0) - pthread_cond_wait (&event_pool->cond, - &event_pool->mutex); - - if (event_pool->used > event_pool->evcache_size) { - GF_FREE (event_pool->evcache); - - event_pool->evcache = events = NULL; - - event_pool->evcache_size = - event_pool->used + 256; - - events = GF_CALLOC (event_pool->evcache_size, - sizeof (struct epoll_event), - gf_common_mt_epoll_event); - if (!events) - break; - - event_pool->evcache = events; - } - } - pthread_mutex_unlock (&event_pool->mutex); - - ret = epoll_wait (event_pool->fd, event_pool->evcache, - event_pool->evcache_size, -1); + for (;;) { + ret = epoll_wait (event_pool->fd, &event, 1, -1); if (ret == 0) /* timeout */ @@ -436,28 +600,43 @@ event_dispatch_epoll (struct event_pool *event_pool) /* sys call */ continue; - size = ret; + ret = event_dispatch_epoll_handler (event_pool, &event); + } +out: + return NULL; +} - for (i = 0; i < size; i++) { - if (!events || !events[i].events) - continue; - ret = event_dispatch_epoll_handler (event_pool, - events, i); - } - } +#define GLUSTERFS_EPOLL_MAXTHREADS 2 -out: - return ret; + +static int +event_dispatch_epoll (struct event_pool *event_pool) +{ + int i = 0; + pthread_t pollers[GLUSTERFS_EPOLL_MAXTHREADS]; + int ret = -1; + + for (i = 0; i < GLUSTERFS_EPOLL_MAXTHREADS; i++) { + ret = pthread_create (&pollers[i], NULL, + event_dispatch_epoll_worker, + event_pool); + } + + for (i = 0; i < GLUSTERFS_EPOLL_MAXTHREADS; i++) + pthread_join (pollers[i], NULL); + + return ret; } struct event_ops event_ops_epoll = { - .new = event_pool_new_epoll, - .event_register = event_register_epoll, - .event_select_on = event_select_on_epoll, - .event_unregister = event_unregister_epoll, - .event_dispatch = event_dispatch_epoll + .new = event_pool_new_epoll, + .event_register = event_register_epoll, + .event_select_on = event_select_on_epoll, + .event_unregister = event_unregister_epoll, + .event_unregister_close = event_unregister_close_epoll, + .event_dispatch = event_dispatch_epoll }; #endif diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c index 7f7f560d086..a7e2e663103 100644 --- a/libglusterfs/src/event-poll.c +++ b/libglusterfs/src/event-poll.c @@ -26,6 +26,16 @@ #include "config.h" #endif + + +struct event_slot_poll { + int fd; + int events; + void *data; + event_handler_t handler; +}; + + static int event_register_poll (struct event_pool *event_pool, int fd, event_handler_t handler, @@ -63,12 +73,16 @@ __event_getindex (struct event_pool *event_pool, int fd, int idx) GF_VALIDATE_OR_GOTO ("event", event_pool, out); + /* lookup in used space based on index provided */ if (idx > -1 && idx < event_pool->used) { - if (event_pool->reg[idx].fd == fd) + if (event_pool->reg[idx].fd == fd) { ret = idx; + goto out; + } } - for (i=0; ret == -1 && i<event_pool->used; i++) { + /* search in used space, if lookup fails */ + for (i = 0; i < event_pool->used; i++) { if (event_pool->reg[i].fd == fd) { ret = i; break; @@ -264,6 +278,20 @@ out: static int +event_unregister_close_poll (struct event_pool *event_pool, int fd, + int idx_hint) +{ + int ret = -1; + + ret = event_unregister_poll (event_pool, fd, idx_hint); + + close (fd); + + return ret; +} + + +static int event_select_on_poll (struct event_pool *event_pool, int fd, int idx_hint, int poll_in, int poll_out) { @@ -443,9 +471,10 @@ out: struct event_ops event_ops_poll = { - .new = event_pool_new_poll, - .event_register = event_register_poll, - .event_select_on = event_select_on_poll, - .event_unregister = event_unregister_poll, - .event_dispatch = event_dispatch_poll + .new = event_pool_new_poll, + .event_register = event_register_poll, + .event_select_on = event_select_on_poll, + .event_unregister = event_unregister_poll, + .event_unregister_close = event_unregister_close_poll, + .event_dispatch = event_dispatch_poll }; diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c index 0197e7948b5..6c253df3c1a 100644 --- a/libglusterfs/src/event.c +++ b/libglusterfs/src/event.c @@ -32,10 +32,10 @@ struct event_pool * event_pool_new (int count) { struct event_pool *event_pool = NULL; - extern struct event_ops event_ops_poll; + extern struct event_ops event_ops_poll; #ifdef HAVE_SYS_EPOLL_H - extern struct event_ops event_ops_epoll; + extern struct event_ops event_ops_epoll; event_pool = event_ops_epoll.new (count); @@ -89,6 +89,20 @@ out: int +event_unregister_close (struct event_pool *event_pool, int fd, int idx) +{ + int ret = -1; + + GF_VALIDATE_OR_GOTO ("event", event_pool, out); + + ret = event_pool->ops->event_unregister_close (event_pool, fd, idx); + +out: + return ret; +} + + +int event_select_on (struct event_pool *event_pool, int fd, int idx_hint, int poll_in, int poll_out) { diff --git a/libglusterfs/src/event.h b/libglusterfs/src/event.h index 7ed182492e2..3b3ab0e4b2f 100644 --- a/libglusterfs/src/event.h +++ b/libglusterfs/src/event.h @@ -20,15 +20,20 @@ struct event_pool; struct event_ops; +struct event_slot_poll; +struct event_slot_epoll; struct event_data { - int fd; int idx; + int gen; } __attribute__ ((__packed__, __may_alias__)); typedef int (*event_handler_t) (int fd, int idx, void *data, int poll_in, int poll_out, int poll_err); +#define EVENT_EPOLL_TABLES 1024 +#define EVENT_EPOLL_SLOTS 1024 + struct event_pool { struct event_ops *ops; @@ -36,12 +41,9 @@ struct event_pool { int breaker[2]; int count; - struct { - int fd; - int events; - void *data; - event_handler_t handler; - } *reg; + struct event_slot_poll *reg; + struct event_slot_epoll *ereg[EVENT_EPOLL_TABLES]; + int slots_used[EVENT_EPOLL_TABLES]; int used; int changed; @@ -65,6 +67,9 @@ struct event_ops { int (*event_unregister) (struct event_pool *event_pool, int fd, int idx); + int (*event_unregister_close) (struct event_pool *event_pool, int fd, + int idx); + int (*event_dispatch) (struct event_pool *event_pool); }; @@ -75,6 +80,7 @@ int event_register (struct event_pool *event_pool, int fd, event_handler_t handler, void *data, int poll_in, int poll_out); int event_unregister (struct event_pool *event_pool, int fd, int idx); +int event_unregister_close (struct event_pool *event_pool, int fd, int idx); int event_dispatch (struct event_pool *event_pool); #endif /* _EVENT_H_ */ diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h index 1cce6db7501..388b8dedfd9 100644 --- a/libglusterfs/src/mem-types.h +++ b/libglusterfs/src/mem-types.h @@ -125,6 +125,7 @@ enum gf_common_mem_types_ { gf_common_mt_strfd_t = 109, gf_common_mt_strfd_data_t = 110, gf_common_mt_regex_t = 111, + gf_common_mt_ereg = 112, gf_common_mt_end }; #endif diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 107590b0273..0a3a5812a91 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -291,6 +291,22 @@ ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func) case SSL_ERROR_NONE: return r; case SSL_ERROR_WANT_READ: + /* If we are attempting to connect/accept then we + * should wait here on the poll, for the SSL + * (re)negotiation to complete, else we would error out + * on the accept/connect. + * If we are here when attempting to read/write + * then we return r (or -1) as the socket is always + * primed for the read event, and it would eventually + * call one of the SSL routines */ + /* NOTE: Only way to determine this is a accept/connect + * is to examine buf or func, which is not very + * clean */ + if ((func == (SSL_trinary_func *)SSL_read) + || (func == (SSL_trinary_func *) SSL_write)) { + return r; + } + pfd.fd = priv->sock; pfd.events = POLLIN; if (poll(&pfd,1,-1) < 0) { @@ -944,9 +960,8 @@ __socket_reset (rpc_transport_t *this) memset (&priv->incoming, 0, sizeof (priv->incoming)); - event_unregister (this->ctx->event_pool, priv->sock, priv->idx); + event_unregister_close (this->ctx->event_pool, priv->sock, priv->idx); - close (priv->sock); priv->sock = -1; priv->idx = -1; priv->connected = -1; @@ -2236,9 +2251,16 @@ socket_event_poll_in (rpc_transport_t *this) rpc_transport_pollin_t *pollin = NULL; socket_private_t *priv = this->private; - ret = socket_proto_state_machine (this, &pollin); + do { + /* consume all we can, this is our only chance + (Edge Triggered polling in epoll) + */ + pollin = NULL; + ret = socket_proto_state_machine (this, &pollin); + + if (!pollin) + break; - if (pollin != NULL) { priv->ot_state = OT_CALLBACK; ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, pollin); @@ -2246,7 +2268,8 @@ socket_event_poll_in (rpc_transport_t *this) priv->ot_state = OT_RUNNING; } rpc_transport_pollin_destroy (pollin); - } + + } while (pollin); return ret; } |