summaryrefslogtreecommitdiffstats
path: root/libglusterfs
diff options
context:
space:
mode:
Diffstat (limited to 'libglusterfs')
-rw-r--r--libglusterfs/src/Makefile.am8
-rw-r--r--libglusterfs/src/async.c723
-rw-r--r--libglusterfs/src/glusterfs/async.h209
-rw-r--r--libglusterfs/src/glusterfs/common-utils.h2
-rw-r--r--libglusterfs/src/glusterfs/glusterfs.h2
-rw-r--r--libglusterfs/src/glusterfs/libglusterfs-messages.h2
-rw-r--r--libglusterfs/src/libglusterfs.sym5
7 files changed, 947 insertions, 4 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am
index 970f4b74978..f030a70b0f0 100644
--- a/libglusterfs/src/Makefile.am
+++ b/libglusterfs/src/Makefile.am
@@ -37,7 +37,7 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \
$(CONTRIBDIR)/timer-wheel/timer-wheel.c \
$(CONTRIBDIR)/timer-wheel/find_last_bit.c default-args.c locking.c \
$(CONTRIBDIR)/xxhash/xxhash.c \
- compound-fop-utils.c throttle-tbf.c monitoring.c
+ compound-fop-utils.c throttle-tbf.c monitoring.c async.c
nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c defaults.c
nodist_libglusterfs_la_HEADERS = y.tab.h protocol-common.h
@@ -69,7 +69,7 @@ libglusterfs_la_HEADERS = glusterfs/common-utils.h glusterfs/defaults.h \
glusterfs/quota-common-utils.h glusterfs/rot-buffs.h \
glusterfs/compat-uuid.h glusterfs/upcall-utils.h glusterfs/throttle-tbf.h \
glusterfs/events.h glusterfs/compound-fop-utils.h glusterfs/atomic.h \
- glusterfs/monitoring.h
+ glusterfs/monitoring.h glusterfs/async.h
libglusterfs_ladir = $(includedir)/glusterfs
@@ -79,6 +79,10 @@ noinst_HEADERS = unittest/unittest.h \
$(CONTRIBDIR)/libexecinfo/execinfo_compat.h \
$(CONTRIBDIR)/timer-wheel/timer-wheel.h \
$(CONTRIBDIR)/xxhash/xxhash.h \
+ $(CONTRIBDIR)/userspace-rcu/wfcqueue.h \
+ $(CONTRIBDIR)/userspace-rcu/wfstack.h \
+ $(CONTRIBDIR)/userspace-rcu/static-wfcqueue.h \
+ $(CONTRIBDIR)/userspace-rcu/static-wfstack.h \
tier-ctr-interface.h
eventtypes.h: $(top_srcdir)/events/eventskeygen.py
diff --git a/libglusterfs/src/async.c b/libglusterfs/src/async.c
new file mode 100644
index 00000000000..ae7152ff7fa
--- /dev/null
+++ b/libglusterfs/src/async.c
@@ -0,0 +1,723 @@
+/*
+ Copyright (c) 2019 Red Hat, Inc <https://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+/* To implement an efficient thread pool with minimum contention we have used
+ * the following ideas:
+ *
+ * - The queue of jobs has been implemented using a Wait-Free queue provided
+ * by the userspace-rcu library. This queue requires a mutex when multiple
+ * consumers can be extracting items from it concurrently, but the locked
+ * region is very small, which minimizes the chances of contention. To
+ * further minimize contention, the number of active worker threads that
+ * are accessing the queue is dynamically adjusted so that we always have
+ * the minimum required amount of workers contending for the queue. Adding
+ * new items can be done with a single atomic operation, without locks.
+ *
+ * - All queue management operations, like creating more threads, enabling
+ * sleeping ones, etc. are done by a single thread. This makes it possible
+ * to manage all scaling related information and workers lists without
+ * locks. This functionality is implemented as a role that can be assigned
+ * to any of the worker threads, which avoids that some lengthy operations
+ * could interfere with this task.
+ *
+ * - Management is based on signals. We used signals for management tasks to
+ * avoid multiple system calls for each request (with signals we can wait
+ * for multiple events and get some additional data for each request in a
+ * single call, instead of first polling and then reading).
+ *
+ * TODO: There are some other changes that can take advantage of this new
+ * thread pool.
+ *
+ * - Use this thread pool as the core threading model for synctasks. I
+ * think this would improve synctask performance because I think we
+ * currently have some contention there for some workloads.
+ *
+ * - Implement a per thread timer that will allow adding and removing
+ * timers without using mutexes.
+ *
+ * - Integrate with userspace-rcu library in QSBR mode, allowing
+ * other portions of code to be implemented using RCU-based
+ * structures with a extremely fast read side without contention.
+ *
+ * - Integrate I/O into the thread pool so that the thread pool is
+ * able to efficiently manage all loads and scale dynamically. This
+ * could make it possible to minimize context switching when serving
+ * requests from fuse or network.
+ *
+ * - Dynamically scale the number of workers based on system load.
+ * This will make it possible to reduce contention when system is
+ * heavily loaded, improving performance under these circumstances
+ * (or minimizing performance loss). This will also make it possible
+ * that gluster can coexist with other processes that also consume
+ * CPU, with minimal interference from each other.
+ */
+
+#include <unistd.h>
+#include <pthread.h>
+#include <errno.h>
+
+//#include <urcu/uatomic.h>
+
+#include "glusterfs/logging.h"
+#include "glusterfs/list.h"
+#include "glusterfs/mem-types.h"
+#include "glusterfs/async.h"
+
+/* These macros wrap a simple system/library call to check the returned error
+ * and log a message in case of failure. */
+#define GF_ASYNC_CHECK(_func, _args...) \
+ ({ \
+ int32_t __async_error = -_func(_args); \
+ if (caa_unlikely(__async_error != 0)) { \
+ gf_async_error(__async_error, #_func "() failed."); \
+ } \
+ __async_error; \
+ })
+
+#define GF_ASYNC_CHECK_ERRNO(_func, _args...) \
+ ({ \
+ int32_t __async_error = _func(_args); \
+ if (caa_unlikely(__async_error < 0)) { \
+ __async_error = -errno; \
+ gf_async_error(__async_error, #_func "() failed."); \
+ } \
+ __async_error; \
+ })
+
+/* These macros are used when, based on POSIX documentation, the function
+ * should never fail under the conditions we are using it. So any unexpected
+ * error will be handled as a fatal event. It probably means a critical bug
+ * or memory corruption. In both cases we consider that stopping the process
+ * is safer (otherwise it could cause more corruption with unknown effects
+ * that could be worse). */
+#define GF_ASYNC_CANTFAIL(_func, _args...) \
+ do { \
+ int32_t __async_error = -_func(_args); \
+ if (caa_unlikely(__async_error != 0)) { \
+ gf_async_fatal(__async_error, #_func "() failed"); \
+ } \
+ } while (0)
+
+#define GF_ASYNC_CANTFAIL_ERRNO(_func, _args...) \
+ ({ \
+ int32_t __async_error = _func(_args); \
+ if (caa_unlikely(__async_error < 0)) { \
+ __async_error = -errno; \
+ gf_async_fatal(__async_error, #_func "() failed"); \
+ } \
+ __async_error; \
+ })
+
+/* TODO: for now we allocate a static array of workers. There's an issue if we
+ * try to use dynamic memory since these workers are initialized very
+ * early in the process startup and it seems that sometimes not all is
+ * ready to use dynamic memory. */
+static gf_async_worker_t gf_async_workers[GF_ASYNC_MAX_THREADS];
+
+/* This is the only global variable needed to manage the entire framework. */
+gf_async_control_t gf_async_ctrl = {};
+
+static __thread gf_async_worker_t *gf_async_current_worker = NULL;
+
+/* The main function of the worker threads. */
+static void *
+gf_async_worker(void *arg);
+
+static void
+gf_async_sync_init(void)
+{
+ GF_ASYNC_CANTFAIL(pthread_barrier_init, &gf_async_ctrl.sync, NULL, 2);
+}
+
+static void
+gf_async_sync_now(void)
+{
+ int32_t ret;
+
+ ret = pthread_barrier_wait(&gf_async_ctrl.sync);
+ if (ret == PTHREAD_BARRIER_SERIAL_THREAD) {
+ GF_ASYNC_CANTFAIL(pthread_barrier_destroy, &gf_async_ctrl.sync);
+ ret = 0;
+ }
+ if (caa_unlikely(ret != 0)) {
+ gf_async_fatal(-ret, "pthread_barrier_wait() failed");
+ }
+}
+
+static void
+gf_async_sigmask_empty(sigset_t *mask)
+{
+ GF_ASYNC_CANTFAIL_ERRNO(sigemptyset, mask);
+}
+
+static void
+gf_async_sigmask_add(sigset_t *mask, int32_t signal)
+{
+ GF_ASYNC_CANTFAIL_ERRNO(sigaddset, mask, signal);
+}
+
+static void
+gf_async_sigmask_set(int32_t mode, sigset_t *mask, sigset_t *old)
+{
+ GF_ASYNC_CANTFAIL(pthread_sigmask, mode, mask, old);
+}
+
+static void
+gf_async_sigaction(int32_t signum, const struct sigaction *action,
+ struct sigaction *old)
+{
+ GF_ASYNC_CANTFAIL_ERRNO(sigaction, signum, action, old);
+}
+
+static int32_t
+gf_async_sigwait(sigset_t *set)
+{
+ int32_t ret, signum;
+
+ do {
+ ret = sigwait(set, &signum);
+ } while (caa_unlikely((ret < 0) && (errno == EINTR)));
+
+ if (caa_unlikely(ret < 0)) {
+ ret = -errno;
+ gf_async_fatal(ret, "sigwait() failed");
+ }
+
+ return signum;
+}
+
+static int32_t
+gf_async_sigtimedwait(sigset_t *set, struct timespec *timeout)
+{
+ int32_t ret;
+
+ do {
+ ret = sigtimedwait(set, NULL, timeout);
+ } while (caa_unlikely((ret < 0) && (errno == EINTR)));
+ if (caa_unlikely(ret < 0)) {
+ ret = -errno;
+ /* EAGAIN means that the timeout has expired, so we allow this error.
+ * Any other error shouldn't happen. */
+ if (caa_unlikely(ret != -EAGAIN)) {
+ gf_async_fatal(ret, "sigtimedwait() failed");
+ }
+ ret = 0;
+ }
+
+ return ret;
+}
+
+static void
+gf_async_sigbroadcast(int32_t signum)
+{
+ GF_ASYNC_CANTFAIL_ERRNO(kill, gf_async_ctrl.pid, signum);
+}
+
+static void
+gf_async_signal_handler(int32_t signum)
+{
+ /* We should never handle a signal in this function. */
+ gf_async_fatal(-EBUSY,
+ "Unexpected processing of signal %d through a handler.",
+ signum);
+}
+
+static void
+gf_async_signal_setup(void)
+{
+ struct sigaction action;
+
+ /* We configure all related signals so that we can detect threads using an
+ * invalid signal mask that doesn't block our critical signal. */
+ memset(&action, 0, sizeof(action));
+ action.sa_handler = gf_async_signal_handler;
+
+ gf_async_sigaction(GF_ASYNC_SIGCTRL, &action, &gf_async_ctrl.handler_ctrl);
+
+ gf_async_sigaction(GF_ASYNC_SIGQUEUE, &action,
+ &gf_async_ctrl.handler_queue);
+}
+
+static void
+gf_async_signal_restore(void)
+{
+ /* Handlers we have previously changed are restored back to their original
+ * value. */
+
+ if (gf_async_ctrl.handler_ctrl.sa_handler != gf_async_signal_handler) {
+ gf_async_sigaction(GF_ASYNC_SIGCTRL, &gf_async_ctrl.handler_ctrl, NULL);
+ }
+
+ if (gf_async_ctrl.handler_queue.sa_handler != gf_async_signal_handler) {
+ gf_async_sigaction(GF_ASYNC_SIGQUEUE, &gf_async_ctrl.handler_queue,
+ NULL);
+ }
+}
+
+static void
+gf_async_signal_flush(void)
+{
+ struct timespec delay;
+
+ delay.tv_sec = 0;
+ delay.tv_nsec = 0;
+
+ /* We read all pending signals so that they don't trigger once the signal
+ * mask of some thread is changed. */
+ while (gf_async_sigtimedwait(&gf_async_ctrl.sigmask_ctrl, &delay) > 0) {
+ }
+ while (gf_async_sigtimedwait(&gf_async_ctrl.sigmask_queue, &delay) > 0) {
+ }
+}
+
+static int32_t
+gf_async_thread_create(pthread_t *thread, int32_t id, void *data)
+{
+ int32_t ret;
+
+ ret = gf_thread_create(thread, NULL, gf_async_worker, data,
+ GF_ASYNC_THREAD_NAME "%u", id);
+ if (caa_unlikely(ret < 0)) {
+ /* TODO: gf_thread_create() should return a more specific error
+ * code. */
+ return -ENOMEM;
+ }
+
+ return 0;
+}
+
+static void
+gf_async_thread_wait(pthread_t thread)
+{
+ /* TODO: this is a blocking call executed inside one of the workers of the
+ * thread pool. This is bad, but this is only executed once we have
+ * received a notification from the thread that it's terminating, so
+ * this should return almost immediately. However, to be more robust
+ * it would be better to use pthread_timedjoin_np() (or even a call
+ * to pthread_tryjoin_np() followed by a delayed recheck if it
+ * fails), but they are not portable. We should see how to do this
+ * in other platforms. */
+ GF_ASYNC_CANTFAIL(pthread_join, thread, NULL);
+}
+
+static int32_t
+gf_async_worker_create(void)
+{
+ struct cds_wfs_node *node;
+ gf_async_worker_t *worker;
+ uint32_t counts, running, max;
+ int32_t ret;
+
+ node = __cds_wfs_pop_blocking(&gf_async_ctrl.available);
+ if (caa_unlikely(node == NULL)) {
+ /* There are no more available workers. We have all threads running. */
+ return 1;
+ }
+ cds_wfs_node_init(node);
+
+ ret = 1;
+
+ counts = uatomic_read(&gf_async_ctrl.counts);
+ max = uatomic_read(&gf_async_ctrl.max_threads);
+ running = GF_ASYNC_COUNT_RUNNING(counts);
+ if (running < max) {
+ uatomic_add(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(1, 0));
+
+ worker = caa_container_of(node, gf_async_worker_t, stack);
+
+ ret = gf_async_thread_create(&worker->thread, worker->id, worker);
+ if (caa_likely(ret >= 0)) {
+ return 0;
+ }
+
+ uatomic_add(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(-1, 0));
+ }
+
+ cds_wfs_push(&gf_async_ctrl.available, node);
+
+ return ret;
+}
+
+static void
+gf_async_worker_enable(void)
+{
+ /* This will wake one of the spare workers. If all workers are busy now,
+ * the signal will be queued so that the first one that completes its
+ * work will become the leader. */
+ gf_async_sigbroadcast(GF_ASYNC_SIGCTRL);
+
+ /* We have consumed a spare worker. We create another one for future
+ * needs. */
+ gf_async_worker_create();
+}
+
+static void
+gf_async_worker_wait(void)
+{
+ int32_t signum;
+
+ signum = gf_async_sigwait(&gf_async_ctrl.sigmask_ctrl);
+ if (caa_unlikely(signum != GF_ASYNC_SIGCTRL)) {
+ gf_async_fatal(-EINVAL, "Worker received an unexpected signal (%d)",
+ signum);
+ }
+}
+
+static void
+gf_async_leader_wait(void)
+{
+ int32_t signum;
+
+ signum = gf_async_sigwait(&gf_async_ctrl.sigmask_queue);
+ if (caa_unlikely(signum != GF_ASYNC_SIGQUEUE)) {
+ gf_async_fatal(-EINVAL, "Leader received an unexpected signal (%d)",
+ signum);
+ }
+}
+
+static void
+gf_async_run(struct cds_wfcq_node *node)
+{
+ gf_async_t *async;
+
+ /* We've just got work from the queue. Process it. */
+ async = caa_container_of(node, gf_async_t, queue);
+ /* TODO: remove dependency from THIS and xl. */
+ THIS = async->xl;
+ async->cbk(async->xl, async);
+}
+
+static void
+gf_async_worker_run(void)
+{
+ struct cds_wfcq_node *node;
+
+ do {
+ /* We keep executing jobs from the queue while it's not empty. Note
+ * that while we do this, we are ignoring any stop request. That's
+ * fine, since we need to process our own 'join' messages to fully
+ * terminate all threads. Note that normal jobs should have already
+ * completed once a stop request is received. */
+ node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head,
+ &gf_async_ctrl.queue.tail);
+ if (node != NULL) {
+ gf_async_run(node);
+ }
+ } while (node != NULL);
+
+ /* TODO: I've tried to keep the worker looking at the queue for some small
+ * amount of time in a busy loop to see if more jobs come soon. With
+ * this I attempted to avoid the overhead of signal management if
+ * jobs come fast enough. However experimental results seem to
+ * indicate that doing this, CPU utilization grows and performance
+ * is actually reduced. We need to see if that's because I used bad
+ * parameters or it's really better to do it as it's done now. */
+}
+
+static void
+gf_async_leader_run(void)
+{
+ struct cds_wfcq_node *node;
+
+ node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head,
+ &gf_async_ctrl.queue.tail);
+ while (caa_unlikely(node == NULL)) {
+ gf_async_leader_wait();
+
+ node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head,
+ &gf_async_ctrl.queue.tail);
+ }
+
+ /* Activate the next available worker thread. It will become the new
+ * leader. */
+ gf_async_worker_enable();
+
+ gf_async_run(node);
+}
+
+static uint32_t
+gf_async_stop_check(gf_async_worker_t *worker)
+{
+ uint32_t counts, old, running, max;
+
+ /* First we check if we should stop without doing any costly atomic
+ * operation. */
+ old = uatomic_read(&gf_async_ctrl.counts);
+ max = uatomic_read(&gf_async_ctrl.max_threads);
+ running = GF_ASYNC_COUNT_RUNNING(old);
+ while (running > max) {
+ /* There are too many threads. We try to stop the current worker. */
+ counts = uatomic_cmpxchg(&gf_async_ctrl.counts, old,
+ old + GF_ASYNC_COUNTS(-1, 1));
+ if (old != counts) {
+ /* Another thread has just updated the counts. We need to retry. */
+ old = counts;
+ running = GF_ASYNC_COUNT_RUNNING(old);
+
+ continue;
+ }
+
+ running--;
+ worker->running = false;
+ }
+
+ return running;
+}
+
+static void
+gf_async_stop_all(xlator_t *xl, gf_async_t *async)
+{
+ if (gf_async_stop_check(gf_async_current_worker) > 0) {
+ /* There are more workers running. We propagate the stop request to
+ * them. */
+ gf_async(async, xl, gf_async_stop_all);
+ }
+}
+
+static void
+gf_async_join(xlator_t *xl, gf_async_t *async)
+{
+ gf_async_worker_t *worker;
+
+ worker = caa_container_of(async, gf_async_worker_t, async);
+
+ gf_async_thread_wait(worker->thread);
+
+ cds_wfs_push(&gf_async_ctrl.available, &worker->stack);
+}
+
+static void
+gf_async_terminate(gf_async_worker_t *worker)
+{
+ uint32_t counts;
+
+ counts = uatomic_add_return(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(0, -1));
+ if (counts == 0) {
+ /* This is the termination of the last worker thread. We need to
+ * synchronize the main thread that is waiting for all workers to
+ * finish. */
+ gf_async_ctrl.sync_thread = worker->thread;
+
+ gf_async_sync_now();
+ } else {
+ /* Force someone else to join this thread to release resources. */
+ gf_async(&worker->async, THIS, gf_async_join);
+ }
+}
+
+static void *
+gf_async_worker(void *arg)
+{
+ gf_async_worker_t *worker;
+
+ worker = (gf_async_worker_t *)arg;
+ gf_async_current_worker = worker;
+
+ worker->running = true;
+ do {
+ /* This thread does nothing until someone enables it to become a
+ * leader. */
+ gf_async_worker_wait();
+
+ /* This thread is now a leader. It will process jobs from the queue
+ * and, if necessary, enable another worker and transfer leadership
+ * to it. */
+ gf_async_leader_run();
+
+ /* This thread is not a leader anymore. It will continue processing
+ * queued jobs until it becomes empty. */
+ gf_async_worker_run();
+
+ /* Stop the current thread if there are too many threads running. */
+ gf_async_stop_check(worker);
+ } while (worker->running);
+
+ gf_async_terminate(worker);
+
+ return NULL;
+}
+
+static void
+gf_async_cleanup(void)
+{
+ /* We do some basic initialization of the global variable 'gf_async_ctrl'
+ * so that it's put into a relatively consistent state. */
+
+ gf_async_ctrl.enabled = false;
+
+ gf_async_ctrl.pid = 0;
+ gf_async_sigmask_empty(&gf_async_ctrl.sigmask_ctrl);
+ gf_async_sigmask_empty(&gf_async_ctrl.sigmask_queue);
+
+ /* This is used to later detect if the handler of these signals have been
+ * changed or not. */
+ gf_async_ctrl.handler_ctrl.sa_handler = gf_async_signal_handler;
+ gf_async_ctrl.handler_queue.sa_handler = gf_async_signal_handler;
+
+ gf_async_ctrl.table = NULL;
+ gf_async_ctrl.max_threads = 0;
+ gf_async_ctrl.counts = 0;
+}
+
+void
+gf_async_fini(void)
+{
+ gf_async_t async;
+
+ if (uatomic_read(&gf_async_ctrl.counts) != 0) {
+ /* We ensure that all threads will quit on the next check. */
+ gf_async_ctrl.max_threads = 0;
+
+ /* Send the stop request to the thread pool. This will cause the
+ * execution of gf_async_stop_all() by one of the worker threads which,
+ * eventually, will terminate all worker threads. */
+ gf_async(&async, THIS, gf_async_stop_all);
+
+ /* We synchronize here with the last thread. */
+ gf_async_sync_now();
+
+ /* We have just synchronized with the latest thread. Now just wait for
+ * it to terminate. */
+ gf_async_thread_wait(gf_async_ctrl.sync_thread);
+
+ gf_async_signal_flush();
+ }
+
+ gf_async_signal_restore();
+
+ gf_async_cleanup();
+}
+
+void
+gf_async_adjust_threads(int32_t threads)
+{
+ if (threads == 0) {
+ /* By default we allow a maximum of 2 * #cores worker threads. This
+ * value is to try to accommodate threads that will do some I/O. Having
+ * more threads than cores we can keep CPU busy even if some threads
+ * are blocked for I/O. In the most efficient case, we can have #cores
+ * computing threads and #cores blocked threads on I/O. However this is
+ * hard to achieve because we can end with more than #cores computing
+ * threads, which won't provide a real benefit and will increase
+ * contention.
+ *
+ * TODO: implement a more intelligent dynamic maximum based on CPU
+ * usage and/or system load. */
+ threads = sysconf(_SC_NPROCESSORS_ONLN) * 2;
+ if (threads < 0) {
+ /* If we can't get the current number of processors, we pick a
+ * random number. */
+ threads = 16;
+ }
+ }
+ if (threads > GF_ASYNC_MAX_THREADS) {
+ threads = GF_ASYNC_MAX_THREADS;
+ }
+ uatomic_set(&gf_async_ctrl.max_threads, threads);
+}
+
+int32_t
+gf_async_init(glusterfs_ctx_t *ctx)
+{
+ sigset_t set;
+ gf_async_worker_t *worker;
+ uint32_t i;
+ int32_t ret;
+ bool running;
+
+ gf_async_cleanup();
+
+ if (!ctx->cmd_args.global_threading ||
+ (ctx->process_mode == GF_GLUSTERD_PROCESS)) {
+ return 0;
+ }
+
+ /* At the init time, the maximum number of threads has not yet been
+ * configured. We use a small starting value that will be layer dynamically
+ * adjusted when ctx->config.max_threads is updated. */
+ gf_async_adjust_threads(GF_ASYNC_SPARE_THREADS + 1);
+
+ gf_async_ctrl.pid = getpid();
+
+ __cds_wfs_init(&gf_async_ctrl.available);
+ cds_wfcq_init(&gf_async_ctrl.queue.head, &gf_async_ctrl.queue.tail);
+
+ gf_async_sync_init();
+
+ /* TODO: it would be cleaner to use dynamic memory, but at this point some
+ * memory management resources are not yet initialized. */
+ gf_async_ctrl.table = gf_async_workers;
+
+ /* We keep all workers in a stack. It will be used when a new thread needs
+ * to be created. */
+ for (i = GF_ASYNC_MAX_THREADS; i > 0; i--) {
+ worker = &gf_async_ctrl.table[i - 1];
+
+ worker->id = i - 1;
+ cds_wfs_node_init(&worker->stack);
+ cds_wfs_push(&gf_async_ctrl.available, &worker->stack);
+ }
+
+ /* Prepare the signal mask for regular workers and the leader. */
+ gf_async_sigmask_add(&gf_async_ctrl.sigmask_ctrl, GF_ASYNC_SIGCTRL);
+ gf_async_sigmask_add(&gf_async_ctrl.sigmask_queue, GF_ASYNC_SIGQUEUE);
+
+ /* TODO: this is needed to block our special signals in the current thread
+ * and all children that it starts. It would be cleaner to do it when
+ * signals are initialized, but there doesn't seem to be a unique
+ * place to do that, so for now we do it here. */
+ gf_async_sigmask_empty(&set);
+ gf_async_sigmask_add(&set, GF_ASYNC_SIGCTRL);
+ gf_async_sigmask_add(&set, GF_ASYNC_SIGQUEUE);
+ gf_async_sigmask_set(SIG_BLOCK, &set, NULL);
+
+ /* Configure the signal handlers. This is mostly for safety, not really
+ * needed, but it doesn't hurt. Note that the caller must ensure that the
+ * signals we need to run are already blocked in any thread already
+ * started. Otherwise this won't work. */
+ gf_async_signal_setup();
+
+ running = false;
+
+ /* We start the spare workers + 1 for the leader. */
+ for (i = 0; i < GF_ASYNC_SPARE_THREADS; i++) {
+ ret = gf_async_worker_create();
+ if (caa_unlikely(ret < 0)) {
+ /* This is the initial start up so we enforce that the spare
+ * threads are created. If this fails at the beginning, it's very
+ * unlikely that the async workers could do its job, so we abort
+ * the initialization. */
+ goto out;
+ }
+
+ /* Once the first thread is started, we can enable it to become the
+ * initial leader. */
+ if ((ret == 0) && !running) {
+ running = true;
+ gf_async_worker_enable();
+ }
+ }
+
+ if (caa_unlikely(!running)) {
+ gf_async_fatal(-ENOMEM, "No worker thread has started");
+ }
+
+ gf_async_ctrl.enabled = true;
+
+ ret = 0;
+
+out:
+ if (ret < 0) {
+ gf_async_error(ret, "Unable to initialize the thread pool.");
+ gf_async_fini();
+ }
+
+ return ret;
+}
diff --git a/libglusterfs/src/glusterfs/async.h b/libglusterfs/src/glusterfs/async.h
new file mode 100644
index 00000000000..d1d70ae0bc7
--- /dev/null
+++ b/libglusterfs/src/glusterfs/async.h
@@ -0,0 +1,209 @@
+/*
+ Copyright (c) 2019 Red Hat, Inc <https://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __GLUSTERFS_ASYNC_H__
+#define __GLUSTERFS_ASYNC_H__
+
+#define _LGPL_SOURCE
+
+#include <sys/types.h>
+#include <signal.h>
+#include <errno.h>
+
+#ifdef URCU_OLD
+
+/* TODO: Fix the include paths. Since this is a .h included from many places
+ * it makes no sense to append a '-I$(CONTRIBDIR)/userspace-rcu/' to each
+ * Makefile.am. I've also seen some problems with CI builders (they
+ * failed to find the include files, but the same source on another setup
+ * is working fine). */
+#include "wfcqueue.h"
+#include "wfstack.h"
+
+#else /* !URCU_OLD */
+
+#include <urcu/wfcqueue.h>
+#include <urcu/wfstack.h>
+
+#endif /* URCU_OLD */
+
+#include "glusterfs/xlator.h"
+#include "glusterfs/common-utils.h"
+#include "glusterfs/list.h"
+#include "glusterfs/libglusterfs-messages.h"
+
+/* This is the name prefix that all worker threads will have. A number will
+ * be added to differentiate them. */
+#define GF_ASYNC_THREAD_NAME "tpw"
+
+/* This value determines the maximum number of threads that are allowed. */
+#define GF_ASYNC_MAX_THREADS 128
+
+/* This value determines how many additional threads will be started but will
+ * remain inactive until they are explicitly activated by the leader. This is
+ * useful to react faster to bursts of load, but at the same time we minimize
+ * contention if they are not really needed to handle current load.
+ *
+ * TODO: Instead of a fixed number, it would probably be better to use a
+ * prcentage of the available cores. */
+#define GF_ASYNC_SPARE_THREADS 2
+
+/* This value determines the signal used to wake the leader when new work has
+ * been added to the queue. To do so we reuse SIGALRM, since the most logical
+ * candidates (SIGUSR1/SIGUSR2) are already used. This signal must not be used
+ * by anything else in the process. */
+#define GF_ASYNC_SIGQUEUE SIGALRM
+
+/* This value determines the signal that will be used to transfer leader role
+ * to other workers. */
+#define GF_ASYNC_SIGCTRL SIGVTALRM
+
+#define gf_async_warning(_err, _msg, _args...) \
+ gf_msg("async", GF_LOG_WARNING, -(_err), LG_MSG_ASYNC_WARNING, _msg, \
+ ##_args)
+
+#define gf_async_error(_err, _msg, _args...) \
+ gf_msg("async", GF_LOG_ERROR, -(_err), LG_MSG_ASYNC_FAILURE, _msg, ##_args)
+
+#define gf_async_fatal(_err, _msg, _args...) \
+ do { \
+ GF_ABORT("Critical error in async module. Unable to continue. (" _msg \
+ "). Error %d.", \
+ ##_args, -(_err)); \
+ } while (0)
+
+struct _gf_async;
+typedef struct _gf_async gf_async_t;
+
+struct _gf_async_worker;
+typedef struct _gf_async_worker gf_async_worker_t;
+
+struct _gf_async_queue;
+typedef struct _gf_async_queue gf_async_queue_t;
+
+struct _gf_async_control;
+typedef struct _gf_async_control gf_async_control_t;
+
+typedef void (*gf_async_callback_f)(xlator_t *xl, gf_async_t *async);
+
+struct _gf_async {
+ /* TODO: remove dependency on xl/THIS. */
+ xlator_t *xl;
+ gf_async_callback_f cbk;
+ struct cds_wfcq_node queue;
+};
+
+struct _gf_async_worker {
+ /* Used to send asynchronous jobs related to the worker. */
+ gf_async_t async;
+
+ /* Member of the available workers stack. */
+ struct cds_wfs_node stack;
+
+ /* Thread object of the current worker. */
+ pthread_t thread;
+
+ /* Unique identifier of this worker. */
+ int32_t id;
+
+ /* Indicates if this worker is enabled. */
+ bool running;
+};
+
+struct _gf_async_queue {
+ /* Structures needed to manage a wait-free queue. For better performance
+ * they are placed in two different cache lines, as recommended by URCU
+ * documentation, even though in our case some threads will be producers
+ * and consumers at the same time. */
+ struct cds_wfcq_head head __attribute__((aligned(64)));
+ struct cds_wfcq_tail tail __attribute__((aligned(64)));
+};
+
+#define GF_ASYNC_COUNTS(_run, _stop) (((uint32_t)(_run) << 16) + (_stop))
+#define GF_ASYNC_COUNT_RUNNING(_count) ((_count) >> 16)
+#define GF_ASYNC_COUNT_STOPPING(_count) ((_count)&65535)
+
+struct _gf_async_control {
+ gf_async_queue_t queue;
+
+ /* Stack of unused workers. */
+ struct __cds_wfs_stack available;
+
+ /* Array of preallocated worker structures. */
+ gf_async_worker_t *table;
+
+ /* Used to synchronize main thread with workers on termination. */
+ pthread_barrier_t sync;
+
+ /* The id of the last thread that will be used for synchronization. */
+ pthread_t sync_thread;
+
+ /* Signal mask to wait for control signals from leader. */
+ sigset_t sigmask_ctrl;
+
+ /* Signal mask to wait for queued items. */
+ sigset_t sigmask_queue;
+
+ /* Saved signal handlers. */
+ struct sigaction handler_ctrl;
+ struct sigaction handler_queue;
+
+ /* PID of the current process. */
+ pid_t pid;
+
+ /* Maximum number of allowed threads. */
+ uint32_t max_threads;
+
+ /* Current number of running and stopping workers. This value is split
+ * into 2 16-bits fields to track both counters atomically at the same
+ * time. */
+ uint32_t counts;
+
+ /* It's used to control whether the asynchronous infrastructure is used
+ * or not. */
+ bool enabled;
+};
+
+extern gf_async_control_t gf_async_ctrl;
+
+int32_t
+gf_async_init(glusterfs_ctx_t *ctx);
+
+void
+gf_async_fini(void);
+
+void
+gf_async_adjust_threads(int32_t threads);
+
+static inline void
+gf_async(gf_async_t *async, xlator_t *xl, gf_async_callback_f cbk)
+{
+ if (!gf_async_ctrl.enabled) {
+ cbk(xl, async);
+ return;
+ }
+
+ async->xl = xl;
+ async->cbk = cbk;
+ cds_wfcq_node_init(&async->queue);
+ if (caa_unlikely(!cds_wfcq_enqueue(&gf_async_ctrl.queue.head,
+ &gf_async_ctrl.queue.tail,
+ &async->queue))) {
+ /* The queue was empty, so the leader could be sleeping. We need to
+ * wake it so that the new item can be processed. If the queue was not
+ * empty, we don't need to do anything special since the leader will
+ * take care of it. */
+ if (caa_unlikely(kill(gf_async_ctrl.pid, GF_ASYNC_SIGQUEUE) < 0)) {
+ gf_async_fatal(errno, "Unable to wake leader worker.");
+ };
+ }
+}
+
+#endif /* !__GLUSTERFS_ASYNC_H__ */
diff --git a/libglusterfs/src/glusterfs/common-utils.h b/libglusterfs/src/glusterfs/common-utils.h
index f03d2c1049a..1418c6531c7 100644
--- a/libglusterfs/src/glusterfs/common-utils.h
+++ b/libglusterfs/src/glusterfs/common-utils.h
@@ -442,7 +442,7 @@ BIT_VALUE(unsigned char *array, unsigned int index)
} while (0)
#endif
-#define GF_ABORT(msg) \
+#define GF_ABORT(msg...) \
do { \
gf_msg_callingfn("", GF_LOG_CRITICAL, 0, LG_MSG_ASSERTION_FAILED, \
"Assertion failed: " msg); \
diff --git a/libglusterfs/src/glusterfs/glusterfs.h b/libglusterfs/src/glusterfs/glusterfs.h
index 7c6af090fd8..9d140c18ee3 100644
--- a/libglusterfs/src/glusterfs/glusterfs.h
+++ b/libglusterfs/src/glusterfs/glusterfs.h
@@ -575,6 +575,8 @@ struct _cmd_args {
int fuse_flush_handle_interrupt;
int fuse_auto_inval;
+
+ bool global_threading;
};
typedef struct _cmd_args cmd_args_t;
diff --git a/libglusterfs/src/glusterfs/libglusterfs-messages.h b/libglusterfs/src/glusterfs/libglusterfs-messages.h
index 1b72f6df5be..e17e33e06fb 100644
--- a/libglusterfs/src/glusterfs/libglusterfs-messages.h
+++ b/libglusterfs/src/glusterfs/libglusterfs-messages.h
@@ -109,6 +109,6 @@ GLFS_MSGID(
LG_MSG_PTHREAD_ATTR_INIT_FAILED, LG_MSG_INVALID_INODE_LIST,
LG_MSG_COMPACT_FAILED, LG_MSG_COMPACT_STATUS, LG_MSG_UTIMENSAT_FAILED,
LG_MSG_PTHREAD_NAMING_FAILED, LG_MSG_SYSCALL_RETURNS_WRONG,
- LG_MSG_XXH64_TO_GFID_FAILED);
+ LG_MSG_XXH64_TO_GFID_FAILED, LG_MSG_ASYNC_WARNING, LG_MSG_ASYNC_FAILURE);
#endif /* !_LG_MESSAGES_H_ */
diff --git a/libglusterfs/src/libglusterfs.sym b/libglusterfs/src/libglusterfs.sym
index 2ac87c491ae..4466a5eee9a 100644
--- a/libglusterfs/src/libglusterfs.sym
+++ b/libglusterfs/src/libglusterfs.sym
@@ -553,6 +553,11 @@ get_xlator_by_name
get_xlator_by_type
gf_array_insertionsort
gf_asprintf
+gf_async
+gf_async_adjust_threads
+gf_async_ctrl
+gf_async_init
+gf_async_fini
gf_backtrace_save
gf_bits_count
gf_bits_index