diff options
Diffstat (limited to 'libglusterfs')
| -rw-r--r-- | libglusterfs/src/Makefile.am | 8 | ||||
| -rw-r--r-- | libglusterfs/src/async.c | 723 | ||||
| -rw-r--r-- | libglusterfs/src/glusterfs/async.h | 209 | ||||
| -rw-r--r-- | libglusterfs/src/glusterfs/common-utils.h | 2 | ||||
| -rw-r--r-- | libglusterfs/src/glusterfs/glusterfs.h | 2 | ||||
| -rw-r--r-- | libglusterfs/src/glusterfs/libglusterfs-messages.h | 2 | ||||
| -rw-r--r-- | libglusterfs/src/libglusterfs.sym | 5 | 
7 files changed, 947 insertions, 4 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am index 970f4b74978..f030a70b0f0 100644 --- a/libglusterfs/src/Makefile.am +++ b/libglusterfs/src/Makefile.am @@ -37,7 +37,7 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \  	$(CONTRIBDIR)/timer-wheel/timer-wheel.c \  	$(CONTRIBDIR)/timer-wheel/find_last_bit.c default-args.c locking.c \  	$(CONTRIBDIR)/xxhash/xxhash.c \ -	compound-fop-utils.c throttle-tbf.c monitoring.c +	compound-fop-utils.c throttle-tbf.c monitoring.c async.c  nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c defaults.c  nodist_libglusterfs_la_HEADERS = y.tab.h protocol-common.h @@ -69,7 +69,7 @@ libglusterfs_la_HEADERS = glusterfs/common-utils.h glusterfs/defaults.h \  	glusterfs/quota-common-utils.h glusterfs/rot-buffs.h \  	glusterfs/compat-uuid.h glusterfs/upcall-utils.h glusterfs/throttle-tbf.h \  	glusterfs/events.h glusterfs/compound-fop-utils.h glusterfs/atomic.h \ -	glusterfs/monitoring.h +	glusterfs/monitoring.h glusterfs/async.h  libglusterfs_ladir = $(includedir)/glusterfs @@ -79,6 +79,10 @@ noinst_HEADERS = unittest/unittest.h \  	$(CONTRIBDIR)/libexecinfo/execinfo_compat.h \  	$(CONTRIBDIR)/timer-wheel/timer-wheel.h \  	$(CONTRIBDIR)/xxhash/xxhash.h \ +	$(CONTRIBDIR)/userspace-rcu/wfcqueue.h \ +	$(CONTRIBDIR)/userspace-rcu/wfstack.h \ +	$(CONTRIBDIR)/userspace-rcu/static-wfcqueue.h \ +	$(CONTRIBDIR)/userspace-rcu/static-wfstack.h \  	tier-ctr-interface.h  eventtypes.h: $(top_srcdir)/events/eventskeygen.py diff --git a/libglusterfs/src/async.c b/libglusterfs/src/async.c new file mode 100644 index 00000000000..ae7152ff7fa --- /dev/null +++ b/libglusterfs/src/async.c @@ -0,0 +1,723 @@ +/* +  Copyright (c) 2019 Red Hat, Inc <https://www.redhat.com> +  This file is part of GlusterFS. + +  This file is licensed to you under your choice of the GNU Lesser +  General Public License, version 3 or any later version (LGPLv3 or +  later), or the GNU General Public License, version 2 (GPLv2), in all +  cases as published by the Free Software Foundation. +*/ + +/* To implement an efficient thread pool with minimum contention we have used + * the following ideas: + * + *    - The queue of jobs has been implemented using a Wait-Free queue provided + *      by the userspace-rcu library. This queue requires a mutex when multiple + *      consumers can be extracting items from it concurrently, but the locked + *      region is very small, which minimizes the chances of contention. To + *      further minimize contention, the number of active worker threads that + *      are accessing the queue is dynamically adjusted so that we always have + *      the minimum required amount of workers contending for the queue. Adding + *      new items can be done with a single atomic operation, without locks. + * + *    - All queue management operations, like creating more threads, enabling + *      sleeping ones, etc. are done by a single thread. This makes it possible + *      to manage all scaling related information and workers lists without + *      locks. This functionality is implemented as a role that can be assigned + *      to any of the worker threads, which avoids that some lengthy operations + *      could interfere with this task. + * + *    - Management is based on signals. We used signals for management tasks to + *      avoid multiple system calls for each request (with signals we can wait + *      for multiple events and get some additional data for each request in a + *      single call, instead of first polling and then reading). + * + * TODO: There are some other changes that can take advantage of this new + *       thread pool. + * + *          - Use this thread pool as the core threading model for synctasks. I + *            think this would improve synctask performance because I think we + *            currently have some contention there for some workloads. + * + *          - Implement a per thread timer that will allow adding and removing + *            timers without using mutexes. + * + *          - Integrate with userspace-rcu library in QSBR mode, allowing + *            other portions of code to be implemented using RCU-based + *            structures with a extremely fast read side without contention. + * + *          - Integrate I/O into the thread pool so that the thread pool is + *            able to efficiently manage all loads and scale dynamically. This + *            could make it possible to minimize context switching when serving + *            requests from fuse or network. + * + *          - Dynamically scale the number of workers based on system load. + *            This will make it possible to reduce contention when system is + *            heavily loaded, improving performance under these circumstances + *            (or minimizing performance loss). This will also make it possible + *            that gluster can coexist with other processes that also consume + *            CPU, with minimal interference from each other. + */ + +#include <unistd.h> +#include <pthread.h> +#include <errno.h> + +//#include <urcu/uatomic.h> + +#include "glusterfs/logging.h" +#include "glusterfs/list.h" +#include "glusterfs/mem-types.h" +#include "glusterfs/async.h" + +/* These macros wrap a simple system/library call to check the returned error + * and log a message in case of failure. */ +#define GF_ASYNC_CHECK(_func, _args...)                                        \ +    ({                                                                         \ +        int32_t __async_error = -_func(_args);                                 \ +        if (caa_unlikely(__async_error != 0)) {                                \ +            gf_async_error(__async_error, #_func "() failed.");                \ +        }                                                                      \ +        __async_error;                                                         \ +    }) + +#define GF_ASYNC_CHECK_ERRNO(_func, _args...)                                  \ +    ({                                                                         \ +        int32_t __async_error = _func(_args);                                  \ +        if (caa_unlikely(__async_error < 0)) {                                 \ +            __async_error = -errno;                                            \ +            gf_async_error(__async_error, #_func "() failed.");                \ +        }                                                                      \ +        __async_error;                                                         \ +    }) + +/* These macros are used when, based on POSIX documentation, the function + * should never fail under the conditions we are using it. So any unexpected + * error will be handled as a fatal event. It probably means a critical bug + * or memory corruption. In both cases we consider that stopping the process + * is safer (otherwise it could cause more corruption with unknown effects + * that could be worse). */ +#define GF_ASYNC_CANTFAIL(_func, _args...)                                     \ +    do {                                                                       \ +        int32_t __async_error = -_func(_args);                                 \ +        if (caa_unlikely(__async_error != 0)) {                                \ +            gf_async_fatal(__async_error, #_func "() failed");                 \ +        }                                                                      \ +    } while (0) + +#define GF_ASYNC_CANTFAIL_ERRNO(_func, _args...)                               \ +    ({                                                                         \ +        int32_t __async_error = _func(_args);                                  \ +        if (caa_unlikely(__async_error < 0)) {                                 \ +            __async_error = -errno;                                            \ +            gf_async_fatal(__async_error, #_func "() failed");                 \ +        }                                                                      \ +        __async_error;                                                         \ +    }) + +/* TODO: for now we allocate a static array of workers. There's an issue if we + *       try to use dynamic memory since these workers are initialized very + *       early in the process startup and it seems that sometimes not all is + *       ready to use dynamic memory. */ +static gf_async_worker_t gf_async_workers[GF_ASYNC_MAX_THREADS]; + +/* This is the only global variable needed to manage the entire framework. */ +gf_async_control_t gf_async_ctrl = {}; + +static __thread gf_async_worker_t *gf_async_current_worker = NULL; + +/* The main function of the worker threads. */ +static void * +gf_async_worker(void *arg); + +static void +gf_async_sync_init(void) +{ +    GF_ASYNC_CANTFAIL(pthread_barrier_init, &gf_async_ctrl.sync, NULL, 2); +} + +static void +gf_async_sync_now(void) +{ +    int32_t ret; + +    ret = pthread_barrier_wait(&gf_async_ctrl.sync); +    if (ret == PTHREAD_BARRIER_SERIAL_THREAD) { +        GF_ASYNC_CANTFAIL(pthread_barrier_destroy, &gf_async_ctrl.sync); +        ret = 0; +    } +    if (caa_unlikely(ret != 0)) { +        gf_async_fatal(-ret, "pthread_barrier_wait() failed"); +    } +} + +static void +gf_async_sigmask_empty(sigset_t *mask) +{ +    GF_ASYNC_CANTFAIL_ERRNO(sigemptyset, mask); +} + +static void +gf_async_sigmask_add(sigset_t *mask, int32_t signal) +{ +    GF_ASYNC_CANTFAIL_ERRNO(sigaddset, mask, signal); +} + +static void +gf_async_sigmask_set(int32_t mode, sigset_t *mask, sigset_t *old) +{ +    GF_ASYNC_CANTFAIL(pthread_sigmask, mode, mask, old); +} + +static void +gf_async_sigaction(int32_t signum, const struct sigaction *action, +                   struct sigaction *old) +{ +    GF_ASYNC_CANTFAIL_ERRNO(sigaction, signum, action, old); +} + +static int32_t +gf_async_sigwait(sigset_t *set) +{ +    int32_t ret, signum; + +    do { +        ret = sigwait(set, &signum); +    } while (caa_unlikely((ret < 0) && (errno == EINTR))); + +    if (caa_unlikely(ret < 0)) { +        ret = -errno; +        gf_async_fatal(ret, "sigwait() failed"); +    } + +    return signum; +} + +static int32_t +gf_async_sigtimedwait(sigset_t *set, struct timespec *timeout) +{ +    int32_t ret; + +    do { +        ret = sigtimedwait(set, NULL, timeout); +    } while (caa_unlikely((ret < 0) && (errno == EINTR))); +    if (caa_unlikely(ret < 0)) { +        ret = -errno; +        /* EAGAIN means that the timeout has expired, so we allow this error. +         * Any other error shouldn't happen. */ +        if (caa_unlikely(ret != -EAGAIN)) { +            gf_async_fatal(ret, "sigtimedwait() failed"); +        } +        ret = 0; +    } + +    return ret; +} + +static void +gf_async_sigbroadcast(int32_t signum) +{ +    GF_ASYNC_CANTFAIL_ERRNO(kill, gf_async_ctrl.pid, signum); +} + +static void +gf_async_signal_handler(int32_t signum) +{ +    /* We should never handle a signal in this function. */ +    gf_async_fatal(-EBUSY, +                   "Unexpected processing of signal %d through a handler.", +                   signum); +} + +static void +gf_async_signal_setup(void) +{ +    struct sigaction action; + +    /* We configure all related signals so that we can detect threads using an +     * invalid signal mask that doesn't block our critical signal. */ +    memset(&action, 0, sizeof(action)); +    action.sa_handler = gf_async_signal_handler; + +    gf_async_sigaction(GF_ASYNC_SIGCTRL, &action, &gf_async_ctrl.handler_ctrl); + +    gf_async_sigaction(GF_ASYNC_SIGQUEUE, &action, +                       &gf_async_ctrl.handler_queue); +} + +static void +gf_async_signal_restore(void) +{ +    /* Handlers we have previously changed are restored back to their original +     * value. */ + +    if (gf_async_ctrl.handler_ctrl.sa_handler != gf_async_signal_handler) { +        gf_async_sigaction(GF_ASYNC_SIGCTRL, &gf_async_ctrl.handler_ctrl, NULL); +    } + +    if (gf_async_ctrl.handler_queue.sa_handler != gf_async_signal_handler) { +        gf_async_sigaction(GF_ASYNC_SIGQUEUE, &gf_async_ctrl.handler_queue, +                           NULL); +    } +} + +static void +gf_async_signal_flush(void) +{ +    struct timespec delay; + +    delay.tv_sec = 0; +    delay.tv_nsec = 0; + +    /* We read all pending signals so that they don't trigger once the signal +     * mask of some thread is changed. */ +    while (gf_async_sigtimedwait(&gf_async_ctrl.sigmask_ctrl, &delay) > 0) { +    } +    while (gf_async_sigtimedwait(&gf_async_ctrl.sigmask_queue, &delay) > 0) { +    } +} + +static int32_t +gf_async_thread_create(pthread_t *thread, int32_t id, void *data) +{ +    int32_t ret; + +    ret = gf_thread_create(thread, NULL, gf_async_worker, data, +                           GF_ASYNC_THREAD_NAME "%u", id); +    if (caa_unlikely(ret < 0)) { +        /* TODO: gf_thread_create() should return a more specific error +         *       code. */ +        return -ENOMEM; +    } + +    return 0; +} + +static void +gf_async_thread_wait(pthread_t thread) +{ +    /* TODO: this is a blocking call executed inside one of the workers of the +     *       thread pool. This is bad, but this is only executed once we have +     *       received a notification from the thread that it's terminating, so +     *       this should return almost immediately. However, to be more robust +     *       it would be better to use pthread_timedjoin_np() (or even a call +     *       to pthread_tryjoin_np() followed by a delayed recheck if it +     *       fails), but they are not portable. We should see how to do this +     *       in other platforms. */ +    GF_ASYNC_CANTFAIL(pthread_join, thread, NULL); +} + +static int32_t +gf_async_worker_create(void) +{ +    struct cds_wfs_node *node; +    gf_async_worker_t *worker; +    uint32_t counts, running, max; +    int32_t ret; + +    node = __cds_wfs_pop_blocking(&gf_async_ctrl.available); +    if (caa_unlikely(node == NULL)) { +        /* There are no more available workers. We have all threads running. */ +        return 1; +    } +    cds_wfs_node_init(node); + +    ret = 1; + +    counts = uatomic_read(&gf_async_ctrl.counts); +    max = uatomic_read(&gf_async_ctrl.max_threads); +    running = GF_ASYNC_COUNT_RUNNING(counts); +    if (running < max) { +        uatomic_add(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(1, 0)); + +        worker = caa_container_of(node, gf_async_worker_t, stack); + +        ret = gf_async_thread_create(&worker->thread, worker->id, worker); +        if (caa_likely(ret >= 0)) { +            return 0; +        } + +        uatomic_add(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(-1, 0)); +    } + +    cds_wfs_push(&gf_async_ctrl.available, node); + +    return ret; +} + +static void +gf_async_worker_enable(void) +{ +    /* This will wake one of the spare workers. If all workers are busy now, +     * the signal will be queued so that the first one that completes its +     * work will become the leader. */ +    gf_async_sigbroadcast(GF_ASYNC_SIGCTRL); + +    /* We have consumed a spare worker. We create another one for future +     * needs. */ +    gf_async_worker_create(); +} + +static void +gf_async_worker_wait(void) +{ +    int32_t signum; + +    signum = gf_async_sigwait(&gf_async_ctrl.sigmask_ctrl); +    if (caa_unlikely(signum != GF_ASYNC_SIGCTRL)) { +        gf_async_fatal(-EINVAL, "Worker received an unexpected signal (%d)", +                       signum); +    } +} + +static void +gf_async_leader_wait(void) +{ +    int32_t signum; + +    signum = gf_async_sigwait(&gf_async_ctrl.sigmask_queue); +    if (caa_unlikely(signum != GF_ASYNC_SIGQUEUE)) { +        gf_async_fatal(-EINVAL, "Leader received an unexpected signal (%d)", +                       signum); +    } +} + +static void +gf_async_run(struct cds_wfcq_node *node) +{ +    gf_async_t *async; + +    /* We've just got work from the queue. Process it. */ +    async = caa_container_of(node, gf_async_t, queue); +    /* TODO: remove dependency from THIS and xl. */ +    THIS = async->xl; +    async->cbk(async->xl, async); +} + +static void +gf_async_worker_run(void) +{ +    struct cds_wfcq_node *node; + +    do { +        /* We keep executing jobs from the queue while it's not empty. Note +         * that while we do this, we are ignoring any stop request. That's +         * fine, since we need to process our own 'join' messages to fully +         * terminate all threads. Note that normal jobs should have already +         * completed once a stop request is received. */ +        node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, +                                         &gf_async_ctrl.queue.tail); +        if (node != NULL) { +            gf_async_run(node); +        } +    } while (node != NULL); + +    /* TODO: I've tried to keep the worker looking at the queue for some small +     *       amount of time in a busy loop to see if more jobs come soon. With +     *       this I attempted to avoid the overhead of signal management if +     *       jobs come fast enough. However experimental results seem to +     *       indicate that doing this, CPU utilization grows and performance +     *       is actually reduced. We need to see if that's because I used bad +     *       parameters or it's really better to do it as it's done now. */ +} + +static void +gf_async_leader_run(void) +{ +    struct cds_wfcq_node *node; + +    node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, +                                     &gf_async_ctrl.queue.tail); +    while (caa_unlikely(node == NULL)) { +        gf_async_leader_wait(); + +        node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, +                                         &gf_async_ctrl.queue.tail); +    } + +    /* Activate the next available worker thread. It will become the new +     * leader. */ +    gf_async_worker_enable(); + +    gf_async_run(node); +} + +static uint32_t +gf_async_stop_check(gf_async_worker_t *worker) +{ +    uint32_t counts, old, running, max; + +    /* First we check if we should stop without doing any costly atomic +     * operation. */ +    old = uatomic_read(&gf_async_ctrl.counts); +    max = uatomic_read(&gf_async_ctrl.max_threads); +    running = GF_ASYNC_COUNT_RUNNING(old); +    while (running > max) { +        /* There are too many threads. We try to stop the current worker. */ +        counts = uatomic_cmpxchg(&gf_async_ctrl.counts, old, +                                 old + GF_ASYNC_COUNTS(-1, 1)); +        if (old != counts) { +            /* Another thread has just updated the counts. We need to retry. */ +            old = counts; +            running = GF_ASYNC_COUNT_RUNNING(old); + +            continue; +        } + +        running--; +        worker->running = false; +    } + +    return running; +} + +static void +gf_async_stop_all(xlator_t *xl, gf_async_t *async) +{ +    if (gf_async_stop_check(gf_async_current_worker) > 0) { +        /* There are more workers running. We propagate the stop request to +         * them. */ +        gf_async(async, xl, gf_async_stop_all); +    } +} + +static void +gf_async_join(xlator_t *xl, gf_async_t *async) +{ +    gf_async_worker_t *worker; + +    worker = caa_container_of(async, gf_async_worker_t, async); + +    gf_async_thread_wait(worker->thread); + +    cds_wfs_push(&gf_async_ctrl.available, &worker->stack); +} + +static void +gf_async_terminate(gf_async_worker_t *worker) +{ +    uint32_t counts; + +    counts = uatomic_add_return(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(0, -1)); +    if (counts == 0) { +        /* This is the termination of the last worker thread. We need to +         * synchronize the main thread that is waiting for all workers to +         * finish. */ +        gf_async_ctrl.sync_thread = worker->thread; + +        gf_async_sync_now(); +    } else { +        /* Force someone else to join this thread to release resources. */ +        gf_async(&worker->async, THIS, gf_async_join); +    } +} + +static void * +gf_async_worker(void *arg) +{ +    gf_async_worker_t *worker; + +    worker = (gf_async_worker_t *)arg; +    gf_async_current_worker = worker; + +    worker->running = true; +    do { +        /* This thread does nothing until someone enables it to become a +         * leader. */ +        gf_async_worker_wait(); + +        /* This thread is now a leader. It will process jobs from the queue +         * and, if necessary, enable another worker and transfer leadership +         * to it. */ +        gf_async_leader_run(); + +        /* This thread is not a leader anymore. It will continue processing +         * queued jobs until it becomes empty. */ +        gf_async_worker_run(); + +        /* Stop the current thread if there are too many threads running. */ +        gf_async_stop_check(worker); +    } while (worker->running); + +    gf_async_terminate(worker); + +    return NULL; +} + +static void +gf_async_cleanup(void) +{ +    /* We do some basic initialization of the global variable 'gf_async_ctrl' +     * so that it's put into a relatively consistent state. */ + +    gf_async_ctrl.enabled = false; + +    gf_async_ctrl.pid = 0; +    gf_async_sigmask_empty(&gf_async_ctrl.sigmask_ctrl); +    gf_async_sigmask_empty(&gf_async_ctrl.sigmask_queue); + +    /* This is used to later detect if the handler of these signals have been +     * changed or not. */ +    gf_async_ctrl.handler_ctrl.sa_handler = gf_async_signal_handler; +    gf_async_ctrl.handler_queue.sa_handler = gf_async_signal_handler; + +    gf_async_ctrl.table = NULL; +    gf_async_ctrl.max_threads = 0; +    gf_async_ctrl.counts = 0; +} + +void +gf_async_fini(void) +{ +    gf_async_t async; + +    if (uatomic_read(&gf_async_ctrl.counts) != 0) { +        /* We ensure that all threads will quit on the next check. */ +        gf_async_ctrl.max_threads = 0; + +        /* Send the stop request to the thread pool. This will cause the +         * execution of gf_async_stop_all() by one of the worker threads which, +         * eventually, will terminate all worker threads. */ +        gf_async(&async, THIS, gf_async_stop_all); + +        /* We synchronize here with the last thread. */ +        gf_async_sync_now(); + +        /* We have just synchronized with the latest thread. Now just wait for +         * it to terminate. */ +        gf_async_thread_wait(gf_async_ctrl.sync_thread); + +        gf_async_signal_flush(); +    } + +    gf_async_signal_restore(); + +    gf_async_cleanup(); +} + +void +gf_async_adjust_threads(int32_t threads) +{ +    if (threads == 0) { +        /* By default we allow a maximum of 2 * #cores worker threads. This +         * value is to try to accommodate threads that will do some I/O. Having +         * more threads than cores we can keep CPU busy even if some threads +         * are blocked for I/O. In the most efficient case, we can have #cores +         * computing threads and #cores blocked threads on I/O. However this is +         * hard to achieve because we can end with more than #cores computing +         * threads, which won't provide a real benefit and will increase +         * contention. +         * +         * TODO: implement a more intelligent dynamic maximum based on CPU +         *       usage and/or system load. */ +        threads = sysconf(_SC_NPROCESSORS_ONLN) * 2; +        if (threads < 0) { +            /* If we can't get the current number of processors, we pick a +             * random number. */ +            threads = 16; +        } +    } +    if (threads > GF_ASYNC_MAX_THREADS) { +        threads = GF_ASYNC_MAX_THREADS; +    } +    uatomic_set(&gf_async_ctrl.max_threads, threads); +} + +int32_t +gf_async_init(glusterfs_ctx_t *ctx) +{ +    sigset_t set; +    gf_async_worker_t *worker; +    uint32_t i; +    int32_t ret; +    bool running; + +    gf_async_cleanup(); + +    if (!ctx->cmd_args.global_threading || +        (ctx->process_mode == GF_GLUSTERD_PROCESS)) { +        return 0; +    } + +    /* At the init time, the maximum number of threads has not yet been +     * configured. We use a small starting value that will be layer dynamically +     * adjusted when ctx->config.max_threads is updated. */ +    gf_async_adjust_threads(GF_ASYNC_SPARE_THREADS + 1); + +    gf_async_ctrl.pid = getpid(); + +    __cds_wfs_init(&gf_async_ctrl.available); +    cds_wfcq_init(&gf_async_ctrl.queue.head, &gf_async_ctrl.queue.tail); + +    gf_async_sync_init(); + +    /* TODO: it would be cleaner to use dynamic memory, but at this point some +     *       memory management resources are not yet initialized. */ +    gf_async_ctrl.table = gf_async_workers; + +    /* We keep all workers in a stack. It will be used when a new thread needs +     * to be created. */ +    for (i = GF_ASYNC_MAX_THREADS; i > 0; i--) { +        worker = &gf_async_ctrl.table[i - 1]; + +        worker->id = i - 1; +        cds_wfs_node_init(&worker->stack); +        cds_wfs_push(&gf_async_ctrl.available, &worker->stack); +    } + +    /* Prepare the signal mask for regular workers and the leader. */ +    gf_async_sigmask_add(&gf_async_ctrl.sigmask_ctrl, GF_ASYNC_SIGCTRL); +    gf_async_sigmask_add(&gf_async_ctrl.sigmask_queue, GF_ASYNC_SIGQUEUE); + +    /* TODO: this is needed to block our special signals in the current thread +     *       and all children that it starts. It would be cleaner to do it when +     *       signals are initialized, but there doesn't seem to be a unique +     *       place to do that, so for now we do it here. */ +    gf_async_sigmask_empty(&set); +    gf_async_sigmask_add(&set, GF_ASYNC_SIGCTRL); +    gf_async_sigmask_add(&set, GF_ASYNC_SIGQUEUE); +    gf_async_sigmask_set(SIG_BLOCK, &set, NULL); + +    /* Configure the signal handlers. This is mostly for safety, not really +     * needed, but it doesn't hurt. Note that the caller must ensure that the +     * signals we need to run are already blocked in any thread already +     * started. Otherwise this won't work. */ +    gf_async_signal_setup(); + +    running = false; + +    /* We start the spare workers + 1 for the leader. */ +    for (i = 0; i < GF_ASYNC_SPARE_THREADS; i++) { +        ret = gf_async_worker_create(); +        if (caa_unlikely(ret < 0)) { +            /* This is the initial start up so we enforce that the spare +             * threads are created. If this fails at the beginning, it's very +             * unlikely that the async workers could do its job, so we abort +             * the initialization. */ +            goto out; +        } + +        /* Once the first thread is started, we can enable it to become the +         * initial leader. */ +        if ((ret == 0) && !running) { +            running = true; +            gf_async_worker_enable(); +        } +    } + +    if (caa_unlikely(!running)) { +        gf_async_fatal(-ENOMEM, "No worker thread has started"); +    } + +    gf_async_ctrl.enabled = true; + +    ret = 0; + +out: +    if (ret < 0) { +        gf_async_error(ret, "Unable to initialize the thread pool."); +        gf_async_fini(); +    } + +    return ret; +} diff --git a/libglusterfs/src/glusterfs/async.h b/libglusterfs/src/glusterfs/async.h new file mode 100644 index 00000000000..d1d70ae0bc7 --- /dev/null +++ b/libglusterfs/src/glusterfs/async.h @@ -0,0 +1,209 @@ +/* +  Copyright (c) 2019 Red Hat, Inc <https://www.redhat.com> +  This file is part of GlusterFS. + +  This file is licensed to you under your choice of the GNU Lesser +  General Public License, version 3 or any later version (LGPLv3 or +  later), or the GNU General Public License, version 2 (GPLv2), in all +  cases as published by the Free Software Foundation. +*/ + +#ifndef __GLUSTERFS_ASYNC_H__ +#define __GLUSTERFS_ASYNC_H__ + +#define _LGPL_SOURCE + +#include <sys/types.h> +#include <signal.h> +#include <errno.h> + +#ifdef URCU_OLD + +/* TODO: Fix the include paths. Since this is a .h included from many places + *       it makes no sense to append a '-I$(CONTRIBDIR)/userspace-rcu/' to each + *       Makefile.am. I've also seen some problems with CI builders (they + *       failed to find the include files, but the same source on another setup + *       is working fine). */ +#include "wfcqueue.h" +#include "wfstack.h" + +#else /* !URCU_OLD */ + +#include <urcu/wfcqueue.h> +#include <urcu/wfstack.h> + +#endif /* URCU_OLD */ + +#include "glusterfs/xlator.h" +#include "glusterfs/common-utils.h" +#include "glusterfs/list.h" +#include "glusterfs/libglusterfs-messages.h" + +/* This is the name prefix that all worker threads will have. A number will + * be added to differentiate them. */ +#define GF_ASYNC_THREAD_NAME "tpw" + +/* This value determines the maximum number of threads that are allowed. */ +#define GF_ASYNC_MAX_THREADS 128 + +/* This value determines how many additional threads will be started but will + * remain inactive until they are explicitly activated by the leader. This is + * useful to react faster to bursts of load, but at the same time we minimize + * contention if they are not really needed to handle current load. + * + * TODO: Instead of a fixed number, it would probably be better to use a + *       prcentage of the available cores. */ +#define GF_ASYNC_SPARE_THREADS 2 + +/* This value determines the signal used to wake the leader when new work has + * been added to the queue. To do so we reuse SIGALRM, since the most logical + * candidates (SIGUSR1/SIGUSR2) are already used. This signal must not be used + * by anything else in the process. */ +#define GF_ASYNC_SIGQUEUE SIGALRM + +/* This value determines the signal that will be used to transfer leader role + * to other workers. */ +#define GF_ASYNC_SIGCTRL SIGVTALRM + +#define gf_async_warning(_err, _msg, _args...)                                 \ +    gf_msg("async", GF_LOG_WARNING, -(_err), LG_MSG_ASYNC_WARNING, _msg,       \ +           ##_args) + +#define gf_async_error(_err, _msg, _args...)                                   \ +    gf_msg("async", GF_LOG_ERROR, -(_err), LG_MSG_ASYNC_FAILURE, _msg, ##_args) + +#define gf_async_fatal(_err, _msg, _args...)                                   \ +    do {                                                                       \ +        GF_ABORT("Critical error in async module. Unable to continue. (" _msg  \ +                 "). Error %d.",                                               \ +                 ##_args, -(_err));                                            \ +    } while (0) + +struct _gf_async; +typedef struct _gf_async gf_async_t; + +struct _gf_async_worker; +typedef struct _gf_async_worker gf_async_worker_t; + +struct _gf_async_queue; +typedef struct _gf_async_queue gf_async_queue_t; + +struct _gf_async_control; +typedef struct _gf_async_control gf_async_control_t; + +typedef void (*gf_async_callback_f)(xlator_t *xl, gf_async_t *async); + +struct _gf_async { +    /* TODO: remove dependency on xl/THIS. */ +    xlator_t *xl; +    gf_async_callback_f cbk; +    struct cds_wfcq_node queue; +}; + +struct _gf_async_worker { +    /* Used to send asynchronous jobs related to the worker. */ +    gf_async_t async; + +    /* Member of the available workers stack. */ +    struct cds_wfs_node stack; + +    /* Thread object of the current worker. */ +    pthread_t thread; + +    /* Unique identifier of this worker. */ +    int32_t id; + +    /* Indicates if this worker is enabled. */ +    bool running; +}; + +struct _gf_async_queue { +    /* Structures needed to manage a wait-free queue. For better performance +     * they are placed in two different cache lines, as recommended by URCU +     * documentation, even though in our case some threads will be producers +     * and consumers at the same time. */ +    struct cds_wfcq_head head __attribute__((aligned(64))); +    struct cds_wfcq_tail tail __attribute__((aligned(64))); +}; + +#define GF_ASYNC_COUNTS(_run, _stop) (((uint32_t)(_run) << 16) + (_stop)) +#define GF_ASYNC_COUNT_RUNNING(_count) ((_count) >> 16) +#define GF_ASYNC_COUNT_STOPPING(_count) ((_count)&65535) + +struct _gf_async_control { +    gf_async_queue_t queue; + +    /* Stack of unused workers. */ +    struct __cds_wfs_stack available; + +    /* Array of preallocated worker structures. */ +    gf_async_worker_t *table; + +    /* Used to synchronize main thread with workers on termination. */ +    pthread_barrier_t sync; + +    /* The id of the last thread that will be used for synchronization. */ +    pthread_t sync_thread; + +    /* Signal mask to wait for control signals from leader. */ +    sigset_t sigmask_ctrl; + +    /* Signal mask to wait for queued items. */ +    sigset_t sigmask_queue; + +    /* Saved signal handlers. */ +    struct sigaction handler_ctrl; +    struct sigaction handler_queue; + +    /* PID of the current process. */ +    pid_t pid; + +    /* Maximum number of allowed threads. */ +    uint32_t max_threads; + +    /* Current number of running and stopping workers. This value is split +     * into 2 16-bits fields to track both counters atomically at the same +     * time. */ +    uint32_t counts; + +    /* It's used to control whether the asynchronous infrastructure is used +     * or not. */ +    bool enabled; +}; + +extern gf_async_control_t gf_async_ctrl; + +int32_t +gf_async_init(glusterfs_ctx_t *ctx); + +void +gf_async_fini(void); + +void +gf_async_adjust_threads(int32_t threads); + +static inline void +gf_async(gf_async_t *async, xlator_t *xl, gf_async_callback_f cbk) +{ +    if (!gf_async_ctrl.enabled) { +        cbk(xl, async); +        return; +    } + +    async->xl = xl; +    async->cbk = cbk; +    cds_wfcq_node_init(&async->queue); +    if (caa_unlikely(!cds_wfcq_enqueue(&gf_async_ctrl.queue.head, +                                       &gf_async_ctrl.queue.tail, +                                       &async->queue))) { +        /* The queue was empty, so the leader could be sleeping. We need to +         * wake it so that the new item can be processed. If the queue was not +         * empty, we don't need to do anything special since the leader will +         * take care of it. */ +        if (caa_unlikely(kill(gf_async_ctrl.pid, GF_ASYNC_SIGQUEUE) < 0)) { +            gf_async_fatal(errno, "Unable to wake leader worker."); +        }; +    } +} + +#endif /* !__GLUSTERFS_ASYNC_H__ */ diff --git a/libglusterfs/src/glusterfs/common-utils.h b/libglusterfs/src/glusterfs/common-utils.h index f03d2c1049a..1418c6531c7 100644 --- a/libglusterfs/src/glusterfs/common-utils.h +++ b/libglusterfs/src/glusterfs/common-utils.h @@ -442,7 +442,7 @@ BIT_VALUE(unsigned char *array, unsigned int index)      } while (0)  #endif -#define GF_ABORT(msg)                                                          \ +#define GF_ABORT(msg...)                                                       \      do {                                                                       \          gf_msg_callingfn("", GF_LOG_CRITICAL, 0, LG_MSG_ASSERTION_FAILED,      \                           "Assertion failed: " msg);                            \ diff --git a/libglusterfs/src/glusterfs/glusterfs.h b/libglusterfs/src/glusterfs/glusterfs.h index 7c6af090fd8..9d140c18ee3 100644 --- a/libglusterfs/src/glusterfs/glusterfs.h +++ b/libglusterfs/src/glusterfs/glusterfs.h @@ -575,6 +575,8 @@ struct _cmd_args {      int fuse_flush_handle_interrupt;      int fuse_auto_inval; + +    bool global_threading;  };  typedef struct _cmd_args cmd_args_t; diff --git a/libglusterfs/src/glusterfs/libglusterfs-messages.h b/libglusterfs/src/glusterfs/libglusterfs-messages.h index 1b72f6df5be..e17e33e06fb 100644 --- a/libglusterfs/src/glusterfs/libglusterfs-messages.h +++ b/libglusterfs/src/glusterfs/libglusterfs-messages.h @@ -109,6 +109,6 @@ GLFS_MSGID(      LG_MSG_PTHREAD_ATTR_INIT_FAILED, LG_MSG_INVALID_INODE_LIST,      LG_MSG_COMPACT_FAILED, LG_MSG_COMPACT_STATUS, LG_MSG_UTIMENSAT_FAILED,      LG_MSG_PTHREAD_NAMING_FAILED, LG_MSG_SYSCALL_RETURNS_WRONG, -    LG_MSG_XXH64_TO_GFID_FAILED); +    LG_MSG_XXH64_TO_GFID_FAILED, LG_MSG_ASYNC_WARNING, LG_MSG_ASYNC_FAILURE);  #endif /* !_LG_MESSAGES_H_ */ diff --git a/libglusterfs/src/libglusterfs.sym b/libglusterfs/src/libglusterfs.sym index 2ac87c491ae..4466a5eee9a 100644 --- a/libglusterfs/src/libglusterfs.sym +++ b/libglusterfs/src/libglusterfs.sym @@ -553,6 +553,11 @@ get_xlator_by_name  get_xlator_by_type  gf_array_insertionsort  gf_asprintf +gf_async +gf_async_adjust_threads +gf_async_ctrl +gf_async_init +gf_async_fini  gf_backtrace_save  gf_bits_count  gf_bits_index  | 
