diff options
Diffstat (limited to 'libglusterfs')
-rw-r--r-- | libglusterfs/src/Makefile.am | 8 | ||||
-rw-r--r-- | libglusterfs/src/async.c | 723 | ||||
-rw-r--r-- | libglusterfs/src/glusterfs/async.h | 209 | ||||
-rw-r--r-- | libglusterfs/src/glusterfs/common-utils.h | 2 | ||||
-rw-r--r-- | libglusterfs/src/glusterfs/glusterfs.h | 2 | ||||
-rw-r--r-- | libglusterfs/src/glusterfs/libglusterfs-messages.h | 2 | ||||
-rw-r--r-- | libglusterfs/src/libglusterfs.sym | 5 |
7 files changed, 947 insertions, 4 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am index 970f4b74978..f030a70b0f0 100644 --- a/libglusterfs/src/Makefile.am +++ b/libglusterfs/src/Makefile.am @@ -37,7 +37,7 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \ $(CONTRIBDIR)/timer-wheel/timer-wheel.c \ $(CONTRIBDIR)/timer-wheel/find_last_bit.c default-args.c locking.c \ $(CONTRIBDIR)/xxhash/xxhash.c \ - compound-fop-utils.c throttle-tbf.c monitoring.c + compound-fop-utils.c throttle-tbf.c monitoring.c async.c nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c defaults.c nodist_libglusterfs_la_HEADERS = y.tab.h protocol-common.h @@ -69,7 +69,7 @@ libglusterfs_la_HEADERS = glusterfs/common-utils.h glusterfs/defaults.h \ glusterfs/quota-common-utils.h glusterfs/rot-buffs.h \ glusterfs/compat-uuid.h glusterfs/upcall-utils.h glusterfs/throttle-tbf.h \ glusterfs/events.h glusterfs/compound-fop-utils.h glusterfs/atomic.h \ - glusterfs/monitoring.h + glusterfs/monitoring.h glusterfs/async.h libglusterfs_ladir = $(includedir)/glusterfs @@ -79,6 +79,10 @@ noinst_HEADERS = unittest/unittest.h \ $(CONTRIBDIR)/libexecinfo/execinfo_compat.h \ $(CONTRIBDIR)/timer-wheel/timer-wheel.h \ $(CONTRIBDIR)/xxhash/xxhash.h \ + $(CONTRIBDIR)/userspace-rcu/wfcqueue.h \ + $(CONTRIBDIR)/userspace-rcu/wfstack.h \ + $(CONTRIBDIR)/userspace-rcu/static-wfcqueue.h \ + $(CONTRIBDIR)/userspace-rcu/static-wfstack.h \ tier-ctr-interface.h eventtypes.h: $(top_srcdir)/events/eventskeygen.py diff --git a/libglusterfs/src/async.c b/libglusterfs/src/async.c new file mode 100644 index 00000000000..ae7152ff7fa --- /dev/null +++ b/libglusterfs/src/async.c @@ -0,0 +1,723 @@ +/* + Copyright (c) 2019 Red Hat, Inc <https://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +/* To implement an efficient thread pool with minimum contention we have used + * the following ideas: + * + * - The queue of jobs has been implemented using a Wait-Free queue provided + * by the userspace-rcu library. This queue requires a mutex when multiple + * consumers can be extracting items from it concurrently, but the locked + * region is very small, which minimizes the chances of contention. To + * further minimize contention, the number of active worker threads that + * are accessing the queue is dynamically adjusted so that we always have + * the minimum required amount of workers contending for the queue. Adding + * new items can be done with a single atomic operation, without locks. + * + * - All queue management operations, like creating more threads, enabling + * sleeping ones, etc. are done by a single thread. This makes it possible + * to manage all scaling related information and workers lists without + * locks. This functionality is implemented as a role that can be assigned + * to any of the worker threads, which avoids that some lengthy operations + * could interfere with this task. + * + * - Management is based on signals. We used signals for management tasks to + * avoid multiple system calls for each request (with signals we can wait + * for multiple events and get some additional data for each request in a + * single call, instead of first polling and then reading). + * + * TODO: There are some other changes that can take advantage of this new + * thread pool. + * + * - Use this thread pool as the core threading model for synctasks. I + * think this would improve synctask performance because I think we + * currently have some contention there for some workloads. + * + * - Implement a per thread timer that will allow adding and removing + * timers without using mutexes. + * + * - Integrate with userspace-rcu library in QSBR mode, allowing + * other portions of code to be implemented using RCU-based + * structures with a extremely fast read side without contention. + * + * - Integrate I/O into the thread pool so that the thread pool is + * able to efficiently manage all loads and scale dynamically. This + * could make it possible to minimize context switching when serving + * requests from fuse or network. + * + * - Dynamically scale the number of workers based on system load. + * This will make it possible to reduce contention when system is + * heavily loaded, improving performance under these circumstances + * (or minimizing performance loss). This will also make it possible + * that gluster can coexist with other processes that also consume + * CPU, with minimal interference from each other. + */ + +#include <unistd.h> +#include <pthread.h> +#include <errno.h> + +//#include <urcu/uatomic.h> + +#include "glusterfs/logging.h" +#include "glusterfs/list.h" +#include "glusterfs/mem-types.h" +#include "glusterfs/async.h" + +/* These macros wrap a simple system/library call to check the returned error + * and log a message in case of failure. */ +#define GF_ASYNC_CHECK(_func, _args...) \ + ({ \ + int32_t __async_error = -_func(_args); \ + if (caa_unlikely(__async_error != 0)) { \ + gf_async_error(__async_error, #_func "() failed."); \ + } \ + __async_error; \ + }) + +#define GF_ASYNC_CHECK_ERRNO(_func, _args...) \ + ({ \ + int32_t __async_error = _func(_args); \ + if (caa_unlikely(__async_error < 0)) { \ + __async_error = -errno; \ + gf_async_error(__async_error, #_func "() failed."); \ + } \ + __async_error; \ + }) + +/* These macros are used when, based on POSIX documentation, the function + * should never fail under the conditions we are using it. So any unexpected + * error will be handled as a fatal event. It probably means a critical bug + * or memory corruption. In both cases we consider that stopping the process + * is safer (otherwise it could cause more corruption with unknown effects + * that could be worse). */ +#define GF_ASYNC_CANTFAIL(_func, _args...) \ + do { \ + int32_t __async_error = -_func(_args); \ + if (caa_unlikely(__async_error != 0)) { \ + gf_async_fatal(__async_error, #_func "() failed"); \ + } \ + } while (0) + +#define GF_ASYNC_CANTFAIL_ERRNO(_func, _args...) \ + ({ \ + int32_t __async_error = _func(_args); \ + if (caa_unlikely(__async_error < 0)) { \ + __async_error = -errno; \ + gf_async_fatal(__async_error, #_func "() failed"); \ + } \ + __async_error; \ + }) + +/* TODO: for now we allocate a static array of workers. There's an issue if we + * try to use dynamic memory since these workers are initialized very + * early in the process startup and it seems that sometimes not all is + * ready to use dynamic memory. */ +static gf_async_worker_t gf_async_workers[GF_ASYNC_MAX_THREADS]; + +/* This is the only global variable needed to manage the entire framework. */ +gf_async_control_t gf_async_ctrl = {}; + +static __thread gf_async_worker_t *gf_async_current_worker = NULL; + +/* The main function of the worker threads. */ +static void * +gf_async_worker(void *arg); + +static void +gf_async_sync_init(void) +{ + GF_ASYNC_CANTFAIL(pthread_barrier_init, &gf_async_ctrl.sync, NULL, 2); +} + +static void +gf_async_sync_now(void) +{ + int32_t ret; + + ret = pthread_barrier_wait(&gf_async_ctrl.sync); + if (ret == PTHREAD_BARRIER_SERIAL_THREAD) { + GF_ASYNC_CANTFAIL(pthread_barrier_destroy, &gf_async_ctrl.sync); + ret = 0; + } + if (caa_unlikely(ret != 0)) { + gf_async_fatal(-ret, "pthread_barrier_wait() failed"); + } +} + +static void +gf_async_sigmask_empty(sigset_t *mask) +{ + GF_ASYNC_CANTFAIL_ERRNO(sigemptyset, mask); +} + +static void +gf_async_sigmask_add(sigset_t *mask, int32_t signal) +{ + GF_ASYNC_CANTFAIL_ERRNO(sigaddset, mask, signal); +} + +static void +gf_async_sigmask_set(int32_t mode, sigset_t *mask, sigset_t *old) +{ + GF_ASYNC_CANTFAIL(pthread_sigmask, mode, mask, old); +} + +static void +gf_async_sigaction(int32_t signum, const struct sigaction *action, + struct sigaction *old) +{ + GF_ASYNC_CANTFAIL_ERRNO(sigaction, signum, action, old); +} + +static int32_t +gf_async_sigwait(sigset_t *set) +{ + int32_t ret, signum; + + do { + ret = sigwait(set, &signum); + } while (caa_unlikely((ret < 0) && (errno == EINTR))); + + if (caa_unlikely(ret < 0)) { + ret = -errno; + gf_async_fatal(ret, "sigwait() failed"); + } + + return signum; +} + +static int32_t +gf_async_sigtimedwait(sigset_t *set, struct timespec *timeout) +{ + int32_t ret; + + do { + ret = sigtimedwait(set, NULL, timeout); + } while (caa_unlikely((ret < 0) && (errno == EINTR))); + if (caa_unlikely(ret < 0)) { + ret = -errno; + /* EAGAIN means that the timeout has expired, so we allow this error. + * Any other error shouldn't happen. */ + if (caa_unlikely(ret != -EAGAIN)) { + gf_async_fatal(ret, "sigtimedwait() failed"); + } + ret = 0; + } + + return ret; +} + +static void +gf_async_sigbroadcast(int32_t signum) +{ + GF_ASYNC_CANTFAIL_ERRNO(kill, gf_async_ctrl.pid, signum); +} + +static void +gf_async_signal_handler(int32_t signum) +{ + /* We should never handle a signal in this function. */ + gf_async_fatal(-EBUSY, + "Unexpected processing of signal %d through a handler.", + signum); +} + +static void +gf_async_signal_setup(void) +{ + struct sigaction action; + + /* We configure all related signals so that we can detect threads using an + * invalid signal mask that doesn't block our critical signal. */ + memset(&action, 0, sizeof(action)); + action.sa_handler = gf_async_signal_handler; + + gf_async_sigaction(GF_ASYNC_SIGCTRL, &action, &gf_async_ctrl.handler_ctrl); + + gf_async_sigaction(GF_ASYNC_SIGQUEUE, &action, + &gf_async_ctrl.handler_queue); +} + +static void +gf_async_signal_restore(void) +{ + /* Handlers we have previously changed are restored back to their original + * value. */ + + if (gf_async_ctrl.handler_ctrl.sa_handler != gf_async_signal_handler) { + gf_async_sigaction(GF_ASYNC_SIGCTRL, &gf_async_ctrl.handler_ctrl, NULL); + } + + if (gf_async_ctrl.handler_queue.sa_handler != gf_async_signal_handler) { + gf_async_sigaction(GF_ASYNC_SIGQUEUE, &gf_async_ctrl.handler_queue, + NULL); + } +} + +static void +gf_async_signal_flush(void) +{ + struct timespec delay; + + delay.tv_sec = 0; + delay.tv_nsec = 0; + + /* We read all pending signals so that they don't trigger once the signal + * mask of some thread is changed. */ + while (gf_async_sigtimedwait(&gf_async_ctrl.sigmask_ctrl, &delay) > 0) { + } + while (gf_async_sigtimedwait(&gf_async_ctrl.sigmask_queue, &delay) > 0) { + } +} + +static int32_t +gf_async_thread_create(pthread_t *thread, int32_t id, void *data) +{ + int32_t ret; + + ret = gf_thread_create(thread, NULL, gf_async_worker, data, + GF_ASYNC_THREAD_NAME "%u", id); + if (caa_unlikely(ret < 0)) { + /* TODO: gf_thread_create() should return a more specific error + * code. */ + return -ENOMEM; + } + + return 0; +} + +static void +gf_async_thread_wait(pthread_t thread) +{ + /* TODO: this is a blocking call executed inside one of the workers of the + * thread pool. This is bad, but this is only executed once we have + * received a notification from the thread that it's terminating, so + * this should return almost immediately. However, to be more robust + * it would be better to use pthread_timedjoin_np() (or even a call + * to pthread_tryjoin_np() followed by a delayed recheck if it + * fails), but they are not portable. We should see how to do this + * in other platforms. */ + GF_ASYNC_CANTFAIL(pthread_join, thread, NULL); +} + +static int32_t +gf_async_worker_create(void) +{ + struct cds_wfs_node *node; + gf_async_worker_t *worker; + uint32_t counts, running, max; + int32_t ret; + + node = __cds_wfs_pop_blocking(&gf_async_ctrl.available); + if (caa_unlikely(node == NULL)) { + /* There are no more available workers. We have all threads running. */ + return 1; + } + cds_wfs_node_init(node); + + ret = 1; + + counts = uatomic_read(&gf_async_ctrl.counts); + max = uatomic_read(&gf_async_ctrl.max_threads); + running = GF_ASYNC_COUNT_RUNNING(counts); + if (running < max) { + uatomic_add(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(1, 0)); + + worker = caa_container_of(node, gf_async_worker_t, stack); + + ret = gf_async_thread_create(&worker->thread, worker->id, worker); + if (caa_likely(ret >= 0)) { + return 0; + } + + uatomic_add(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(-1, 0)); + } + + cds_wfs_push(&gf_async_ctrl.available, node); + + return ret; +} + +static void +gf_async_worker_enable(void) +{ + /* This will wake one of the spare workers. If all workers are busy now, + * the signal will be queued so that the first one that completes its + * work will become the leader. */ + gf_async_sigbroadcast(GF_ASYNC_SIGCTRL); + + /* We have consumed a spare worker. We create another one for future + * needs. */ + gf_async_worker_create(); +} + +static void +gf_async_worker_wait(void) +{ + int32_t signum; + + signum = gf_async_sigwait(&gf_async_ctrl.sigmask_ctrl); + if (caa_unlikely(signum != GF_ASYNC_SIGCTRL)) { + gf_async_fatal(-EINVAL, "Worker received an unexpected signal (%d)", + signum); + } +} + +static void +gf_async_leader_wait(void) +{ + int32_t signum; + + signum = gf_async_sigwait(&gf_async_ctrl.sigmask_queue); + if (caa_unlikely(signum != GF_ASYNC_SIGQUEUE)) { + gf_async_fatal(-EINVAL, "Leader received an unexpected signal (%d)", + signum); + } +} + +static void +gf_async_run(struct cds_wfcq_node *node) +{ + gf_async_t *async; + + /* We've just got work from the queue. Process it. */ + async = caa_container_of(node, gf_async_t, queue); + /* TODO: remove dependency from THIS and xl. */ + THIS = async->xl; + async->cbk(async->xl, async); +} + +static void +gf_async_worker_run(void) +{ + struct cds_wfcq_node *node; + + do { + /* We keep executing jobs from the queue while it's not empty. Note + * that while we do this, we are ignoring any stop request. That's + * fine, since we need to process our own 'join' messages to fully + * terminate all threads. Note that normal jobs should have already + * completed once a stop request is received. */ + node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, + &gf_async_ctrl.queue.tail); + if (node != NULL) { + gf_async_run(node); + } + } while (node != NULL); + + /* TODO: I've tried to keep the worker looking at the queue for some small + * amount of time in a busy loop to see if more jobs come soon. With + * this I attempted to avoid the overhead of signal management if + * jobs come fast enough. However experimental results seem to + * indicate that doing this, CPU utilization grows and performance + * is actually reduced. We need to see if that's because I used bad + * parameters or it's really better to do it as it's done now. */ +} + +static void +gf_async_leader_run(void) +{ + struct cds_wfcq_node *node; + + node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, + &gf_async_ctrl.queue.tail); + while (caa_unlikely(node == NULL)) { + gf_async_leader_wait(); + + node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, + &gf_async_ctrl.queue.tail); + } + + /* Activate the next available worker thread. It will become the new + * leader. */ + gf_async_worker_enable(); + + gf_async_run(node); +} + +static uint32_t +gf_async_stop_check(gf_async_worker_t *worker) +{ + uint32_t counts, old, running, max; + + /* First we check if we should stop without doing any costly atomic + * operation. */ + old = uatomic_read(&gf_async_ctrl.counts); + max = uatomic_read(&gf_async_ctrl.max_threads); + running = GF_ASYNC_COUNT_RUNNING(old); + while (running > max) { + /* There are too many threads. We try to stop the current worker. */ + counts = uatomic_cmpxchg(&gf_async_ctrl.counts, old, + old + GF_ASYNC_COUNTS(-1, 1)); + if (old != counts) { + /* Another thread has just updated the counts. We need to retry. */ + old = counts; + running = GF_ASYNC_COUNT_RUNNING(old); + + continue; + } + + running--; + worker->running = false; + } + + return running; +} + +static void +gf_async_stop_all(xlator_t *xl, gf_async_t *async) +{ + if (gf_async_stop_check(gf_async_current_worker) > 0) { + /* There are more workers running. We propagate the stop request to + * them. */ + gf_async(async, xl, gf_async_stop_all); + } +} + +static void +gf_async_join(xlator_t *xl, gf_async_t *async) +{ + gf_async_worker_t *worker; + + worker = caa_container_of(async, gf_async_worker_t, async); + + gf_async_thread_wait(worker->thread); + + cds_wfs_push(&gf_async_ctrl.available, &worker->stack); +} + +static void +gf_async_terminate(gf_async_worker_t *worker) +{ + uint32_t counts; + + counts = uatomic_add_return(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(0, -1)); + if (counts == 0) { + /* This is the termination of the last worker thread. We need to + * synchronize the main thread that is waiting for all workers to + * finish. */ + gf_async_ctrl.sync_thread = worker->thread; + + gf_async_sync_now(); + } else { + /* Force someone else to join this thread to release resources. */ + gf_async(&worker->async, THIS, gf_async_join); + } +} + +static void * +gf_async_worker(void *arg) +{ + gf_async_worker_t *worker; + + worker = (gf_async_worker_t *)arg; + gf_async_current_worker = worker; + + worker->running = true; + do { + /* This thread does nothing until someone enables it to become a + * leader. */ + gf_async_worker_wait(); + + /* This thread is now a leader. It will process jobs from the queue + * and, if necessary, enable another worker and transfer leadership + * to it. */ + gf_async_leader_run(); + + /* This thread is not a leader anymore. It will continue processing + * queued jobs until it becomes empty. */ + gf_async_worker_run(); + + /* Stop the current thread if there are too many threads running. */ + gf_async_stop_check(worker); + } while (worker->running); + + gf_async_terminate(worker); + + return NULL; +} + +static void +gf_async_cleanup(void) +{ + /* We do some basic initialization of the global variable 'gf_async_ctrl' + * so that it's put into a relatively consistent state. */ + + gf_async_ctrl.enabled = false; + + gf_async_ctrl.pid = 0; + gf_async_sigmask_empty(&gf_async_ctrl.sigmask_ctrl); + gf_async_sigmask_empty(&gf_async_ctrl.sigmask_queue); + + /* This is used to later detect if the handler of these signals have been + * changed or not. */ + gf_async_ctrl.handler_ctrl.sa_handler = gf_async_signal_handler; + gf_async_ctrl.handler_queue.sa_handler = gf_async_signal_handler; + + gf_async_ctrl.table = NULL; + gf_async_ctrl.max_threads = 0; + gf_async_ctrl.counts = 0; +} + +void +gf_async_fini(void) +{ + gf_async_t async; + + if (uatomic_read(&gf_async_ctrl.counts) != 0) { + /* We ensure that all threads will quit on the next check. */ + gf_async_ctrl.max_threads = 0; + + /* Send the stop request to the thread pool. This will cause the + * execution of gf_async_stop_all() by one of the worker threads which, + * eventually, will terminate all worker threads. */ + gf_async(&async, THIS, gf_async_stop_all); + + /* We synchronize here with the last thread. */ + gf_async_sync_now(); + + /* We have just synchronized with the latest thread. Now just wait for + * it to terminate. */ + gf_async_thread_wait(gf_async_ctrl.sync_thread); + + gf_async_signal_flush(); + } + + gf_async_signal_restore(); + + gf_async_cleanup(); +} + +void +gf_async_adjust_threads(int32_t threads) +{ + if (threads == 0) { + /* By default we allow a maximum of 2 * #cores worker threads. This + * value is to try to accommodate threads that will do some I/O. Having + * more threads than cores we can keep CPU busy even if some threads + * are blocked for I/O. In the most efficient case, we can have #cores + * computing threads and #cores blocked threads on I/O. However this is + * hard to achieve because we can end with more than #cores computing + * threads, which won't provide a real benefit and will increase + * contention. + * + * TODO: implement a more intelligent dynamic maximum based on CPU + * usage and/or system load. */ + threads = sysconf(_SC_NPROCESSORS_ONLN) * 2; + if (threads < 0) { + /* If we can't get the current number of processors, we pick a + * random number. */ + threads = 16; + } + } + if (threads > GF_ASYNC_MAX_THREADS) { + threads = GF_ASYNC_MAX_THREADS; + } + uatomic_set(&gf_async_ctrl.max_threads, threads); +} + +int32_t +gf_async_init(glusterfs_ctx_t *ctx) +{ + sigset_t set; + gf_async_worker_t *worker; + uint32_t i; + int32_t ret; + bool running; + + gf_async_cleanup(); + + if (!ctx->cmd_args.global_threading || + (ctx->process_mode == GF_GLUSTERD_PROCESS)) { + return 0; + } + + /* At the init time, the maximum number of threads has not yet been + * configured. We use a small starting value that will be layer dynamically + * adjusted when ctx->config.max_threads is updated. */ + gf_async_adjust_threads(GF_ASYNC_SPARE_THREADS + 1); + + gf_async_ctrl.pid = getpid(); + + __cds_wfs_init(&gf_async_ctrl.available); + cds_wfcq_init(&gf_async_ctrl.queue.head, &gf_async_ctrl.queue.tail); + + gf_async_sync_init(); + + /* TODO: it would be cleaner to use dynamic memory, but at this point some + * memory management resources are not yet initialized. */ + gf_async_ctrl.table = gf_async_workers; + + /* We keep all workers in a stack. It will be used when a new thread needs + * to be created. */ + for (i = GF_ASYNC_MAX_THREADS; i > 0; i--) { + worker = &gf_async_ctrl.table[i - 1]; + + worker->id = i - 1; + cds_wfs_node_init(&worker->stack); + cds_wfs_push(&gf_async_ctrl.available, &worker->stack); + } + + /* Prepare the signal mask for regular workers and the leader. */ + gf_async_sigmask_add(&gf_async_ctrl.sigmask_ctrl, GF_ASYNC_SIGCTRL); + gf_async_sigmask_add(&gf_async_ctrl.sigmask_queue, GF_ASYNC_SIGQUEUE); + + /* TODO: this is needed to block our special signals in the current thread + * and all children that it starts. It would be cleaner to do it when + * signals are initialized, but there doesn't seem to be a unique + * place to do that, so for now we do it here. */ + gf_async_sigmask_empty(&set); + gf_async_sigmask_add(&set, GF_ASYNC_SIGCTRL); + gf_async_sigmask_add(&set, GF_ASYNC_SIGQUEUE); + gf_async_sigmask_set(SIG_BLOCK, &set, NULL); + + /* Configure the signal handlers. This is mostly for safety, not really + * needed, but it doesn't hurt. Note that the caller must ensure that the + * signals we need to run are already blocked in any thread already + * started. Otherwise this won't work. */ + gf_async_signal_setup(); + + running = false; + + /* We start the spare workers + 1 for the leader. */ + for (i = 0; i < GF_ASYNC_SPARE_THREADS; i++) { + ret = gf_async_worker_create(); + if (caa_unlikely(ret < 0)) { + /* This is the initial start up so we enforce that the spare + * threads are created. If this fails at the beginning, it's very + * unlikely that the async workers could do its job, so we abort + * the initialization. */ + goto out; + } + + /* Once the first thread is started, we can enable it to become the + * initial leader. */ + if ((ret == 0) && !running) { + running = true; + gf_async_worker_enable(); + } + } + + if (caa_unlikely(!running)) { + gf_async_fatal(-ENOMEM, "No worker thread has started"); + } + + gf_async_ctrl.enabled = true; + + ret = 0; + +out: + if (ret < 0) { + gf_async_error(ret, "Unable to initialize the thread pool."); + gf_async_fini(); + } + + return ret; +} diff --git a/libglusterfs/src/glusterfs/async.h b/libglusterfs/src/glusterfs/async.h new file mode 100644 index 00000000000..d1d70ae0bc7 --- /dev/null +++ b/libglusterfs/src/glusterfs/async.h @@ -0,0 +1,209 @@ +/* + Copyright (c) 2019 Red Hat, Inc <https://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef __GLUSTERFS_ASYNC_H__ +#define __GLUSTERFS_ASYNC_H__ + +#define _LGPL_SOURCE + +#include <sys/types.h> +#include <signal.h> +#include <errno.h> + +#ifdef URCU_OLD + +/* TODO: Fix the include paths. Since this is a .h included from many places + * it makes no sense to append a '-I$(CONTRIBDIR)/userspace-rcu/' to each + * Makefile.am. I've also seen some problems with CI builders (they + * failed to find the include files, but the same source on another setup + * is working fine). */ +#include "wfcqueue.h" +#include "wfstack.h" + +#else /* !URCU_OLD */ + +#include <urcu/wfcqueue.h> +#include <urcu/wfstack.h> + +#endif /* URCU_OLD */ + +#include "glusterfs/xlator.h" +#include "glusterfs/common-utils.h" +#include "glusterfs/list.h" +#include "glusterfs/libglusterfs-messages.h" + +/* This is the name prefix that all worker threads will have. A number will + * be added to differentiate them. */ +#define GF_ASYNC_THREAD_NAME "tpw" + +/* This value determines the maximum number of threads that are allowed. */ +#define GF_ASYNC_MAX_THREADS 128 + +/* This value determines how many additional threads will be started but will + * remain inactive until they are explicitly activated by the leader. This is + * useful to react faster to bursts of load, but at the same time we minimize + * contention if they are not really needed to handle current load. + * + * TODO: Instead of a fixed number, it would probably be better to use a + * prcentage of the available cores. */ +#define GF_ASYNC_SPARE_THREADS 2 + +/* This value determines the signal used to wake the leader when new work has + * been added to the queue. To do so we reuse SIGALRM, since the most logical + * candidates (SIGUSR1/SIGUSR2) are already used. This signal must not be used + * by anything else in the process. */ +#define GF_ASYNC_SIGQUEUE SIGALRM + +/* This value determines the signal that will be used to transfer leader role + * to other workers. */ +#define GF_ASYNC_SIGCTRL SIGVTALRM + +#define gf_async_warning(_err, _msg, _args...) \ + gf_msg("async", GF_LOG_WARNING, -(_err), LG_MSG_ASYNC_WARNING, _msg, \ + ##_args) + +#define gf_async_error(_err, _msg, _args...) \ + gf_msg("async", GF_LOG_ERROR, -(_err), LG_MSG_ASYNC_FAILURE, _msg, ##_args) + +#define gf_async_fatal(_err, _msg, _args...) \ + do { \ + GF_ABORT("Critical error in async module. Unable to continue. (" _msg \ + "). Error %d.", \ + ##_args, -(_err)); \ + } while (0) + +struct _gf_async; +typedef struct _gf_async gf_async_t; + +struct _gf_async_worker; +typedef struct _gf_async_worker gf_async_worker_t; + +struct _gf_async_queue; +typedef struct _gf_async_queue gf_async_queue_t; + +struct _gf_async_control; +typedef struct _gf_async_control gf_async_control_t; + +typedef void (*gf_async_callback_f)(xlator_t *xl, gf_async_t *async); + +struct _gf_async { + /* TODO: remove dependency on xl/THIS. */ + xlator_t *xl; + gf_async_callback_f cbk; + struct cds_wfcq_node queue; +}; + +struct _gf_async_worker { + /* Used to send asynchronous jobs related to the worker. */ + gf_async_t async; + + /* Member of the available workers stack. */ + struct cds_wfs_node stack; + + /* Thread object of the current worker. */ + pthread_t thread; + + /* Unique identifier of this worker. */ + int32_t id; + + /* Indicates if this worker is enabled. */ + bool running; +}; + +struct _gf_async_queue { + /* Structures needed to manage a wait-free queue. For better performance + * they are placed in two different cache lines, as recommended by URCU + * documentation, even though in our case some threads will be producers + * and consumers at the same time. */ + struct cds_wfcq_head head __attribute__((aligned(64))); + struct cds_wfcq_tail tail __attribute__((aligned(64))); +}; + +#define GF_ASYNC_COUNTS(_run, _stop) (((uint32_t)(_run) << 16) + (_stop)) +#define GF_ASYNC_COUNT_RUNNING(_count) ((_count) >> 16) +#define GF_ASYNC_COUNT_STOPPING(_count) ((_count)&65535) + +struct _gf_async_control { + gf_async_queue_t queue; + + /* Stack of unused workers. */ + struct __cds_wfs_stack available; + + /* Array of preallocated worker structures. */ + gf_async_worker_t *table; + + /* Used to synchronize main thread with workers on termination. */ + pthread_barrier_t sync; + + /* The id of the last thread that will be used for synchronization. */ + pthread_t sync_thread; + + /* Signal mask to wait for control signals from leader. */ + sigset_t sigmask_ctrl; + + /* Signal mask to wait for queued items. */ + sigset_t sigmask_queue; + + /* Saved signal handlers. */ + struct sigaction handler_ctrl; + struct sigaction handler_queue; + + /* PID of the current process. */ + pid_t pid; + + /* Maximum number of allowed threads. */ + uint32_t max_threads; + + /* Current number of running and stopping workers. This value is split + * into 2 16-bits fields to track both counters atomically at the same + * time. */ + uint32_t counts; + + /* It's used to control whether the asynchronous infrastructure is used + * or not. */ + bool enabled; +}; + +extern gf_async_control_t gf_async_ctrl; + +int32_t +gf_async_init(glusterfs_ctx_t *ctx); + +void +gf_async_fini(void); + +void +gf_async_adjust_threads(int32_t threads); + +static inline void +gf_async(gf_async_t *async, xlator_t *xl, gf_async_callback_f cbk) +{ + if (!gf_async_ctrl.enabled) { + cbk(xl, async); + return; + } + + async->xl = xl; + async->cbk = cbk; + cds_wfcq_node_init(&async->queue); + if (caa_unlikely(!cds_wfcq_enqueue(&gf_async_ctrl.queue.head, + &gf_async_ctrl.queue.tail, + &async->queue))) { + /* The queue was empty, so the leader could be sleeping. We need to + * wake it so that the new item can be processed. If the queue was not + * empty, we don't need to do anything special since the leader will + * take care of it. */ + if (caa_unlikely(kill(gf_async_ctrl.pid, GF_ASYNC_SIGQUEUE) < 0)) { + gf_async_fatal(errno, "Unable to wake leader worker."); + }; + } +} + +#endif /* !__GLUSTERFS_ASYNC_H__ */ diff --git a/libglusterfs/src/glusterfs/common-utils.h b/libglusterfs/src/glusterfs/common-utils.h index f03d2c1049a..1418c6531c7 100644 --- a/libglusterfs/src/glusterfs/common-utils.h +++ b/libglusterfs/src/glusterfs/common-utils.h @@ -442,7 +442,7 @@ BIT_VALUE(unsigned char *array, unsigned int index) } while (0) #endif -#define GF_ABORT(msg) \ +#define GF_ABORT(msg...) \ do { \ gf_msg_callingfn("", GF_LOG_CRITICAL, 0, LG_MSG_ASSERTION_FAILED, \ "Assertion failed: " msg); \ diff --git a/libglusterfs/src/glusterfs/glusterfs.h b/libglusterfs/src/glusterfs/glusterfs.h index 7c6af090fd8..9d140c18ee3 100644 --- a/libglusterfs/src/glusterfs/glusterfs.h +++ b/libglusterfs/src/glusterfs/glusterfs.h @@ -575,6 +575,8 @@ struct _cmd_args { int fuse_flush_handle_interrupt; int fuse_auto_inval; + + bool global_threading; }; typedef struct _cmd_args cmd_args_t; diff --git a/libglusterfs/src/glusterfs/libglusterfs-messages.h b/libglusterfs/src/glusterfs/libglusterfs-messages.h index 1b72f6df5be..e17e33e06fb 100644 --- a/libglusterfs/src/glusterfs/libglusterfs-messages.h +++ b/libglusterfs/src/glusterfs/libglusterfs-messages.h @@ -109,6 +109,6 @@ GLFS_MSGID( LG_MSG_PTHREAD_ATTR_INIT_FAILED, LG_MSG_INVALID_INODE_LIST, LG_MSG_COMPACT_FAILED, LG_MSG_COMPACT_STATUS, LG_MSG_UTIMENSAT_FAILED, LG_MSG_PTHREAD_NAMING_FAILED, LG_MSG_SYSCALL_RETURNS_WRONG, - LG_MSG_XXH64_TO_GFID_FAILED); + LG_MSG_XXH64_TO_GFID_FAILED, LG_MSG_ASYNC_WARNING, LG_MSG_ASYNC_FAILURE); #endif /* !_LG_MESSAGES_H_ */ diff --git a/libglusterfs/src/libglusterfs.sym b/libglusterfs/src/libglusterfs.sym index 2ac87c491ae..4466a5eee9a 100644 --- a/libglusterfs/src/libglusterfs.sym +++ b/libglusterfs/src/libglusterfs.sym @@ -553,6 +553,11 @@ get_xlator_by_name get_xlator_by_type gf_array_insertionsort gf_asprintf +gf_async +gf_async_adjust_threads +gf_async_ctrl +gf_async_init +gf_async_fini gf_backtrace_save gf_bits_count gf_bits_index |