diff options
Diffstat (limited to 'libglusterfs/src/event-poll.c')
| -rw-r--r-- | libglusterfs/src/event-poll.c | 513 |
1 files changed, 513 insertions, 0 deletions
diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c new file mode 100644 index 00000000000..2cba963f096 --- /dev/null +++ b/libglusterfs/src/event-poll.c @@ -0,0 +1,513 @@ +/* + Copyright (c) 2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <sys/poll.h> +#include <pthread.h> +#include <unistd.h> +#include <fcntl.h> +#include <stdlib.h> +#include <errno.h> +#include <string.h> + +#include "glusterfs/logging.h" +#include "glusterfs/gf-event.h" +#include "glusterfs/mem-pool.h" +#include "glusterfs/common-utils.h" +#include "glusterfs/syscall.h" +#include "glusterfs/libglusterfs-messages.h" + +struct event_slot_poll { + int fd; + int events; + void *data; + event_handler_t handler; +}; + +static int +event_register_poll(struct event_pool *event_pool, int fd, + event_handler_t handler, void *data, int poll_in, + int poll_out, char notify_poller_death); + +static void +__flush_fd(int fd, int idx, int gen, void *data, int poll_in, int poll_out, + int poll_err, char event_thread_died) +{ + char buf[64]; + int ret = -1; + + if (!poll_in) + return; + + do { + ret = sys_read(fd, buf, 64); + if (ret == -1 && errno != EAGAIN) { + gf_smsg("poll", GF_LOG_ERROR, errno, LG_MSG_READ_FILE_FAILED, + "fd=%d", fd, NULL); + } + } while (ret == 64); + + return; +} + +static int +__event_getindex(struct event_pool *event_pool, int fd, int idx) +{ + int ret = -1; + int i = 0; + + GF_VALIDATE_OR_GOTO("event", event_pool, out); + + /* lookup in used space based on index provided */ + if (idx > -1 && idx < event_pool->used) { + if (event_pool->reg[idx].fd == fd) { + ret = idx; + goto out; + } + } + + /* search in used space, if lookup fails */ + for (i = 0; i < event_pool->used; i++) { + if (event_pool->reg[i].fd == fd) { + ret = i; + break; + } + } + +out: + return ret; +} + +static struct event_pool * +event_pool_new_poll(int count, int eventthreadcount) +{ + struct event_pool *event_pool = NULL; + int ret = -1; + + event_pool = GF_CALLOC(1, sizeof(*event_pool), gf_common_mt_event_pool); + + if (!event_pool) + return NULL; + + event_pool->count = count; + event_pool->reg = GF_CALLOC(event_pool->count, sizeof(*event_pool->reg), + gf_common_mt_reg); + + if (!event_pool->reg) { + GF_FREE(event_pool); + return NULL; + } + + pthread_mutex_init(&event_pool->mutex, NULL); + + ret = pipe(event_pool->breaker); + + if (ret == -1) { + gf_smsg("poll", GF_LOG_ERROR, errno, LG_MSG_PIPE_CREATE_FAILED, NULL); + GF_FREE(event_pool->reg); + GF_FREE(event_pool); + return NULL; + } + + ret = fcntl(event_pool->breaker[0], F_SETFL, O_NONBLOCK); + if (ret == -1) { + gf_smsg("poll", GF_LOG_ERROR, errno, LG_MSG_SET_PIPE_FAILED, NULL); + sys_close(event_pool->breaker[0]); + sys_close(event_pool->breaker[1]); + event_pool->breaker[0] = event_pool->breaker[1] = -1; + + GF_FREE(event_pool->reg); + GF_FREE(event_pool); + return NULL; + } + + ret = fcntl(event_pool->breaker[1], F_SETFL, O_NONBLOCK); + if (ret == -1) { + gf_smsg("poll", GF_LOG_ERROR, errno, LG_MSG_SET_PIPE_FAILED, NULL); + + sys_close(event_pool->breaker[0]); + sys_close(event_pool->breaker[1]); + event_pool->breaker[0] = event_pool->breaker[1] = -1; + + GF_FREE(event_pool->reg); + GF_FREE(event_pool); + return NULL; + } + + ret = event_register_poll(event_pool, event_pool->breaker[0], __flush_fd, + NULL, 1, 0, 0); + if (ret == -1) { + gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_REGISTER_PIPE_FAILED, NULL); + sys_close(event_pool->breaker[0]); + sys_close(event_pool->breaker[1]); + event_pool->breaker[0] = event_pool->breaker[1] = -1; + + GF_FREE(event_pool->reg); + GF_FREE(event_pool); + return NULL; + } + + if (eventthreadcount > 1) { + gf_smsg("poll", GF_LOG_INFO, 0, LG_MSG_POLL_IGNORE_MULTIPLE_THREADS, + "count=%d", eventthreadcount, NULL); + } + + /* although, eventhreadcount for poll implementation is always + * going to be 1, eventthreadcount needs to be set to 1 so that + * rpcsvc_request_handler() thread scaling works flawlessly in + * both epoll and poll models + */ + event_pool->eventthreadcount = 1; + + return event_pool; +} + +static int +event_register_poll(struct event_pool *event_pool, int fd, + event_handler_t handler, void *data, int poll_in, + int poll_out, char notify_poller_death) +{ + int idx = -1; + + GF_VALIDATE_OR_GOTO("event", event_pool, out); + + pthread_mutex_lock(&event_pool->mutex); + { + if (event_pool->count == event_pool->used) { + event_pool->count += 256; + + event_pool->reg = GF_REALLOC( + event_pool->reg, event_pool->count * sizeof(*event_pool->reg)); + if (!event_pool->reg) + goto unlock; + } + + idx = event_pool->used++; + + event_pool->reg[idx].fd = fd; + event_pool->reg[idx].events = POLLPRI; + event_pool->reg[idx].handler = handler; + event_pool->reg[idx].data = data; + + switch (poll_in) { + case 1: + event_pool->reg[idx].events |= POLLIN; + break; + case 0: + event_pool->reg[idx].events &= ~POLLIN; + break; + case -1: + /* do nothing */ + break; + default: + gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_IN, + "value=%d", poll_in, NULL); + break; + } + + switch (poll_out) { + case 1: + event_pool->reg[idx].events |= POLLOUT; + break; + case 0: + event_pool->reg[idx].events &= ~POLLOUT; + break; + case -1: + /* do nothing */ + break; + default: + gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_OUT, + "value=%d", poll_out, NULL); + break; + } + + event_pool->changed = 1; + } +unlock: + pthread_mutex_unlock(&event_pool->mutex); + +out: + return idx; +} + +static int +event_unregister_poll(struct event_pool *event_pool, int fd, int idx_hint) +{ + int idx = -1; + + GF_VALIDATE_OR_GOTO("event", event_pool, out); + + pthread_mutex_lock(&event_pool->mutex); + { + idx = __event_getindex(event_pool, fd, idx_hint); + + if (idx == -1) { + gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND, "fd=%d", + fd, "idx_hint=%d", idx_hint, NULL); + errno = ENOENT; + goto unlock; + } + + event_pool->reg[idx] = event_pool->reg[--event_pool->used]; + event_pool->changed = 1; + } +unlock: + pthread_mutex_unlock(&event_pool->mutex); + +out: + return idx; +} + +static int +event_unregister_close_poll(struct event_pool *event_pool, int fd, int idx_hint) +{ + int ret = -1; + + ret = event_unregister_poll(event_pool, fd, idx_hint); + + sys_close(fd); + + return ret; +} + +static int +event_select_on_poll(struct event_pool *event_pool, int fd, int idx_hint, + int poll_in, int poll_out) +{ + int idx = -1; + + GF_VALIDATE_OR_GOTO("event", event_pool, out); + + pthread_mutex_lock(&event_pool->mutex); + { + idx = __event_getindex(event_pool, fd, idx_hint); + + if (idx == -1) { + gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND, "fd=%d", + fd, "idx_hint=%d", idx_hint, NULL); + errno = ENOENT; + goto unlock; + } + + switch (poll_in) { + case 1: + event_pool->reg[idx].events |= POLLIN; + break; + case 0: + event_pool->reg[idx].events &= ~POLLIN; + break; + case -1: + /* do nothing */ + break; + default: + /* TODO: log error */ + break; + } + + switch (poll_out) { + case 1: + event_pool->reg[idx].events |= POLLOUT; + break; + case 0: + event_pool->reg[idx].events &= ~POLLOUT; + break; + case -1: + /* do nothing */ + break; + default: + /* TODO: log error */ + break; + } + + if (poll_in + poll_out > -2) + event_pool->changed = 1; + } +unlock: + pthread_mutex_unlock(&event_pool->mutex); + +out: + return idx; +} + +static int +event_dispatch_poll_handler(struct event_pool *event_pool, struct pollfd *ufds, + int i) +{ + event_handler_t handler = NULL; + void *data = NULL; + int idx = -1; + int ret = 0; + + handler = NULL; + data = NULL; + + pthread_mutex_lock(&event_pool->mutex); + { + idx = __event_getindex(event_pool, ufds[i].fd, i); + + if (idx == -1) { + gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND, "fd=%d", + ufds[i].fd, "idx_hint=%d", i, NULL); + goto unlock; + } + + handler = event_pool->reg[idx].handler; + data = event_pool->reg[idx].data; + } +unlock: + pthread_mutex_unlock(&event_pool->mutex); + + if (handler) + handler(ufds[i].fd, idx, 0, data, + (ufds[i].revents & (POLLIN | POLLPRI)), + (ufds[i].revents & (POLLOUT)), + (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL)), 0); + + return ret; +} + +static int +event_dispatch_poll_resize(struct event_pool *event_pool, struct pollfd *ufds, + int size) +{ + int i = 0; + + pthread_mutex_lock(&event_pool->mutex); + { + if (event_pool->changed == 0) { + goto unlock; + } + + if (event_pool->used > event_pool->evcache_size) { + GF_FREE(event_pool->evcache); + + event_pool->evcache = ufds = NULL; + + event_pool->evcache_size = event_pool->used; + + ufds = GF_CALLOC(sizeof(struct pollfd), event_pool->evcache_size, + gf_common_mt_pollfd); + if (!ufds) + goto unlock; + event_pool->evcache = ufds; + } + + if (ufds == NULL) { + goto unlock; + } + + for (i = 0; i < event_pool->used; i++) { + ufds[i].fd = event_pool->reg[i].fd; + ufds[i].events = event_pool->reg[i].events; + ufds[i].revents = 0; + } + + size = i; + } +unlock: + pthread_mutex_unlock(&event_pool->mutex); + + return size; +} + +static int +event_dispatch_poll(struct event_pool *event_pool) +{ + struct pollfd *ufds = NULL; + int size = 0; + int i = 0; + int ret = -1; + + GF_VALIDATE_OR_GOTO("event", event_pool, out); + + pthread_mutex_lock(&event_pool->mutex); + { + event_pool->activethreadcount = 1; + } + pthread_mutex_unlock(&event_pool->mutex); + + while (1) { + pthread_mutex_lock(&event_pool->mutex); + { + if (event_pool->destroy == 1) { + event_pool->activethreadcount = 0; + pthread_cond_broadcast(&event_pool->cond); + pthread_mutex_unlock(&event_pool->mutex); + return 0; + } + } + pthread_mutex_unlock(&event_pool->mutex); + + size = event_dispatch_poll_resize(event_pool, ufds, size); + ufds = event_pool->evcache; + + ret = poll(ufds, size, 1); + + if (ret == 0) + /* timeout */ + continue; + + if (ret == -1 && errno == EINTR) + /* sys call */ + continue; + + for (i = 0; i < size; i++) { + if (!ufds[i].revents) + continue; + + event_dispatch_poll_handler(event_pool, ufds, i); + } + } + +out: + return -1; +} + +int +event_reconfigure_threads_poll(struct event_pool *event_pool, int value) +{ + /* No-op for poll */ + + return 0; +} + +/* This function is the destructor for the event_pool data structure + * Should be called only after poller_threads_destroy() is called, + * else will lead to crashes. + */ +static int +event_pool_destroy_poll(struct event_pool *event_pool) +{ + int ret = 0; + + ret = sys_close(event_pool->breaker[0]); + if (ret) + return ret; + + ret = sys_close(event_pool->breaker[1]); + if (ret) + return ret; + + event_pool->breaker[0] = event_pool->breaker[1] = -1; + + GF_FREE(event_pool->reg); + GF_FREE(event_pool); + + return ret; +} + +struct event_ops event_ops_poll = { + .new = event_pool_new_poll, + .event_register = event_register_poll, + .event_select_on = event_select_on_poll, + .event_unregister = event_unregister_poll, + .event_unregister_close = event_unregister_close_poll, + .event_dispatch = event_dispatch_poll, + .event_reconfigure_threads = event_reconfigure_threads_poll, + .event_pool_destroy = event_pool_destroy_poll}; |
