diff options
author | Xavi Hernandez <xhernandez@redhat.com> | 2019-01-24 18:44:06 +0100 |
---|---|---|
committer | Amar Tumballi <amarts@redhat.com> | 2019-02-18 02:58:24 +0000 |
commit | dddcf52020004d98f688ebef968de51d76cbf9a6 (patch) | |
tree | 01ee4c39a7859a76562e15aa7045c5bd86417a60 | |
parent | ec273a46820ba17f46488c082c65cd1aa6739be3 (diff) |
core: implement a global thread pool
This patch implements a thread pool that is wait-free for adding jobs to
the queue and uses a very small locked region to get jobs. This makes it
possible to decrease contention drastically. It's based on wfcqueue
structure provided by urcu library.
It automatically enables more threads when load demands it, and stops
them when not needed. There's a maximum number of threads that can be
used. This value can be configured.
Depending on the workload, the maximum number of threads plays an
important role. So it needs to be configured for optimal performance.
Currently the thread pool doesn't self adjust the maximum for the
workload, so this configuration needs to be changed manually.
For this reason, the global thread pool has been made optional, so that
volumes can still use the thread pool provided by io-threads.
To enable it for bricks, the following option needs to be set:
config.global-threading = on
This option has no effect if bricks are already running. A restart is
required to activate it. It's recommended to also enable the following
option when running bricks with the global thread pool:
performance.iot-pass-through = on
To enable it for a FUSE mount point, the option '--global-threading'
must be added to the mount command. To change it, an umount and remount
is needed. It's recommended to disable the following option when using
global threading on a mount point:
performance.client-io-threads = off
To enable it for services managed by glusterd, glusterd needs to be
started with option '--global-threading'. In this case all daemons, like
self-heal, will be using the global thread pool.
Currently it can only be enabled for bricks, FUSE mounts and glusterd
services.
The maximum number of threads for clients and bricks can be configured
using the following options:
config.client-threads
config.brick-threads
These options can be applied online and its effect is immediate most of
the times. If one of them is set to 0, the maximum number of threads
will be calcutated as #cores * 2.
Some distributions use a very old userspace-rcu library (version 0.7)
for this reason, some header files from version 0.10 have been copied
into contrib/userspace-rcu and are used if the detected version is 0.7
or older.
An additional change has been made to io-threads to prevent that threads
are started when iot-pass-through is set.
Change-Id: I09d19e246b9e6d53c6247b29dfca6af6ee00a24b
updates: #532
Signed-off-by: Xavi Hernandez <xhernandez@redhat.com>
29 files changed, 2858 insertions, 56 deletions
diff --git a/configure.ac b/configure.ac index 8752ca043bf..3aa7f4e77c1 100644 --- a/configure.ac +++ b/configure.ac @@ -1359,10 +1359,12 @@ PKG_CHECK_MODULES([URCU], [liburcu-bp], [], AC_MSG_ERROR([liburcu-bp not found]))]) PKG_CHECK_MODULES([URCU_CDS], [liburcu-cds >= 0.8], [], [PKG_CHECK_MODULES([URCU_CDS], [liburcu-cds >= 0.7], - [AC_DEFINE(URCU_OLD, 1, [Define if liburcu 0.6 or 0.7 is found])], + [AC_DEFINE(URCU_OLD, 1, [Define if liburcu 0.6 or 0.7 is found]) + USE_CONTRIB_URCU='yes'], [AC_CHECK_HEADERS([urcu/cds.h], [AC_DEFINE(URCU_OLD, 1, [Define if liburcu 0.6 or 0.7 is found]) - URCU_CDS_LIBS='-lurcu-cds'], + URCU_CDS_LIBS='-lurcu-cds' + USE_CONTRIB_URCU='yes'], [AC_MSG_ERROR([liburcu-cds not found])])])]) BUILD_UNITTEST="no" @@ -1526,6 +1528,9 @@ AC_SUBST(CONTRIBDIR) GF_CPPDEFINES='-D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -D$(GF_HOST_OS)' GF_CPPINCLUDES='-include $(top_builddir)/config.h -include $(top_builddir)/site.h -I$(top_srcdir)/libglusterfs/src -I$(top_builddir)/libglusterfs/src' +if test "x${USE_CONTRIB_URCU}" = "xyes"; then + GF_CPPINCLUDES="${GF_CPPINCLUDES} -I\$(CONTRIBDIR)/userspace-rcu" +fi GF_CPPFLAGS="$GF_CPPFLAGS $GF_CPPDEFINES $GF_CPPINCLUDES" AC_SUBST([GF_CPPFLAGS]) diff --git a/contrib/userspace-rcu/static-wfcqueue.h b/contrib/userspace-rcu/static-wfcqueue.h new file mode 100644 index 00000000000..37d14ad674b --- /dev/null +++ b/contrib/userspace-rcu/static-wfcqueue.h @@ -0,0 +1,685 @@ +#ifndef _URCU_WFCQUEUE_STATIC_H +#define _URCU_WFCQUEUE_STATIC_H + +/* + * urcu/static/wfcqueue.h + * + * Userspace RCU library - Concurrent Queue with Wait-Free Enqueue/Blocking Dequeue + * + * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu/wfcqueue.h for + * linking dynamically with the userspace rcu library. + * + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com> + * Copyright 2011-2012 - Lai Jiangshan <laijs@cn.fujitsu.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/* Copied from userspace-rcu 0.10 because version 0.7 doesn't contain it. */ + +#include <pthread.h> +#include <assert.h> +#include <poll.h> +#include <stdbool.h> +#include <urcu/compiler.h> +#include <urcu/uatomic.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Concurrent queue with wait-free enqueue/blocking dequeue. + * + * This queue has been designed and implemented collaboratively by + * Mathieu Desnoyers and Lai Jiangshan. Inspired from + * half-wait-free/half-blocking queue implementation done by Paul E. + * McKenney. + * + * Mutual exclusion of cds_wfcq_* / __cds_wfcq_* API + * + * Synchronization table: + * + * External synchronization techniques described in the API below is + * required between pairs marked with "X". No external synchronization + * required between pairs marked with "-". + * + * Legend: + * [1] cds_wfcq_enqueue + * [2] __cds_wfcq_splice (destination queue) + * [3] __cds_wfcq_dequeue + * [4] __cds_wfcq_splice (source queue) + * [5] __cds_wfcq_first + * [6] __cds_wfcq_next + * + * [1] [2] [3] [4] [5] [6] + * [1] - - - - - - + * [2] - - - - - - + * [3] - - X X X X + * [4] - - X - X X + * [5] - - X X - - + * [6] - - X X - - + * + * Mutual exclusion can be ensured by holding cds_wfcq_dequeue_lock(). + * + * For convenience, cds_wfcq_dequeue_blocking() and + * cds_wfcq_splice_blocking() hold the dequeue lock. + * + * Besides locking, mutual exclusion of dequeue, splice and iteration + * can be ensured by performing all of those operations from a single + * thread, without requiring any lock. + */ + +#define WFCQ_ADAPT_ATTEMPTS 10 /* Retry if being set */ +#define WFCQ_WAIT 10 /* Wait 10 ms if being set */ + +/* + * cds_wfcq_node_init: initialize wait-free queue node. + */ +static inline void _cds_wfcq_node_init(struct cds_wfcq_node *node) +{ + node->next = NULL; +} + +/* + * cds_wfcq_init: initialize wait-free queue (with lock). Pair with + * cds_wfcq_destroy(). + */ +static inline void _cds_wfcq_init(struct cds_wfcq_head *head, + struct cds_wfcq_tail *tail) +{ + int ret; + + /* Set queue head and tail */ + _cds_wfcq_node_init(&head->node); + tail->p = &head->node; + ret = pthread_mutex_init(&head->lock, NULL); + assert(!ret); +} + +/* + * cds_wfcq_destroy: destroy wait-free queue (with lock). Pair with + * cds_wfcq_init(). + */ +static inline void _cds_wfcq_destroy(struct cds_wfcq_head *head, + struct cds_wfcq_tail *tail) +{ + int ret = pthread_mutex_destroy(&head->lock); + assert(!ret); +} + +/* + * __cds_wfcq_init: initialize wait-free queue (without lock). Don't + * pair with any destroy function. + */ +static inline void ___cds_wfcq_init(struct __cds_wfcq_head *head, + struct cds_wfcq_tail *tail) +{ + /* Set queue head and tail */ + _cds_wfcq_node_init(&head->node); + tail->p = &head->node; +} + +/* + * cds_wfcq_empty: return whether wait-free queue is empty. + * + * No memory barrier is issued. No mutual exclusion is required. + * + * We perform the test on head->node.next to check if the queue is + * possibly empty, but we confirm this by checking if the tail pointer + * points to the head node because the tail pointer is the linearisation + * point of the enqueuers. Just checking the head next pointer could + * make a queue appear empty if an enqueuer is preempted for a long time + * between xchg() and setting the previous node's next pointer. + */ +static inline bool _cds_wfcq_empty(cds_wfcq_head_ptr_t u_head, + struct cds_wfcq_tail *tail) +{ + struct __cds_wfcq_head *head = u_head._h; + /* + * Queue is empty if no node is pointed by head->node.next nor + * tail->p. Even though the tail->p check is sufficient to find + * out of the queue is empty, we first check head->node.next as a + * common case to ensure that dequeuers do not frequently access + * enqueuer's tail->p cache line. + */ + return CMM_LOAD_SHARED(head->node.next) == NULL + && CMM_LOAD_SHARED(tail->p) == &head->node; +} + +static inline void _cds_wfcq_dequeue_lock(struct cds_wfcq_head *head, + struct cds_wfcq_tail *tail) +{ + int ret; + + ret = pthread_mutex_lock(&head->lock); + assert(!ret); +} + +static inline void _cds_wfcq_dequeue_unlock(struct cds_wfcq_head *head, + struct cds_wfcq_tail *tail) +{ + int ret; + + ret = pthread_mutex_unlock(&head->lock); + assert(!ret); +} + +static inline bool ___cds_wfcq_append(cds_wfcq_head_ptr_t u_head, + struct cds_wfcq_tail *tail, + struct cds_wfcq_node *new_head, + struct cds_wfcq_node *new_tail) +{ + struct __cds_wfcq_head *head = u_head._h; + struct cds_wfcq_node *old_tail; + + /* + * Implicit memory barrier before uatomic_xchg() orders earlier + * stores to data structure containing node and setting + * node->next to NULL before publication. + */ + old_tail = uatomic_xchg(&tail->p, new_tail); + + /* + * Implicit memory barrier after uatomic_xchg() orders store to + * q->tail before store to old_tail->next. + * + * At this point, dequeuers see a NULL tail->p->next, which + * indicates that the queue is being appended to. The following + * store will append "node" to the queue from a dequeuer + * perspective. + */ + CMM_STORE_SHARED(old_tail->next, new_head); + /* + * Return false if queue was empty prior to adding the node, + * else return true. + */ + return old_tail != &head->node; +} + +/* + * cds_wfcq_enqueue: enqueue a node into a wait-free queue. + * + * Issues a full memory barrier before enqueue. No mutual exclusion is + * required. + * + * Returns false if the queue was empty prior to adding the node. + * Returns true otherwise. + */ +static inline bool _cds_wfcq_enqueue(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail, + struct cds_wfcq_node *new_tail) +{ + return ___cds_wfcq_append(head, tail, new_tail, new_tail); +} + +/* + * CDS_WFCQ_WAIT_SLEEP: + * + * By default, this sleeps for the given @msec milliseconds. + * This is a macro which LGPL users may #define themselves before + * including wfcqueue.h to override the default behavior (e.g. + * to log a warning or perform other background work). + */ +#ifndef CDS_WFCQ_WAIT_SLEEP +#define CDS_WFCQ_WAIT_SLEEP(msec) ___cds_wfcq_wait_sleep(msec) +#endif + +static inline void ___cds_wfcq_wait_sleep(int msec) +{ + (void) poll(NULL, 0, msec); +} + +/* + * ___cds_wfcq_busy_wait: adaptative busy-wait. + * + * Returns 1 if nonblocking and needs to block, 0 otherwise. + */ +static inline bool +___cds_wfcq_busy_wait(int *attempt, int blocking) +{ + if (!blocking) + return 1; + if (++(*attempt) >= WFCQ_ADAPT_ATTEMPTS) { + CDS_WFCQ_WAIT_SLEEP(WFCQ_WAIT); /* Wait for 10ms */ + *attempt = 0; + } else { + caa_cpu_relax(); + } + return 0; +} + +/* + * Waiting for enqueuer to complete enqueue and return the next node. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_node_sync_next(struct cds_wfcq_node *node, int blocking) +{ + struct cds_wfcq_node *next; + int attempt = 0; + + /* + * Adaptative busy-looping waiting for enqueuer to complete enqueue. + */ + while ((next = CMM_LOAD_SHARED(node->next)) == NULL) { + if (___cds_wfcq_busy_wait(&attempt, blocking)) + return CDS_WFCQ_WOULDBLOCK; + } + + return next; +} + +static inline struct cds_wfcq_node * +___cds_wfcq_first(cds_wfcq_head_ptr_t u_head, + struct cds_wfcq_tail *tail, + int blocking) +{ + struct __cds_wfcq_head *head = u_head._h; + struct cds_wfcq_node *node; + + if (_cds_wfcq_empty(__cds_wfcq_head_cast(head), tail)) + return NULL; + node = ___cds_wfcq_node_sync_next(&head->node, blocking); + /* Load head->node.next before loading node's content */ + cmm_smp_read_barrier_depends(); + return node; +} + +/* + * __cds_wfcq_first_blocking: get first node of a queue, without dequeuing. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + * + * Used by for-like iteration macros in urcu/wfqueue.h: + * __cds_wfcq_for_each_blocking() + * __cds_wfcq_for_each_blocking_safe() + * + * Returns NULL if queue is empty, first node otherwise. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_first_blocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail) +{ + return ___cds_wfcq_first(head, tail, 1); +} + + +/* + * __cds_wfcq_first_nonblocking: get first node of a queue, without dequeuing. + * + * Same as __cds_wfcq_first_blocking, but returns CDS_WFCQ_WOULDBLOCK if + * it needs to block. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_first_nonblocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail) +{ + return ___cds_wfcq_first(head, tail, 0); +} + +static inline struct cds_wfcq_node * +___cds_wfcq_next(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail, + struct cds_wfcq_node *node, + int blocking) +{ + struct cds_wfcq_node *next; + + /* + * Even though the following tail->p check is sufficient to find + * out if we reached the end of the queue, we first check + * node->next as a common case to ensure that iteration on nodes + * do not frequently access enqueuer's tail->p cache line. + */ + if ((next = CMM_LOAD_SHARED(node->next)) == NULL) { + /* Load node->next before tail->p */ + cmm_smp_rmb(); + if (CMM_LOAD_SHARED(tail->p) == node) + return NULL; + next = ___cds_wfcq_node_sync_next(node, blocking); + } + /* Load node->next before loading next's content */ + cmm_smp_read_barrier_depends(); + return next; +} + +/* + * __cds_wfcq_next_blocking: get next node of a queue, without dequeuing. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + * + * Used by for-like iteration macros in urcu/wfqueue.h: + * __cds_wfcq_for_each_blocking() + * __cds_wfcq_for_each_blocking_safe() + * + * Returns NULL if reached end of queue, non-NULL next queue node + * otherwise. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_next_blocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail, + struct cds_wfcq_node *node) +{ + return ___cds_wfcq_next(head, tail, node, 1); +} + +/* + * __cds_wfcq_next_blocking: get next node of a queue, without dequeuing. + * + * Same as __cds_wfcq_next_blocking, but returns CDS_WFCQ_WOULDBLOCK if + * it needs to block. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_next_nonblocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail, + struct cds_wfcq_node *node) +{ + return ___cds_wfcq_next(head, tail, node, 0); +} + +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_with_state(cds_wfcq_head_ptr_t u_head, + struct cds_wfcq_tail *tail, + int *state, + int blocking) +{ + struct __cds_wfcq_head *head = u_head._h; + struct cds_wfcq_node *node, *next; + + if (state) + *state = 0; + + if (_cds_wfcq_empty(__cds_wfcq_head_cast(head), tail)) { + return NULL; + } + + node = ___cds_wfcq_node_sync_next(&head->node, blocking); + if (!blocking && node == CDS_WFCQ_WOULDBLOCK) { + return CDS_WFCQ_WOULDBLOCK; + } + + if ((next = CMM_LOAD_SHARED(node->next)) == NULL) { + /* + * @node is probably the only node in the queue. + * Try to move the tail to &q->head. + * q->head.next is set to NULL here, and stays + * NULL if the cmpxchg succeeds. Should the + * cmpxchg fail due to a concurrent enqueue, the + * q->head.next will be set to the next node. + * The implicit memory barrier before + * uatomic_cmpxchg() orders load node->next + * before loading q->tail. + * The implicit memory barrier before uatomic_cmpxchg + * orders load q->head.next before loading node's + * content. + */ + _cds_wfcq_node_init(&head->node); + if (uatomic_cmpxchg(&tail->p, node, &head->node) == node) { + if (state) + *state |= CDS_WFCQ_STATE_LAST; + return node; + } + next = ___cds_wfcq_node_sync_next(node, blocking); + /* + * In nonblocking mode, if we would need to block to + * get node's next, set the head next node pointer + * (currently NULL) back to its original value. + */ + if (!blocking && next == CDS_WFCQ_WOULDBLOCK) { + head->node.next = node; + return CDS_WFCQ_WOULDBLOCK; + } + } + + /* + * Move queue head forward. + */ + head->node.next = next; + + /* Load q->head.next before loading node's content */ + cmm_smp_read_barrier_depends(); + return node; +} + +/* + * __cds_wfcq_dequeue_with_state_blocking: dequeue node from queue, with state. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * It is valid to reuse and free a dequeued node immediately. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_with_state_blocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail, int *state) +{ + return ___cds_wfcq_dequeue_with_state(head, tail, state, 1); +} + +/* + * ___cds_wfcq_dequeue_blocking: dequeue node from queue. + * + * Same as __cds_wfcq_dequeue_with_state_blocking, but without saving + * state. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_blocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail) +{ + return ___cds_wfcq_dequeue_with_state_blocking(head, tail, NULL); +} + +/* + * __cds_wfcq_dequeue_with_state_nonblocking: dequeue node, with state. + * + * Same as __cds_wfcq_dequeue_blocking, but returns CDS_WFCQ_WOULDBLOCK + * if it needs to block. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_with_state_nonblocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail, int *state) +{ + return ___cds_wfcq_dequeue_with_state(head, tail, state, 0); +} + +/* + * ___cds_wfcq_dequeue_nonblocking: dequeue node from queue. + * + * Same as __cds_wfcq_dequeue_with_state_nonblocking, but without saving + * state. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_nonblocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail) +{ + return ___cds_wfcq_dequeue_with_state_nonblocking(head, tail, NULL); +} + +/* + * __cds_wfcq_splice: enqueue all src_q nodes at the end of dest_q. + * + * Dequeue all nodes from src_q. + * dest_q must be already initialized. + * Mutual exclusion for src_q should be ensured by the caller as + * specified in the "Synchronisation table". + * Returns enum cds_wfcq_ret which indicates the state of the src or + * dest queue. + */ +static inline enum cds_wfcq_ret +___cds_wfcq_splice( + cds_wfcq_head_ptr_t u_dest_q_head, + struct cds_wfcq_tail *dest_q_tail, + cds_wfcq_head_ptr_t u_src_q_head, + struct cds_wfcq_tail *src_q_tail, + int blocking) +{ + struct __cds_wfcq_head *dest_q_head = u_dest_q_head._h; + struct __cds_wfcq_head *src_q_head = u_src_q_head._h; + struct cds_wfcq_node *head, *tail; + int attempt = 0; + + /* + * Initial emptiness check to speed up cases where queue is + * empty: only require loads to check if queue is empty. + */ + if (_cds_wfcq_empty(__cds_wfcq_head_cast(src_q_head), src_q_tail)) + return CDS_WFCQ_RET_SRC_EMPTY; + + for (;;) { + /* + * Open-coded _cds_wfcq_empty() by testing result of + * uatomic_xchg, as well as tail pointer vs head node + * address. + */ + head = uatomic_xchg(&src_q_head->node.next, NULL); + if (head) + break; /* non-empty */ + if (CMM_LOAD_SHARED(src_q_tail->p) == &src_q_head->node) + return CDS_WFCQ_RET_SRC_EMPTY; + if (___cds_wfcq_busy_wait(&attempt, blocking)) + return CDS_WFCQ_RET_WOULDBLOCK; + } + + /* + * Memory barrier implied before uatomic_xchg() orders store to + * src_q->head before store to src_q->tail. This is required by + * concurrent enqueue on src_q, which exchanges the tail before + * updating the previous tail's next pointer. + */ + tail = uatomic_xchg(&src_q_tail->p, &src_q_head->node); + + /* + * Append the spliced content of src_q into dest_q. Does not + * require mutual exclusion on dest_q (wait-free). + */ + if (___cds_wfcq_append(__cds_wfcq_head_cast(dest_q_head), dest_q_tail, + head, tail)) + return CDS_WFCQ_RET_DEST_NON_EMPTY; + else + return CDS_WFCQ_RET_DEST_EMPTY; +} + +/* + * __cds_wfcq_splice_blocking: enqueue all src_q nodes at the end of dest_q. + * + * Dequeue all nodes from src_q. + * dest_q must be already initialized. + * Mutual exclusion for src_q should be ensured by the caller as + * specified in the "Synchronisation table". + * Returns enum cds_wfcq_ret which indicates the state of the src or + * dest queue. Never returns CDS_WFCQ_RET_WOULDBLOCK. + */ +static inline enum cds_wfcq_ret +___cds_wfcq_splice_blocking( + cds_wfcq_head_ptr_t dest_q_head, + struct cds_wfcq_tail *dest_q_tail, + cds_wfcq_head_ptr_t src_q_head, + struct cds_wfcq_tail *src_q_tail) +{ + return ___cds_wfcq_splice(dest_q_head, dest_q_tail, + src_q_head, src_q_tail, 1); +} + +/* + * __cds_wfcq_splice_nonblocking: enqueue all src_q nodes at the end of dest_q. + * + * Same as __cds_wfcq_splice_blocking, but returns + * CDS_WFCQ_RET_WOULDBLOCK if it needs to block. + */ +static inline enum cds_wfcq_ret +___cds_wfcq_splice_nonblocking( + cds_wfcq_head_ptr_t dest_q_head, + struct cds_wfcq_tail *dest_q_tail, + cds_wfcq_head_ptr_t src_q_head, + struct cds_wfcq_tail *src_q_tail) +{ + return ___cds_wfcq_splice(dest_q_head, dest_q_tail, + src_q_head, src_q_tail, 0); +} + +/* + * cds_wfcq_dequeue_with_state_blocking: dequeue a node from a wait-free queue. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Mutual exclusion with cds_wfcq_splice_blocking and dequeue lock is + * ensured. + * It is valid to reuse and free a dequeued node immediately. + */ +static inline struct cds_wfcq_node * +_cds_wfcq_dequeue_with_state_blocking(struct cds_wfcq_head *head, + struct cds_wfcq_tail *tail, int *state) +{ + struct cds_wfcq_node *retval; + + _cds_wfcq_dequeue_lock(head, tail); + retval = ___cds_wfcq_dequeue_with_state_blocking(cds_wfcq_head_cast(head), + tail, state); + _cds_wfcq_dequeue_unlock(head, tail); + return retval; +} + +/* + * cds_wfcq_dequeue_blocking: dequeue node from queue. + * + * Same as cds_wfcq_dequeue_blocking, but without saving state. + */ +static inline struct cds_wfcq_node * +_cds_wfcq_dequeue_blocking(struct cds_wfcq_head *head, + struct cds_wfcq_tail *tail) +{ + return _cds_wfcq_dequeue_with_state_blocking(head, tail, NULL); +} + +/* + * cds_wfcq_splice_blocking: enqueue all src_q nodes at the end of dest_q. + * + * Dequeue all nodes from src_q. + * dest_q must be already initialized. + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Mutual exclusion with cds_wfcq_dequeue_blocking and dequeue lock is + * ensured. + * Returns enum cds_wfcq_ret which indicates the state of the src or + * dest queue. Never returns CDS_WFCQ_RET_WOULDBLOCK. + */ +static inline enum cds_wfcq_ret +_cds_wfcq_splice_blocking( + struct cds_wfcq_head *dest_q_head, + struct cds_wfcq_tail *dest_q_tail, + struct cds_wfcq_head *src_q_head, + struct cds_wfcq_tail *src_q_tail) +{ + enum cds_wfcq_ret ret; + + _cds_wfcq_dequeue_lock(src_q_head, src_q_tail); + ret = ___cds_wfcq_splice_blocking(cds_wfcq_head_cast(dest_q_head), dest_q_tail, + cds_wfcq_head_cast(src_q_head), src_q_tail); + _cds_wfcq_dequeue_unlock(src_q_head, src_q_tail); + return ret; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _URCU_WFCQUEUE_STATIC_H */ diff --git a/contrib/userspace-rcu/static-wfstack.h b/contrib/userspace-rcu/static-wfstack.h new file mode 100644 index 00000000000..29b81c3aac3 --- /dev/null +++ b/contrib/userspace-rcu/static-wfstack.h @@ -0,0 +1,455 @@ +#ifndef _URCU_STATIC_WFSTACK_H +#define _URCU_STATIC_WFSTACK_H + +/* + * urcu/static/wfstack.h + * + * Userspace RCU library - Stack with with wait-free push, blocking traversal. + * + * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu/wfstack.h for + * linking dynamically with the userspace rcu library. + * + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/* Adapted from userspace-rcu 0.10 because version 0.7 doesn't support a stack + * without mutex. */ + +#include <pthread.h> +#include <assert.h> +#include <poll.h> +#include <stdbool.h> +#include <urcu/compiler.h> +#include <urcu/uatomic.h> + +#ifdef __cplusplus +extern "C" { +#endif + +#define CDS_WFS_END ((void *) 0x1UL) +#define CDS_WFS_ADAPT_ATTEMPTS 10 /* Retry if being set */ +#define CDS_WFS_WAIT 10 /* Wait 10 ms if being set */ + +/* + * Stack with wait-free push, blocking traversal. + * + * Stack implementing push, pop, pop_all operations, as well as iterator + * on the stack head returned by pop_all. + * + * Wait-free operations: cds_wfs_push, __cds_wfs_pop_all, cds_wfs_empty, + * cds_wfs_first. + * Blocking operations: cds_wfs_pop, cds_wfs_pop_all, cds_wfs_next, + * iteration on stack head returned by pop_all. + * + * Synchronization table: + * + * External synchronization techniques described in the API below is + * required between pairs marked with "X". No external synchronization + * required between pairs marked with "-". + * + * cds_wfs_push __cds_wfs_pop __cds_wfs_pop_all + * cds_wfs_push - - - + * __cds_wfs_pop - X X + * __cds_wfs_pop_all - X - + * + * cds_wfs_pop and cds_wfs_pop_all use an internal mutex to provide + * synchronization. + */ + +/* + * cds_wfs_node_init: initialize wait-free stack node. + */ +static inline +void _cds_wfs_node_init(struct cds_wfs_node *node) +{ + node->next = NULL; +} + +/* + * __cds_wfs_init: initialize wait-free stack. Don't pair with + * any destroy function. + */ +static inline void ___cds_wfs_init(struct __cds_wfs_stack *s) +{ + s->head = CDS_WFS_END; +} + +/* + * cds_wfs_init: initialize wait-free stack. Pair with + * cds_wfs_destroy(). + */ +static inline +void _cds_wfs_init(struct cds_wfs_stack *s) +{ + int ret; + + s->head = CDS_WFS_END; + ret = pthread_mutex_init(&s->lock, NULL); + assert(!ret); +} + +/* + * cds_wfs_destroy: destroy wait-free stack. Pair with + * cds_wfs_init(). + */ +static inline +void _cds_wfs_destroy(struct cds_wfs_stack *s) +{ + int ret = pthread_mutex_destroy(&s->lock); + assert(!ret); +} + +static inline bool ___cds_wfs_end(void *node) +{ + return node == CDS_WFS_END; +} + +/* + * cds_wfs_empty: return whether wait-free stack is empty. + * + * No memory barrier is issued. No mutual exclusion is required. + */ +static inline bool _cds_wfs_empty(cds_wfs_stack_ptr_t u_stack) +{ + struct __cds_wfs_stack *s = u_stack._s; + + return ___cds_wfs_end(CMM_LOAD_SHARED(s->head)); +} + +/* + * cds_wfs_push: push a node into the stack. + * + * Issues a full memory barrier before push. No mutual exclusion is + * required. + * + * Returns 0 if the stack was empty prior to adding the node. + * Returns non-zero otherwise. + */ +static inline +int _cds_wfs_push(cds_wfs_stack_ptr_t u_stack, struct cds_wfs_node *node) +{ + struct __cds_wfs_stack *s = u_stack._s; + struct cds_wfs_head *old_head, *new_head; + + assert(node->next == NULL); + new_head = caa_container_of(node, struct cds_wfs_head, node); + /* + * uatomic_xchg() implicit memory barrier orders earlier stores + * to node (setting it to NULL) before publication. + */ + old_head = uatomic_xchg(&s->head, new_head); + /* + * At this point, dequeuers see a NULL node->next, they should + * busy-wait until node->next is set to old_head. + */ + CMM_STORE_SHARED(node->next, &old_head->node); + return !___cds_wfs_end(old_head); +} + +/* + * Waiting for push to complete enqueue and return the next node. + */ +static inline struct cds_wfs_node * +___cds_wfs_node_sync_next(struct cds_wfs_node *node, int blocking) +{ + struct cds_wfs_node *next; + int attempt = 0; + + /* + * Adaptative busy-looping waiting for push to complete. + */ + while ((next = CMM_LOAD_SHARED(node->next)) == NULL) { + if (!blocking) + return CDS_WFS_WOULDBLOCK; + if (++attempt >= CDS_WFS_ADAPT_ATTEMPTS) { + (void) poll(NULL, 0, CDS_WFS_WAIT); /* Wait for 10ms */ + attempt = 0; + } else { + caa_cpu_relax(); + } + } + + return next; +} + +static inline +struct cds_wfs_node * +___cds_wfs_pop(cds_wfs_stack_ptr_t u_stack, int *state, int blocking) +{ + struct cds_wfs_head *head, *new_head; + struct cds_wfs_node *next; + struct __cds_wfs_stack *s = u_stack._s; + + if (state) + *state = 0; + for (;;) { + head = CMM_LOAD_SHARED(s->head); + if (___cds_wfs_end(head)) { + return NULL; + } + next = ___cds_wfs_node_sync_next(&head->node, blocking); + if (!blocking && next == CDS_WFS_WOULDBLOCK) { + return CDS_WFS_WOULDBLOCK; + } + new_head = caa_container_of(next, struct cds_wfs_head, node); + if (uatomic_cmpxchg(&s->head, head, new_head) == head) { + if (state && ___cds_wfs_end(new_head)) + *state |= CDS_WFS_STATE_LAST; + return &head->node; + } + if (!blocking) { + return CDS_WFS_WOULDBLOCK; + } + /* busy-loop if head changed under us */ + } +} + +/* + * __cds_wfs_pop_with_state_blocking: pop a node from the stack, with state. + * + * Returns NULL if stack is empty. + * + * __cds_wfs_pop_blocking needs to be synchronized using one of the + * following techniques: + * + * 1) Calling __cds_wfs_pop_blocking under rcu read lock critical + * section. The caller must wait for a grace period to pass before + * freeing the returned node or modifying the cds_wfs_node structure. + * 2) Using mutual exclusion (e.g. mutexes) to protect + * __cds_wfs_pop_blocking and __cds_wfs_pop_all callers. + * 3) Ensuring that only ONE thread can call __cds_wfs_pop_blocking() + * and __cds_wfs_pop_all(). (multi-provider/single-consumer scheme). + * + * "state" saves state flags atomically sampled with pop operation. + */ +static inline +struct cds_wfs_node * +___cds_wfs_pop_with_state_blocking(cds_wfs_stack_ptr_t u_stack, int *state) +{ + return ___cds_wfs_pop(u_stack, state, 1); +} + +static inline +struct cds_wfs_node * +___cds_wfs_pop_blocking(cds_wfs_stack_ptr_t u_stack) +{ + return ___cds_wfs_pop_with_state_blocking(u_stack, NULL); +} + +/* + * __cds_wfs_pop_with_state_nonblocking: pop a node from the stack. + * + * Same as __cds_wfs_pop_with_state_blocking, but returns + * CDS_WFS_WOULDBLOCK if it needs to block. + * + * "state" saves state flags atomically sampled with pop operation. + */ +static inline +struct cds_wfs_node * +___cds_wfs_pop_with_state_nonblocking(cds_wfs_stack_ptr_t u_stack, int *state) +{ + return ___cds_wfs_pop(u_stack, state, 0); +} + +/* + * __cds_wfs_pop_nonblocking: pop a node from the stack. + * + * Same as __cds_wfs_pop_blocking, but returns CDS_WFS_WOULDBLOCK if + * it needs to block. + */ +static inline +struct cds_wfs_node * +___cds_wfs_pop_nonblocking(cds_wfs_stack_ptr_t u_stack) +{ + return ___cds_wfs_pop_with_state_nonblocking(u_stack, NULL); +} + +/* + * __cds_wfs_pop_all: pop all nodes from a stack. + * + * __cds_wfs_pop_all does not require any synchronization with other + * push, nor with other __cds_wfs_pop_all, but requires synchronization + * matching the technique used to synchronize __cds_wfs_pop_blocking: + * + * 1) If __cds_wfs_pop_blocking is called under rcu read lock critical + * section, both __cds_wfs_pop_blocking and cds_wfs_pop_all callers + * must wait for a grace period to pass before freeing the returned + * node or modifying the cds_wfs_node structure. However, no RCU + * read-side critical section is needed around __cds_wfs_pop_all. + * 2) Using mutual exclusion (e.g. mutexes) to protect + * __cds_wfs_pop_blocking and __cds_wfs_pop_all callers. + * 3) Ensuring that only ONE thread can call __cds_wfs_pop_blocking() + * and __cds_wfs_pop_all(). (multi-provider/single-consumer scheme). + */ +static inline +struct cds_wfs_head * +___cds_wfs_pop_all(cds_wfs_stack_ptr_t u_stack) +{ + struct __cds_wfs_stack *s = u_stack._s; + struct cds_wfs_head *head; + + /* + * Implicit memory barrier after uatomic_xchg() matches implicit + * memory barrier before uatomic_xchg() in cds_wfs_push. It + * ensures that all nodes of the returned list are consistent. + * There is no need to issue memory barriers when iterating on + * the returned list, because the full memory barrier issued + * prior to each uatomic_cmpxchg, which each write to head, are + * taking care to order writes to each node prior to the full + * memory barrier after this uatomic_xchg(). + */ + head = uatomic_xchg(&s->head, CDS_WFS_END); + if (___cds_wfs_end(head)) + return NULL; + return head; +} + +/* + * cds_wfs_pop_lock: lock stack pop-protection mutex. + */ +static inline void _cds_wfs_pop_lock(struct cds_wfs_stack *s) +{ + int ret; + + ret = pthread_mutex_lock(&s->lock); + assert(!ret); +} + +/* + * cds_wfs_pop_unlock: unlock stack pop-protection mutex. + */ +static inline void _cds_wfs_pop_unlock(struct cds_wfs_stack *s) +{ + int ret; + + ret = pthread_mutex_unlock(&s->lock); + assert(!ret); +} + +/* + * Call __cds_wfs_pop_with_state_blocking with an internal pop mutex held. + */ +static inline +struct cds_wfs_node * +_cds_wfs_pop_with_state_blocking(struct cds_wfs_stack *s, int *state) +{ + struct cds_wfs_node *retnode; + + _cds_wfs_pop_lock(s); + retnode = ___cds_wfs_pop_with_state_blocking(s, state); + _cds_wfs_pop_unlock(s); + return retnode; +} + +/* + * Call _cds_wfs_pop_with_state_blocking without saving any state. + */ +static inline +struct cds_wfs_node * +_cds_wfs_pop_blocking(struct cds_wfs_stack *s) +{ + return _cds_wfs_pop_with_state_blocking(s, NULL); +} + +/* + * Call __cds_wfs_pop_all with an internal pop mutex held. + */ +static inline +struct cds_wfs_head * +_cds_wfs_pop_all_blocking(struct cds_wfs_stack *s) +{ + struct cds_wfs_head *rethead; + + _cds_wfs_pop_lock(s); + rethead = ___cds_wfs_pop_all(s); + _cds_wfs_pop_unlock(s); + return rethead; +} + +/* + * cds_wfs_first: get first node of a popped stack. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * + * Used by for-like iteration macros in urcu/wfstack.h: + * cds_wfs_for_each_blocking() + * cds_wfs_for_each_blocking_safe() + * + * Returns NULL if popped stack is empty, top stack node otherwise. + */ +static inline struct cds_wfs_node * +_cds_wfs_first(struct cds_wfs_head *head) +{ + if (___cds_wfs_end(head)) + return NULL; + return &head->node; +} + +static inline struct cds_wfs_node * +___cds_wfs_next(struct cds_wfs_node *node, int blocking) +{ + struct cds_wfs_node *next; + + next = ___cds_wfs_node_sync_next(node, blocking); + /* + * CDS_WFS_WOULDBLOCK != CSD_WFS_END, so we can check for end + * even if ___cds_wfs_node_sync_next returns CDS_WFS_WOULDBLOCK, + * and still return CDS_WFS_WOULDBLOCK. + */ + if (___cds_wfs_end(next)) + return NULL; + return next; +} + +/* + * cds_wfs_next_blocking: get next node of a popped stack. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * + * Used by for-like iteration macros in urcu/wfstack.h: + * cds_wfs_for_each_blocking() + * cds_wfs_for_each_blocking_safe() + * + * Returns NULL if reached end of popped stack, non-NULL next stack + * node otherwise. + */ +static inline struct cds_wfs_node * +_cds_wfs_next_blocking(struct cds_wfs_node *node) +{ + return ___cds_wfs_next(node, 1); +} + + +/* + * cds_wfs_next_nonblocking: get next node of a popped stack. + * + * Same as cds_wfs_next_blocking, but returns CDS_WFS_WOULDBLOCK if it + * needs to block. + */ +static inline struct cds_wfs_node * +_cds_wfs_next_nonblocking(struct cds_wfs_node *node) +{ + return ___cds_wfs_next(node, 0); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _URCU_STATIC_WFSTACK_H */ diff --git a/contrib/userspace-rcu/wfcqueue.h b/contrib/userspace-rcu/wfcqueue.h new file mode 100644 index 00000000000..0292585ac79 --- /dev/null +++ b/contrib/userspace-rcu/wfcqueue.h @@ -0,0 +1,216 @@ +#ifndef _URCU_WFCQUEUE_H +#define _URCU_WFCQUEUE_H + +/* + * urcu/wfcqueue.h + * + * Userspace RCU library - Concurrent Queue with Wait-Free Enqueue/Blocking Dequeue + * + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com> + * Copyright 2011-2012 - Lai Jiangshan <laijs@cn.fujitsu.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/* Adapted from userspace-rcu 0.10 because version 0.7 doesn't contain it. + * The non-LGPL section has been removed. */ + +#include <pthread.h> +#include <assert.h> +#include <stdbool.h> +#include <urcu/compiler.h> +#include <urcu/arch.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Concurrent queue with wait-free enqueue/blocking dequeue. + * + * This queue has been designed and implemented collaboratively by + * Mathieu Desnoyers and Lai Jiangshan. Inspired from + * half-wait-free/half-blocking queue implementation done by Paul E. + * McKenney. + */ + +#define CDS_WFCQ_WOULDBLOCK ((struct cds_wfcq_node *) -1UL) + +enum cds_wfcq_ret { + CDS_WFCQ_RET_WOULDBLOCK = -1, + CDS_WFCQ_RET_DEST_EMPTY = 0, + CDS_WFCQ_RET_DEST_NON_EMPTY = 1, + CDS_WFCQ_RET_SRC_EMPTY = 2, +}; + +enum cds_wfcq_state { + CDS_WFCQ_STATE_LAST = (1U << 0), +}; + +struct cds_wfcq_node { + struct cds_wfcq_node *next; +}; + +/* + * Do not put head and tail on the same cache-line if concurrent + * enqueue/dequeue are expected from many CPUs. This eliminates + * false-sharing between enqueue and dequeue. + */ +struct __cds_wfcq_head { + struct cds_wfcq_node node; +}; + +struct cds_wfcq_head { + struct cds_wfcq_node node; + pthread_mutex_t lock; +}; + +#ifndef __cplusplus +/* + * The transparent union allows calling functions that work on both + * struct cds_wfcq_head and struct __cds_wfcq_head on any of those two + * types. + */ +typedef union { + struct __cds_wfcq_head *_h; + struct cds_wfcq_head *h; +} __attribute__((__transparent_union__)) cds_wfcq_head_ptr_t; + +/* + * This static inline is only present for compatibility with C++. It is + * effect-less in C. + */ +static inline struct __cds_wfcq_head *__cds_wfcq_head_cast(struct __cds_wfcq_head *head) +{ + return head; +} + +/* + * This static inline is only present for compatibility with C++. It is + * effect-less in C. + */ +static inline struct cds_wfcq_head *cds_wfcq_head_cast(struct cds_wfcq_head *head) +{ + return head; +} +#else /* #ifndef __cplusplus */ + +/* C++ ignores transparent union. */ +typedef union { + struct __cds_wfcq_head *_h; + struct cds_wfcq_head *h; +} cds_wfcq_head_ptr_t; + +/* C++ ignores transparent union. Requires an explicit conversion. */ +static inline cds_wfcq_head_ptr_t __cds_wfcq_head_cast(struct __cds_wfcq_head *head) +{ + cds_wfcq_head_ptr_t ret = { ._h = head }; + return ret; +} +/* C++ ignores transparent union. Requires an explicit conversion. */ +static inline cds_wfcq_head_ptr_t cds_wfcq_head_cast(struct cds_wfcq_head *head) +{ + cds_wfcq_head_ptr_t ret = { .h = head }; + return ret; +} +#endif /* #else #ifndef __cplusplus */ + +struct cds_wfcq_tail { + struct cds_wfcq_node *p; +}; + +#include "static-wfcqueue.h" + +#define cds_wfcq_node_init _cds_wfcq_node_init +#define cds_wfcq_init _cds_wfcq_init +#define __cds_wfcq_init ___cds_wfcq_init +#define cds_wfcq_destroy _cds_wfcq_destroy +#define cds_wfcq_empty _cds_wfcq_empty +#define cds_wfcq_enqueue _cds_wfcq_enqueue + +/* Dequeue locking */ +#define cds_wfcq_dequeue_lock _cds_wfcq_dequeue_lock +#define cds_wfcq_dequeue_unlock _cds_wfcq_dequeue_unlock + +/* Locking performed within cds_wfcq calls. */ +#define cds_wfcq_dequeue_blocking _cds_wfcq_dequeue_blocking +#define cds_wfcq_dequeue_with_state_blocking \ + _cds_wfcq_dequeue_with_state_blocking +#define cds_wfcq_splice_blocking _cds_wfcq_splice_blocking +#define cds_wfcq_first_blocking _cds_wfcq_first_blocking +#define cds_wfcq_next_blocking _cds_wfcq_next_blocking + +/* Locking ensured by caller by holding cds_wfcq_dequeue_lock() */ +#define __cds_wfcq_dequeue_blocking ___cds_wfcq_dequeue_blocking +#define __cds_wfcq_dequeue_with_state_blocking \ + ___cds_wfcq_dequeue_with_state_blocking +#define __cds_wfcq_splice_blocking ___cds_wfcq_splice_blocking +#define __cds_wfcq_first_blocking ___cds_wfcq_first_blocking +#define __cds_wfcq_next_blocking ___cds_wfcq_next_blocking + +/* + * Locking ensured by caller by holding cds_wfcq_dequeue_lock(). + * Non-blocking: deque, first, next return CDS_WFCQ_WOULDBLOCK if they + * need to block. splice returns nonzero if it needs to block. + */ +#define __cds_wfcq_dequeue_nonblocking ___cds_wfcq_dequeue_nonblocking +#define __cds_wfcq_dequeue_with_state_nonblocking \ + ___cds_wfcq_dequeue_with_state_nonblocking +#define __cds_wfcq_splice_nonblocking ___cds_wfcq_splice_nonblocking +#define __cds_wfcq_first_nonblocking ___cds_wfcq_first_nonblocking +#define __cds_wfcq_next_nonblocking ___cds_wfcq_next_nonblocking + +/* + * __cds_wfcq_for_each_blocking: Iterate over all nodes in a queue, + * without dequeuing them. + * @head: head of the queue (struct cds_wfcq_head or __cds_wfcq_head pointer). + * @tail: tail of the queue (struct cds_wfcq_tail pointer). + * @node: iterator on the queue (struct cds_wfcq_node pointer). + * + * Content written into each node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + */ +#define __cds_wfcq_for_each_blocking(head, tail, node) \ + for (node = __cds_wfcq_first_blocking(head, tail); \ + node != NULL; \ + node = __cds_wfcq_next_blocking(head, tail, node)) + +/* + * __cds_wfcq_for_each_blocking_safe: Iterate over all nodes in a queue, + * without dequeuing them. Safe against deletion. + * @head: head of the queue (struct cds_wfcq_head or __cds_wfcq_head pointer). + * @tail: tail of the queue (struct cds_wfcq_tail pointer). + * @node: iterator on the queue (struct cds_wfcq_node pointer). + * @n: struct cds_wfcq_node pointer holding the next pointer (used + * internally). + * + * Content written into each node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + */ +#define __cds_wfcq_for_each_blocking_safe(head, tail, node, n) \ + for (node = __cds_wfcq_first_blocking(head, tail), \ + n = (node ? __cds_wfcq_next_blocking(head, tail, node) : NULL); \ + node != NULL; \ + node = n, n = (node ? __cds_wfcq_next_blocking(head, tail, node) : NULL)) + +#ifdef __cplusplus +} +#endif + +#endif /* _URCU_WFCQUEUE_H */ diff --git a/contrib/userspace-rcu/wfstack.h b/contrib/userspace-rcu/wfstack.h new file mode 100644 index 00000000000..738fd1cfd33 --- /dev/null +++ b/contrib/userspace-rcu/wfstack.h @@ -0,0 +1,178 @@ +#ifndef _URCU_WFSTACK_H +#define _URCU_WFSTACK_H + +/* + * urcu/wfstack.h + * + * Userspace RCU library - Stack with wait-free push, blocking traversal. + * + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/* Adapted from userspace-rcu 0.10 because version 0.7 doesn't support a stack + * without mutex. The non-LGPL section has been removed. */ + +#include <pthread.h> +#include <assert.h> +#include <stdbool.h> +#include <urcu/compiler.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Stack with wait-free push, blocking traversal. + * + * Stack implementing push, pop, pop_all operations, as well as iterator + * on the stack head returned by pop_all. + * + * Wait-free operations: cds_wfs_push, __cds_wfs_pop_all, cds_wfs_empty, + * cds_wfs_first. + * Blocking operations: cds_wfs_pop, cds_wfs_pop_all, cds_wfs_next, + * iteration on stack head returned by pop_all. + * + * Synchronization table: + * + * External synchronization techniques described in the API below is + * required between pairs marked with "X". No external synchronization + * required between pairs marked with "-". + * + * cds_wfs_push __cds_wfs_pop __cds_wfs_pop_all + * cds_wfs_push - - - + * __cds_wfs_pop - X X + * __cds_wfs_pop_all - X - + * + * cds_wfs_pop and cds_wfs_pop_all use an internal mutex to provide + * synchronization. + */ + +#define CDS_WFS_WOULDBLOCK ((void *) -1UL) + +enum cds_wfs_state { + CDS_WFS_STATE_LAST = (1U << 0), +}; + +/* + * struct cds_wfs_node is returned by __cds_wfs_pop, and also used as + * iterator on stack. It is not safe to dereference the node next + * pointer when returned by __cds_wfs_pop_blocking. + */ +struct cds_wfs_node { + struct cds_wfs_node *next; +}; + +/* + * struct cds_wfs_head is returned by __cds_wfs_pop_all, and can be used + * to begin iteration on the stack. "node" needs to be the first field of + * cds_wfs_head, so the end-of-stack pointer value can be used for both + * types. + */ +struct cds_wfs_head { + struct cds_wfs_node node; +}; + +struct __cds_wfs_stack { + struct cds_wfs_head *head; +}; + +struct cds_wfs_stack { + struct cds_wfs_head *head; + pthread_mutex_t lock; +}; + +/* + * The transparent union allows calling functions that work on both + * struct cds_wfs_stack and struct __cds_wfs_stack on any of those two + * types. + */ +typedef union { + struct __cds_wfs_stack *_s; + struct cds_wfs_stack *s; +} __attribute__((__transparent_union__)) cds_wfs_stack_ptr_t; + +#include "static-wfstack.h" + +#define cds_wfs_node_init _cds_wfs_node_init +#define cds_wfs_init _cds_wfs_init +#define cds_wfs_destroy _cds_wfs_destroy +#define __cds_wfs_init ___cds_wfs_init +#define cds_wfs_empty _cds_wfs_empty +#define cds_wfs_push _cds_wfs_push + +/* Locking performed internally */ +#define cds_wfs_pop_blocking _cds_wfs_pop_blocking +#define cds_wfs_pop_with_state_blocking _cds_wfs_pop_with_state_blocking +#define cds_wfs_pop_all_blocking _cds_wfs_pop_all_blocking + +/* + * For iteration on cds_wfs_head returned by __cds_wfs_pop_all or + * cds_wfs_pop_all_blocking. + */ +#define cds_wfs_first _cds_wfs_first +#define cds_wfs_next_blocking _cds_wfs_next_blocking +#define cds_wfs_next_nonblocking _cds_wfs_next_nonblocking + +/* Pop locking with internal mutex */ +#define cds_wfs_pop_lock _cds_wfs_pop_lock +#define cds_wfs_pop_unlock _cds_wfs_pop_unlock + +/* Synchronization ensured by the caller. See synchronization table. */ +#define __cds_wfs_pop_blocking ___cds_wfs_pop_blocking +#define __cds_wfs_pop_with_state_blocking \ + ___cds_wfs_pop_with_state_blocking +#define __cds_wfs_pop_nonblocking ___cds_wfs_pop_nonblocking +#define __cds_wfs_pop_with_state_nonblocking \ + ___cds_wfs_pop_with_state_nonblocking +#define __cds_wfs_pop_all ___cds_wfs_pop_all + +#ifdef __cplusplus +} +#endif + +/* + * cds_wfs_for_each_blocking: Iterate over all nodes returned by + * __cds_wfs_pop_all(). + * @head: head of the queue (struct cds_wfs_head pointer). + * @node: iterator (struct cds_wfs_node pointer). + * + * Content written into each node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + */ +#define cds_wfs_for_each_blocking(head, node) \ + for (node = cds_wfs_first(head); \ + node != NULL; \ + node = cds_wfs_next_blocking(node)) + +/* + * cds_wfs_for_each_blocking_safe: Iterate over all nodes returned by + * __cds_wfs_pop_all(). Safe against deletion. + * @head: head of the queue (struct cds_wfs_head pointer). + * @node: iterator (struct cds_wfs_node pointer). + * @n: struct cds_wfs_node pointer holding the next pointer (used + * internally). + * + * Content written into each node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + */ +#define cds_wfs_for_each_blocking_safe(head, node, n) \ + for (node = cds_wfs_first(head), \ + n = (node ? cds_wfs_next_blocking(node) : NULL); \ + node != NULL; \ + node = n, n = (node ? cds_wfs_next_blocking(node) : NULL)) + +#endif /* _URCU_WFSTACK_H */ diff --git a/glusterfsd/src/glusterfsd.c b/glusterfsd/src/glusterfsd.c index ae74de73f29..9ee88b6d57b 100644 --- a/glusterfsd/src/glusterfsd.c +++ b/glusterfsd/src/glusterfsd.c @@ -203,6 +203,8 @@ static struct argp_option gf_options[] = { "Instantiate process global timer-wheel"}, {"thin-client", ARGP_THIN_CLIENT_KEY, 0, 0, "Enables thin mount and connects via gfproxyd daemon"}, + {"global-threading", ARGP_GLOBAL_THREADING_KEY, "BOOL", OPTION_ARG_OPTIONAL, + "Use the global thread pool instead of io-threads"}, {0, 0, 0, 0, "Fuse options:"}, {"direct-io-mode", ARGP_DIRECT_IO_MODE_KEY, "BOOL|auto", @@ -697,6 +699,14 @@ set_fuse_mount_options(glusterfs_ctx_t *ctx, dict_t *options) cmd_args->fuse_flush_handle_interrupt); break; } + if (cmd_args->global_threading) { + ret = dict_set_static_ptr(options, "global-threading", "on"); + if (ret < 0) { + gf_msg("glusterfsd", GF_LOG_ERROR, 0, glusterfsd_msg_4, + "failed to set dict value for key global-threading"); + goto err; + } + } ret = 0; err: @@ -1497,6 +1507,7 @@ parse_opts(int key, char *arg, struct argp_state *state) "unknown fuse flush handle interrupt setting \"%s\"", arg); break; + case ARGP_FUSE_AUTO_INVAL_KEY: if (!arg) arg = "yes"; @@ -1507,6 +1518,20 @@ parse_opts(int key, char *arg, struct argp_state *state) } break; + + case ARGP_GLOBAL_THREADING_KEY: + if (!arg || (*arg == 0)) { + arg = "yes"; + } + + if (gf_string2boolean(arg, &b) == 0) { + cmd_args->global_threading = b; + break; + } + + argp_failure(state, -1, 0, + "Invalid value for global threading \"%s\"", arg); + break; } return 0; } @@ -2435,6 +2460,10 @@ glusterfs_signals_setup(glusterfs_ctx_t *ctx) sigaddset(&set, SIGUSR1); /* gf_proc_dump_info */ sigaddset(&set, SIGUSR2); + /* Signals needed for asynchronous framework. */ + sigaddset(&set, GF_ASYNC_SIGQUEUE); + sigaddset(&set, GF_ASYNC_SIGCTRL); + ret = pthread_sigmask(SIG_BLOCK, &set, NULL); if (ret) { gf_msg("glusterfsd", GF_LOG_WARNING, errno, glusterfsd_msg_22, @@ -2845,6 +2874,11 @@ main(int argc, char *argv[]) */ mem_pools_init_late(); + ret = gf_async_init(ctx); + if (ret < 0) { + goto out; + } + #ifdef GF_LINUX_HOST_OS ret = set_oom_score_adj(ctx); if (ret) @@ -2871,6 +2905,7 @@ main(int argc, char *argv[]) ret = event_dispatch(ctx->event_pool); out: - // glusterfs_ctx_destroy (ctx); + // glusterfs_ctx_destroy (ctx); + gf_async_fini(); return ret; } diff --git a/glusterfsd/src/glusterfsd.h b/glusterfsd/src/glusterfsd.h index 35cf6d88b7a..c91daa0fb54 100644 --- a/glusterfsd/src/glusterfsd.h +++ b/glusterfsd/src/glusterfsd.h @@ -111,6 +111,7 @@ enum argp_option_keys { ARGP_FUSE_FLUSH_HANDLE_INTERRUPT_KEY = 189, ARGP_FUSE_LRU_LIMIT_KEY = 190, ARGP_FUSE_AUTO_INVAL_KEY = 191, + ARGP_GLOBAL_THREADING_KEY = 192 }; struct _gfd_vol_top_priv { diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am index 970f4b74978..f030a70b0f0 100644 --- a/libglusterfs/src/Makefile.am +++ b/libglusterfs/src/Makefile.am @@ -37,7 +37,7 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \ $(CONTRIBDIR)/timer-wheel/timer-wheel.c \ $(CONTRIBDIR)/timer-wheel/find_last_bit.c default-args.c locking.c \ $(CONTRIBDIR)/xxhash/xxhash.c \ - compound-fop-utils.c throttle-tbf.c monitoring.c + compound-fop-utils.c throttle-tbf.c monitoring.c async.c nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c defaults.c nodist_libglusterfs_la_HEADERS = y.tab.h protocol-common.h @@ -69,7 +69,7 @@ libglusterfs_la_HEADERS = glusterfs/common-utils.h glusterfs/defaults.h \ glusterfs/quota-common-utils.h glusterfs/rot-buffs.h \ glusterfs/compat-uuid.h glusterfs/upcall-utils.h glusterfs/throttle-tbf.h \ glusterfs/events.h glusterfs/compound-fop-utils.h glusterfs/atomic.h \ - glusterfs/monitoring.h + glusterfs/monitoring.h glusterfs/async.h libglusterfs_ladir = $(includedir)/glusterfs @@ -79,6 +79,10 @@ noinst_HEADERS = unittest/unittest.h \ $(CONTRIBDIR)/libexecinfo/execinfo_compat.h \ $(CONTRIBDIR)/timer-wheel/timer-wheel.h \ $(CONTRIBDIR)/xxhash/xxhash.h \ + $(CONTRIBDIR)/userspace-rcu/wfcqueue.h \ + $(CONTRIBDIR)/userspace-rcu/wfstack.h \ + $(CONTRIBDIR)/userspace-rcu/static-wfcqueue.h \ + $(CONTRIBDIR)/userspace-rcu/static-wfstack.h \ tier-ctr-interface.h eventtypes.h: $(top_srcdir)/events/eventskeygen.py diff --git a/libglusterfs/src/async.c b/libglusterfs/src/async.c new file mode 100644 index 00000000000..ae7152ff7fa --- /dev/null +++ b/libglusterfs/src/async.c @@ -0,0 +1,723 @@ +/* + Copyright (c) 2019 Red Hat, Inc <https://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +/* To implement an efficient thread pool with minimum contention we have used + * the following ideas: + * + * - The queue of jobs has been implemented using a Wait-Free queue provided + * by the userspace-rcu library. This queue requires a mutex when multiple + * consumers can be extracting items from it concurrently, but the locked + * region is very small, which minimizes the chances of contention. To + * further minimize contention, the number of active worker threads that + * are accessing the queue is dynamically adjusted so that we always have + * the minimum required amount of workers contending for the queue. Adding + * new items can be done with a single atomic operation, without locks. + * + * - All queue management operations, like creating more threads, enabling + * sleeping ones, etc. are done by a single thread. This makes it possible + * to manage all scaling related information and workers lists without + * locks. This functionality is implemented as a role that can be assigned + * to any of the worker threads, which avoids that some lengthy operations + * could interfere with this task. + * + * - Management is based on signals. We used signals for management tasks to + * avoid multiple system calls for each request (with signals we can wait + * for multiple events and get some additional data for each request in a + * single call, instead of first polling and then reading). + * + * TODO: There are some other changes that can take advantage of this new + * thread pool. + * + * - Use this thread pool as the core threading model for synctasks. I + * think this would improve synctask performance because I think we + * currently have some contention there for some workloads. + * + * - Implement a per thread timer that will allow adding and removing + * timers without using mutexes. + * + * - Integrate with userspace-rcu library in QSBR mode, allowing + * other portions of code to be implemented using RCU-based + * structures with a extremely fast read side without contention. + * + * - Integrate I/O into the thread pool so that the thread pool is + * able to efficiently manage all loads and scale dynamically. This + * could make it possible to minimize context switching when serving + * requests from fuse or network. + * + * - Dynamically scale the number of workers based on system load. + * This will make it possible to reduce contention when system is + * heavily loaded, improving performance under these circumstances + * (or minimizing performance loss). This will also make it possible + * that gluster can coexist with other processes that also consume + * CPU, with minimal interference from each other. + */ + +#include <unistd.h> +#include <pthread.h> +#include <errno.h> + +//#include <urcu/uatomic.h> + +#include "glusterfs/logging.h" +#include "glusterfs/list.h" +#include "glusterfs/mem-types.h" +#include "glusterfs/async.h" + +/* These macros wrap a simple system/library call to check the returned error + * and log a message in case of failure. */ +#define GF_ASYNC_CHECK(_func, _args...) \ + ({ \ + int32_t __async_error = -_func(_args); \ + if (caa_unlikely(__async_error != 0)) { \ + gf_async_error(__async_error, #_func "() failed."); \ + } \ + __async_error; \ + }) + +#define GF_ASYNC_CHECK_ERRNO(_func, _args...) \ + ({ \ + int32_t __async_error = _func(_args); \ + if (caa_unlikely(__async_error < 0)) { \ + __async_error = -errno; \ + gf_async_error(__async_error, #_func "() failed."); \ + } \ + __async_error; \ + }) + +/* These macros are used when, based on POSIX documentation, the function + * should never fail under the conditions we are using it. So any unexpected + * error will be handled as a fatal event. It probably means a critical bug + * or memory corruption. In both cases we consider that stopping the process + * is safer (otherwise it could cause more corruption with unknown effects + * that could be worse). */ +#define GF_ASYNC_CANTFAIL(_func, _args...) \ + do { \ + int32_t __async_error = -_func(_args); \ + if (caa_unlikely(__async_error != 0)) { \ + gf_async_fatal(__async_error, #_func "() failed"); \ + } \ + } while (0) + +#define GF_ASYNC_CANTFAIL_ERRNO(_func, _args...) \ + ({ \ + int32_t __async_error = _func(_args); \ + if (caa_unlikely(__async_error < 0)) { \ + __async_error = -errno; \ + gf_async_fatal(__async_error, #_func "() failed"); \ + } \ + __async_error; \ + }) + +/* TODO: for now we allocate a static array of workers. There's an issue if we + * try to use dynamic memory since these workers are initialized very + * early in the process startup and it seems that sometimes not all is + * ready to use dynamic memory. */ +static gf_async_worker_t gf_async_workers[GF_ASYNC_MAX_THREADS]; + +/* This is the only global variable needed to manage the entire framework. */ +gf_async_control_t gf_async_ctrl = {}; + +static __thread gf_async_worker_t *gf_async_current_worker = NULL; + +/* The main function of the worker threads. */ +static void * +gf_async_worker(void *arg); + +static void +gf_async_sync_init(void) +{ + GF_ASYNC_CANTFAIL(pthread_barrier_init, &gf_async_ctrl.sync, NULL, 2); +} + +static void +gf_async_sync_now(void) +{ + int32_t ret; + + ret = pthread_barrier_wait(&gf_async_ctrl.sync); + if (ret == PTHREAD_BARRIER_SERIAL_THREAD) { + GF_ASYNC_CANTFAIL(pthread_barrier_destroy, &gf_async_ctrl.sync); + ret = 0; + } + if (caa_unlikely(ret != 0)) { + gf_async_fatal(-ret, "pthread_barrier_wait() failed"); + } +} + +static void +gf_async_sigmask_empty(sigset_t *mask) +{ + GF_ASYNC_CANTFAIL_ERRNO(sigemptyset, mask); +} + +static void +gf_async_sigmask_add(sigset_t *mask, int32_t signal) +{ + GF_ASYNC_CANTFAIL_ERRNO(sigaddset, mask, signal); +} + +static void +gf_async_sigmask_set(int32_t mode, sigset_t *mask, sigset_t *old) +{ + GF_ASYNC_CANTFAIL(pthread_sigmask, mode, mask, old); +} + +static void +gf_async_sigaction(int32_t signum, const struct sigaction *action, + struct sigaction *old) +{ + GF_ASYNC_CANTFAIL_ERRNO(sigaction, signum, action, old); +} + +static int32_t +gf_async_sigwait(sigset_t *set) +{ + int32_t ret, signum; + + do { + ret = sigwait(set, &signum); + } while (caa_unlikely((ret < 0) && (errno == EINTR))); + + if (caa_unlikely(ret < 0)) { + ret = -errno; + gf_async_fatal(ret, "sigwait() failed"); + } + + return signum; +} + +static int32_t +gf_async_sigtimedwait(sigset_t *set, struct timespec *timeout) +{ + int32_t ret; + + do { + ret = sigtimedwait(set, NULL, timeout); + } while (caa_unlikely((ret < 0) && (errno == EINTR))); + if (caa_unlikely(ret < 0)) { + ret = -errno; + /* EAGAIN means that the timeout has expired, so we allow this error. + * Any other error shouldn't happen. */ + if (caa_unlikely(ret != -EAGAIN)) { + gf_async_fatal(ret, "sigtimedwait() failed"); + } + ret = 0; + } + + return ret; +} + +static void +gf_async_sigbroadcast(int32_t signum) +{ + GF_ASYNC_CANTFAIL_ERRNO(kill, gf_async_ctrl.pid, signum); +} + +static void +gf_async_signal_handler(int32_t signum) +{ + /* We should never handle a signal in this function. */ + gf_async_fatal(-EBUSY, + "Unexpected processing of signal %d through a handler.", + signum); +} + +static void +gf_async_signal_setup(void) +{ + struct sigaction action; + + /* We configure all related signals so that we can detect threads using an + * invalid signal mask that doesn't block our critical signal. */ + memset(&action, 0, sizeof(action)); + action.sa_handler = gf_async_signal_handler; + + gf_async_sigaction(GF_ASYNC_SIGCTRL, &action, &gf_async_ctrl.handler_ctrl); + + gf_async_sigaction(GF_ASYNC_SIGQUEUE, &action, + &gf_async_ctrl.handler_queue); +} + +static void +gf_async_signal_restore(void) +{ + /* Handlers we have previously changed are restored back to their original + * value. */ + + if (gf_async_ctrl.handler_ctrl.sa_handler != gf_async_signal_handler) { + gf_async_sigaction(GF_ASYNC_SIGCTRL, &gf_async_ctrl.handler_ctrl, NULL); + } + + if (gf_async_ctrl.handler_queue.sa_handler != gf_async_signal_handler) { + gf_async_sigaction(GF_ASYNC_SIGQUEUE, &gf_async_ctrl.handler_queue, + NULL); + } +} + +static void +gf_async_signal_flush(void) +{ + struct timespec delay; + + delay.tv_sec = 0; + delay.tv_nsec = 0; + + /* We read all pending signals so that they don't trigger once the signal + * mask of some thread is changed. */ + while (gf_async_sigtimedwait(&gf_async_ctrl.sigmask_ctrl, &delay) > 0) { + } + while (gf_async_sigtimedwait(&gf_async_ctrl.sigmask_queue, &delay) > 0) { + } +} + +static int32_t +gf_async_thread_create(pthread_t *thread, int32_t id, void *data) +{ + int32_t ret; + + ret = gf_thread_create(thread, NULL, gf_async_worker, data, + GF_ASYNC_THREAD_NAME "%u", id); + if (caa_unlikely(ret < 0)) { + /* TODO: gf_thread_create() should return a more specific error + * code. */ + return -ENOMEM; + } + + return 0; +} + +static void +gf_async_thread_wait(pthread_t thread) +{ + /* TODO: this is a blocking call executed inside one of the workers of the + * thread pool. This is bad, but this is only executed once we have + * received a notification from the thread that it's terminating, so + * this should return almost immediately. However, to be more robust + * it would be better to use pthread_timedjoin_np() (or even a call + * to pthread_tryjoin_np() followed by a delayed recheck if it + * fails), but they are not portable. We should see how to do this + * in other platforms. */ + GF_ASYNC_CANTFAIL(pthread_join, thread, NULL); +} + +static int32_t +gf_async_worker_create(void) +{ + struct cds_wfs_node *node; + gf_async_worker_t *worker; + uint32_t counts, running, max; + int32_t ret; + + node = __cds_wfs_pop_blocking(&gf_async_ctrl.available); + if (caa_unlikely(node == NULL)) { + /* There are no more available workers. We have all threads running. */ + return 1; + } + cds_wfs_node_init(node); + + ret = 1; + + counts = uatomic_read(&gf_async_ctrl.counts); + max = uatomic_read(&gf_async_ctrl.max_threads); + running = GF_ASYNC_COUNT_RUNNING(counts); + if (running < max) { + uatomic_add(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(1, 0)); + + worker = caa_container_of(node, gf_async_worker_t, stack); + + ret = gf_async_thread_create(&worker->thread, worker->id, worker); + if (caa_likely(ret >= 0)) { + return 0; + } + + uatomic_add(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(-1, 0)); + } + + cds_wfs_push(&gf_async_ctrl.available, node); + + return ret; +} + +static void +gf_async_worker_enable(void) +{ + /* This will wake one of the spare workers. If all workers are busy now, + * the signal will be queued so that the first one that completes its + * work will become the leader. */ + gf_async_sigbroadcast(GF_ASYNC_SIGCTRL); + + /* We have consumed a spare worker. We create another one for future + * needs. */ + gf_async_worker_create(); +} + +static void +gf_async_worker_wait(void) +{ + int32_t signum; + + signum = gf_async_sigwait(&gf_async_ctrl.sigmask_ctrl); + if (caa_unlikely(signum != GF_ASYNC_SIGCTRL)) { + gf_async_fatal(-EINVAL, "Worker received an unexpected signal (%d)", + signum); + } +} + +static void +gf_async_leader_wait(void) +{ + int32_t signum; + + signum = gf_async_sigwait(&gf_async_ctrl.sigmask_queue); + if (caa_unlikely(signum != GF_ASYNC_SIGQUEUE)) { + gf_async_fatal(-EINVAL, "Leader received an unexpected signal (%d)", + signum); + } +} + +static void +gf_async_run(struct cds_wfcq_node *node) +{ + gf_async_t *async; + + /* We've just got work from the queue. Process it. */ + async = caa_container_of(node, gf_async_t, queue); + /* TODO: remove dependency from THIS and xl. */ + THIS = async->xl; + async->cbk(async->xl, async); +} + +static void +gf_async_worker_run(void) +{ + struct cds_wfcq_node *node; + + do { + /* We keep executing jobs from the queue while it's not empty. Note + * that while we do this, we are ignoring any stop request. That's + * fine, since we need to process our own 'join' messages to fully + * terminate all threads. Note that normal jobs should have already + * completed once a stop request is received. */ + node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, + &gf_async_ctrl.queue.tail); + if (node != NULL) { + gf_async_run(node); + } + } while (node != NULL); + + /* TODO: I've tried to keep the worker looking at the queue for some small + * amount of time in a busy loop to see if more jobs come soon. With + * this I attempted to avoid the overhead of signal management if + * jobs come fast enough. However experimental results seem to + * indicate that doing this, CPU utilization grows and performance + * is actually reduced. We need to see if that's because I used bad + * parameters or it's really better to do it as it's done now. */ +} + +static void +gf_async_leader_run(void) +{ + struct cds_wfcq_node *node; + + node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, + &gf_async_ctrl.queue.tail); + while (caa_unlikely(node == NULL)) { + gf_async_leader_wait(); + + node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, + &gf_async_ctrl.queue.tail); + } + + /* Activate the next available worker thread. It will become the new + * leader. */ + gf_async_worker_enable(); + + gf_async_run(node); +} + +static uint32_t +gf_async_stop_check(gf_async_worker_t *worker) +{ + uint32_t counts, old, running, max; + + /* First we check if we should stop without doing any costly atomic + * operation. */ + old = uatomic_read(&gf_async_ctrl.counts); + max = uatomic_read(&gf_async_ctrl.max_threads); + running = GF_ASYNC_COUNT_RUNNING(old); + while (running > max) { + /* There are too many threads. We try to stop the current worker. */ + counts = uatomic_cmpxchg(&gf_async_ctrl.counts, old, + old + GF_ASYNC_COUNTS(-1, 1)); + if (old != counts) { + /* Another thread has just updated the counts. We need to retry. */ + old = counts; + running = GF_ASYNC_COUNT_RUNNING(old); + + continue; + } + + running--; + worker->running = false; + } + + return running; +} + +static void +gf_async_stop_all(xlator_t *xl, gf_async_t *async) +{ + if (gf_async_stop_check(gf_async_current_worker) > 0) { + /* There are more workers running. We propagate the stop request to + * them. */ + gf_async(async, xl, gf_async_stop_all); + } +} + +static void +gf_async_join(xlator_t *xl, gf_async_t *async) +{ + gf_async_worker_t *worker; + + worker = caa_container_of(async, gf_async_worker_t, async); + + gf_async_thread_wait(worker->thread); + + cds_wfs_push(&gf_async_ctrl.available, &worker->stack); +} + +static void +gf_async_terminate(gf_async_worker_t *worker) +{ + uint32_t counts; + + counts = uatomic_add_return(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(0, -1)); + if (counts == 0) { + /* This is the termination of the last worker thread. We need to + * synchronize the main thread that is waiting for all workers to + * finish. */ + gf_async_ctrl.sync_thread = worker->thread; + + gf_async_sync_now(); + } else { + /* Force someone else to join this thread to release resources. */ + gf_async(&worker->async, THIS, gf_async_join); + } +} + +static void * +gf_async_worker(void *arg) +{ + gf_async_worker_t *worker; + + worker = (gf_async_worker_t *)arg; + gf_async_current_worker = worker; + + worker->running = true; + do { + /* This thread does nothing until someone enables it to become a + * leader. */ + gf_async_worker_wait(); + + /* This thread is now a leader. It will process jobs from the queue + * and, if necessary, enable another worker and transfer leadership + * to it. */ + gf_async_leader_run(); + + /* This thread is not a leader anymore. It will continue processing + * queued jobs until it becomes empty. */ + gf_async_worker_run(); + + /* Stop the current thread if there are too many threads running. */ + gf_async_stop_check(worker); + } while (worker->running); + + gf_async_terminate(worker); + + return NULL; +} + +static void +gf_async_cleanup(void) +{ + /* We do some basic initialization of the global variable 'gf_async_ctrl' + * so that it's put into a relatively consistent state. */ + + gf_async_ctrl.enabled = false; + + gf_async_ctrl.pid = 0; + gf_async_sigmask_empty(&gf_async_ctrl.sigmask_ctrl); + gf_async_sigmask_empty(&gf_async_ctrl.sigmask_queue); + + /* This is used to later detect if the handler of these signals have been + * changed or not. */ + gf_async_ctrl.handler_ctrl.sa_handler = gf_async_signal_handler; + gf_async_ctrl.handler_queue.sa_handler = gf_async_signal_handler; + + gf_async_ctrl.table = NULL; + gf_async_ctrl.max_threads = 0; + gf_async_ctrl.counts = 0; +} + +void +gf_async_fini(void) +{ + gf_async_t async; + + if (uatomic_read(&gf_async_ctrl.counts) != 0) { + /* We ensure that all threads will quit on the next check. */ + gf_async_ctrl.max_threads = 0; + + /* Send the stop request to the thread pool. This will cause the + * execution of gf_async_stop_all() by one of the worker threads which, + * eventually, will terminate all worker threads. */ + gf_async(&async, THIS, gf_async_stop_all); + + /* We synchronize here with the last thread. */ + gf_async_sync_now(); + + /* We have just synchronized with the latest thread. Now just wait for + * it to terminate. */ + gf_async_thread_wait(gf_async_ctrl.sync_thread); + + gf_async_signal_flush(); + } + + gf_async_signal_restore(); + + gf_async_cleanup(); +} + +void +gf_async_adjust_threads(int32_t threads) +{ + if (threads == 0) { + /* By default we allow a maximum of 2 * #cores worker threads. This + * value is to try to accommodate threads that will do some I/O. Having + * more threads than cores we can keep CPU busy even if some threads + * are blocked for I/O. In the most efficient case, we can have #cores + * computing threads and #cores blocked threads on I/O. However this is + * hard to achieve because we can end with more than #cores computing + * threads, which won't provide a real benefit and will increase + * contention. + * + * TODO: implement a more intelligent dynamic maximum based on CPU + * usage and/or system load. */ + threads = sysconf(_SC_NPROCESSORS_ONLN) * 2; + if (threads < 0) { + /* If we can't get the current number of processors, we pick a + * random number. */ + threads = 16; + } + } + if (threads > GF_ASYNC_MAX_THREADS) { + threads = GF_ASYNC_MAX_THREADS; + } + uatomic_set(&gf_async_ctrl.max_threads, threads); +} + +int32_t +gf_async_init(glusterfs_ctx_t *ctx) +{ + sigset_t set; + gf_async_worker_t *worker; + uint32_t i; + int32_t ret; + bool running; + + gf_async_cleanup(); + + if (!ctx->cmd_args.global_threading || + (ctx->process_mode == GF_GLUSTERD_PROCESS)) { + return 0; + } + + /* At the init time, the maximum number of threads has not yet been + * configured. We use a small starting value that will be layer dynamically + * adjusted when ctx->config.max_threads is updated. */ + gf_async_adjust_threads(GF_ASYNC_SPARE_THREADS + 1); + + gf_async_ctrl.pid = getpid(); + + __cds_wfs_init(&gf_async_ctrl.available); + cds_wfcq_init(&gf_async_ctrl.queue.head, &gf_async_ctrl.queue.tail); + + gf_async_sync_init(); + + /* TODO: it would be cleaner to use dynamic memory, but at this point some + * memory management resources are not yet initialized. */ + gf_async_ctrl.table = gf_async_workers; + + /* We keep all workers in a stack. It will be used when a new thread needs + * to be created. */ + for (i = GF_ASYNC_MAX_THREADS; i > 0; i--) { + worker = &gf_async_ctrl.table[i - 1]; + + worker->id = i - 1; + cds_wfs_node_init(&worker->stack); + cds_wfs_push(&gf_async_ctrl.available, &worker->stack); + } + + /* Prepare the signal mask for regular workers and the leader. */ + gf_async_sigmask_add(&gf_async_ctrl.sigmask_ctrl, GF_ASYNC_SIGCTRL); + gf_async_sigmask_add(&gf_async_ctrl.sigmask_queue, GF_ASYNC_SIGQUEUE); + + /* TODO: this is needed to block our special signals in the current thread + * and all children that it starts. It would be cleaner to do it when + * signals are initialized, but there doesn't seem to be a unique + * place to do that, so for now we do it here. */ + gf_async_sigmask_empty(&set); + gf_async_sigmask_add(&set, GF_ASYNC_SIGCTRL); + gf_async_sigmask_add(&set, GF_ASYNC_SIGQUEUE); + gf_async_sigmask_set(SIG_BLOCK, &set, NULL); + + /* Configure the signal handlers. This is mostly for safety, not really + * needed, but it doesn't hurt. Note that the caller must ensure that the + * signals we need to run are already blocked in any thread already + * started. Otherwise this won't work. */ + gf_async_signal_setup(); + + running = false; + + /* We start the spare workers + 1 for the leader. */ + for (i = 0; i < GF_ASYNC_SPARE_THREADS; i++) { + ret = gf_async_worker_create(); + if (caa_unlikely(ret < 0)) { + /* This is the initial start up so we enforce that the spare + * threads are created. If this fails at the beginning, it's very + * unlikely that the async workers could do its job, so we abort + * the initialization. */ + goto out; + } + + /* Once the first thread is started, we can enable it to become the + * initial leader. */ + if ((ret == 0) && !running) { + running = true; + gf_async_worker_enable(); + } + } + + if (caa_unlikely(!running)) { + gf_async_fatal(-ENOMEM, "No worker thread has started"); + } + + gf_async_ctrl.enabled = true; + + ret = 0; + +out: + if (ret < 0) { + gf_async_error(ret, "Unable to initialize the thread pool."); + gf_async_fini(); + } + + return ret; +} diff --git a/libglusterfs/src/glusterfs/async.h b/libglusterfs/src/glusterfs/async.h new file mode 100644 index 00000000000..d1d70ae0bc7 --- /dev/null +++ b/libglusterfs/src/glusterfs/async.h @@ -0,0 +1,209 @@ +/* + Copyright (c) 2019 Red Hat, Inc <https://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef __GLUSTERFS_ASYNC_H__ +#define __GLUSTERFS_ASYNC_H__ + +#define _LGPL_SOURCE + +#include <sys/types.h> +#include <signal.h> +#include <errno.h> + +#ifdef URCU_OLD + +/* TODO: Fix the include paths. Since this is a .h included from many places + * it makes no sense to append a '-I$(CONTRIBDIR)/userspace-rcu/' to each + * Makefile.am. I've also seen some problems with CI builders (they + * failed to find the include files, but the same source on another setup + * is working fine). */ +#include "wfcqueue.h" +#include "wfstack.h" + +#else /* !URCU_OLD */ + +#include <urcu/wfcqueue.h> +#include <urcu/wfstack.h> + +#endif /* URCU_OLD */ + +#include "glusterfs/xlator.h" +#include "glusterfs/common-utils.h" +#include "glusterfs/list.h" +#include "glusterfs/libglusterfs-messages.h" + +/* This is the name prefix that all worker threads will have. A number will + * be added to differentiate them. */ +#define GF_ASYNC_THREAD_NAME "tpw" + +/* This value determines the maximum number of threads that are allowed. */ +#define GF_ASYNC_MAX_THREADS 128 + +/* This value determines how many additional threads will be started but will + * remain inactive until they are explicitly activated by the leader. This is + * useful to react faster to bursts of load, but at the same time we minimize + * contention if they are not really needed to handle current load. + * + * TODO: Instead of a fixed number, it would probably be better to use a + * prcentage of the available cores. */ +#define GF_ASYNC_SPARE_THREADS 2 + +/* This value determines the signal used to wake the leader when new work has + * been added to the queue. To do so we reuse SIGALRM, since the most logical + * candidates (SIGUSR1/SIGUSR2) are already used. This signal must not be used + * by anything else in the process. */ +#define GF_ASYNC_SIGQUEUE SIGALRM + +/* This value determines the signal that will be used to transfer leader role + * to other workers. */ +#define GF_ASYNC_SIGCTRL SIGVTALRM + +#define gf_async_warning(_err, _msg, _args...) \ + gf_msg("async", GF_LOG_WARNING, -(_err), LG_MSG_ASYNC_WARNING, _msg, \ + ##_args) + +#define gf_async_error(_err, _msg, _args...) \ + gf_msg("async", GF_LOG_ERROR, -(_err), LG_MSG_ASYNC_FAILURE, _msg, ##_args) + +#define gf_async_fatal(_err, _msg, _args...) \ + do { \ + GF_ABORT("Critical error in async module. Unable to continue. (" _msg \ + "). Error %d.", \ + ##_args, -(_err)); \ + } while (0) + +struct _gf_async; +typedef struct _gf_async gf_async_t; + +struct _gf_async_worker; +typedef struct _gf_async_worker gf_async_worker_t; + +struct _gf_async_queue; +typedef struct _gf_async_queue gf_async_queue_t; + +struct _gf_async_control; +typedef struct _gf_async_control gf_async_control_t; + +typedef void (*gf_async_callback_f)(xlator_t *xl, gf_async_t *async); + +struct _gf_async { + /* TODO: remove dependency on xl/THIS. */ + xlator_t *xl; + gf_async_callback_f cbk; + struct cds_wfcq_node queue; +}; + +struct _gf_async_worker { + /* Used to send asynchronous jobs related to the worker. */ + gf_async_t async; + + /* Member of the available workers stack. */ + struct cds_wfs_node stack; + + /* Thread object of the current worker. */ + pthread_t thread; + + /* Unique identifier of this worker. */ + int32_t id; + + /* Indicates if this worker is enabled. */ + bool running; +}; + +struct _gf_async_queue { + /* Structures needed to manage a wait-free queue. For better performance + * they are placed in two different cache lines, as recommended by URCU + * documentation, even though in our case some threads will be producers + * and consumers at the same time. */ + struct cds_wfcq_head head __attribute__((aligned(64))); + struct cds_wfcq_tail tail __attribute__((aligned(64))); +}; + +#define GF_ASYNC_COUNTS(_run, _stop) (((uint32_t)(_run) << 16) + (_stop)) +#define GF_ASYNC_COUNT_RUNNING(_count) ((_count) >> 16) +#define GF_ASYNC_COUNT_STOPPING(_count) ((_count)&65535) + +struct _gf_async_control { + gf_async_queue_t queue; + + /* Stack of unused workers. */ + struct __cds_wfs_stack available; + + /* Array of preallocated worker structures. */ + gf_async_worker_t *table; + + /* Used to synchronize main thread with workers on termination. */ + pthread_barrier_t sync; + + /* The id of the last thread that will be used for synchronization. */ + pthread_t sync_thread; + + /* Signal mask to wait for control signals from leader. */ + sigset_t sigmask_ctrl; + + /* Signal mask to wait for queued items. */ + sigset_t sigmask_queue; + + /* Saved signal handlers. */ + struct sigaction handler_ctrl; + struct sigaction handler_queue; + + /* PID of the current process. */ + pid_t pid; + + /* Maximum number of allowed threads. */ + uint32_t max_threads; + + /* Current number of running and stopping workers. This value is split + * into 2 16-bits fields to track both counters atomically at the same + * time. */ + uint32_t counts; + + /* It's used to control whether the asynchronous infrastructure is used + * or not. */ + bool enabled; +}; + +extern gf_async_control_t gf_async_ctrl; + +int32_t +gf_async_init(glusterfs_ctx_t *ctx); + +void +gf_async_fini(void); + +void +gf_async_adjust_threads(int32_t threads); + +static inline void +gf_async(gf_async_t *async, xlator_t *xl, gf_async_callback_f cbk) +{ + if (!gf_async_ctrl.enabled) { + cbk(xl, async); + return; + } + + async->xl = xl; + async->cbk = cbk; + cds_wfcq_node_init(&async->queue); + if (caa_unlikely(!cds_wfcq_enqueue(&gf_async_ctrl.queue.head, + &gf_async_ctrl.queue.tail, + &async->queue))) { + /* The queue was empty, so the leader could be sleeping. We need to + * wake it so that the new item can be processed. If the queue was not + * empty, we don't need to do anything special since the leader will + * take care of it. */ + if (caa_unlikely(kill(gf_async_ctrl.pid, GF_ASYNC_SIGQUEUE) < 0)) { + gf_async_fatal(errno, "Unable to wake leader worker."); + }; + } +} + +#endif /* !__GLUSTERFS_ASYNC_H__ */ diff --git a/libglusterfs/src/glusterfs/common-utils.h b/libglusterfs/src/glusterfs/common-utils.h index f03d2c1049a..1418c6531c7 100644 --- a/libglusterfs/src/glusterfs/common-utils.h +++ b/libglusterfs/src/glusterfs/common-utils.h @@ -442,7 +442,7 @@ BIT_VALUE(unsigned char *array, unsigned int index) } while (0) #endif -#define GF_ABORT(msg) \ +#define GF_ABORT(msg...) \ do { \ gf_msg_callingfn("", GF_LOG_CRITICAL, 0, LG_MSG_ASSERTION_FAILED, \ "Assertion failed: " msg); \ diff --git a/libglusterfs/src/glusterfs/glusterfs.h b/libglusterfs/src/glusterfs/glusterfs.h index 7c6af090fd8..9d140c18ee3 100644 --- a/libglusterfs/src/glusterfs/glusterfs.h +++ b/libglusterfs/src/glusterfs/glusterfs.h @@ -575,6 +575,8 @@ struct _cmd_args { int fuse_flush_handle_interrupt; int fuse_auto_inval; + + bool global_threading; }; typedef struct _cmd_args cmd_args_t; diff --git a/libglusterfs/src/glusterfs/libglusterfs-messages.h b/libglusterfs/src/glusterfs/libglusterfs-messages.h index 1b72f6df5be..e17e33e06fb 100644 --- a/libglusterfs/src/glusterfs/libglusterfs-messages.h +++ b/libglusterfs/src/glusterfs/libglusterfs-messages.h @@ -109,6 +109,6 @@ GLFS_MSGID( LG_MSG_PTHREAD_ATTR_INIT_FAILED, LG_MSG_INVALID_INODE_LIST, LG_MSG_COMPACT_FAILED, LG_MSG_COMPACT_STATUS, LG_MSG_UTIMENSAT_FAILED, LG_MSG_PTHREAD_NAMING_FAILED, LG_MSG_SYSCALL_RETURNS_WRONG, - LG_MSG_XXH64_TO_GFID_FAILED); + LG_MSG_XXH64_TO_GFID_FAILED, LG_MSG_ASYNC_WARNING, LG_MSG_ASYNC_FAILURE); #endif /* !_LG_MESSAGES_H_ */ diff --git a/libglusterfs/src/libglusterfs.sym b/libglusterfs/src/libglusterfs.sym index 2ac87c491ae..4466a5eee9a 100644 --- a/libglusterfs/src/libglusterfs.sym +++ b/libglusterfs/src/libglusterfs.sym @@ -553,6 +553,11 @@ get_xlator_by_name get_xlator_by_type gf_array_insertionsort gf_asprintf +gf_async +gf_async_adjust_threads +gf_async_ctrl +gf_async_init +gf_async_fini gf_backtrace_save gf_bits_count gf_bits_index diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index f9cbdf133c7..ce984426cbe 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -144,6 +144,8 @@ rpc_transport_pollin_alloc(rpc_transport_t *this, struct iovec *vector, goto out; } + msg->trans = this; + if (count > 1) { msg->vectored = 1; } diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index 9e75d1a2bbb..6830279f07e 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -58,6 +58,7 @@ typedef struct rpc_transport rpc_transport_t; #include <glusterfs/dict.h> #include <glusterfs/compat.h> +#include <glusterfs/async.h> #include "rpcsvc-common.h" struct peer_info { @@ -155,16 +156,6 @@ struct rpc_request_info { }; typedef struct rpc_request_info rpc_request_info_t; -struct rpc_transport_pollin { - int count; - void *private; - struct iobref *iobref; - struct iovec vector[MAX_IOVEC]; - char is_reply; - char vectored; -}; -typedef struct rpc_transport_pollin rpc_transport_pollin_t; - typedef int (*rpc_transport_notify_t)(rpc_transport_t *, void *mydata, rpc_transport_event_t, void *data, ...); @@ -217,6 +208,18 @@ struct rpc_transport { gf_atomic_t disconnect_progress; }; +struct rpc_transport_pollin { + struct rpc_transport *trans; + int count; + void *private; + struct iobref *iobref; + struct iovec vector[MAX_IOVEC]; + char is_reply; + char vectored; + gf_async_t async; +}; +typedef struct rpc_transport_pollin rpc_transport_pollin_t; + struct rpc_transport_ops { /* no need of receive op, msg will be delivered through an event * notification diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 74373c44f91..c38a675b8c2 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -2314,6 +2314,11 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program, newprog->alive = _gf_true; + if (gf_async_ctrl.enabled) { + newprog->ownthread = _gf_false; + newprog->synctask = _gf_false; + } + /* make sure synctask gets priority over ownthread */ if (newprog->synctask) newprog->ownthread = _gf_false; diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index fa0e0f20901..26dbe0b706a 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -2495,6 +2495,33 @@ out: return ret; } +static void +socket_event_poll_in_async(xlator_t *xl, gf_async_t *async) +{ + rpc_transport_pollin_t *pollin; + rpc_transport_t *this; + socket_private_t *priv; + + pollin = caa_container_of(async, rpc_transport_pollin_t, async); + this = pollin->trans; + priv = this->private; + + rpc_transport_notify(this, RPC_TRANSPORT_MSG_RECEIVED, pollin); + + rpc_transport_unref(this); + + rpc_transport_pollin_destroy(pollin); + + pthread_mutex_lock(&priv->notify.lock); + { + --priv->notify.in_progress; + + if (!priv->notify.in_progress) + pthread_cond_signal(&priv->notify.cond); + } + pthread_mutex_unlock(&priv->notify.lock); +} + static int socket_event_poll_in(rpc_transport_t *this, gf_boolean_t notify_handled) { @@ -2519,18 +2546,8 @@ socket_event_poll_in(rpc_transport_t *this, gf_boolean_t notify_handled) event_handled(ctx->event_pool, priv->sock, priv->idx, priv->gen); if (pollin) { - rpc_transport_notify(this, RPC_TRANSPORT_MSG_RECEIVED, pollin); - - rpc_transport_pollin_destroy(pollin); - - pthread_mutex_lock(&priv->notify.lock); - { - --priv->notify.in_progress; - - if (!priv->notify.in_progress) - pthread_cond_signal(&priv->notify.cond); - } - pthread_mutex_unlock(&priv->notify.lock); + rpc_transport_ref(this); + gf_async(&pollin->async, THIS, socket_event_poll_in_async); } return ret; diff --git a/tests/basic/global-threading.t b/tests/basic/global-threading.t new file mode 100644 index 00000000000..f7d34044b09 --- /dev/null +++ b/tests/basic/global-threading.t @@ -0,0 +1,104 @@ +#!/bin/bash + +. $(dirname $0)/../include.rc +. $(dirname $0)/../volume.rc + +# Test if the given process has a number of threads of a given type between +# min and max. +function check_threads() { + local pid="${1}" + local pattern="${2}" + local min="${3}" + local max="${4-}" + local count + + count="$(ps hH -o comm ${pid} | grep "${pattern}" | wc -l)" + if [[ ${min} -gt ${count} ]]; then + return 1 + fi + if [[ ! -z "${max}" && ${max} -lt ${count} ]]; then + return 1 + fi + + return 0 +} + +cleanup + +TEST glusterd + +# Glusterd shouldn't use any thread +TEST check_threads $(get_glusterd_pid) glfs_tpw 0 0 +TEST check_threads $(get_glusterd_pid) glfs_iotwr 0 0 + +TEST pkill -9 glusterd + +TEST glusterd --global-threading + +# Glusterd shouldn't use global threads, even if enabled +TEST check_threads $(get_glusterd_pid) glfs_tpw 0 0 +TEST check_threads $(get_glusterd_pid) glfs_iotwr 0 0 + +TEST $CLI volume create $V0 replica 2 $H0:$B0/b{0,1} + +# Normal configuration using io-threads on bricks +TEST $CLI volume set $V0 config.global-threading off +TEST $CLI volume set $V0 performance.iot-pass-through off +TEST $CLI volume set $V0 performance.client-io-threads off +TEST $CLI volume start $V0 + +# There shouldn't be global threads +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b0) glfs_tpw 0 0 +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b1) glfs_tpw 0 0 + +# There should be at least 1 io-thread +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b0) glfs_iotwr 1 +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b1) glfs_iotwr 1 + +# Self-heal should be using global threads +TEST check_threads $(get_shd_process_pid) glfs_tpw 1 +TEST check_threads $(get_shd_process_pid) glfs_iotwr 0 0 + +TEST $CLI volume stop $V0 + +# Configuration with global threads on bricks +TEST $CLI volume set $V0 config.global-threading on +TEST $CLI volume set $V0 performance.iot-pass-through on +TEST $CLI volume start $V0 + +# There should be at least 1 global thread +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b0) glfs_tpw 1 +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b1) glfs_tpw 1 + +# There shouldn't be any io-thread worker threads +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b0) glfs_iotwr 0 0 +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b1) glfs_iotwr 0 0 + +# Normal configuration using io-threads on clients +TEST $CLI volume set $V0 performance.iot-pass-through off +TEST $CLI volume set $V0 performance.client-io-threads on +TEST $GFS --volfile-id=/$V0 --volfile-server=$H0 $M0 + +# There shouldn't be global threads +TEST check_threads $(get_mount_process_pid $V0 $M0) glfs_tpw 0 0 + +# There should be at least 1 io-thread +TEST check_threads $(get_mount_process_pid $V0 $M0) glfs_iotwr 1 + +EXPECT_WITHIN $UMOUNT_TIMEOUT "Y" force_umount $M0 + +# Configuration with global threads on clients +TEST $CLI volume set $V0 performance.client-io-threads off +TEST $GFS --volfile-id=/$V0 --volfile-server=$H0 --global-threading $M0 + +# There should be at least 1 global thread +TEST check_threads $(get_mount_process_pid $V0 $M0) glfs_tpw 1 + +# There shouldn't be io-threads +TEST check_threads $(get_mount_process_pid $V0 $M0) glfs_iotwr 0 0 + +# Some basic volume access checks with global-threading enabled everywhere +TEST mkdir ${M0}/dir +TEST dd if=/dev/zero of=${M0}/dir/file bs=128k count=8 + +cleanup diff --git a/tests/volume.rc b/tests/volume.rc index 261c6554d46..6fd7c3cdbcd 100644 --- a/tests/volume.rc +++ b/tests/volume.rc @@ -281,6 +281,10 @@ function quotad_up_status { gluster volume status | grep "Quota Daemon" | awk '{print $7}' } +function get_glusterd_pid { + pgrep '^glusterd$' | head -1 +} + function get_brick_pidfile { local vol=$1 local host=$2 diff --git a/xlators/debug/io-stats/src/io-stats.c b/xlators/debug/io-stats/src/io-stats.c index f12191fb8df..101e403d39a 100644 --- a/xlators/debug/io-stats/src/io-stats.c +++ b/xlators/debug/io-stats/src/io-stats.c @@ -40,6 +40,7 @@ #include <pwd.h> #include <grp.h> #include <glusterfs/upcall-utils.h> +#include <glusterfs/async.h> #define MAX_LIST_MEMBERS 100 #define DEFAULT_PWD_BUF_SZ 16384 @@ -3737,6 +3738,7 @@ reconfigure(xlator_t *this, dict_t *options) uint32_t log_buf_size = 0; uint32_t log_flush_timeout = 0; int32_t old_dump_interval; + int32_t threads; if (!this || !this->private) goto out; @@ -3809,6 +3811,9 @@ reconfigure(xlator_t *this, dict_t *options) out); gf_log_set_log_flush_timeout(log_flush_timeout); + GF_OPTION_RECONF("threads", threads, options, int32, out); + gf_async_adjust_threads(threads); + ret = 0; out: gf_log(this ? this->name : "io-stats", GF_LOG_DEBUG, @@ -3888,6 +3893,7 @@ init(xlator_t *this) int ret = -1; uint32_t log_buf_size = 0; uint32_t log_flush_timeout = 0; + int32_t threads; if (!this) return -1; @@ -3951,6 +3957,7 @@ init(xlator_t *this) gf_log(this->name, GF_LOG_ERROR, "Out of memory."); goto out; } + ret = -1; GF_OPTION_INIT("ios-dnscache-ttl-sec", conf->ios_dnscache_ttl_sec, int32, out); @@ -3987,6 +3994,9 @@ init(xlator_t *this) GF_OPTION_INIT("log-flush-timeout", log_flush_timeout, time, out); gf_log_set_log_flush_timeout(log_flush_timeout); + GF_OPTION_INIT("threads", threads, int32, out); + gf_async_adjust_threads(threads); + this->private = conf; if (conf->ios_dump_interval > 0) { conf->dump_thread_running = _gf_true; @@ -4430,8 +4440,37 @@ struct volume_options options[] = { .type = GF_OPTION_TYPE_STR, .default_value = "/no/such/path", .description = "Unique ID for our files."}, + {.key = {"global-threading"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "off", + .op_version = {GD_OP_VERSION_6_0}, + .flags = OPT_FLAG_SETTABLE, + .tags = {"io-stats", "threading"}, + .description = "This option enables the global threading support for " + "bricks. If enabled, it's recommended to also enable " + "'performance.iot-pass-through'"}, + {.key = {"threads"}, .type = GF_OPTION_TYPE_INT}, + {.key = {"brick-threads"}, + .type = GF_OPTION_TYPE_INT, + .default_value = "16", + .min = 0, + .max = GF_ASYNC_MAX_THREADS, + .op_version = {GD_OP_VERSION_6_0}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-stats", "threading"}, + .description = "When global threading is used, this value determines the " + "maximum amount of threads that can be created on bricks"}, + {.key = {"client-threads"}, + .type = GF_OPTION_TYPE_INT, + .default_value = "16", + .min = 0, + .max = GF_ASYNC_MAX_THREADS, + .op_version = {GD_OP_VERSION_6_0}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, + .tags = {"io-stats", "threading"}, + .description = "When global threading is used, this value determines the " + "maximum amount of threads that can be created on clients"}, {.key = {NULL}}, - }; xlator_api_t xlator_api = { diff --git a/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c b/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c index 4cd4cea15e4..6325f60f94a 100644 --- a/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c +++ b/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c @@ -213,6 +213,10 @@ glusterd_svc_start(glusterd_svc_t *svc, int flags, dict_t *cmdline) runner_add_arg(&runner, daemon_log_level); } + if (this->ctx->cmd_args.global_threading) { + runner_add_arg(&runner, "--global-threading"); + } + if (cmdline) dict_foreach(cmdline, svc_add_args, (void *)&runner); diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c index 1aa6947fbba..85a7884b51a 100644 --- a/xlators/mgmt/glusterd/src/glusterd-utils.c +++ b/xlators/mgmt/glusterd/src/glusterd-utils.c @@ -2045,6 +2045,8 @@ glusterd_volume_start_glusterfs(glusterd_volinfo_t *volinfo, int32_t len = 0; glusterd_brick_proc_t *brick_proc = NULL; char *inet_family = NULL; + char *global_threading = NULL; + bool threading = false; GF_ASSERT(volinfo); GF_ASSERT(brickinfo); @@ -2203,6 +2205,15 @@ retry: volinfo->volname, rdma_port); } + if (dict_get_strn(volinfo->dict, VKEY_CONFIG_GLOBAL_THREADING, + SLEN(VKEY_CONFIG_GLOBAL_THREADING), + &global_threading) == 0) { + if ((gf_string2boolean(global_threading, &threading) == 0) && + threading) { + runner_add_arg(&runner, "--global-threading"); + } + } + runner_add_arg(&runner, "--xlator-option"); runner_argprintf(&runner, "%s-server.listen-port=%d", volinfo->volname, port); diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c index b7c7bd9b638..448dd8669a1 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.c +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c @@ -1215,6 +1215,26 @@ loglevel_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme, } static int +threads_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme, + void *param) +{ + char *role = param; + struct volopt_map_entry vme2 = { + 0, + }; + + if ((strcmp(vme->option, "!client-threads") != 0 && + strcmp(vme->option, "!brick-threads") != 0) || + !strstr(vme->key, role)) + return 0; + + memcpy(&vme2, vme, sizeof(vme2)); + vme2.option = "threads"; + + return basic_option_handler(graph, &vme2, NULL); +} + +static int server_check_changelog_off(volgen_graph_t *graph, struct volopt_map_entry *vme, glusterd_volinfo_t *volinfo) { @@ -1506,6 +1526,9 @@ server_spec_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme, if (!ret) ret = log_localtime_logging_option_handler(graph, vme, "brick"); + if (!ret) + ret = threads_option_handler(graph, vme, "brick"); + return ret; } @@ -4085,6 +4108,14 @@ graph_set_generic_options(xlator_t *this, volgen_graph_t *graph, gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_GRAPH_SET_OPT_FAIL, "Failed to change " "log-localtime-logging option"); + + ret = volgen_graph_set_options_generic(graph, set_dict, "client", + &threads_option_handler); + + if (ret) + gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_GRAPH_SET_OPT_FAIL, + "changing %s threads failed", identifier); + return 0; } diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.h b/xlators/mgmt/glusterd/src/glusterd-volgen.h index f9fc068931b..37eecc04bef 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.h +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.h @@ -38,6 +38,9 @@ #define VKEY_RDA_CACHE_LIMIT "performance.rda-cache-limit" #define VKEY_RDA_REQUEST_SIZE "performance.rda-request-size" #define VKEY_CONFIG_GFPROXY "config.gfproxyd" +#define VKEY_CONFIG_GLOBAL_THREADING "config.global-threading" +#define VKEY_CONFIG_CLIENT_THREADS "config.client-threads" +#define VKEY_CONFIG_BRICK_THREADS "config.brick-threads" #define AUTH_ALLOW_MAP_KEY "auth.allow" #define AUTH_REJECT_MAP_KEY "auth.reject" diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c index b32b6ce0ec4..c8591cf0487 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c +++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c @@ -2961,4 +2961,19 @@ struct volopt_map_entry glusterd_volopt_map[] = { .validate_fn = validate_boolean, .description = "option to enforce mandatory lock on a file", .flags = VOLOPT_FLAG_XLATOR_OPT}, + {.key = VKEY_CONFIG_GLOBAL_THREADING, + .voltype = "debug/io-stats", + .option = "global-threading", + .value = "off", + .op_version = GD_OP_VERSION_6_0}, + {.key = VKEY_CONFIG_CLIENT_THREADS, + .voltype = "debug/io-stats", + .option = "!client-threads", + .value = "16", + .op_version = GD_OP_VERSION_6_0}, + {.key = VKEY_CONFIG_BRICK_THREADS, + .voltype = "debug/io-stats", + .option = "!brick-threads", + .value = "16", + .op_version = GD_OP_VERSION_6_0}, {.key = NULL}}; diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c index 3479d40ceeb..bd8bc114a32 100644 --- a/xlators/mount/fuse/src/fuse-bridge.c +++ b/xlators/mount/fuse/src/fuse-bridge.c @@ -16,10 +16,17 @@ #include <glusterfs/glusterfs-acl.h> #include <glusterfs/syscall.h> #include <glusterfs/timespec.h> +#include <glusterfs/async.h> #ifdef __NetBSD__ #undef open /* in perfuse.h, pulled from mount-gluster-compat.h */ #endif +typedef struct _fuse_async { + struct iobuf *iobuf; + fuse_in_header_t *finh; + void *msg; + gf_async_t async; +} fuse_async_t; static int gf_fuse_xattr_enotsup_log; @@ -5810,6 +5817,28 @@ fuse_get_mount_status(xlator_t *this) return kid_status; } +static void +fuse_dispatch(xlator_t *xl, gf_async_t *async) +{ + fuse_async_t *fasync; + fuse_private_t *priv; + fuse_in_header_t *finh; + struct iobuf *iobuf; + + priv = xl->private; + fasync = caa_container_of(async, fuse_async_t, async); + finh = fasync->finh; + iobuf = fasync->iobuf; + + priv->fuse_ops[finh->opcode](xl, finh, fasync->msg, iobuf); + + iobuf_unref(iobuf); +} + +/* We need 512 extra buffer size for BATCH_FORGET fop. By tests, it is + * found to be reduces 'REALLOC()' in the loop */ +#define FUSE_EXTRA_ALLOC 512 + static void * fuse_thread_proc(void *data) { @@ -5821,24 +5850,20 @@ fuse_thread_proc(void *data) fuse_in_header_t *finh = NULL; struct iovec iov_in[2]; void *msg = NULL; - /* we need 512 extra buffer size for BATCH_FORGET fop. By tests, it is - found to be reduces 'REALLOC()' in the loop */ - const size_t msg0_size = sizeof(*finh) + 512; - fuse_handler_t **fuse_ops = NULL; + size_t msg0_size = sizeof(*finh) + sizeof(struct fuse_write_in); + fuse_async_t *fasync; struct pollfd pfd[2] = {{ 0, }}; + uint32_t psize; this = data; priv = this->private; - fuse_ops = priv->fuse_ops; THIS = this; - iov_in[0].iov_len = sizeof(*finh) + sizeof(struct fuse_write_in); - iov_in[1].iov_len = ((struct iobuf_pool *)this->ctx->iobuf_pool) - ->default_page_size; - priv->msg0_len_p = &iov_in[0].iov_len; + psize = ((struct iobuf_pool *)this->ctx->iobuf_pool)->default_page_size; + priv->msg0_len_p = &msg0_size; for (;;) { /* THIS has to be reset here */ @@ -5895,14 +5920,15 @@ fuse_thread_proc(void *data) changing this one too */ iobuf = iobuf_get(this->ctx->iobuf_pool); - /* Add extra 128 byte to the first iov so that it can + /* Add extra 512 byte to the first iov so that it can * accommodate "ordinary" non-write requests. It's not * guaranteed to be big enough, as SETXATTR and namespace * operations with very long names may grow behind it, * but it's good enough in most cases (and we can handle - * rest via realloc). - */ - iov_in[0].iov_base = GF_CALLOC(1, msg0_size, gf_fuse_mt_iov_base); + * rest via realloc). */ + iov_in[0].iov_base = GF_MALLOC( + sizeof(fuse_async_t) + msg0_size + FUSE_EXTRA_ALLOC, + gf_fuse_mt_iov_base); if (!iobuf || !iov_in[0].iov_base) { gf_log(this->name, GF_LOG_ERROR, "Out of memory"); @@ -5915,6 +5941,9 @@ fuse_thread_proc(void *data) iov_in[1].iov_base = iobuf->ptr; + iov_in[0].iov_len = msg0_size; + iov_in[1].iov_len = psize; + res = sys_readv(priv->fd, iov_in, 2); if (res == -1) { @@ -5941,7 +5970,7 @@ fuse_thread_proc(void *data) goto cont_err; } - if (res < sizeof(finh)) { + if (res < sizeof(*finh)) { gf_log("glusterfs-fuse", GF_LOG_WARNING, "short read on /dev/fuse"); fuse_log_eh(this, "glusterfs-fuse: short read on " @@ -5983,8 +6012,9 @@ fuse_thread_proc(void *data) if (finh->opcode == FUSE_WRITE) msg = iov_in[1].iov_base; else { - if (res > msg0_size) { - void *b = GF_REALLOC(iov_in[0].iov_base, res); + if (res > msg0_size + FUSE_EXTRA_ALLOC) { + void *b = GF_REALLOC(iov_in[0].iov_base, + sizeof(fuse_async_t) + res); if (b) { iov_in[0].iov_base = b; finh = (fuse_in_header_t *)iov_in[0].iov_base; @@ -5996,22 +6026,29 @@ fuse_thread_proc(void *data) } } - if (res > iov_in[0].iov_len) + if (res > iov_in[0].iov_len) { memcpy(iov_in[0].iov_base + iov_in[0].iov_len, iov_in[1].iov_base, res - iov_in[0].iov_len); + iov_in[0].iov_len = res; + } msg = finh + 1; } if (priv->uid_map_root && finh->uid == priv->uid_map_root) finh->uid = 0; - if (finh->opcode >= FUSE_OP_HIGH) + if (finh->opcode >= FUSE_OP_HIGH) { /* turn down MacFUSE specific messages */ fuse_enosys(this, finh, msg, NULL); - else - fuse_ops[finh->opcode](this, finh, msg, iobuf); + iobuf_unref(iobuf); + } else { + fasync = iov_in[0].iov_base + iov_in[0].iov_len; + fasync->finh = finh; + fasync->msg = msg; + fasync->iobuf = iobuf; + gf_async(&fasync->async, this, fuse_dispatch); + } - iobuf_unref(iobuf); continue; cont_err: diff --git a/xlators/mount/fuse/utils/mount.glusterfs.in b/xlators/mount/fuse/utils/mount.glusterfs.in index 3f5d76d2e93..243c9c71af4 100755 --- a/xlators/mount/fuse/utils/mount.glusterfs.in +++ b/xlators/mount/fuse/utils/mount.glusterfs.in @@ -189,6 +189,10 @@ start_glusterfs () cmd_line=$(echo "$cmd_line --thin-client"); fi + if [ -n "$global_threading" ]; then + cmd_line=$(echo "$cmd_line --global-threading"); + fi + #options with values start here if [ -n "$halo_max_latency" ]; then cmd_line=$(echo "$cmd_line --xlator-option \ @@ -629,6 +633,9 @@ without_options() # "mount -t glusterfs" sends this, but it's useless. "rw") ;; + "global-threading") + global_threading=1 + ;; # TODO: not sure how to handle this yet "async"|"sync"|"dirsync"|\ "mand"|"nomand"|\ diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index bf75015eda8..9a4c728ae02 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -1255,12 +1255,14 @@ init(xlator_t *this) INIT_LIST_HEAD(&conf->no_client[i].reqs); } - ret = iot_workers_scale(conf); + if (!this->pass_through) { + ret = iot_workers_scale(conf); - if (ret == -1) { - gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED, - "cannot initialize worker threads, exiting init"); - goto out; + if (ret == -1) { + gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED, + "cannot initialize worker threads, exiting init"); + goto out; + } } this->private = conf; |