diff options
author | Anand Avati <avati@redhat.com> | 2012-09-11 12:28:52 -0700 |
---|---|---|
committer | Anand Avati <avati@redhat.com> | 2012-09-12 12:34:38 -0700 |
commit | c3d7286a67ce0ac4db9cb8fa079a48f423245000 (patch) | |
tree | 41e1f2abe959de320de3f7a5ed4ee57d56f0d2e5 /libglusterfs/src | |
parent | 35d178dbd4ddb8c407e911823c850642563baa9f (diff) |
event: peel out poll and epoll specific code into separate files
code re-org, no change in logic.
Change-Id: Ib1cb3d3f4cf8989a916df7476479c26570d07a84
BUG: 821087
Signed-off-by: Anand Avati <avati@redhat.com>
Reviewed-on: http://review.gluster.org/3932
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Amar Tumballi <amarts@redhat.com>
Diffstat (limited to 'libglusterfs/src')
-rw-r--r-- | libglusterfs/src/Makefile.am | 4 | ||||
-rw-r--r-- | libglusterfs/src/event-epoll.c | 463 | ||||
-rw-r--r-- | libglusterfs/src/event-poll.c | 451 | ||||
-rw-r--r-- | libglusterfs/src/event.c | 838 |
4 files changed, 920 insertions, 836 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am index 0156c974a1b..0921a290193 100644 --- a/libglusterfs/src/Makefile.am +++ b/libglusterfs/src/Makefile.am @@ -23,7 +23,9 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \ $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c syncop.c \ graph-print.c trie.c run.c options.c fd-lk.c circ-buff.c \ event-history.c gidcache.c ctx.c \ - $(CONTRIBDIR)/libgen/basename_r.c $(CONTRIBDIR)/libgen/dirname_r.c + $(CONTRIBDIR)/libgen/basename_r.c $(CONTRIBDIR)/libgen/dirname_r.c \ + event-poll.c event-epoll.c + nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c new file mode 100644 index 00000000000..06b3236246a --- /dev/null +++ b/libglusterfs/src/event-epoll.c @@ -0,0 +1,463 @@ +/* + Copyright (c) 2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <sys/poll.h> +#include <pthread.h> +#include <unistd.h> +#include <fcntl.h> +#include <stdlib.h> +#include <errno.h> +#include <string.h> + +#include "logging.h" +#include "event.h" +#include "mem-pool.h" +#include "common-utils.h" + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + + +#ifdef HAVE_SYS_EPOLL_H +#include <sys/epoll.h> + + +static int +__event_getindex (struct event_pool *event_pool, int fd, int idx) +{ + int ret = -1; + int i = 0; + + GF_VALIDATE_OR_GOTO ("event", event_pool, out); + + if (idx > -1 && idx < event_pool->used) { + if (event_pool->reg[idx].fd == fd) + ret = idx; + } + + for (i=0; ret == -1 && i<event_pool->used; i++) { + if (event_pool->reg[i].fd == fd) { + ret = i; + break; + } + } + +out: + return ret; +} + + +static struct event_pool * +event_pool_new_epoll (int count) +{ + struct event_pool *event_pool = NULL; + int epfd = -1; + + event_pool = GF_CALLOC (1, sizeof (*event_pool), + gf_common_mt_event_pool); + + if (!event_pool) + goto out; + + event_pool->count = count; + event_pool->reg = GF_CALLOC (event_pool->count, + sizeof (*event_pool->reg), + gf_common_mt_reg); + + if (!event_pool->reg) { + GF_FREE (event_pool); + event_pool = NULL; + goto out; + } + + epfd = epoll_create (count); + + if (epfd == -1) { + gf_log ("epoll", GF_LOG_ERROR, "epoll fd creation failed (%s)", + strerror (errno)); + GF_FREE (event_pool->reg); + GF_FREE (event_pool); + event_pool = NULL; + goto out; + } + + event_pool->fd = epfd; + + event_pool->count = count; + + pthread_mutex_init (&event_pool->mutex, NULL); + pthread_cond_init (&event_pool->cond, NULL); + +out: + return event_pool; +} + + +int +event_register_epoll (struct event_pool *event_pool, int fd, + event_handler_t handler, + void *data, int poll_in, int poll_out) +{ + int idx = -1; + int ret = -1; + struct epoll_event epoll_event = {0, }; + struct event_data *ev_data = (void *)&epoll_event.data; + + + GF_VALIDATE_OR_GOTO ("event", event_pool, out); + + pthread_mutex_lock (&event_pool->mutex); + { + if (event_pool->count == event_pool->used) { + event_pool->count *= 2; + + event_pool->reg = GF_REALLOC (event_pool->reg, + event_pool->count * + sizeof (*event_pool->reg)); + + if (!event_pool->reg) { + gf_log ("epoll", GF_LOG_ERROR, + "event registry re-allocation failed"); + goto unlock; + } + } + + idx = event_pool->used; + event_pool->used++; + + event_pool->reg[idx].fd = fd; + event_pool->reg[idx].events = EPOLLPRI; + event_pool->reg[idx].handler = handler; + event_pool->reg[idx].data = data; + + switch (poll_in) { + case 1: + event_pool->reg[idx].events |= EPOLLIN; + break; + case 0: + event_pool->reg[idx].events &= ~EPOLLIN; + break; + case -1: + /* do nothing */ + break; + default: + gf_log ("epoll", GF_LOG_ERROR, + "invalid poll_in value %d", poll_in); + break; + } + + switch (poll_out) { + case 1: + event_pool->reg[idx].events |= EPOLLOUT; + break; + case 0: + event_pool->reg[idx].events &= ~EPOLLOUT; + break; + case -1: + /* do nothing */ + break; + default: + gf_log ("epoll", GF_LOG_ERROR, + "invalid poll_out value %d", poll_out); + break; + } + + event_pool->changed = 1; + + epoll_event.events = event_pool->reg[idx].events; + ev_data->fd = fd; + ev_data->idx = idx; + + ret = epoll_ctl (event_pool->fd, EPOLL_CTL_ADD, fd, + &epoll_event); + + if (ret == -1) { + gf_log ("epoll", GF_LOG_ERROR, + "failed to add fd(=%d) to epoll fd(=%d) (%s)", + fd, event_pool->fd, strerror (errno)); + goto unlock; + } + + pthread_cond_broadcast (&event_pool->cond); + } +unlock: + pthread_mutex_unlock (&event_pool->mutex); + +out: + return ret; +} + + +static int +event_unregister_epoll (struct event_pool *event_pool, int fd, int idx_hint) +{ + int idx = -1; + int ret = -1; + + struct epoll_event epoll_event = {0, }; + struct event_data *ev_data = (void *)&epoll_event.data; + int lastidx = -1; + + GF_VALIDATE_OR_GOTO ("event", event_pool, out); + + pthread_mutex_lock (&event_pool->mutex); + { + idx = __event_getindex (event_pool, fd, idx_hint); + + if (idx == -1) { + gf_log ("epoll", GF_LOG_ERROR, + "index not found for fd=%d (idx_hint=%d)", + fd, idx_hint); + errno = ENOENT; + goto unlock; + } + + ret = epoll_ctl (event_pool->fd, EPOLL_CTL_DEL, fd, NULL); + + /* if ret is -1, this array member should never be accessed */ + /* if it is 0, the array member might be used by idx_cache + * in which case the member should not be accessed till + * it is reallocated + */ + + event_pool->reg[idx].fd = -1; + + if (ret == -1) { + gf_log ("epoll", GF_LOG_ERROR, + "fail to del fd(=%d) from epoll fd(=%d) (%s)", + fd, event_pool->fd, strerror (errno)); + goto unlock; + } + + lastidx = event_pool->used - 1; + if (lastidx == idx) { + event_pool->used--; + goto unlock; + } + + epoll_event.events = event_pool->reg[lastidx].events; + ev_data->fd = event_pool->reg[lastidx].fd; + ev_data->idx = idx; + + ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, ev_data->fd, + &epoll_event); + if (ret == -1) { + gf_log ("epoll", GF_LOG_ERROR, + "fail to modify fd(=%d) index %d to %d (%s)", + ev_data->fd, event_pool->used, idx, + strerror (errno)); + goto unlock; + } + + /* just replace the unregistered idx by last one */ + event_pool->reg[idx] = event_pool->reg[lastidx]; + event_pool->used--; + } +unlock: + pthread_mutex_unlock (&event_pool->mutex); + +out: + return ret; +} + + +static int +event_select_on_epoll (struct event_pool *event_pool, int fd, int idx_hint, + int poll_in, int poll_out) +{ + int idx = -1; + int ret = -1; + + struct epoll_event epoll_event = {0, }; + struct event_data *ev_data = (void *)&epoll_event.data; + + + GF_VALIDATE_OR_GOTO ("event", event_pool, out); + + pthread_mutex_lock (&event_pool->mutex); + { + idx = __event_getindex (event_pool, fd, idx_hint); + + if (idx == -1) { + gf_log ("epoll", GF_LOG_ERROR, + "index not found for fd=%d (idx_hint=%d)", + fd, idx_hint); + errno = ENOENT; + goto unlock; + } + + switch (poll_in) { + case 1: + event_pool->reg[idx].events |= EPOLLIN; + break; + case 0: + event_pool->reg[idx].events &= ~EPOLLIN; + break; + case -1: + /* do nothing */ + break; + default: + gf_log ("epoll", GF_LOG_ERROR, + "invalid poll_in value %d", poll_in); + break; + } + + switch (poll_out) { + case 1: + event_pool->reg[idx].events |= EPOLLOUT; + break; + case 0: + event_pool->reg[idx].events &= ~EPOLLOUT; + break; + case -1: + /* do nothing */ + break; + default: + gf_log ("epoll", GF_LOG_ERROR, + "invalid poll_out value %d", poll_out); + break; + } + + epoll_event.events = event_pool->reg[idx].events; + ev_data->fd = fd; + ev_data->idx = idx; + + ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, fd, + &epoll_event); + if (ret == -1) { + gf_log ("epoll", GF_LOG_ERROR, + "failed to modify fd(=%d) events to %d", + fd, epoll_event.events); + } + } +unlock: + pthread_mutex_unlock (&event_pool->mutex); + +out: + return ret; +} + + +static int +event_dispatch_epoll_handler (struct event_pool *event_pool, + struct epoll_event *events, int i) +{ + struct event_data *event_data = NULL; + event_handler_t handler = NULL; + void *data = NULL; + int idx = -1; + int ret = -1; + + + event_data = (void *)&events[i].data; + handler = NULL; + data = NULL; + + pthread_mutex_lock (&event_pool->mutex); + { + idx = __event_getindex (event_pool, event_data->fd, + event_data->idx); + + if (idx == -1) { + gf_log ("epoll", GF_LOG_ERROR, + "index not found for fd(=%d) (idx_hint=%d)", + event_data->fd, event_data->idx); + goto unlock; + } + + handler = event_pool->reg[idx].handler; + data = event_pool->reg[idx].data; + } +unlock: + pthread_mutex_unlock (&event_pool->mutex); + + if (handler) + ret = handler (event_data->fd, event_data->idx, data, + (events[i].events & (EPOLLIN|EPOLLPRI)), + (events[i].events & (EPOLLOUT)), + (events[i].events & (EPOLLERR|EPOLLHUP))); + return ret; +} + + +static int +event_dispatch_epoll (struct event_pool *event_pool) +{ + struct epoll_event *events = NULL; + int size = 0; + int i = 0; + int ret = -1; + + GF_VALIDATE_OR_GOTO ("event", event_pool, out); + + while (1) { + pthread_mutex_lock (&event_pool->mutex); + { + while (event_pool->used == 0) + pthread_cond_wait (&event_pool->cond, + &event_pool->mutex); + + if (event_pool->used > event_pool->evcache_size) { + GF_FREE (event_pool->evcache); + + event_pool->evcache = events = NULL; + + event_pool->evcache_size = + event_pool->used + 256; + + events = GF_CALLOC (event_pool->evcache_size, + sizeof (struct epoll_event), + gf_common_mt_epoll_event); + if (!events) + break; + + event_pool->evcache = events; + } + } + pthread_mutex_unlock (&event_pool->mutex); + + ret = epoll_wait (event_pool->fd, event_pool->evcache, + event_pool->evcache_size, -1); + + if (ret == 0) + /* timeout */ + continue; + + if (ret == -1 && errno == EINTR) + /* sys call */ + continue; + + size = ret; + + for (i = 0; i < size; i++) { + if (!events || !events[i].events) + continue; + + ret = event_dispatch_epoll_handler (event_pool, + events, i); + } + } + +out: + return ret; +} + + +struct event_ops event_ops_epoll = { + .new = event_pool_new_epoll, + .event_register = event_register_epoll, + .event_select_on = event_select_on_epoll, + .event_unregister = event_unregister_epoll, + .event_dispatch = event_dispatch_epoll +}; + +#endif diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c new file mode 100644 index 00000000000..7f7f560d086 --- /dev/null +++ b/libglusterfs/src/event-poll.c @@ -0,0 +1,451 @@ +/* + Copyright (c) 2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <sys/poll.h> +#include <pthread.h> +#include <unistd.h> +#include <fcntl.h> +#include <stdlib.h> +#include <errno.h> +#include <string.h> + +#include "logging.h" +#include "event.h" +#include "mem-pool.h" +#include "common-utils.h" + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +static int +event_register_poll (struct event_pool *event_pool, int fd, + event_handler_t handler, + void *data, int poll_in, int poll_out); + + +static int +__flush_fd (int fd, int idx, void *data, + int poll_in, int poll_out, int poll_err) +{ + char buf[64]; + int ret = -1; + + if (!poll_in) + return ret; + + do { + ret = read (fd, buf, 64); + if (ret == -1 && errno != EAGAIN) { + gf_log ("poll", GF_LOG_ERROR, + "read on %d returned error (%s)", + fd, strerror (errno)); + } + } while (ret == 64); + + return ret; +} + + +static int +__event_getindex (struct event_pool *event_pool, int fd, int idx) +{ + int ret = -1; + int i = 0; + + GF_VALIDATE_OR_GOTO ("event", event_pool, out); + + if (idx > -1 && idx < event_pool->used) { + if (event_pool->reg[idx].fd == fd) + ret = idx; + } + + for (i=0; ret == -1 && i<event_pool->used; i++) { + if (event_pool->reg[i].fd == fd) { + ret = i; + break; + } + } + +out: + return ret; +} + + +static struct event_pool * +event_pool_new_poll (int count) +{ + struct event_pool *event_pool = NULL; + int ret = -1; + + event_pool = GF_CALLOC (1, sizeof (*event_pool), + gf_common_mt_event_pool); + + if (!event_pool) + return NULL; + + event_pool->count = count; + event_pool->reg = GF_CALLOC (event_pool->count, + sizeof (*event_pool->reg), + gf_common_mt_reg); + + if (!event_pool->reg) { + GF_FREE (event_pool); + return NULL; + } + + pthread_mutex_init (&event_pool->mutex, NULL); + + ret = pipe (event_pool->breaker); + + if (ret == -1) { + gf_log ("poll", GF_LOG_ERROR, + "pipe creation failed (%s)", strerror (errno)); + GF_FREE (event_pool->reg); + GF_FREE (event_pool); + return NULL; + } + + ret = fcntl (event_pool->breaker[0], F_SETFL, O_NONBLOCK); + if (ret == -1) { + gf_log ("poll", GF_LOG_ERROR, + "could not set pipe to non blocking mode (%s)", + strerror (errno)); + close (event_pool->breaker[0]); + close (event_pool->breaker[1]); + event_pool->breaker[0] = event_pool->breaker[1] = -1; + + GF_FREE (event_pool->reg); + GF_FREE (event_pool); + return NULL; + } + + ret = fcntl (event_pool->breaker[1], F_SETFL, O_NONBLOCK); + if (ret == -1) { + gf_log ("poll", GF_LOG_ERROR, + "could not set pipe to non blocking mode (%s)", + strerror (errno)); + + close (event_pool->breaker[0]); + close (event_pool->breaker[1]); + event_pool->breaker[0] = event_pool->breaker[1] = -1; + + GF_FREE (event_pool->reg); + GF_FREE (event_pool); + return NULL; + } + + ret = event_register_poll (event_pool, event_pool->breaker[0], + __flush_fd, NULL, 1, 0); + if (ret == -1) { + gf_log ("poll", GF_LOG_ERROR, + "could not register pipe fd with poll event loop"); + close (event_pool->breaker[0]); + close (event_pool->breaker[1]); + event_pool->breaker[0] = event_pool->breaker[1] = -1; + + GF_FREE (event_pool->reg); + GF_FREE (event_pool); + return NULL; + } + + return event_pool; +} + + +static int +event_register_poll (struct event_pool *event_pool, int fd, + event_handler_t handler, + void *data, int poll_in, int poll_out) +{ + int idx = -1; + + GF_VALIDATE_OR_GOTO ("event", event_pool, out); + + pthread_mutex_lock (&event_pool->mutex); + { + if (event_pool->count == event_pool->used) + { + event_pool->count += 256; + + event_pool->reg = GF_REALLOC (event_pool->reg, + event_pool->count * + sizeof (*event_pool->reg)); + if (!event_pool->reg) + goto unlock; + } + + idx = event_pool->used++; + + event_pool->reg[idx].fd = fd; + event_pool->reg[idx].events = POLLPRI; + event_pool->reg[idx].handler = handler; + event_pool->reg[idx].data = data; + + switch (poll_in) { + case 1: + event_pool->reg[idx].events |= POLLIN; + break; + case 0: + event_pool->reg[idx].events &= ~POLLIN; + break; + case -1: + /* do nothing */ + break; + default: + gf_log ("poll", GF_LOG_ERROR, + "invalid poll_in value %d", poll_in); + break; + } + + switch (poll_out) { + case 1: + event_pool->reg[idx].events |= POLLOUT; + break; + case 0: + event_pool->reg[idx].events &= ~POLLOUT; + break; + case -1: + /* do nothing */ + break; + default: + gf_log ("poll", GF_LOG_ERROR, + "invalid poll_out value %d", poll_out); + break; + } + + event_pool->changed = 1; + + } +unlock: + pthread_mutex_unlock (&event_pool->mutex); + +out: + return idx; +} + + +static int +event_unregister_poll (struct event_pool *event_pool, int fd, int idx_hint) +{ + int idx = -1; + + GF_VALIDATE_OR_GOTO ("event", event_pool, out); + + pthread_mutex_lock (&event_pool->mutex); + { + idx = __event_getindex (event_pool, fd, idx_hint); + + if (idx == -1) { + gf_log ("poll", GF_LOG_ERROR, + "index not found for fd=%d (idx_hint=%d)", + fd, idx_hint); + errno = ENOENT; + goto unlock; + } + + event_pool->reg[idx] = event_pool->reg[--event_pool->used]; + event_pool->changed = 1; + } +unlock: + pthread_mutex_unlock (&event_pool->mutex); + +out: + return idx; +} + + +static int +event_select_on_poll (struct event_pool *event_pool, int fd, int idx_hint, + int poll_in, int poll_out) +{ + int idx = -1; + + GF_VALIDATE_OR_GOTO ("event", event_pool, out); + + pthread_mutex_lock (&event_pool->mutex); + { + idx = __event_getindex (event_pool, fd, idx_hint); + + if (idx == -1) { + gf_log ("poll", GF_LOG_ERROR, + "index not found for fd=%d (idx_hint=%d)", + fd, idx_hint); + errno = ENOENT; + goto unlock; + } + + switch (poll_in) { + case 1: + event_pool->reg[idx].events |= POLLIN; + break; + case 0: + event_pool->reg[idx].events &= ~POLLIN; + break; + case -1: + /* do nothing */ + break; + default: + /* TODO: log error */ + break; + } + + switch (poll_out) { + case 1: + event_pool->reg[idx].events |= POLLOUT; + break; + case 0: + event_pool->reg[idx].events &= ~POLLOUT; + break; + case -1: + /* do nothing */ + break; + default: + /* TODO: log error */ + break; + } + + if (poll_in + poll_out > -2) + event_pool->changed = 1; + } +unlock: + pthread_mutex_unlock (&event_pool->mutex); + +out: + return idx; +} + + +static int +event_dispatch_poll_handler (struct event_pool *event_pool, + struct pollfd *ufds, int i) +{ + event_handler_t handler = NULL; + void *data = NULL; + int idx = -1; + int ret = 0; + + handler = NULL; + data = NULL; + + pthread_mutex_lock (&event_pool->mutex); + { + idx = __event_getindex (event_pool, ufds[i].fd, i); + + if (idx == -1) { + gf_log ("poll", GF_LOG_ERROR, + "index not found for fd=%d (idx_hint=%d)", + ufds[i].fd, i); + goto unlock; + } + + handler = event_pool->reg[idx].handler; + data = event_pool->reg[idx].data; + } +unlock: + pthread_mutex_unlock (&event_pool->mutex); + + if (handler) + ret = handler (ufds[i].fd, idx, data, + (ufds[i].revents & (POLLIN|POLLPRI)), + (ufds[i].revents & (POLLOUT)), + (ufds[i].revents & (POLLERR|POLLHUP|POLLNVAL))); + + return ret; +} + + +static int +event_dispatch_poll_resize (struct event_pool *event_pool, + struct pollfd *ufds, int size) +{ + int i = 0; + + pthread_mutex_lock (&event_pool->mutex); + { + if (event_pool->changed == 0) { + goto unlock; + } + + if (event_pool->used > event_pool->evcache_size) { + GF_FREE (event_pool->evcache); + + event_pool->evcache = ufds = NULL; + + event_pool->evcache_size = event_pool->used; + + ufds = GF_CALLOC (sizeof (struct pollfd), + event_pool->evcache_size, + gf_common_mt_pollfd); + if (!ufds) + goto unlock; + event_pool->evcache = ufds; + } + + for (i = 0; i < event_pool->used; i++) { + ufds[i].fd = event_pool->reg[i].fd; + ufds[i].events = event_pool->reg[i].events; + ufds[i].revents = 0; + } + + size = i; + } +unlock: + pthread_mutex_unlock (&event_pool->mutex); + + return size; +} + + +static int +event_dispatch_poll (struct event_pool *event_pool) +{ + struct pollfd *ufds = NULL; + int size = 0; + int i = 0; + int ret = -1; + + GF_VALIDATE_OR_GOTO ("event", event_pool, out); + + while (1) { + size = event_dispatch_poll_resize (event_pool, ufds, size); + ufds = event_pool->evcache; + + ret = poll (ufds, size, 1); + + if (ret == 0) + /* timeout */ + continue; + + if (ret == -1 && errno == EINTR) + /* sys call */ + continue; + + for (i = 0; i < size; i++) { + if (!ufds[i].revents) + continue; + + event_dispatch_poll_handler (event_pool, ufds, i); + } + } + +out: + return -1; +} + + +struct event_ops event_ops_poll = { + .new = event_pool_new_poll, + .event_register = event_register_poll, + .event_select_on = event_select_on_poll, + .event_unregister = event_unregister_poll, + .event_dispatch = event_dispatch_poll +}; diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c index 00175bda49e..0197e7948b5 100644 --- a/libglusterfs/src/event.c +++ b/libglusterfs/src/event.c @@ -26,849 +26,17 @@ #include "config.h" #endif -static int -event_register_poll (struct event_pool *event_pool, int fd, - event_handler_t handler, - void *data, int poll_in, int poll_out); - - -static int -__flush_fd (int fd, int idx, void *data, - int poll_in, int poll_out, int poll_err) -{ - char buf[64]; - int ret = -1; - - if (!poll_in) - return ret; - - do { - ret = read (fd, buf, 64); - if (ret == -1 && errno != EAGAIN) { - gf_log ("poll", GF_LOG_ERROR, - "read on %d returned error (%s)", - fd, strerror (errno)); - } - } while (ret == 64); - - return ret; -} - - -static int -__event_getindex (struct event_pool *event_pool, int fd, int idx) -{ - int ret = -1; - int i = 0; - - GF_VALIDATE_OR_GOTO ("event", event_pool, out); - - if (idx > -1 && idx < event_pool->used) { - if (event_pool->reg[idx].fd == fd) - ret = idx; - } - - for (i=0; ret == -1 && i<event_pool->used; i++) { - if (event_pool->reg[i].fd == fd) { - ret = i; - break; - } - } - -out: - return ret; -} - - -static struct event_pool * -event_pool_new_poll (int count) -{ - struct event_pool *event_pool = NULL; - int ret = -1; - - event_pool = GF_CALLOC (1, sizeof (*event_pool), - gf_common_mt_event_pool); - - if (!event_pool) - return NULL; - - event_pool->count = count; - event_pool->reg = GF_CALLOC (event_pool->count, - sizeof (*event_pool->reg), - gf_common_mt_reg); - - if (!event_pool->reg) { - GF_FREE (event_pool); - return NULL; - } - - pthread_mutex_init (&event_pool->mutex, NULL); - - ret = pipe (event_pool->breaker); - - if (ret == -1) { - gf_log ("poll", GF_LOG_ERROR, - "pipe creation failed (%s)", strerror (errno)); - GF_FREE (event_pool->reg); - GF_FREE (event_pool); - return NULL; - } - - ret = fcntl (event_pool->breaker[0], F_SETFL, O_NONBLOCK); - if (ret == -1) { - gf_log ("poll", GF_LOG_ERROR, - "could not set pipe to non blocking mode (%s)", - strerror (errno)); - close (event_pool->breaker[0]); - close (event_pool->breaker[1]); - event_pool->breaker[0] = event_pool->breaker[1] = -1; - - GF_FREE (event_pool->reg); - GF_FREE (event_pool); - return NULL; - } - - ret = fcntl (event_pool->breaker[1], F_SETFL, O_NONBLOCK); - if (ret == -1) { - gf_log ("poll", GF_LOG_ERROR, - "could not set pipe to non blocking mode (%s)", - strerror (errno)); - - close (event_pool->breaker[0]); - close (event_pool->breaker[1]); - event_pool->breaker[0] = event_pool->breaker[1] = -1; - - GF_FREE (event_pool->reg); - GF_FREE (event_pool); - return NULL; - } - - ret = event_register_poll (event_pool, event_pool->breaker[0], - __flush_fd, NULL, 1, 0); - if (ret == -1) { - gf_log ("poll", GF_LOG_ERROR, - "could not register pipe fd with poll event loop"); - close (event_pool->breaker[0]); - close (event_pool->breaker[1]); - event_pool->breaker[0] = event_pool->breaker[1] = -1; - - GF_FREE (event_pool->reg); - GF_FREE (event_pool); - return NULL; - } - - return event_pool; -} - - -static int -event_register_poll (struct event_pool *event_pool, int fd, - event_handler_t handler, - void *data, int poll_in, int poll_out) -{ - int idx = -1; - - GF_VALIDATE_OR_GOTO ("event", event_pool, out); - - pthread_mutex_lock (&event_pool->mutex); - { - if (event_pool->count == event_pool->used) - { - event_pool->count += 256; - - event_pool->reg = GF_REALLOC (event_pool->reg, - event_pool->count * - sizeof (*event_pool->reg)); - if (!event_pool->reg) - goto unlock; - } - - idx = event_pool->used++; - - event_pool->reg[idx].fd = fd; - event_pool->reg[idx].events = POLLPRI; - event_pool->reg[idx].handler = handler; - event_pool->reg[idx].data = data; - - switch (poll_in) { - case 1: - event_pool->reg[idx].events |= POLLIN; - break; - case 0: - event_pool->reg[idx].events &= ~POLLIN; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("poll", GF_LOG_ERROR, - "invalid poll_in value %d", poll_in); - break; - } - - switch (poll_out) { - case 1: - event_pool->reg[idx].events |= POLLOUT; - break; - case 0: - event_pool->reg[idx].events &= ~POLLOUT; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("poll", GF_LOG_ERROR, - "invalid poll_out value %d", poll_out); - break; - } - - event_pool->changed = 1; - - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - -out: - return idx; -} - - -static int -event_unregister_poll (struct event_pool *event_pool, int fd, int idx_hint) -{ - int idx = -1; - - GF_VALIDATE_OR_GOTO ("event", event_pool, out); - - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, fd, idx_hint); - - if (idx == -1) { - gf_log ("poll", GF_LOG_ERROR, - "index not found for fd=%d (idx_hint=%d)", - fd, idx_hint); - errno = ENOENT; - goto unlock; - } - - event_pool->reg[idx] = event_pool->reg[--event_pool->used]; - event_pool->changed = 1; - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - -out: - return idx; -} - - -static int -event_select_on_poll (struct event_pool *event_pool, int fd, int idx_hint, - int poll_in, int poll_out) -{ - int idx = -1; - - GF_VALIDATE_OR_GOTO ("event", event_pool, out); - - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, fd, idx_hint); - - if (idx == -1) { - gf_log ("poll", GF_LOG_ERROR, - "index not found for fd=%d (idx_hint=%d)", - fd, idx_hint); - errno = ENOENT; - goto unlock; - } - - switch (poll_in) { - case 1: - event_pool->reg[idx].events |= POLLIN; - break; - case 0: - event_pool->reg[idx].events &= ~POLLIN; - break; - case -1: - /* do nothing */ - break; - default: - /* TODO: log error */ - break; - } - - switch (poll_out) { - case 1: - event_pool->reg[idx].events |= POLLOUT; - break; - case 0: - event_pool->reg[idx].events &= ~POLLOUT; - break; - case -1: - /* do nothing */ - break; - default: - /* TODO: log error */ - break; - } - - if (poll_in + poll_out > -2) - event_pool->changed = 1; - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - -out: - return idx; -} - - -static int -event_dispatch_poll_handler (struct event_pool *event_pool, - struct pollfd *ufds, int i) -{ - event_handler_t handler = NULL; - void *data = NULL; - int idx = -1; - int ret = 0; - - handler = NULL; - data = NULL; - - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, ufds[i].fd, i); - - if (idx == -1) { - gf_log ("poll", GF_LOG_ERROR, - "index not found for fd=%d (idx_hint=%d)", - ufds[i].fd, i); - goto unlock; - } - - handler = event_pool->reg[idx].handler; - data = event_pool->reg[idx].data; - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - - if (handler) - ret = handler (ufds[i].fd, idx, data, - (ufds[i].revents & (POLLIN|POLLPRI)), - (ufds[i].revents & (POLLOUT)), - (ufds[i].revents & (POLLERR|POLLHUP|POLLNVAL))); - - return ret; -} - - -static int -event_dispatch_poll_resize (struct event_pool *event_pool, - struct pollfd *ufds, int size) -{ - int i = 0; - - pthread_mutex_lock (&event_pool->mutex); - { - if (event_pool->changed == 0) { - goto unlock; - } - - if (event_pool->used > event_pool->evcache_size) { - GF_FREE (event_pool->evcache); - - event_pool->evcache = ufds = NULL; - - event_pool->evcache_size = event_pool->used; - - ufds = GF_CALLOC (sizeof (struct pollfd), - event_pool->evcache_size, - gf_common_mt_pollfd); - if (!ufds) - goto unlock; - event_pool->evcache = ufds; - } - - for (i = 0; i < event_pool->used; i++) { - ufds[i].fd = event_pool->reg[i].fd; - ufds[i].events = event_pool->reg[i].events; - ufds[i].revents = 0; - } - - size = i; - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - - return size; -} - - -static int -event_dispatch_poll (struct event_pool *event_pool) -{ - struct pollfd *ufds = NULL; - int size = 0; - int i = 0; - int ret = -1; - - GF_VALIDATE_OR_GOTO ("event", event_pool, out); - - while (1) { - size = event_dispatch_poll_resize (event_pool, ufds, size); - ufds = event_pool->evcache; - - ret = poll (ufds, size, 1); - - if (ret == 0) - /* timeout */ - continue; - - if (ret == -1 && errno == EINTR) - /* sys call */ - continue; - - for (i = 0; i < size; i++) { - if (!ufds[i].revents) - continue; - - event_dispatch_poll_handler (event_pool, ufds, i); - } - } - -out: - return -1; -} - - -static struct event_ops event_ops_poll = { - .new = event_pool_new_poll, - .event_register = event_register_poll, - .event_select_on = event_select_on_poll, - .event_unregister = event_unregister_poll, - .event_dispatch = event_dispatch_poll -}; - - - -#ifdef HAVE_SYS_EPOLL_H -#include <sys/epoll.h> - - -static struct event_pool * -event_pool_new_epoll (int count) -{ - struct event_pool *event_pool = NULL; - int epfd = -1; - - event_pool = GF_CALLOC (1, sizeof (*event_pool), - gf_common_mt_event_pool); - - if (!event_pool) - goto out; - - event_pool->count = count; - event_pool->reg = GF_CALLOC (event_pool->count, - sizeof (*event_pool->reg), - gf_common_mt_reg); - - if (!event_pool->reg) { - GF_FREE (event_pool); - event_pool = NULL; - goto out; - } - - epfd = epoll_create (count); - - if (epfd == -1) { - gf_log ("epoll", GF_LOG_ERROR, "epoll fd creation failed (%s)", - strerror (errno)); - GF_FREE (event_pool->reg); - GF_FREE (event_pool); - event_pool = NULL; - goto out; - } - - event_pool->fd = epfd; - - event_pool->count = count; - - pthread_mutex_init (&event_pool->mutex, NULL); - pthread_cond_init (&event_pool->cond, NULL); - -out: - return event_pool; -} - - -int -event_register_epoll (struct event_pool *event_pool, int fd, - event_handler_t handler, - void *data, int poll_in, int poll_out) -{ - int idx = -1; - int ret = -1; - struct epoll_event epoll_event = {0, }; - struct event_data *ev_data = (void *)&epoll_event.data; - - - GF_VALIDATE_OR_GOTO ("event", event_pool, out); - - pthread_mutex_lock (&event_pool->mutex); - { - if (event_pool->count == event_pool->used) { - event_pool->count *= 2; - - event_pool->reg = GF_REALLOC (event_pool->reg, - event_pool->count * - sizeof (*event_pool->reg)); - - if (!event_pool->reg) { - gf_log ("epoll", GF_LOG_ERROR, - "event registry re-allocation failed"); - goto unlock; - } - } - - idx = event_pool->used; - event_pool->used++; - - event_pool->reg[idx].fd = fd; - event_pool->reg[idx].events = EPOLLPRI; - event_pool->reg[idx].handler = handler; - event_pool->reg[idx].data = data; - - switch (poll_in) { - case 1: - event_pool->reg[idx].events |= EPOLLIN; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLIN; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_in value %d", poll_in); - break; - } - - switch (poll_out) { - case 1: - event_pool->reg[idx].events |= EPOLLOUT; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLOUT; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_out value %d", poll_out); - break; - } - - event_pool->changed = 1; - - epoll_event.events = event_pool->reg[idx].events; - ev_data->fd = fd; - ev_data->idx = idx; - - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_ADD, fd, - &epoll_event); - - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "failed to add fd(=%d) to epoll fd(=%d) (%s)", - fd, event_pool->fd, strerror (errno)); - goto unlock; - } - - pthread_cond_broadcast (&event_pool->cond); - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - -out: - return ret; -} - - -static int -event_unregister_epoll (struct event_pool *event_pool, int fd, int idx_hint) -{ - int idx = -1; - int ret = -1; - - struct epoll_event epoll_event = {0, }; - struct event_data *ev_data = (void *)&epoll_event.data; - int lastidx = -1; - - GF_VALIDATE_OR_GOTO ("event", event_pool, out); - - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, fd, idx_hint); - - if (idx == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "index not found for fd=%d (idx_hint=%d)", - fd, idx_hint); - errno = ENOENT; - goto unlock; - } - - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_DEL, fd, NULL); - - /* if ret is -1, this array member should never be accessed */ - /* if it is 0, the array member might be used by idx_cache - * in which case the member should not be accessed till - * it is reallocated - */ - - event_pool->reg[idx].fd = -1; - - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "fail to del fd(=%d) from epoll fd(=%d) (%s)", - fd, event_pool->fd, strerror (errno)); - goto unlock; - } - - lastidx = event_pool->used - 1; - if (lastidx == idx) { - event_pool->used--; - goto unlock; - } - - epoll_event.events = event_pool->reg[lastidx].events; - ev_data->fd = event_pool->reg[lastidx].fd; - ev_data->idx = idx; - - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, ev_data->fd, - &epoll_event); - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "fail to modify fd(=%d) index %d to %d (%s)", - ev_data->fd, event_pool->used, idx, - strerror (errno)); - goto unlock; - } - - /* just replace the unregistered idx by last one */ - event_pool->reg[idx] = event_pool->reg[lastidx]; - event_pool->used--; - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - -out: - return ret; -} - - -static int -event_select_on_epoll (struct event_pool *event_pool, int fd, int idx_hint, - int poll_in, int poll_out) -{ - int idx = -1; - int ret = -1; - - struct epoll_event epoll_event = {0, }; - struct event_data *ev_data = (void *)&epoll_event.data; - - - GF_VALIDATE_OR_GOTO ("event", event_pool, out); - - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, fd, idx_hint); - - if (idx == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "index not found for fd=%d (idx_hint=%d)", - fd, idx_hint); - errno = ENOENT; - goto unlock; - } - - switch (poll_in) { - case 1: - event_pool->reg[idx].events |= EPOLLIN; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLIN; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_in value %d", poll_in); - break; - } - - switch (poll_out) { - case 1: - event_pool->reg[idx].events |= EPOLLOUT; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLOUT; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_out value %d", poll_out); - break; - } - - epoll_event.events = event_pool->reg[idx].events; - ev_data->fd = fd; - ev_data->idx = idx; - - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, fd, - &epoll_event); - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "failed to modify fd(=%d) events to %d", - fd, epoll_event.events); - } - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - -out: - return ret; -} - - -static int -event_dispatch_epoll_handler (struct event_pool *event_pool, - struct epoll_event *events, int i) -{ - struct event_data *event_data = NULL; - event_handler_t handler = NULL; - void *data = NULL; - int idx = -1; - int ret = -1; - - - event_data = (void *)&events[i].data; - handler = NULL; - data = NULL; - - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, event_data->fd, - event_data->idx); - - if (idx == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "index not found for fd(=%d) (idx_hint=%d)", - event_data->fd, event_data->idx); - goto unlock; - } - - handler = event_pool->reg[idx].handler; - data = event_pool->reg[idx].data; - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - - if (handler) - ret = handler (event_data->fd, event_data->idx, data, - (events[i].events & (EPOLLIN|EPOLLPRI)), - (events[i].events & (EPOLLOUT)), - (events[i].events & (EPOLLERR|EPOLLHUP))); - return ret; -} - - -static int -event_dispatch_epoll (struct event_pool *event_pool) -{ - struct epoll_event *events = NULL; - int size = 0; - int i = 0; - int ret = -1; - - GF_VALIDATE_OR_GOTO ("event", event_pool, out); - - while (1) { - pthread_mutex_lock (&event_pool->mutex); - { - while (event_pool->used == 0) - pthread_cond_wait (&event_pool->cond, - &event_pool->mutex); - - if (event_pool->used > event_pool->evcache_size) { - GF_FREE (event_pool->evcache); - - event_pool->evcache = events = NULL; - - event_pool->evcache_size = - event_pool->used + 256; - - events = GF_CALLOC (event_pool->evcache_size, - sizeof (struct epoll_event), - gf_common_mt_epoll_event); - if (!events) - break; - - event_pool->evcache = events; - } - } - pthread_mutex_unlock (&event_pool->mutex); - - ret = epoll_wait (event_pool->fd, event_pool->evcache, - event_pool->evcache_size, -1); - - if (ret == 0) - /* timeout */ - continue; - - if (ret == -1 && errno == EINTR) - /* sys call */ - continue; - - size = ret; - - for (i = 0; i < size; i++) { - if (!events || !events[i].events) - continue; - - ret = event_dispatch_epoll_handler (event_pool, - events, i); - } - } - -out: - return ret; -} - - -static struct event_ops event_ops_epoll = { - .new = event_pool_new_epoll, - .event_register = event_register_epoll, - .event_select_on = event_select_on_epoll, - .event_unregister = event_unregister_epoll, - .event_dispatch = event_dispatch_epoll -}; - -#endif struct event_pool * event_pool_new (int count) { struct event_pool *event_pool = NULL; + extern struct event_ops event_ops_poll; #ifdef HAVE_SYS_EPOLL_H + extern struct event_ops event_ops_epoll; + event_pool = event_ops_epoll.new (count); if (event_pool) { |