summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVijaikumar M <vmallika@redhat.com>2014-06-19 15:41:22 +0530
committerVijay Bellur <vbellur@redhat.com>2015-02-07 13:17:30 -0800
commitc61074400a45e69c6edbf82b8ed02568726d37ae (patch)
treec9c826414bcd3da0e1f30edbaaf79ac0c716a371
parent5e25569ed0717aa8636ad708430a823d39f9aa60 (diff)
epoll: edge triggered and multi-threaded epoll
- edge triggered (oneshot) polling with epoll - pick one event to avoid multiple events getting picked up by same thread and so get better distribution of events against multiple threads - wire support for multiple poll threads to epoll_wait in parallel - evdata to store absolute index and not hint for epoll - store index and gen of slot instead of fd and index hint - perform fd close asynchronously inside event.c for multithread safety - poll is still single threaded Change-Id: I536851dda0ab224c5d5a1b130a571397c9cace8f BUG: 1104462 Signed-off-by: Anand Avati <avati@redhat.com> Signed-off-by: Vijaikumar M <vmallika@redhat.com> Signed-off-by: Jeff Darcy <jdarcy@redhat.com> Signed-off-by: Shyam <srangana@redhat.com> Reviewed-on: http://review.gluster.org/3842 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Raghavendra G <rgowdapp@redhat.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
-rw-r--r--libglusterfs/src/event-epoll.c747
-rw-r--r--libglusterfs/src/event-poll.c43
-rw-r--r--libglusterfs/src/event.c18
-rw-r--r--libglusterfs/src/event.h20
-rw-r--r--libglusterfs/src/mem-types.h1
-rw-r--r--rpc/rpc-transport/socket/src/socket.c33
6 files changed, 557 insertions, 305 deletions
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c
index 06b3236246a..9082954e4e4 100644
--- a/libglusterfs/src/event-epoll.c
+++ b/libglusterfs/src/event-epoll.c
@@ -31,28 +31,203 @@
#include <sys/epoll.h>
+struct event_slot_epoll {
+ int fd;
+ int events;
+ int gen;
+ int ref;
+ int do_close;
+ int in_handler;
+ void *data;
+ event_handler_t handler;
+ gf_lock_t lock;
+};
+
+
+static struct event_slot_epoll *
+__event_newtable (struct event_pool *event_pool, int table_idx)
+{
+ struct event_slot_epoll *table = NULL;
+ int i = -1;
+
+ table = GF_CALLOC (sizeof (*table), EVENT_EPOLL_SLOTS,
+ gf_common_mt_ereg);
+ if (!table)
+ return NULL;
+
+ for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {
+ table[i].fd = -1;
+ LOCK_INIT (&table[i].lock);
+ }
+
+ event_pool->ereg[table_idx] = table;
+ event_pool->slots_used[table_idx] = 0;
+
+ return table;
+}
+
+
static int
-__event_getindex (struct event_pool *event_pool, int fd, int idx)
+__event_slot_alloc (struct event_pool *event_pool, int fd)
{
- int ret = -1;
int i = 0;
+ int table_idx = -1;
+ int gen = -1;
+ struct event_slot_epoll *table = NULL;
+
+ for (i = 0; i < EVENT_EPOLL_TABLES; i++) {
+ switch (event_pool->slots_used[i]) {
+ case EVENT_EPOLL_SLOTS:
+ continue;
+ case 0:
+ if (!event_pool->ereg[i]) {
+ table = __event_newtable (event_pool, i);
+ if (!table)
+ return -1;
+ }
+ break;
+ default:
+ table = event_pool->ereg[i];
+ break;
+ }
+
+ if (table)
+ /* break out of the loop */
+ break;
+ }
+
+ if (!table)
+ return -1;
+
+ table_idx = i;
+
+ for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {
+ if (table[i].fd == -1) {
+ /* wipe everything except bump the generation */
+ gen = table[i].gen;
+ memset (&table[i], 0, sizeof (table[i]));
+ table[i].gen = gen + 1;
+
+ LOCK_INIT (&table[i].lock);
+
+ table[i].fd = fd;
+ event_pool->slots_used[table_idx]++;
+
+ break;
+ }
+ }
+
+ return table_idx * EVENT_EPOLL_SLOTS + i;
+}
- GF_VALIDATE_OR_GOTO ("event", event_pool, out);
- if (idx > -1 && idx < event_pool->used) {
- if (event_pool->reg[idx].fd == fd)
- ret = idx;
- }
+static int
+event_slot_alloc (struct event_pool *event_pool, int fd)
+{
+ int idx = -1;
- for (i=0; ret == -1 && i<event_pool->used; i++) {
- if (event_pool->reg[i].fd == fd) {
- ret = i;
- break;
- }
- }
+ pthread_mutex_lock (&event_pool->mutex);
+ {
+ idx = __event_slot_alloc (event_pool, fd);
+ }
+ pthread_mutex_unlock (&event_pool->mutex);
-out:
- return ret;
+ return idx;
+}
+
+
+
+static void
+__event_slot_dealloc (struct event_pool *event_pool, int idx)
+{
+ int table_idx = 0;
+ int offset = 0;
+ struct event_slot_epoll *table = NULL;
+ struct event_slot_epoll *slot = NULL;
+
+ table_idx = idx / EVENT_EPOLL_SLOTS;
+ offset = idx % EVENT_EPOLL_SLOTS;
+
+ table = event_pool->ereg[table_idx];
+ if (!table)
+ return;
+
+ slot = &table[offset];
+ slot->gen++;
+
+ slot->fd = -1;
+ event_pool->slots_used[table_idx]--;
+
+ return;
+}
+
+
+static void
+event_slot_dealloc (struct event_pool *event_pool, int idx)
+{
+ pthread_mutex_lock (&event_pool->mutex);
+ {
+ __event_slot_dealloc (event_pool, idx);
+ }
+ pthread_mutex_unlock (&event_pool->mutex);
+
+ return;
+}
+
+
+static struct event_slot_epoll *
+event_slot_get (struct event_pool *event_pool, int idx)
+{
+ struct event_slot_epoll *slot = NULL;
+ struct event_slot_epoll *table = NULL;
+ int table_idx = 0;
+ int offset = 0;
+
+ table_idx = idx / EVENT_EPOLL_SLOTS;
+ offset = idx % EVENT_EPOLL_SLOTS;
+
+ table = event_pool->ereg[table_idx];
+ if (!table)
+ return NULL;
+
+ slot = &table[offset];
+
+ LOCK (&slot->lock);
+ {
+ slot->ref++;
+ }
+ UNLOCK (&slot->lock);
+
+ return slot;
+}
+
+
+static void
+event_slot_unref (struct event_pool *event_pool, struct event_slot_epoll *slot,
+ int idx)
+{
+ int ref = -1;
+ int fd = -1;
+ int do_close = 0;
+
+ LOCK (&slot->lock);
+ {
+ ref = --slot->ref;
+ fd = slot->fd;
+ do_close = slot->do_close;
+ }
+ UNLOCK (&slot->lock);
+
+ if (ref)
+ /* slot still alive */
+ goto done;
+
+ event_slot_dealloc (event_pool, idx);
+
+ if (do_close)
+ close (fd);
+done:
+ return;
}
@@ -68,17 +243,6 @@ event_pool_new_epoll (int count)
if (!event_pool)
goto out;
- event_pool->count = count;
- event_pool->reg = GF_CALLOC (event_pool->count,
- sizeof (*event_pool->reg),
- gf_common_mt_reg);
-
- if (!event_pool->reg) {
- GF_FREE (event_pool);
- event_pool = NULL;
- goto out;
- }
-
epfd = epoll_create (count);
if (epfd == -1) {
@@ -95,13 +259,49 @@ event_pool_new_epoll (int count)
event_pool->count = count;
pthread_mutex_init (&event_pool->mutex, NULL);
- pthread_cond_init (&event_pool->cond, NULL);
out:
return event_pool;
}
+static void
+__slot_update_events (struct event_slot_epoll *slot, int poll_in, int poll_out)
+{
+ switch (poll_in) {
+ case 1:
+ slot->events |= EPOLLIN;
+ break;
+ case 0:
+ slot->events &= ~EPOLLIN;
+ break;
+ case -1:
+ /* do nothing */
+ break;
+ default:
+ gf_log ("epoll", GF_LOG_ERROR,
+ "invalid poll_in value %d", poll_in);
+ break;
+ }
+
+ switch (poll_out) {
+ case 1:
+ slot->events |= EPOLLOUT;
+ break;
+ case 0:
+ slot->events &= ~EPOLLOUT;
+ break;
+ case -1:
+ /* do nothing */
+ break;
+ default:
+ gf_log ("epoll", GF_LOG_ERROR,
+ "invalid poll_out value %d", poll_out);
+ break;
+ }
+}
+
+
int
event_register_epoll (struct event_pool *event_pool, int fd,
event_handler_t handler,
@@ -111,87 +311,98 @@ event_register_epoll (struct event_pool *event_pool, int fd,
int ret = -1;
struct epoll_event epoll_event = {0, };
struct event_data *ev_data = (void *)&epoll_event.data;
+ struct event_slot_epoll *slot = NULL;
GF_VALIDATE_OR_GOTO ("event", event_pool, out);
- pthread_mutex_lock (&event_pool->mutex);
- {
- if (event_pool->count == event_pool->used) {
- event_pool->count *= 2;
+ idx = event_slot_alloc (event_pool, fd);
+ if (idx == -1) {
+ gf_log ("epoll", GF_LOG_ERROR,
+ "could not find slot for fd=%d", fd);
+ return -1;
+ }
+
+ slot = event_slot_get (event_pool, idx);
+
+ assert (slot->fd == fd);
+
+ LOCK (&slot->lock);
+ {
+ /* make epoll edge triggered and 'singleshot', which
+ means we need to re-add the fd with
+ epoll_ctl(EPOLL_CTL_MOD) after delivery of every
+ single event. This assures us that while a poller
+ thread has picked up and is processing an event,
+ another poller will not try to pick this at the same
+ time as well.
+ */
+
+ slot->events = EPOLLPRI | EPOLLET | EPOLLONESHOT;
+ slot->handler = handler;
+ slot->data = data;
+
+ __slot_update_events (slot, poll_in, poll_out);
+
+ epoll_event.events = slot->events;
+ ev_data->idx = idx;
+ ev_data->gen = slot->gen;
+
+ ret = epoll_ctl (event_pool->fd, EPOLL_CTL_ADD, fd,
+ &epoll_event);
+ /* check ret after UNLOCK() to avoid deadlock in
+ event_slot_unref()
+ */
+ }
+ UNLOCK (&slot->lock);
+
+ if (ret == -1) {
+ gf_log ("epoll", GF_LOG_ERROR,
+ "failed to add fd(=%d) to epoll fd(=%d) (%s)",
+ fd, event_pool->fd, strerror (errno));
+
+ event_slot_unref (event_pool, slot, idx);
+ idx = -1;
+ }
+
+ /* keep slot->ref (do not event_slot_unref) if successful */
+out:
+ return idx;
+}
- event_pool->reg = GF_REALLOC (event_pool->reg,
- event_pool->count *
- sizeof (*event_pool->reg));
- if (!event_pool->reg) {
- gf_log ("epoll", GF_LOG_ERROR,
- "event registry re-allocation failed");
- goto unlock;
- }
- }
-
- idx = event_pool->used;
- event_pool->used++;
-
- event_pool->reg[idx].fd = fd;
- event_pool->reg[idx].events = EPOLLPRI;
- event_pool->reg[idx].handler = handler;
- event_pool->reg[idx].data = data;
-
- switch (poll_in) {
- case 1:
- event_pool->reg[idx].events |= EPOLLIN;
- break;
- case 0:
- event_pool->reg[idx].events &= ~EPOLLIN;
- break;
- case -1:
- /* do nothing */
- break;
- default:
- gf_log ("epoll", GF_LOG_ERROR,
- "invalid poll_in value %d", poll_in);
- break;
- }
+static int
+event_unregister_epoll_common (struct event_pool *event_pool, int fd,
+ int idx, int do_close)
+{
+ int ret = -1;
+ struct event_slot_epoll *slot = NULL;
- switch (poll_out) {
- case 1:
- event_pool->reg[idx].events |= EPOLLOUT;
- break;
- case 0:
- event_pool->reg[idx].events &= ~EPOLLOUT;
- break;
- case -1:
- /* do nothing */
- break;
- default:
- gf_log ("epoll", GF_LOG_ERROR,
- "invalid poll_out value %d", poll_out);
- break;
- }
+ GF_VALIDATE_OR_GOTO ("event", event_pool, out);
- event_pool->changed = 1;
+ slot = event_slot_get (event_pool, idx);
- epoll_event.events = event_pool->reg[idx].events;
- ev_data->fd = fd;
- ev_data->idx = idx;
+ assert (slot->fd == fd);
- ret = epoll_ctl (event_pool->fd, EPOLL_CTL_ADD, fd,
- &epoll_event);
+ LOCK (&slot->lock);
+ {
+ ret = epoll_ctl (event_pool->fd, EPOLL_CTL_DEL, fd, NULL);
if (ret == -1) {
gf_log ("epoll", GF_LOG_ERROR,
- "failed to add fd(=%d) to epoll fd(=%d) (%s)",
+ "fail to del fd(=%d) from epoll fd(=%d) (%s)",
fd, event_pool->fd, strerror (errno));
goto unlock;
}
- pthread_cond_broadcast (&event_pool->cond);
+ slot->do_close = do_close;
+ slot->gen++; /* detect unregister in dispatch_handler() */
}
unlock:
- pthread_mutex_unlock (&event_pool->mutex);
+ UNLOCK (&slot->lock);
+ event_slot_unref (event_pool, slot, idx); /* one for event_register() */
+ event_slot_unref (event_pool, slot, idx); /* one for event_slot_get() */
out:
return ret;
}
@@ -200,233 +411,186 @@ out:
static int
event_unregister_epoll (struct event_pool *event_pool, int fd, int idx_hint)
{
- int idx = -1;
- int ret = -1;
-
- struct epoll_event epoll_event = {0, };
- struct event_data *ev_data = (void *)&epoll_event.data;
- int lastidx = -1;
-
- GF_VALIDATE_OR_GOTO ("event", event_pool, out);
+ int ret = -1;
- pthread_mutex_lock (&event_pool->mutex);
- {
- idx = __event_getindex (event_pool, fd, idx_hint);
+ ret = event_unregister_epoll_common (event_pool, fd, idx_hint, 0);
- if (idx == -1) {
- gf_log ("epoll", GF_LOG_ERROR,
- "index not found for fd=%d (idx_hint=%d)",
- fd, idx_hint);
- errno = ENOENT;
- goto unlock;
- }
-
- ret = epoll_ctl (event_pool->fd, EPOLL_CTL_DEL, fd, NULL);
-
- /* if ret is -1, this array member should never be accessed */
- /* if it is 0, the array member might be used by idx_cache
- * in which case the member should not be accessed till
- * it is reallocated
- */
-
- event_pool->reg[idx].fd = -1;
-
- if (ret == -1) {
- gf_log ("epoll", GF_LOG_ERROR,
- "fail to del fd(=%d) from epoll fd(=%d) (%s)",
- fd, event_pool->fd, strerror (errno));
- goto unlock;
- }
-
- lastidx = event_pool->used - 1;
- if (lastidx == idx) {
- event_pool->used--;
- goto unlock;
- }
+ return ret;
+}
- epoll_event.events = event_pool->reg[lastidx].events;
- ev_data->fd = event_pool->reg[lastidx].fd;
- ev_data->idx = idx;
- ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, ev_data->fd,
- &epoll_event);
- if (ret == -1) {
- gf_log ("epoll", GF_LOG_ERROR,
- "fail to modify fd(=%d) index %d to %d (%s)",
- ev_data->fd, event_pool->used, idx,
- strerror (errno));
- goto unlock;
- }
+static int
+event_unregister_close_epoll (struct event_pool *event_pool, int fd,
+ int idx_hint)
+{
+ int ret = -1;
- /* just replace the unregistered idx by last one */
- event_pool->reg[idx] = event_pool->reg[lastidx];
- event_pool->used--;
- }
-unlock:
- pthread_mutex_unlock (&event_pool->mutex);
+ ret = event_unregister_epoll_common (event_pool, fd, idx_hint, 1);
-out:
- return ret;
+ return ret;
}
static int
-event_select_on_epoll (struct event_pool *event_pool, int fd, int idx_hint,
+event_select_on_epoll (struct event_pool *event_pool, int fd, int idx,
int poll_in, int poll_out)
{
- int idx = -1;
int ret = -1;
-
+ struct event_slot_epoll *slot = NULL;
struct epoll_event epoll_event = {0, };
struct event_data *ev_data = (void *)&epoll_event.data;
GF_VALIDATE_OR_GOTO ("event", event_pool, out);
- pthread_mutex_lock (&event_pool->mutex);
- {
- idx = __event_getindex (event_pool, fd, idx_hint);
-
- if (idx == -1) {
- gf_log ("epoll", GF_LOG_ERROR,
- "index not found for fd=%d (idx_hint=%d)",
- fd, idx_hint);
- errno = ENOENT;
- goto unlock;
- }
-
- switch (poll_in) {
- case 1:
- event_pool->reg[idx].events |= EPOLLIN;
- break;
- case 0:
- event_pool->reg[idx].events &= ~EPOLLIN;
- break;
- case -1:
- /* do nothing */
- break;
- default:
- gf_log ("epoll", GF_LOG_ERROR,
- "invalid poll_in value %d", poll_in);
- break;
- }
-
- switch (poll_out) {
- case 1:
- event_pool->reg[idx].events |= EPOLLOUT;
- break;
- case 0:
- event_pool->reg[idx].events &= ~EPOLLOUT;
- break;
- case -1:
- /* do nothing */
- break;
- default:
- gf_log ("epoll", GF_LOG_ERROR,
- "invalid poll_out value %d", poll_out);
- break;
- }
-
- epoll_event.events = event_pool->reg[idx].events;
- ev_data->fd = fd;
- ev_data->idx = idx;
-
- ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, fd,
- &epoll_event);
- if (ret == -1) {
- gf_log ("epoll", GF_LOG_ERROR,
- "failed to modify fd(=%d) events to %d",
- fd, epoll_event.events);
- }
- }
+ slot = event_slot_get (event_pool, idx);
+
+ assert (slot->fd == fd);
+
+ LOCK (&slot->lock);
+ {
+ __slot_update_events (slot, poll_in, poll_out);
+
+ epoll_event.events = slot->events;
+ ev_data->idx = idx;
+ ev_data->gen = slot->gen;
+
+ if (slot->in_handler)
+ /* in_handler indicates at least one thread
+ executing event_dispatch_epoll_handler()
+ which will perform epoll_ctl(EPOLL_CTL_MOD)
+ anyways (because of EPOLLET)
+
+ This not only saves a system call, but also
+ avoids possibility of another epoll thread
+ parallely picking up the next event while the
+ ongoing handler is still in progress (and
+ resulting in unnecessary contention on
+ rpc_transport_t->mutex).
+ */
+ goto unlock;
+
+ ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, fd,
+ &epoll_event);
+ if (ret == -1) {
+ gf_log ("epoll", GF_LOG_ERROR,
+ "failed to modify fd(=%d) events to %d",
+ fd, epoll_event.events);
+ }
+ }
unlock:
- pthread_mutex_unlock (&event_pool->mutex);
+ UNLOCK (&slot->lock);
+
+ event_slot_unref (event_pool, slot, idx);
out:
- return ret;
+ return idx;
}
static int
event_dispatch_epoll_handler (struct event_pool *event_pool,
- struct epoll_event *events, int i)
+ struct epoll_event *event)
{
- struct event_data *event_data = NULL;
+ struct event_data *ev_data = NULL;
+ struct event_slot_epoll *slot = NULL;
event_handler_t handler = NULL;
void *data = NULL;
int idx = -1;
+ int gen = -1;
int ret = -1;
+ int fd = -1;
-
- event_data = (void *)&events[i].data;
+ ev_data = (void *)&event->data;
handler = NULL;
data = NULL;
- pthread_mutex_lock (&event_pool->mutex);
- {
- idx = __event_getindex (event_pool, event_data->fd,
- event_data->idx);
-
- if (idx == -1) {
- gf_log ("epoll", GF_LOG_ERROR,
- "index not found for fd(=%d) (idx_hint=%d)",
- event_data->fd, event_data->idx);
- goto unlock;
- }
-
- handler = event_pool->reg[idx].handler;
- data = event_pool->reg[idx].data;
- }
-unlock:
- pthread_mutex_unlock (&event_pool->mutex);
+ idx = ev_data->idx;
+ gen = ev_data->gen;
+
+ slot = event_slot_get (event_pool, idx);
+
+ LOCK (&slot->lock);
+ {
+ fd = slot->fd;
+ if (fd == -1) {
+ gf_log ("epoll", GF_LOG_ERROR,
+ "stale fd found on idx=%d, gen=%d, events=%d, "
+ "slot->gen=%d",
+ idx, gen, event->events, slot->gen);
+ /* fd got unregistered in another thread */
+ goto pre_unlock;
+ }
+
+ if (gen != slot->gen) {
+ gf_log ("epoll", GF_LOG_ERROR,
+ "generation mismatch on idx=%d, gen=%d, "
+ "slot->gen=%d, slot->fd=%d",
+ idx, gen, slot->gen, slot->fd);
+ /* slot was re-used and therefore is another fd! */
+ goto pre_unlock;
+ }
+
+ handler = slot->handler;
+ data = slot->data;
+
+ slot->in_handler++;
+ }
+pre_unlock:
+ UNLOCK (&slot->lock);
+
+ if (!handler)
+ goto out;
+
+ ret = handler (fd, idx, data,
+ (event->events & (EPOLLIN|EPOLLPRI)),
+ (event->events & (EPOLLOUT)),
+ (event->events & (EPOLLERR|EPOLLHUP)));
+
+ LOCK (&slot->lock);
+ {
+ slot->in_handler--;
+
+ if (gen != slot->gen) {
+ /* event_unregister() happened while we were
+ in handler()
+ */
+ gf_log ("epoll", GF_LOG_DEBUG,
+ "generation bumped on idx=%d from "
+ "gen=%d to slot->gen=%d, fd=%d, "
+ "slot->fd=%d",
+ idx, gen, slot->gen, fd, slot->fd);
+ goto post_unlock;
+ }
+
+ /* This call also picks up the changes made by another
+ thread calling event_select_on_epoll() while this
+ thread was busy in handler()
+ */
+ event->events = slot->events;
+ ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD,
+ fd, event);
+ }
+post_unlock:
+ UNLOCK (&slot->lock);
+out:
+ event_slot_unref (event_pool, slot, idx);
- if (handler)
- ret = handler (event_data->fd, event_data->idx, data,
- (events[i].events & (EPOLLIN|EPOLLPRI)),
- (events[i].events & (EPOLLOUT)),
- (events[i].events & (EPOLLERR|EPOLLHUP)));
return ret;
}
-static int
-event_dispatch_epoll (struct event_pool *event_pool)
+static void *
+event_dispatch_epoll_worker (void *data)
{
- struct epoll_event *events = NULL;
- int size = 0;
- int i = 0;
+ struct epoll_event event;
int ret = -1;
+ struct event_pool *event_pool = data;
GF_VALIDATE_OR_GOTO ("event", event_pool, out);
- while (1) {
- pthread_mutex_lock (&event_pool->mutex);
- {
- while (event_pool->used == 0)
- pthread_cond_wait (&event_pool->cond,
- &event_pool->mutex);
-
- if (event_pool->used > event_pool->evcache_size) {
- GF_FREE (event_pool->evcache);
-
- event_pool->evcache = events = NULL;
-
- event_pool->evcache_size =
- event_pool->used + 256;
-
- events = GF_CALLOC (event_pool->evcache_size,
- sizeof (struct epoll_event),
- gf_common_mt_epoll_event);
- if (!events)
- break;
-
- event_pool->evcache = events;
- }
- }
- pthread_mutex_unlock (&event_pool->mutex);
-
- ret = epoll_wait (event_pool->fd, event_pool->evcache,
- event_pool->evcache_size, -1);
+ for (;;) {
+ ret = epoll_wait (event_pool->fd, &event, 1, -1);
if (ret == 0)
/* timeout */
@@ -436,28 +600,43 @@ event_dispatch_epoll (struct event_pool *event_pool)
/* sys call */
continue;
- size = ret;
+ ret = event_dispatch_epoll_handler (event_pool, &event);
+ }
+out:
+ return NULL;
+}
- for (i = 0; i < size; i++) {
- if (!events || !events[i].events)
- continue;
- ret = event_dispatch_epoll_handler (event_pool,
- events, i);
- }
- }
+#define GLUSTERFS_EPOLL_MAXTHREADS 2
-out:
- return ret;
+
+static int
+event_dispatch_epoll (struct event_pool *event_pool)
+{
+ int i = 0;
+ pthread_t pollers[GLUSTERFS_EPOLL_MAXTHREADS];
+ int ret = -1;
+
+ for (i = 0; i < GLUSTERFS_EPOLL_MAXTHREADS; i++) {
+ ret = pthread_create (&pollers[i], NULL,
+ event_dispatch_epoll_worker,
+ event_pool);
+ }
+
+ for (i = 0; i < GLUSTERFS_EPOLL_MAXTHREADS; i++)
+ pthread_join (pollers[i], NULL);
+
+ return ret;
}
struct event_ops event_ops_epoll = {
- .new = event_pool_new_epoll,
- .event_register = event_register_epoll,
- .event_select_on = event_select_on_epoll,
- .event_unregister = event_unregister_epoll,
- .event_dispatch = event_dispatch_epoll
+ .new = event_pool_new_epoll,
+ .event_register = event_register_epoll,
+ .event_select_on = event_select_on_epoll,
+ .event_unregister = event_unregister_epoll,
+ .event_unregister_close = event_unregister_close_epoll,
+ .event_dispatch = event_dispatch_epoll
};
#endif
diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c
index 7f7f560d086..a7e2e663103 100644
--- a/libglusterfs/src/event-poll.c
+++ b/libglusterfs/src/event-poll.c
@@ -26,6 +26,16 @@
#include "config.h"
#endif
+
+
+struct event_slot_poll {
+ int fd;
+ int events;
+ void *data;
+ event_handler_t handler;
+};
+
+
static int
event_register_poll (struct event_pool *event_pool, int fd,
event_handler_t handler,
@@ -63,12 +73,16 @@ __event_getindex (struct event_pool *event_pool, int fd, int idx)
GF_VALIDATE_OR_GOTO ("event", event_pool, out);
+ /* lookup in used space based on index provided */
if (idx > -1 && idx < event_pool->used) {
- if (event_pool->reg[idx].fd == fd)
+ if (event_pool->reg[idx].fd == fd) {
ret = idx;
+ goto out;
+ }
}
- for (i=0; ret == -1 && i<event_pool->used; i++) {
+ /* search in used space, if lookup fails */
+ for (i = 0; i < event_pool->used; i++) {
if (event_pool->reg[i].fd == fd) {
ret = i;
break;
@@ -264,6 +278,20 @@ out:
static int
+event_unregister_close_poll (struct event_pool *event_pool, int fd,
+ int idx_hint)
+{
+ int ret = -1;
+
+ ret = event_unregister_poll (event_pool, fd, idx_hint);
+
+ close (fd);
+
+ return ret;
+}
+
+
+static int
event_select_on_poll (struct event_pool *event_pool, int fd, int idx_hint,
int poll_in, int poll_out)
{
@@ -443,9 +471,10 @@ out:
struct event_ops event_ops_poll = {
- .new = event_pool_new_poll,
- .event_register = event_register_poll,
- .event_select_on = event_select_on_poll,
- .event_unregister = event_unregister_poll,
- .event_dispatch = event_dispatch_poll
+ .new = event_pool_new_poll,
+ .event_register = event_register_poll,
+ .event_select_on = event_select_on_poll,
+ .event_unregister = event_unregister_poll,
+ .event_unregister_close = event_unregister_close_poll,
+ .event_dispatch = event_dispatch_poll
};
diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c
index 0197e7948b5..6c253df3c1a 100644
--- a/libglusterfs/src/event.c
+++ b/libglusterfs/src/event.c
@@ -32,10 +32,10 @@ struct event_pool *
event_pool_new (int count)
{
struct event_pool *event_pool = NULL;
- extern struct event_ops event_ops_poll;
+ extern struct event_ops event_ops_poll;
#ifdef HAVE_SYS_EPOLL_H
- extern struct event_ops event_ops_epoll;
+ extern struct event_ops event_ops_epoll;
event_pool = event_ops_epoll.new (count);
@@ -89,6 +89,20 @@ out:
int
+event_unregister_close (struct event_pool *event_pool, int fd, int idx)
+{
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("event", event_pool, out);
+
+ ret = event_pool->ops->event_unregister_close (event_pool, fd, idx);
+
+out:
+ return ret;
+}
+
+
+int
event_select_on (struct event_pool *event_pool, int fd, int idx_hint,
int poll_in, int poll_out)
{
diff --git a/libglusterfs/src/event.h b/libglusterfs/src/event.h
index 7ed182492e2..3b3ab0e4b2f 100644
--- a/libglusterfs/src/event.h
+++ b/libglusterfs/src/event.h
@@ -20,15 +20,20 @@
struct event_pool;
struct event_ops;
+struct event_slot_poll;
+struct event_slot_epoll;
struct event_data {
- int fd;
int idx;
+ int gen;
} __attribute__ ((__packed__, __may_alias__));
typedef int (*event_handler_t) (int fd, int idx, void *data,
int poll_in, int poll_out, int poll_err);
+#define EVENT_EPOLL_TABLES 1024
+#define EVENT_EPOLL_SLOTS 1024
+
struct event_pool {
struct event_ops *ops;
@@ -36,12 +41,9 @@ struct event_pool {
int breaker[2];
int count;
- struct {
- int fd;
- int events;
- void *data;
- event_handler_t handler;
- } *reg;
+ struct event_slot_poll *reg;
+ struct event_slot_epoll *ereg[EVENT_EPOLL_TABLES];
+ int slots_used[EVENT_EPOLL_TABLES];
int used;
int changed;
@@ -65,6 +67,9 @@ struct event_ops {
int (*event_unregister) (struct event_pool *event_pool, int fd, int idx);
+ int (*event_unregister_close) (struct event_pool *event_pool, int fd,
+ int idx);
+
int (*event_dispatch) (struct event_pool *event_pool);
};
@@ -75,6 +80,7 @@ int event_register (struct event_pool *event_pool, int fd,
event_handler_t handler,
void *data, int poll_in, int poll_out);
int event_unregister (struct event_pool *event_pool, int fd, int idx);
+int event_unregister_close (struct event_pool *event_pool, int fd, int idx);
int event_dispatch (struct event_pool *event_pool);
#endif /* _EVENT_H_ */
diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h
index 1cce6db7501..388b8dedfd9 100644
--- a/libglusterfs/src/mem-types.h
+++ b/libglusterfs/src/mem-types.h
@@ -125,6 +125,7 @@ enum gf_common_mem_types_ {
gf_common_mt_strfd_t = 109,
gf_common_mt_strfd_data_t = 110,
gf_common_mt_regex_t = 111,
+ gf_common_mt_ereg = 112,
gf_common_mt_end
};
#endif
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 107590b0273..0a3a5812a91 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -291,6 +291,22 @@ ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func)
case SSL_ERROR_NONE:
return r;
case SSL_ERROR_WANT_READ:
+ /* If we are attempting to connect/accept then we
+ * should wait here on the poll, for the SSL
+ * (re)negotiation to complete, else we would error out
+ * on the accept/connect.
+ * If we are here when attempting to read/write
+ * then we return r (or -1) as the socket is always
+ * primed for the read event, and it would eventually
+ * call one of the SSL routines */
+ /* NOTE: Only way to determine this is a accept/connect
+ * is to examine buf or func, which is not very
+ * clean */
+ if ((func == (SSL_trinary_func *)SSL_read)
+ || (func == (SSL_trinary_func *) SSL_write)) {
+ return r;
+ }
+
pfd.fd = priv->sock;
pfd.events = POLLIN;
if (poll(&pfd,1,-1) < 0) {
@@ -944,9 +960,8 @@ __socket_reset (rpc_transport_t *this)
memset (&priv->incoming, 0, sizeof (priv->incoming));
- event_unregister (this->ctx->event_pool, priv->sock, priv->idx);
+ event_unregister_close (this->ctx->event_pool, priv->sock, priv->idx);
- close (priv->sock);
priv->sock = -1;
priv->idx = -1;
priv->connected = -1;
@@ -2236,9 +2251,16 @@ socket_event_poll_in (rpc_transport_t *this)
rpc_transport_pollin_t *pollin = NULL;
socket_private_t *priv = this->private;
- ret = socket_proto_state_machine (this, &pollin);
+ do {
+ /* consume all we can, this is our only chance
+ (Edge Triggered polling in epoll)
+ */
+ pollin = NULL;
+ ret = socket_proto_state_machine (this, &pollin);
+
+ if (!pollin)
+ break;
- if (pollin != NULL) {
priv->ot_state = OT_CALLBACK;
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
pollin);
@@ -2246,7 +2268,8 @@ socket_event_poll_in (rpc_transport_t *this)
priv->ot_state = OT_RUNNING;
}
rpc_transport_pollin_destroy (pollin);
- }
+
+ } while (pollin);
return ret;
}