diff options
29 files changed, 2858 insertions, 56 deletions
diff --git a/configure.ac b/configure.ac index 8752ca043bf..3aa7f4e77c1 100644 --- a/configure.ac +++ b/configure.ac @@ -1359,10 +1359,12 @@ PKG_CHECK_MODULES([URCU], [liburcu-bp], [], AC_MSG_ERROR([liburcu-bp not found]))]) PKG_CHECK_MODULES([URCU_CDS], [liburcu-cds >= 0.8], [], [PKG_CHECK_MODULES([URCU_CDS], [liburcu-cds >= 0.7], - [AC_DEFINE(URCU_OLD, 1, [Define if liburcu 0.6 or 0.7 is found])], + [AC_DEFINE(URCU_OLD, 1, [Define if liburcu 0.6 or 0.7 is found]) + USE_CONTRIB_URCU='yes'], [AC_CHECK_HEADERS([urcu/cds.h], [AC_DEFINE(URCU_OLD, 1, [Define if liburcu 0.6 or 0.7 is found]) - URCU_CDS_LIBS='-lurcu-cds'], + URCU_CDS_LIBS='-lurcu-cds' + USE_CONTRIB_URCU='yes'], [AC_MSG_ERROR([liburcu-cds not found])])])]) BUILD_UNITTEST="no" @@ -1526,6 +1528,9 @@ AC_SUBST(CONTRIBDIR) GF_CPPDEFINES='-D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -D$(GF_HOST_OS)' GF_CPPINCLUDES='-include $(top_builddir)/config.h -include $(top_builddir)/site.h -I$(top_srcdir)/libglusterfs/src -I$(top_builddir)/libglusterfs/src' +if test "x${USE_CONTRIB_URCU}" = "xyes"; then + GF_CPPINCLUDES="${GF_CPPINCLUDES} -I\$(CONTRIBDIR)/userspace-rcu" +fi GF_CPPFLAGS="$GF_CPPFLAGS $GF_CPPDEFINES $GF_CPPINCLUDES" AC_SUBST([GF_CPPFLAGS]) diff --git a/contrib/userspace-rcu/static-wfcqueue.h b/contrib/userspace-rcu/static-wfcqueue.h new file mode 100644 index 00000000000..37d14ad674b --- /dev/null +++ b/contrib/userspace-rcu/static-wfcqueue.h @@ -0,0 +1,685 @@ +#ifndef _URCU_WFCQUEUE_STATIC_H +#define _URCU_WFCQUEUE_STATIC_H + +/* + * urcu/static/wfcqueue.h + * + * Userspace RCU library - Concurrent Queue with Wait-Free Enqueue/Blocking Dequeue + * + * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu/wfcqueue.h for + * linking dynamically with the userspace rcu library. + * + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com> + * Copyright 2011-2012 - Lai Jiangshan <laijs@cn.fujitsu.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/* Copied from userspace-rcu 0.10 because version 0.7 doesn't contain it. */ + +#include <pthread.h> +#include <assert.h> +#include <poll.h> +#include <stdbool.h> +#include <urcu/compiler.h> +#include <urcu/uatomic.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Concurrent queue with wait-free enqueue/blocking dequeue. + * + * This queue has been designed and implemented collaboratively by + * Mathieu Desnoyers and Lai Jiangshan. Inspired from + * half-wait-free/half-blocking queue implementation done by Paul E. + * McKenney. + * + * Mutual exclusion of cds_wfcq_* / __cds_wfcq_* API + * + * Synchronization table: + * + * External synchronization techniques described in the API below is + * required between pairs marked with "X". No external synchronization + * required between pairs marked with "-". + * + * Legend: + * [1] cds_wfcq_enqueue + * [2] __cds_wfcq_splice (destination queue) + * [3] __cds_wfcq_dequeue + * [4] __cds_wfcq_splice (source queue) + * [5] __cds_wfcq_first + * [6] __cds_wfcq_next + * + * [1] [2] [3] [4] [5] [6] + * [1] - - - - - - + * [2] - - - - - - + * [3] - - X X X X + * [4] - - X - X X + * [5] - - X X - - + * [6] - - X X - - + * + * Mutual exclusion can be ensured by holding cds_wfcq_dequeue_lock(). + * + * For convenience, cds_wfcq_dequeue_blocking() and + * cds_wfcq_splice_blocking() hold the dequeue lock. + * + * Besides locking, mutual exclusion of dequeue, splice and iteration + * can be ensured by performing all of those operations from a single + * thread, without requiring any lock. + */ + +#define WFCQ_ADAPT_ATTEMPTS 10 /* Retry if being set */ +#define WFCQ_WAIT 10 /* Wait 10 ms if being set */ + +/* + * cds_wfcq_node_init: initialize wait-free queue node. + */ +static inline void _cds_wfcq_node_init(struct cds_wfcq_node *node) +{ + node->next = NULL; +} + +/* + * cds_wfcq_init: initialize wait-free queue (with lock). Pair with + * cds_wfcq_destroy(). + */ +static inline void _cds_wfcq_init(struct cds_wfcq_head *head, + struct cds_wfcq_tail *tail) +{ + int ret; + + /* Set queue head and tail */ + _cds_wfcq_node_init(&head->node); + tail->p = &head->node; + ret = pthread_mutex_init(&head->lock, NULL); + assert(!ret); +} + +/* + * cds_wfcq_destroy: destroy wait-free queue (with lock). Pair with + * cds_wfcq_init(). + */ +static inline void _cds_wfcq_destroy(struct cds_wfcq_head *head, + struct cds_wfcq_tail *tail) +{ + int ret = pthread_mutex_destroy(&head->lock); + assert(!ret); +} + +/* + * __cds_wfcq_init: initialize wait-free queue (without lock). Don't + * pair with any destroy function. + */ +static inline void ___cds_wfcq_init(struct __cds_wfcq_head *head, + struct cds_wfcq_tail *tail) +{ + /* Set queue head and tail */ + _cds_wfcq_node_init(&head->node); + tail->p = &head->node; +} + +/* + * cds_wfcq_empty: return whether wait-free queue is empty. + * + * No memory barrier is issued. No mutual exclusion is required. + * + * We perform the test on head->node.next to check if the queue is + * possibly empty, but we confirm this by checking if the tail pointer + * points to the head node because the tail pointer is the linearisation + * point of the enqueuers. Just checking the head next pointer could + * make a queue appear empty if an enqueuer is preempted for a long time + * between xchg() and setting the previous node's next pointer. + */ +static inline bool _cds_wfcq_empty(cds_wfcq_head_ptr_t u_head, + struct cds_wfcq_tail *tail) +{ + struct __cds_wfcq_head *head = u_head._h; + /* + * Queue is empty if no node is pointed by head->node.next nor + * tail->p. Even though the tail->p check is sufficient to find + * out of the queue is empty, we first check head->node.next as a + * common case to ensure that dequeuers do not frequently access + * enqueuer's tail->p cache line. + */ + return CMM_LOAD_SHARED(head->node.next) == NULL + && CMM_LOAD_SHARED(tail->p) == &head->node; +} + +static inline void _cds_wfcq_dequeue_lock(struct cds_wfcq_head *head, + struct cds_wfcq_tail *tail) +{ + int ret; + + ret = pthread_mutex_lock(&head->lock); + assert(!ret); +} + +static inline void _cds_wfcq_dequeue_unlock(struct cds_wfcq_head *head, + struct cds_wfcq_tail *tail) +{ + int ret; + + ret = pthread_mutex_unlock(&head->lock); + assert(!ret); +} + +static inline bool ___cds_wfcq_append(cds_wfcq_head_ptr_t u_head, + struct cds_wfcq_tail *tail, + struct cds_wfcq_node *new_head, + struct cds_wfcq_node *new_tail) +{ + struct __cds_wfcq_head *head = u_head._h; + struct cds_wfcq_node *old_tail; + + /* + * Implicit memory barrier before uatomic_xchg() orders earlier + * stores to data structure containing node and setting + * node->next to NULL before publication. + */ + old_tail = uatomic_xchg(&tail->p, new_tail); + + /* + * Implicit memory barrier after uatomic_xchg() orders store to + * q->tail before store to old_tail->next. + * + * At this point, dequeuers see a NULL tail->p->next, which + * indicates that the queue is being appended to. The following + * store will append "node" to the queue from a dequeuer + * perspective. + */ + CMM_STORE_SHARED(old_tail->next, new_head); + /* + * Return false if queue was empty prior to adding the node, + * else return true. + */ + return old_tail != &head->node; +} + +/* + * cds_wfcq_enqueue: enqueue a node into a wait-free queue. + * + * Issues a full memory barrier before enqueue. No mutual exclusion is + * required. + * + * Returns false if the queue was empty prior to adding the node. + * Returns true otherwise. + */ +static inline bool _cds_wfcq_enqueue(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail, + struct cds_wfcq_node *new_tail) +{ + return ___cds_wfcq_append(head, tail, new_tail, new_tail); +} + +/* + * CDS_WFCQ_WAIT_SLEEP: + * + * By default, this sleeps for the given @msec milliseconds. + * This is a macro which LGPL users may #define themselves before + * including wfcqueue.h to override the default behavior (e.g. + * to log a warning or perform other background work). + */ +#ifndef CDS_WFCQ_WAIT_SLEEP +#define CDS_WFCQ_WAIT_SLEEP(msec) ___cds_wfcq_wait_sleep(msec) +#endif + +static inline void ___cds_wfcq_wait_sleep(int msec) +{ + (void) poll(NULL, 0, msec); +} + +/* + * ___cds_wfcq_busy_wait: adaptative busy-wait. + * + * Returns 1 if nonblocking and needs to block, 0 otherwise. + */ +static inline bool +___cds_wfcq_busy_wait(int *attempt, int blocking) +{ + if (!blocking) + return 1; + if (++(*attempt) >= WFCQ_ADAPT_ATTEMPTS) { + CDS_WFCQ_WAIT_SLEEP(WFCQ_WAIT); /* Wait for 10ms */ + *attempt = 0; + } else { + caa_cpu_relax(); + } + return 0; +} + +/* + * Waiting for enqueuer to complete enqueue and return the next node. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_node_sync_next(struct cds_wfcq_node *node, int blocking) +{ + struct cds_wfcq_node *next; + int attempt = 0; + + /* + * Adaptative busy-looping waiting for enqueuer to complete enqueue. + */ + while ((next = CMM_LOAD_SHARED(node->next)) == NULL) { + if (___cds_wfcq_busy_wait(&attempt, blocking)) + return CDS_WFCQ_WOULDBLOCK; + } + + return next; +} + +static inline struct cds_wfcq_node * +___cds_wfcq_first(cds_wfcq_head_ptr_t u_head, + struct cds_wfcq_tail *tail, + int blocking) +{ + struct __cds_wfcq_head *head = u_head._h; + struct cds_wfcq_node *node; + + if (_cds_wfcq_empty(__cds_wfcq_head_cast(head), tail)) + return NULL; + node = ___cds_wfcq_node_sync_next(&head->node, blocking); + /* Load head->node.next before loading node's content */ + cmm_smp_read_barrier_depends(); + return node; +} + +/* + * __cds_wfcq_first_blocking: get first node of a queue, without dequeuing. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + * + * Used by for-like iteration macros in urcu/wfqueue.h: + * __cds_wfcq_for_each_blocking() + * __cds_wfcq_for_each_blocking_safe() + * + * Returns NULL if queue is empty, first node otherwise. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_first_blocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail) +{ + return ___cds_wfcq_first(head, tail, 1); +} + + +/* + * __cds_wfcq_first_nonblocking: get first node of a queue, without dequeuing. + * + * Same as __cds_wfcq_first_blocking, but returns CDS_WFCQ_WOULDBLOCK if + * it needs to block. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_first_nonblocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail) +{ + return ___cds_wfcq_first(head, tail, 0); +} + +static inline struct cds_wfcq_node * +___cds_wfcq_next(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail, + struct cds_wfcq_node *node, + int blocking) +{ + struct cds_wfcq_node *next; + + /* + * Even though the following tail->p check is sufficient to find + * out if we reached the end of the queue, we first check + * node->next as a common case to ensure that iteration on nodes + * do not frequently access enqueuer's tail->p cache line. + */ + if ((next = CMM_LOAD_SHARED(node->next)) == NULL) { + /* Load node->next before tail->p */ + cmm_smp_rmb(); + if (CMM_LOAD_SHARED(tail->p) == node) + return NULL; + next = ___cds_wfcq_node_sync_next(node, blocking); + } + /* Load node->next before loading next's content */ + cmm_smp_read_barrier_depends(); + return next; +} + +/* + * __cds_wfcq_next_blocking: get next node of a queue, without dequeuing. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + * + * Used by for-like iteration macros in urcu/wfqueue.h: + * __cds_wfcq_for_each_blocking() + * __cds_wfcq_for_each_blocking_safe() + * + * Returns NULL if reached end of queue, non-NULL next queue node + * otherwise. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_next_blocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail, + struct cds_wfcq_node *node) +{ + return ___cds_wfcq_next(head, tail, node, 1); +} + +/* + * __cds_wfcq_next_blocking: get next node of a queue, without dequeuing. + * + * Same as __cds_wfcq_next_blocking, but returns CDS_WFCQ_WOULDBLOCK if + * it needs to block. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_next_nonblocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail, + struct cds_wfcq_node *node) +{ + return ___cds_wfcq_next(head, tail, node, 0); +} + +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_with_state(cds_wfcq_head_ptr_t u_head, + struct cds_wfcq_tail *tail, + int *state, + int blocking) +{ + struct __cds_wfcq_head *head = u_head._h; + struct cds_wfcq_node *node, *next; + + if (state) + *state = 0; + + if (_cds_wfcq_empty(__cds_wfcq_head_cast(head), tail)) { + return NULL; + } + + node = ___cds_wfcq_node_sync_next(&head->node, blocking); + if (!blocking && node == CDS_WFCQ_WOULDBLOCK) { + return CDS_WFCQ_WOULDBLOCK; + } + + if ((next = CMM_LOAD_SHARED(node->next)) == NULL) { + /* + * @node is probably the only node in the queue. + * Try to move the tail to &q->head. + * q->head.next is set to NULL here, and stays + * NULL if the cmpxchg succeeds. Should the + * cmpxchg fail due to a concurrent enqueue, the + * q->head.next will be set to the next node. + * The implicit memory barrier before + * uatomic_cmpxchg() orders load node->next + * before loading q->tail. + * The implicit memory barrier before uatomic_cmpxchg + * orders load q->head.next before loading node's + * content. + */ + _cds_wfcq_node_init(&head->node); + if (uatomic_cmpxchg(&tail->p, node, &head->node) == node) { + if (state) + *state |= CDS_WFCQ_STATE_LAST; + return node; + } + next = ___cds_wfcq_node_sync_next(node, blocking); + /* + * In nonblocking mode, if we would need to block to + * get node's next, set the head next node pointer + * (currently NULL) back to its original value. + */ + if (!blocking && next == CDS_WFCQ_WOULDBLOCK) { + head->node.next = node; + return CDS_WFCQ_WOULDBLOCK; + } + } + + /* + * Move queue head forward. + */ + head->node.next = next; + + /* Load q->head.next before loading node's content */ + cmm_smp_read_barrier_depends(); + return node; +} + +/* + * __cds_wfcq_dequeue_with_state_blocking: dequeue node from queue, with state. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * It is valid to reuse and free a dequeued node immediately. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_with_state_blocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail, int *state) +{ + return ___cds_wfcq_dequeue_with_state(head, tail, state, 1); +} + +/* + * ___cds_wfcq_dequeue_blocking: dequeue node from queue. + * + * Same as __cds_wfcq_dequeue_with_state_blocking, but without saving + * state. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_blocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail) +{ + return ___cds_wfcq_dequeue_with_state_blocking(head, tail, NULL); +} + +/* + * __cds_wfcq_dequeue_with_state_nonblocking: dequeue node, with state. + * + * Same as __cds_wfcq_dequeue_blocking, but returns CDS_WFCQ_WOULDBLOCK + * if it needs to block. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_with_state_nonblocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail, int *state) +{ + return ___cds_wfcq_dequeue_with_state(head, tail, state, 0); +} + +/* + * ___cds_wfcq_dequeue_nonblocking: dequeue node from queue. + * + * Same as __cds_wfcq_dequeue_with_state_nonblocking, but without saving + * state. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_nonblocking(cds_wfcq_head_ptr_t head, + struct cds_wfcq_tail *tail) +{ + return ___cds_wfcq_dequeue_with_state_nonblocking(head, tail, NULL); +} + +/* + * __cds_wfcq_splice: enqueue all src_q nodes at the end of dest_q. + * + * Dequeue all nodes from src_q. + * dest_q must be already initialized. + * Mutual exclusion for src_q should be ensured by the caller as + * specified in the "Synchronisation table". + * Returns enum cds_wfcq_ret which indicates the state of the src or + * dest queue. + */ +static inline enum cds_wfcq_ret +___cds_wfcq_splice( + cds_wfcq_head_ptr_t u_dest_q_head, + struct cds_wfcq_tail *dest_q_tail, + cds_wfcq_head_ptr_t u_src_q_head, + struct cds_wfcq_tail *src_q_tail, + int blocking) +{ + struct __cds_wfcq_head *dest_q_head = u_dest_q_head._h; + struct __cds_wfcq_head *src_q_head = u_src_q_head._h; + struct cds_wfcq_node *head, *tail; + int attempt = 0; + + /* + * Initial emptiness check to speed up cases where queue is + * empty: only require loads to check if queue is empty. + */ + if (_cds_wfcq_empty(__cds_wfcq_head_cast(src_q_head), src_q_tail)) + return CDS_WFCQ_RET_SRC_EMPTY; + + for (;;) { + /* + * Open-coded _cds_wfcq_empty() by testing result of + * uatomic_xchg, as well as tail pointer vs head node + * address. + */ + head = uatomic_xchg(&src_q_head->node.next, NULL); + if (head) + break; /* non-empty */ + if (CMM_LOAD_SHARED(src_q_tail->p) == &src_q_head->node) + return CDS_WFCQ_RET_SRC_EMPTY; + if (___cds_wfcq_busy_wait(&attempt, blocking)) + return CDS_WFCQ_RET_WOULDBLOCK; + } + + /* + * Memory barrier implied before uatomic_xchg() orders store to + * src_q->head before store to src_q->tail. This is required by + * concurrent enqueue on src_q, which exchanges the tail before + * updating the previous tail's next pointer. + */ + tail = uatomic_xchg(&src_q_tail->p, &src_q_head->node); + + /* + * Append the spliced content of src_q into dest_q. Does not + * require mutual exclusion on dest_q (wait-free). + */ + if (___cds_wfcq_append(__cds_wfcq_head_cast(dest_q_head), dest_q_tail, + head, tail)) + return CDS_WFCQ_RET_DEST_NON_EMPTY; + else + return CDS_WFCQ_RET_DEST_EMPTY; +} + +/* + * __cds_wfcq_splice_blocking: enqueue all src_q nodes at the end of dest_q. + * + * Dequeue all nodes from src_q. + * dest_q must be already initialized. + * Mutual exclusion for src_q should be ensured by the caller as + * specified in the "Synchronisation table". + * Returns enum cds_wfcq_ret which indicates the state of the src or + * dest queue. Never returns CDS_WFCQ_RET_WOULDBLOCK. + */ +static inline enum cds_wfcq_ret +___cds_wfcq_splice_blocking( + cds_wfcq_head_ptr_t dest_q_head, + struct cds_wfcq_tail *dest_q_tail, + cds_wfcq_head_ptr_t src_q_head, + struct cds_wfcq_tail *src_q_tail) +{ + return ___cds_wfcq_splice(dest_q_head, dest_q_tail, + src_q_head, src_q_tail, 1); +} + +/* + * __cds_wfcq_splice_nonblocking: enqueue all src_q nodes at the end of dest_q. + * + * Same as __cds_wfcq_splice_blocking, but returns + * CDS_WFCQ_RET_WOULDBLOCK if it needs to block. + */ +static inline enum cds_wfcq_ret +___cds_wfcq_splice_nonblocking( + cds_wfcq_head_ptr_t dest_q_head, + struct cds_wfcq_tail *dest_q_tail, + cds_wfcq_head_ptr_t src_q_head, + struct cds_wfcq_tail *src_q_tail) +{ + return ___cds_wfcq_splice(dest_q_head, dest_q_tail, + src_q_head, src_q_tail, 0); +} + +/* + * cds_wfcq_dequeue_with_state_blocking: dequeue a node from a wait-free queue. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Mutual exclusion with cds_wfcq_splice_blocking and dequeue lock is + * ensured. + * It is valid to reuse and free a dequeued node immediately. + */ +static inline struct cds_wfcq_node * +_cds_wfcq_dequeue_with_state_blocking(struct cds_wfcq_head *head, + struct cds_wfcq_tail *tail, int *state) +{ + struct cds_wfcq_node *retval; + + _cds_wfcq_dequeue_lock(head, tail); + retval = ___cds_wfcq_dequeue_with_state_blocking(cds_wfcq_head_cast(head), + tail, state); + _cds_wfcq_dequeue_unlock(head, tail); + return retval; +} + +/* + * cds_wfcq_dequeue_blocking: dequeue node from queue. + * + * Same as cds_wfcq_dequeue_blocking, but without saving state. + */ +static inline struct cds_wfcq_node * +_cds_wfcq_dequeue_blocking(struct cds_wfcq_head *head, + struct cds_wfcq_tail *tail) +{ + return _cds_wfcq_dequeue_with_state_blocking(head, tail, NULL); +} + +/* + * cds_wfcq_splice_blocking: enqueue all src_q nodes at the end of dest_q. + * + * Dequeue all nodes from src_q. + * dest_q must be already initialized. + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Mutual exclusion with cds_wfcq_dequeue_blocking and dequeue lock is + * ensured. + * Returns enum cds_wfcq_ret which indicates the state of the src or + * dest queue. Never returns CDS_WFCQ_RET_WOULDBLOCK. + */ +static inline enum cds_wfcq_ret +_cds_wfcq_splice_blocking( + struct cds_wfcq_head *dest_q_head, + struct cds_wfcq_tail *dest_q_tail, + struct cds_wfcq_head *src_q_head, + struct cds_wfcq_tail *src_q_tail) +{ + enum cds_wfcq_ret ret; + + _cds_wfcq_dequeue_lock(src_q_head, src_q_tail); + ret = ___cds_wfcq_splice_blocking(cds_wfcq_head_cast(dest_q_head), dest_q_tail, + cds_wfcq_head_cast(src_q_head), src_q_tail); + _cds_wfcq_dequeue_unlock(src_q_head, src_q_tail); + return ret; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _URCU_WFCQUEUE_STATIC_H */ diff --git a/contrib/userspace-rcu/static-wfstack.h b/contrib/userspace-rcu/static-wfstack.h new file mode 100644 index 00000000000..29b81c3aac3 --- /dev/null +++ b/contrib/userspace-rcu/static-wfstack.h @@ -0,0 +1,455 @@ +#ifndef _URCU_STATIC_WFSTACK_H +#define _URCU_STATIC_WFSTACK_H + +/* + * urcu/static/wfstack.h + * + * Userspace RCU library - Stack with with wait-free push, blocking traversal. + * + * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu/wfstack.h for + * linking dynamically with the userspace rcu library. + * + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/* Adapted from userspace-rcu 0.10 because version 0.7 doesn't support a stack + * without mutex. */ + +#include <pthread.h> +#include <assert.h> +#include <poll.h> +#include <stdbool.h> +#include <urcu/compiler.h> +#include <urcu/uatomic.h> + +#ifdef __cplusplus +extern "C" { +#endif + +#define CDS_WFS_END ((void *) 0x1UL) +#define CDS_WFS_ADAPT_ATTEMPTS 10 /* Retry if being set */ +#define CDS_WFS_WAIT 10 /* Wait 10 ms if being set */ + +/* + * Stack with wait-free push, blocking traversal. + * + * Stack implementing push, pop, pop_all operations, as well as iterator + * on the stack head returned by pop_all. + * + * Wait-free operations: cds_wfs_push, __cds_wfs_pop_all, cds_wfs_empty, + * cds_wfs_first. + * Blocking operations: cds_wfs_pop, cds_wfs_pop_all, cds_wfs_next, + * iteration on stack head returned by pop_all. + * + * Synchronization table: + * + * External synchronization techniques described in the API below is + * required between pairs marked with "X". No external synchronization + * required between pairs marked with "-". + * + * cds_wfs_push __cds_wfs_pop __cds_wfs_pop_all + * cds_wfs_push - - - + * __cds_wfs_pop - X X + * __cds_wfs_pop_all - X - + * + * cds_wfs_pop and cds_wfs_pop_all use an internal mutex to provide + * synchronization. + */ + +/* + * cds_wfs_node_init: initialize wait-free stack node. + */ +static inline +void _cds_wfs_node_init(struct cds_wfs_node *node) +{ + node->next = NULL; +} + +/* + * __cds_wfs_init: initialize wait-free stack. Don't pair with + * any destroy function. + */ +static inline void ___cds_wfs_init(struct __cds_wfs_stack *s) +{ + s->head = CDS_WFS_END; +} + +/* + * cds_wfs_init: initialize wait-free stack. Pair with + * cds_wfs_destroy(). + */ +static inline +void _cds_wfs_init(struct cds_wfs_stack *s) +{ + int ret; + + s->head = CDS_WFS_END; + ret = pthread_mutex_init(&s->lock, NULL); + assert(!ret); +} + +/* + * cds_wfs_destroy: destroy wait-free stack. Pair with + * cds_wfs_init(). + */ +static inline +void _cds_wfs_destroy(struct cds_wfs_stack *s) +{ + int ret = pthread_mutex_destroy(&s->lock); + assert(!ret); +} + +static inline bool ___cds_wfs_end(void *node) +{ + return node == CDS_WFS_END; +} + +/* + * cds_wfs_empty: return whether wait-free stack is empty. + * + * No memory barrier is issued. No mutual exclusion is required. + */ +static inline bool _cds_wfs_empty(cds_wfs_stack_ptr_t u_stack) +{ + struct __cds_wfs_stack *s = u_stack._s; + + return ___cds_wfs_end(CMM_LOAD_SHARED(s->head)); +} + +/* + * cds_wfs_push: push a node into the stack. + * + * Issues a full memory barrier before push. No mutual exclusion is + * required. + * + * Returns 0 if the stack was empty prior to adding the node. + * Returns non-zero otherwise. + */ +static inline +int _cds_wfs_push(cds_wfs_stack_ptr_t u_stack, struct cds_wfs_node *node) +{ + struct __cds_wfs_stack *s = u_stack._s; + struct cds_wfs_head *old_head, *new_head; + + assert(node->next == NULL); + new_head = caa_container_of(node, struct cds_wfs_head, node); + /* + * uatomic_xchg() implicit memory barrier orders earlier stores + * to node (setting it to NULL) before publication. + */ + old_head = uatomic_xchg(&s->head, new_head); + /* + * At this point, dequeuers see a NULL node->next, they should + * busy-wait until node->next is set to old_head. + */ + CMM_STORE_SHARED(node->next, &old_head->node); + return !___cds_wfs_end(old_head); +} + +/* + * Waiting for push to complete enqueue and return the next node. + */ +static inline struct cds_wfs_node * +___cds_wfs_node_sync_next(struct cds_wfs_node *node, int blocking) +{ + struct cds_wfs_node *next; + int attempt = 0; + + /* + * Adaptative busy-looping waiting for push to complete. + */ + while ((next = CMM_LOAD_SHARED(node->next)) == NULL) { + if (!blocking) + return CDS_WFS_WOULDBLOCK; + if (++attempt >= CDS_WFS_ADAPT_ATTEMPTS) { + (void) poll(NULL, 0, CDS_WFS_WAIT); /* Wait for 10ms */ + attempt = 0; + } else { + caa_cpu_relax(); + } + } + + return next; +} + +static inline +struct cds_wfs_node * +___cds_wfs_pop(cds_wfs_stack_ptr_t u_stack, int *state, int blocking) +{ + struct cds_wfs_head *head, *new_head; + struct cds_wfs_node *next; + struct __cds_wfs_stack *s = u_stack._s; + + if (state) + *state = 0; + for (;;) { + head = CMM_LOAD_SHARED(s->head); + if (___cds_wfs_end(head)) { + return NULL; + } + next = ___cds_wfs_node_sync_next(&head->node, blocking); + if (!blocking && next == CDS_WFS_WOULDBLOCK) { + return CDS_WFS_WOULDBLOCK; + } + new_head = caa_container_of(next, struct cds_wfs_head, node); + if (uatomic_cmpxchg(&s->head, head, new_head) == head) { + if (state && ___cds_wfs_end(new_head)) + *state |= CDS_WFS_STATE_LAST; + return &head->node; + } + if (!blocking) { + return CDS_WFS_WOULDBLOCK; + } + /* busy-loop if head changed under us */ + } +} + +/* + * __cds_wfs_pop_with_state_blocking: pop a node from the stack, with state. + * + * Returns NULL if stack is empty. + * + * __cds_wfs_pop_blocking needs to be synchronized using one of the + * following techniques: + * + * 1) Calling __cds_wfs_pop_blocking under rcu read lock critical + * section. The caller must wait for a grace period to pass before + * freeing the returned node or modifying the cds_wfs_node structure. + * 2) Using mutual exclusion (e.g. mutexes) to protect + * __cds_wfs_pop_blocking and __cds_wfs_pop_all callers. + * 3) Ensuring that only ONE thread can call __cds_wfs_pop_blocking() + * and __cds_wfs_pop_all(). (multi-provider/single-consumer scheme). + * + * "state" saves state flags atomically sampled with pop operation. + */ +static inline +struct cds_wfs_node * +___cds_wfs_pop_with_state_blocking(cds_wfs_stack_ptr_t u_stack, int *state) +{ + return ___cds_wfs_pop(u_stack, state, 1); +} + +static inline +struct cds_wfs_node * +___cds_wfs_pop_blocking(cds_wfs_stack_ptr_t u_stack) +{ + return ___cds_wfs_pop_with_state_blocking(u_stack, NULL); +} + +/* + * __cds_wfs_pop_with_state_nonblocking: pop a node from the stack. + * + * Same as __cds_wfs_pop_with_state_blocking, but returns + * CDS_WFS_WOULDBLOCK if it needs to block. + * + * "state" saves state flags atomically sampled with pop operation. + */ +static inline +struct cds_wfs_node * +___cds_wfs_pop_with_state_nonblocking(cds_wfs_stack_ptr_t u_stack, int *state) +{ + return ___cds_wfs_pop(u_stack, state, 0); +} + +/* + * __cds_wfs_pop_nonblocking: pop a node from the stack. + * + * Same as __cds_wfs_pop_blocking, but returns CDS_WFS_WOULDBLOCK if + * it needs to block. + */ +static inline +struct cds_wfs_node * +___cds_wfs_pop_nonblocking(cds_wfs_stack_ptr_t u_stack) +{ + return ___cds_wfs_pop_with_state_nonblocking(u_stack, NULL); +} + +/* + * __cds_wfs_pop_all: pop all nodes from a stack. + * + * __cds_wfs_pop_all does not require any synchronization with other + * push, nor with other __cds_wfs_pop_all, but requires synchronization + * matching the technique used to synchronize __cds_wfs_pop_blocking: + * + * 1) If __cds_wfs_pop_blocking is called under rcu read lock critical + * section, both __cds_wfs_pop_blocking and cds_wfs_pop_all callers + * must wait for a grace period to pass before freeing the returned + * node or modifying the cds_wfs_node structure. However, no RCU + * read-side critical section is needed around __cds_wfs_pop_all. + * 2) Using mutual exclusion (e.g. mutexes) to protect + * __cds_wfs_pop_blocking and __cds_wfs_pop_all callers. + * 3) Ensuring that only ONE thread can call __cds_wfs_pop_blocking() + * and __cds_wfs_pop_all(). (multi-provider/single-consumer scheme). + */ +static inline +struct cds_wfs_head * +___cds_wfs_pop_all(cds_wfs_stack_ptr_t u_stack) +{ + struct __cds_wfs_stack *s = u_stack._s; + struct cds_wfs_head *head; + + /* + * Implicit memory barrier after uatomic_xchg() matches implicit + * memory barrier before uatomic_xchg() in cds_wfs_push. It + * ensures that all nodes of the returned list are consistent. + * There is no need to issue memory barriers when iterating on + * the returned list, because the full memory barrier issued + * prior to each uatomic_cmpxchg, which each write to head, are + * taking care to order writes to each node prior to the full + * memory barrier after this uatomic_xchg(). + */ + head = uatomic_xchg(&s->head, CDS_WFS_END); + if (___cds_wfs_end(head)) + return NULL; + return head; +} + +/* + * cds_wfs_pop_lock: lock stack pop-protection mutex. + */ +static inline void _cds_wfs_pop_lock(struct cds_wfs_stack *s) +{ + int ret; + + ret = pthread_mutex_lock(&s->lock); + assert(!ret); +} + +/* + * cds_wfs_pop_unlock: unlock stack pop-protection mutex. + */ +static inline void _cds_wfs_pop_unlock(struct cds_wfs_stack *s) +{ + int ret; + + ret = pthread_mutex_unlock(&s->lock); + assert(!ret); +} + +/* + * Call __cds_wfs_pop_with_state_blocking with an internal pop mutex held. + */ +static inline +struct cds_wfs_node * +_cds_wfs_pop_with_state_blocking(struct cds_wfs_stack *s, int *state) +{ + struct cds_wfs_node *retnode; + + _cds_wfs_pop_lock(s); + retnode = ___cds_wfs_pop_with_state_blocking(s, state); + _cds_wfs_pop_unlock(s); + return retnode; +} + +/* + * Call _cds_wfs_pop_with_state_blocking without saving any state. + */ +static inline +struct cds_wfs_node * +_cds_wfs_pop_blocking(struct cds_wfs_stack *s) +{ + return _cds_wfs_pop_with_state_blocking(s, NULL); +} + +/* + * Call __cds_wfs_pop_all with an internal pop mutex held. + */ +static inline +struct cds_wfs_head * +_cds_wfs_pop_all_blocking(struct cds_wfs_stack *s) +{ + struct cds_wfs_head *rethead; + + _cds_wfs_pop_lock(s); + rethead = ___cds_wfs_pop_all(s); + _cds_wfs_pop_unlock(s); + return rethead; +} + +/* + * cds_wfs_first: get first node of a popped stack. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * + * Used by for-like iteration macros in urcu/wfstack.h: + * cds_wfs_for_each_blocking() + * cds_wfs_for_each_blocking_safe() + * + * Returns NULL if popped stack is empty, top stack node otherwise. + */ +static inline struct cds_wfs_node * +_cds_wfs_first(struct cds_wfs_head *head) +{ + if (___cds_wfs_end(head)) + return NULL; + return &head->node; +} + +static inline struct cds_wfs_node * +___cds_wfs_next(struct cds_wfs_node *node, int blocking) +{ + struct cds_wfs_node *next; + + next = ___cds_wfs_node_sync_next(node, blocking); + /* + * CDS_WFS_WOULDBLOCK != CSD_WFS_END, so we can check for end + * even if ___cds_wfs_node_sync_next returns CDS_WFS_WOULDBLOCK, + * and still return CDS_WFS_WOULDBLOCK. + */ + if (___cds_wfs_end(next)) + return NULL; + return next; +} + +/* + * cds_wfs_next_blocking: get next node of a popped stack. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * + * Used by for-like iteration macros in urcu/wfstack.h: + * cds_wfs_for_each_blocking() + * cds_wfs_for_each_blocking_safe() + * + * Returns NULL if reached end of popped stack, non-NULL next stack + * node otherwise. + */ +static inline struct cds_wfs_node * +_cds_wfs_next_blocking(struct cds_wfs_node *node) +{ + return ___cds_wfs_next(node, 1); +} + + +/* + * cds_wfs_next_nonblocking: get next node of a popped stack. + * + * Same as cds_wfs_next_blocking, but returns CDS_WFS_WOULDBLOCK if it + * needs to block. + */ +static inline struct cds_wfs_node * +_cds_wfs_next_nonblocking(struct cds_wfs_node *node) +{ + return ___cds_wfs_next(node, 0); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _URCU_STATIC_WFSTACK_H */ diff --git a/contrib/userspace-rcu/wfcqueue.h b/contrib/userspace-rcu/wfcqueue.h new file mode 100644 index 00000000000..0292585ac79 --- /dev/null +++ b/contrib/userspace-rcu/wfcqueue.h @@ -0,0 +1,216 @@ +#ifndef _URCU_WFCQUEUE_H +#define _URCU_WFCQUEUE_H + +/* + * urcu/wfcqueue.h + * + * Userspace RCU library - Concurrent Queue with Wait-Free Enqueue/Blocking Dequeue + * + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com> + * Copyright 2011-2012 - Lai Jiangshan <laijs@cn.fujitsu.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/* Adapted from userspace-rcu 0.10 because version 0.7 doesn't contain it. + * The non-LGPL section has been removed. */ + +#include <pthread.h> +#include <assert.h> +#include <stdbool.h> +#include <urcu/compiler.h> +#include <urcu/arch.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Concurrent queue with wait-free enqueue/blocking dequeue. + * + * This queue has been designed and implemented collaboratively by + * Mathieu Desnoyers and Lai Jiangshan. Inspired from + * half-wait-free/half-blocking queue implementation done by Paul E. + * McKenney. + */ + +#define CDS_WFCQ_WOULDBLOCK ((struct cds_wfcq_node *) -1UL) + +enum cds_wfcq_ret { + CDS_WFCQ_RET_WOULDBLOCK = -1, + CDS_WFCQ_RET_DEST_EMPTY = 0, + CDS_WFCQ_RET_DEST_NON_EMPTY = 1, + CDS_WFCQ_RET_SRC_EMPTY = 2, +}; + +enum cds_wfcq_state { + CDS_WFCQ_STATE_LAST = (1U << 0), +}; + +struct cds_wfcq_node { + struct cds_wfcq_node *next; +}; + +/* + * Do not put head and tail on the same cache-line if concurrent + * enqueue/dequeue are expected from many CPUs. This eliminates + * false-sharing between enqueue and dequeue. + */ +struct __cds_wfcq_head { + struct cds_wfcq_node node; +}; + +struct cds_wfcq_head { + struct cds_wfcq_node node; + pthread_mutex_t lock; +}; + +#ifndef __cplusplus +/* + * The transparent union allows calling functions that work on both + * struct cds_wfcq_head and struct __cds_wfcq_head on any of those two + * types. + */ +typedef union { + struct __cds_wfcq_head *_h; + struct cds_wfcq_head *h; +} __attribute__((__transparent_union__)) cds_wfcq_head_ptr_t; + +/* + * This static inline is only present for compatibility with C++. It is + * effect-less in C. + */ +static inline struct __cds_wfcq_head *__cds_wfcq_head_cast(struct __cds_wfcq_head *head) +{ + return head; +} + +/* + * This static inline is only present for compatibility with C++. It is + * effect-less in C. + */ +static inline struct cds_wfcq_head *cds_wfcq_head_cast(struct cds_wfcq_head *head) +{ + return head; +} +#else /* #ifndef __cplusplus */ + +/* C++ ignores transparent union. */ +typedef union { + struct __cds_wfcq_head *_h; + struct cds_wfcq_head *h; +} cds_wfcq_head_ptr_t; + +/* C++ ignores transparent union. Requires an explicit conversion. */ +static inline cds_wfcq_head_ptr_t __cds_wfcq_head_cast(struct __cds_wfcq_head *head) +{ + cds_wfcq_head_ptr_t ret = { ._h = head }; + return ret; +} +/* C++ ignores transparent union. Requires an explicit conversion. */ +static inline cds_wfcq_head_ptr_t cds_wfcq_head_cast(struct cds_wfcq_head *head) +{ + cds_wfcq_head_ptr_t ret = { .h = head }; + return ret; +} +#endif /* #else #ifndef __cplusplus */ + +struct cds_wfcq_tail { + struct cds_wfcq_node *p; +}; + +#include "static-wfcqueue.h" + +#define cds_wfcq_node_init _cds_wfcq_node_init +#define cds_wfcq_init _cds_wfcq_init +#define __cds_wfcq_init ___cds_wfcq_init +#define cds_wfcq_destroy _cds_wfcq_destroy +#define cds_wfcq_empty _cds_wfcq_empty +#define cds_wfcq_enqueue _cds_wfcq_enqueue + +/* Dequeue locking */ +#define cds_wfcq_dequeue_lock _cds_wfcq_dequeue_lock +#define cds_wfcq_dequeue_unlock _cds_wfcq_dequeue_unlock + +/* Locking performed within cds_wfcq calls. */ +#define cds_wfcq_dequeue_blocking _cds_wfcq_dequeue_blocking +#define cds_wfcq_dequeue_with_state_blocking \ + _cds_wfcq_dequeue_with_state_blocking +#define cds_wfcq_splice_blocking _cds_wfcq_splice_blocking +#define cds_wfcq_first_blocking _cds_wfcq_first_blocking +#define cds_wfcq_next_blocking _cds_wfcq_next_blocking + +/* Locking ensured by caller by holding cds_wfcq_dequeue_lock() */ +#define __cds_wfcq_dequeue_blocking ___cds_wfcq_dequeue_blocking +#define __cds_wfcq_dequeue_with_state_blocking \ + ___cds_wfcq_dequeue_with_state_blocking +#define __cds_wfcq_splice_blocking ___cds_wfcq_splice_blocking +#define __cds_wfcq_first_blocking ___cds_wfcq_first_blocking +#define __cds_wfcq_next_blocking ___cds_wfcq_next_blocking + +/* + * Locking ensured by caller by holding cds_wfcq_dequeue_lock(). + * Non-blocking: deque, first, next return CDS_WFCQ_WOULDBLOCK if they + * need to block. splice returns nonzero if it needs to block. + */ +#define __cds_wfcq_dequeue_nonblocking ___cds_wfcq_dequeue_nonblocking +#define __cds_wfcq_dequeue_with_state_nonblocking \ + ___cds_wfcq_dequeue_with_state_nonblocking +#define __cds_wfcq_splice_nonblocking ___cds_wfcq_splice_nonblocking +#define __cds_wfcq_first_nonblocking ___cds_wfcq_first_nonblocking +#define __cds_wfcq_next_nonblocking ___cds_wfcq_next_nonblocking + +/* + * __cds_wfcq_for_each_blocking: Iterate over all nodes in a queue, + * without dequeuing them. + * @head: head of the queue (struct cds_wfcq_head or __cds_wfcq_head pointer). + * @tail: tail of the queue (struct cds_wfcq_tail pointer). + * @node: iterator on the queue (struct cds_wfcq_node pointer). + * + * Content written into each node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + */ +#define __cds_wfcq_for_each_blocking(head, tail, node) \ + for (node = __cds_wfcq_first_blocking(head, tail); \ + node != NULL; \ + node = __cds_wfcq_next_blocking(head, tail, node)) + +/* + * __cds_wfcq_for_each_blocking_safe: Iterate over all nodes in a queue, + * without dequeuing them. Safe against deletion. + * @head: head of the queue (struct cds_wfcq_head or __cds_wfcq_head pointer). + * @tail: tail of the queue (struct cds_wfcq_tail pointer). + * @node: iterator on the queue (struct cds_wfcq_node pointer). + * @n: struct cds_wfcq_node pointer holding the next pointer (used + * internally). + * + * Content written into each node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + */ +#define __cds_wfcq_for_each_blocking_safe(head, tail, node, n) \ + for (node = __cds_wfcq_first_blocking(head, tail), \ + n = (node ? __cds_wfcq_next_blocking(head, tail, node) : NULL); \ + node != NULL; \ + node = n, n = (node ? __cds_wfcq_next_blocking(head, tail, node) : NULL)) + +#ifdef __cplusplus +} +#endif + +#endif /* _URCU_WFCQUEUE_H */ diff --git a/contrib/userspace-rcu/wfstack.h b/contrib/userspace-rcu/wfstack.h new file mode 100644 index 00000000000..738fd1cfd33 --- /dev/null +++ b/contrib/userspace-rcu/wfstack.h @@ -0,0 +1,178 @@ +#ifndef _URCU_WFSTACK_H +#define _URCU_WFSTACK_H + +/* + * urcu/wfstack.h + * + * Userspace RCU library - Stack with wait-free push, blocking traversal. + * + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/* Adapted from userspace-rcu 0.10 because version 0.7 doesn't support a stack + * without mutex. The non-LGPL section has been removed. */ + +#include <pthread.h> +#include <assert.h> +#include <stdbool.h> +#include <urcu/compiler.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Stack with wait-free push, blocking traversal. + * + * Stack implementing push, pop, pop_all operations, as well as iterator + * on the stack head returned by pop_all. + * + * Wait-free operations: cds_wfs_push, __cds_wfs_pop_all, cds_wfs_empty, + * cds_wfs_first. + * Blocking operations: cds_wfs_pop, cds_wfs_pop_all, cds_wfs_next, + * iteration on stack head returned by pop_all. + * + * Synchronization table: + * + * External synchronization techniques described in the API below is + * required between pairs marked with "X". No external synchronization + * required between pairs marked with "-". + * + * cds_wfs_push __cds_wfs_pop __cds_wfs_pop_all + * cds_wfs_push - - - + * __cds_wfs_pop - X X + * __cds_wfs_pop_all - X - + * + * cds_wfs_pop and cds_wfs_pop_all use an internal mutex to provide + * synchronization. + */ + +#define CDS_WFS_WOULDBLOCK ((void *) -1UL) + +enum cds_wfs_state { + CDS_WFS_STATE_LAST = (1U << 0), +}; + +/* + * struct cds_wfs_node is returned by __cds_wfs_pop, and also used as + * iterator on stack. It is not safe to dereference the node next + * pointer when returned by __cds_wfs_pop_blocking. + */ +struct cds_wfs_node { + struct cds_wfs_node *next; +}; + +/* + * struct cds_wfs_head is returned by __cds_wfs_pop_all, and can be used + * to begin iteration on the stack. "node" needs to be the first field of + * cds_wfs_head, so the end-of-stack pointer value can be used for both + * types. + */ +struct cds_wfs_head { + struct cds_wfs_node node; +}; + +struct __cds_wfs_stack { + struct cds_wfs_head *head; +}; + +struct cds_wfs_stack { + struct cds_wfs_head *head; + pthread_mutex_t lock; +}; + +/* + * The transparent union allows calling functions that work on both + * struct cds_wfs_stack and struct __cds_wfs_stack on any of those two + * types. + */ +typedef union { + struct __cds_wfs_stack *_s; + struct cds_wfs_stack *s; +} __attribute__((__transparent_union__)) cds_wfs_stack_ptr_t; + +#include "static-wfstack.h" + +#define cds_wfs_node_init _cds_wfs_node_init +#define cds_wfs_init _cds_wfs_init +#define cds_wfs_destroy _cds_wfs_destroy +#define __cds_wfs_init ___cds_wfs_init +#define cds_wfs_empty _cds_wfs_empty +#define cds_wfs_push _cds_wfs_push + +/* Locking performed internally */ +#define cds_wfs_pop_blocking _cds_wfs_pop_blocking +#define cds_wfs_pop_with_state_blocking _cds_wfs_pop_with_state_blocking +#define cds_wfs_pop_all_blocking _cds_wfs_pop_all_blocking + +/* + * For iteration on cds_wfs_head returned by __cds_wfs_pop_all or + * cds_wfs_pop_all_blocking. + */ +#define cds_wfs_first _cds_wfs_first +#define cds_wfs_next_blocking _cds_wfs_next_blocking +#define cds_wfs_next_nonblocking _cds_wfs_next_nonblocking + +/* Pop locking with internal mutex */ +#define cds_wfs_pop_lock _cds_wfs_pop_lock +#define cds_wfs_pop_unlock _cds_wfs_pop_unlock + +/* Synchronization ensured by the caller. See synchronization table. */ +#define __cds_wfs_pop_blocking ___cds_wfs_pop_blocking +#define __cds_wfs_pop_with_state_blocking \ + ___cds_wfs_pop_with_state_blocking +#define __cds_wfs_pop_nonblocking ___cds_wfs_pop_nonblocking +#define __cds_wfs_pop_with_state_nonblocking \ + ___cds_wfs_pop_with_state_nonblocking +#define __cds_wfs_pop_all ___cds_wfs_pop_all + +#ifdef __cplusplus +} +#endif + +/* + * cds_wfs_for_each_blocking: Iterate over all nodes returned by + * __cds_wfs_pop_all(). + * @head: head of the queue (struct cds_wfs_head pointer). + * @node: iterator (struct cds_wfs_node pointer). + * + * Content written into each node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + */ +#define cds_wfs_for_each_blocking(head, node) \ + for (node = cds_wfs_first(head); \ + node != NULL; \ + node = cds_wfs_next_blocking(node)) + +/* + * cds_wfs_for_each_blocking_safe: Iterate over all nodes returned by + * __cds_wfs_pop_all(). Safe against deletion. + * @head: head of the queue (struct cds_wfs_head pointer). + * @node: iterator (struct cds_wfs_node pointer). + * @n: struct cds_wfs_node pointer holding the next pointer (used + * internally). + * + * Content written into each node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + */ +#define cds_wfs_for_each_blocking_safe(head, node, n) \ + for (node = cds_wfs_first(head), \ + n = (node ? cds_wfs_next_blocking(node) : NULL); \ + node != NULL; \ + node = n, n = (node ? cds_wfs_next_blocking(node) : NULL)) + +#endif /* _URCU_WFSTACK_H */ diff --git a/glusterfsd/src/glusterfsd.c b/glusterfsd/src/glusterfsd.c index ae74de73f29..9ee88b6d57b 100644 --- a/glusterfsd/src/glusterfsd.c +++ b/glusterfsd/src/glusterfsd.c @@ -203,6 +203,8 @@ static struct argp_option gf_options[] = { "Instantiate process global timer-wheel"}, {"thin-client", ARGP_THIN_CLIENT_KEY, 0, 0, "Enables thin mount and connects via gfproxyd daemon"}, + {"global-threading", ARGP_GLOBAL_THREADING_KEY, "BOOL", OPTION_ARG_OPTIONAL, + "Use the global thread pool instead of io-threads"}, {0, 0, 0, 0, "Fuse options:"}, {"direct-io-mode", ARGP_DIRECT_IO_MODE_KEY, "BOOL|auto", @@ -697,6 +699,14 @@ set_fuse_mount_options(glusterfs_ctx_t *ctx, dict_t *options) cmd_args->fuse_flush_handle_interrupt); break; } + if (cmd_args->global_threading) { + ret = dict_set_static_ptr(options, "global-threading", "on"); + if (ret < 0) { + gf_msg("glusterfsd", GF_LOG_ERROR, 0, glusterfsd_msg_4, + "failed to set dict value for key global-threading"); + goto err; + } + } ret = 0; err: @@ -1497,6 +1507,7 @@ parse_opts(int key, char *arg, struct argp_state *state) "unknown fuse flush handle interrupt setting \"%s\"", arg); break; + case ARGP_FUSE_AUTO_INVAL_KEY: if (!arg) arg = "yes"; @@ -1507,6 +1518,20 @@ parse_opts(int key, char *arg, struct argp_state *state) } break; + + case ARGP_GLOBAL_THREADING_KEY: + if (!arg || (*arg == 0)) { + arg = "yes"; + } + + if (gf_string2boolean(arg, &b) == 0) { + cmd_args->global_threading = b; + break; + } + + argp_failure(state, -1, 0, + "Invalid value for global threading \"%s\"", arg); + break; } return 0; } @@ -2435,6 +2460,10 @@ glusterfs_signals_setup(glusterfs_ctx_t *ctx) sigaddset(&set, SIGUSR1); /* gf_proc_dump_info */ sigaddset(&set, SIGUSR2); + /* Signals needed for asynchronous framework. */ + sigaddset(&set, GF_ASYNC_SIGQUEUE); + sigaddset(&set, GF_ASYNC_SIGCTRL); + ret = pthread_sigmask(SIG_BLOCK, &set, NULL); if (ret) { gf_msg("glusterfsd", GF_LOG_WARNING, errno, glusterfsd_msg_22, @@ -2845,6 +2874,11 @@ main(int argc, char *argv[]) */ mem_pools_init_late(); + ret = gf_async_init(ctx); + if (ret < 0) { + goto out; + } + #ifdef GF_LINUX_HOST_OS ret = set_oom_score_adj(ctx); if (ret) @@ -2871,6 +2905,7 @@ main(int argc, char *argv[]) ret = event_dispatch(ctx->event_pool); out: - // glusterfs_ctx_destroy (ctx); + // glusterfs_ctx_destroy (ctx); + gf_async_fini(); return ret; } diff --git a/glusterfsd/src/glusterfsd.h b/glusterfsd/src/glusterfsd.h index 35cf6d88b7a..c91daa0fb54 100644 --- a/glusterfsd/src/glusterfsd.h +++ b/glusterfsd/src/glusterfsd.h @@ -111,6 +111,7 @@ enum argp_option_keys { ARGP_FUSE_FLUSH_HANDLE_INTERRUPT_KEY = 189, ARGP_FUSE_LRU_LIMIT_KEY = 190, ARGP_FUSE_AUTO_INVAL_KEY = 191, + ARGP_GLOBAL_THREADING_KEY = 192 }; struct _gfd_vol_top_priv { diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am index 970f4b74978..f030a70b0f0 100644 --- a/libglusterfs/src/Makefile.am +++ b/libglusterfs/src/Makefile.am @@ -37,7 +37,7 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \ $(CONTRIBDIR)/timer-wheel/timer-wheel.c \ $(CONTRIBDIR)/timer-wheel/find_last_bit.c default-args.c locking.c \ $(CONTRIBDIR)/xxhash/xxhash.c \ - compound-fop-utils.c throttle-tbf.c monitoring.c + compound-fop-utils.c throttle-tbf.c monitoring.c async.c nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c defaults.c nodist_libglusterfs_la_HEADERS = y.tab.h protocol-common.h @@ -69,7 +69,7 @@ libglusterfs_la_HEADERS = glusterfs/common-utils.h glusterfs/defaults.h \ glusterfs/quota-common-utils.h glusterfs/rot-buffs.h \ glusterfs/compat-uuid.h glusterfs/upcall-utils.h glusterfs/throttle-tbf.h \ glusterfs/events.h glusterfs/compound-fop-utils.h glusterfs/atomic.h \ - glusterfs/monitoring.h + glusterfs/monitoring.h glusterfs/async.h libglusterfs_ladir = $(includedir)/glusterfs @@ -79,6 +79,10 @@ noinst_HEADERS = unittest/unittest.h \ $(CONTRIBDIR)/libexecinfo/execinfo_compat.h \ $(CONTRIBDIR)/timer-wheel/timer-wheel.h \ $(CONTRIBDIR)/xxhash/xxhash.h \ + $(CONTRIBDIR)/userspace-rcu/wfcqueue.h \ + $(CONTRIBDIR)/userspace-rcu/wfstack.h \ + $(CONTRIBDIR)/userspace-rcu/static-wfcqueue.h \ + $(CONTRIBDIR)/userspace-rcu/static-wfstack.h \ tier-ctr-interface.h eventtypes.h: $(top_srcdir)/events/eventskeygen.py diff --git a/libglusterfs/src/async.c b/libglusterfs/src/async.c new file mode 100644 index 00000000000..ae7152ff7fa --- /dev/null +++ b/libglusterfs/src/async.c @@ -0,0 +1,723 @@ +/* + Copyright (c) 2019 Red Hat, Inc <https://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +/* To implement an efficient thread pool with minimum contention we have used + * the following ideas: + * + * - The queue of jobs has been implemented using a Wait-Free queue provided + * by the userspace-rcu library. This queue requires a mutex when multiple + * consumers can be extracting items from it concurrently, but the locked + * region is very small, which minimizes the chances of contention. To + * further minimize contention, the number of active worker threads that + * are accessing the queue is dynamically adjusted so that we always have + * the minimum required amount of workers contending for the queue. Adding + * new items can be done with a single atomic operation, without locks. + * + * - All queue management operations, like creating more threads, enabling + * sleeping ones, etc. are done by a single thread. This makes it possible + * to manage all scaling related information and workers lists without + * locks. This functionality is implemented as a role that can be assigned + * to any of the worker threads, which avoids that some lengthy operations + * could interfere with this task. + * + * - Management is based on signals. We used signals for management tasks to + * avoid multiple system calls for each request (with signals we can wait + * for multiple events and get some additional data for each request in a + * single call, instead of first polling and then reading). + * + * TODO: There are some other changes that can take advantage of this new + * thread pool. + * + * - Use this thread pool as the core threading model for synctasks. I + * think this would improve synctask performance because I think we + * currently have some contention there for some workloads. + * + * - Implement a per thread timer that will allow adding and removing + * timers without using mutexes. + * + * - Integrate with userspace-rcu library in QSBR mode, allowing + * other portions of code to be implemented using RCU-based + * structures with a extremely fast read side without contention. + * + * - Integrate I/O into the thread pool so that the thread pool is + * able to efficiently manage all loads and scale dynamically. This + * could make it possible to minimize context switching when serving + * requests from fuse or network. + * + * - Dynamically scale the number of workers based on system load. + * This will make it possible to reduce contention when system is + * heavily loaded, improving performance under these circumstances + * (or minimizing performance loss). This will also make it possible + * that gluster can coexist with other processes that also consume + * CPU, with minimal interference from each other. + */ + +#include <unistd.h> +#include <pthread.h> +#include <errno.h> + +//#include <urcu/uatomic.h> + +#include "glusterfs/logging.h" +#include "glusterfs/list.h" +#include "glusterfs/mem-types.h" +#include "glusterfs/async.h" + +/* These macros wrap a simple system/library call to check the returned error + * and log a message in case of failure. */ +#define GF_ASYNC_CHECK(_func, _args...) \ + ({ \ + int32_t __async_error = -_func(_args); \ + if (caa_unlikely(__async_error != 0)) { \ + gf_async_error(__async_error, #_func "() failed."); \ + } \ + __async_error; \ + }) + +#define GF_ASYNC_CHECK_ERRNO(_func, _args...) \ + ({ \ + int32_t __async_error = _func(_args); \ + if (caa_unlikely(__async_error < 0)) { \ + __async_error = -errno; \ + gf_async_error(__async_error, #_func "() failed."); \ + } \ + __async_error; \ + }) + +/* These macros are used when, based on POSIX documentation, the function + * should never fail under the conditions we are using it. So any unexpected + * error will be handled as a fatal event. It probably means a critical bug + * or memory corruption. In both cases we consider that stopping the process + * is safer (otherwise it could cause more corruption with unknown effects + * that could be worse). */ +#define GF_ASYNC_CANTFAIL(_func, _args...) \ + do { \ + int32_t __async_error = -_func(_args); \ + if (caa_unlikely(__async_error != 0)) { \ + gf_async_fatal(__async_error, #_func "() failed"); \ + } \ + } while (0) + +#define GF_ASYNC_CANTFAIL_ERRNO(_func, _args...) \ + ({ \ + int32_t __async_error = _func(_args); \ + if (caa_unlikely(__async_error < 0)) { \ + __async_error = -errno; \ + gf_async_fatal(__async_error, #_func "() failed"); \ + } \ + __async_error; \ + }) + +/* TODO: for now we allocate a static array of workers. There's an issue if we + * try to use dynamic memory since these workers are initialized very + * early in the process startup and it seems that sometimes not all is + * ready to use dynamic memory. */ +static gf_async_worker_t gf_async_workers[GF_ASYNC_MAX_THREADS]; + +/* This is the only global variable needed to manage the entire framework. */ +gf_async_control_t gf_async_ctrl = {}; + +static __thread gf_async_worker_t *gf_async_current_worker = NULL; + +/* The main function of the worker threads. */ +static void * +gf_async_worker(void *arg); + +static void +gf_async_sync_init(void) +{ + GF_ASYNC_CANTFAIL(pthread_barrier_init, &gf_async_ctrl.sync, NULL, 2); +} + +static void +gf_async_sync_now(void) +{ + int32_t ret; + + ret = pthread_barrier_wait(&gf_async_ctrl.sync); + if (ret == PTHREAD_BARRIER_SERIAL_THREAD) { + GF_ASYNC_CANTFAIL(pthread_barrier_destroy, &gf_async_ctrl.sync); + ret = 0; + } + if (caa_unlikely(ret != 0)) { + gf_async_fatal(-ret, "pthread_barrier_wait() failed"); + } +} + +static void +gf_async_sigmask_empty(sigset_t *mask) +{ + GF_ASYNC_CANTFAIL_ERRNO(sigemptyset, mask); +} + +static void +gf_async_sigmask_add(sigset_t *mask, int32_t signal) +{ + GF_ASYNC_CANTFAIL_ERRNO(sigaddset, mask, signal); +} + +static void +gf_async_sigmask_set(int32_t mode, sigset_t *mask, sigset_t *old) +{ + GF_ASYNC_CANTFAIL(pthread_sigmask, mode, mask, old); +} + +static void +gf_async_sigaction(int32_t signum, const struct sigaction *action, + struct sigaction *old) +{ + GF_ASYNC_CANTFAIL_ERRNO(sigaction, signum, action, old); +} + +static int32_t +gf_async_sigwait(sigset_t *set) +{ + int32_t ret, signum; + + do { + ret = sigwait(set, &signum); + } while (caa_unlikely((ret < 0) && (errno == EINTR))); + + if (caa_unlikely(ret < 0)) { + ret = -errno; + gf_async_fatal(ret, "sigwait() failed"); + } + + return signum; +} + +static int32_t +gf_async_sigtimedwait(sigset_t *set, struct timespec *timeout) +{ + int32_t ret; + + do { + ret = sigtimedwait(set, NULL, timeout); + } while (caa_unlikely((ret < 0) && (errno == EINTR))); + if (caa_unlikely(ret < 0)) { + ret = -errno; + /* EAGAIN means that the timeout has expired, so we allow this error. + * Any other error shouldn't happen. */ + if (caa_unlikely(ret != -EAGAIN)) { + gf_async_fatal(ret, "sigtimedwait() failed"); + } + ret = 0; + } + + return ret; +} + +static void +gf_async_sigbroadcast(int32_t signum) +{ + GF_ASYNC_CANTFAIL_ERRNO(kill, gf_async_ctrl.pid, signum); +} + +static void +gf_async_signal_handler(int32_t signum) +{ + /* We should never handle a signal in this function. */ + gf_async_fatal(-EBUSY, + "Unexpected processing of signal %d through a handler.", + signum); +} + +static void +gf_async_signal_setup(void) +{ + struct sigaction action; + + /* We configure all related signals so that we can detect threads using an + * invalid signal mask that doesn't block our critical signal. */ + memset(&action, 0, sizeof(action)); + action.sa_handler = gf_async_signal_handler; + + gf_async_sigaction(GF_ASYNC_SIGCTRL, &action, &gf_async_ctrl.handler_ctrl); + + gf_async_sigaction(GF_ASYNC_SIGQUEUE, &action, + &gf_async_ctrl.handler_queue); +} + +static void +gf_async_signal_restore(void) +{ + /* Handlers we have previously changed are restored back to their original + * value. */ + + if (gf_async_ctrl.handler_ctrl.sa_handler != gf_async_signal_handler) { + gf_async_sigaction(GF_ASYNC_SIGCTRL, &gf_async_ctrl.handler_ctrl, NULL); + } + + if (gf_async_ctrl.handler_queue.sa_handler != gf_async_signal_handler) { + gf_async_sigaction(GF_ASYNC_SIGQUEUE, &gf_async_ctrl.handler_queue, + NULL); + } +} + +static void +gf_async_signal_flush(void) +{ + struct timespec delay; + + delay.tv_sec = 0; + delay.tv_nsec = 0; + + /* We read all pending signals so that they don't trigger once the signal + * mask of some thread is changed. */ + while (gf_async_sigtimedwait(&gf_async_ctrl.sigmask_ctrl, &delay) > 0) { + } + while (gf_async_sigtimedwait(&gf_async_ctrl.sigmask_queue, &delay) > 0) { + } +} + +static int32_t +gf_async_thread_create(pthread_t *thread, int32_t id, void *data) +{ + int32_t ret; + + ret = gf_thread_create(thread, NULL, gf_async_worker, data, + GF_ASYNC_THREAD_NAME "%u", id); + if (caa_unlikely(ret < 0)) { + /* TODO: gf_thread_create() should return a more specific error + * code. */ + return -ENOMEM; + } + + return 0; +} + +static void +gf_async_thread_wait(pthread_t thread) +{ + /* TODO: this is a blocking call executed inside one of the workers of the + * thread pool. This is bad, but this is only executed once we have + * received a notification from the thread that it's terminating, so + * this should return almost immediately. However, to be more robust + * it would be better to use pthread_timedjoin_np() (or even a call + * to pthread_tryjoin_np() followed by a delayed recheck if it + * fails), but they are not portable. We should see how to do this + * in other platforms. */ + GF_ASYNC_CANTFAIL(pthread_join, thread, NULL); +} + +static int32_t +gf_async_worker_create(void) +{ + struct cds_wfs_node *node; + gf_async_worker_t *worker; + uint32_t counts, running, max; + int32_t ret; + + node = __cds_wfs_pop_blocking(&gf_async_ctrl.available); + if (caa_unlikely(node == NULL)) { + /* There are no more available workers. We have all threads running. */ + return 1; + } + cds_wfs_node_init(node); + + ret = 1; + + counts = uatomic_read(&gf_async_ctrl.counts); + max = uatomic_read(&gf_async_ctrl.max_threads); + running = GF_ASYNC_COUNT_RUNNING(counts); + if (running < max) { + uatomic_add(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(1, 0)); + + worker = caa_container_of(node, gf_async_worker_t, stack); + + ret = gf_async_thread_create(&worker->thread, worker->id, worker); + if (caa_likely(ret >= 0)) { + return 0; + } + + uatomic_add(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(-1, 0)); + } + + cds_wfs_push(&gf_async_ctrl.available, node); + + return ret; +} + +static void +gf_async_worker_enable(void) +{ + /* This will wake one of the spare workers. If all workers are busy now, + * the signal will be queued so that the first one that completes its + * work will become the leader. */ + gf_async_sigbroadcast(GF_ASYNC_SIGCTRL); + + /* We have consumed a spare worker. We create another one for future + * needs. */ + gf_async_worker_create(); +} + +static void +gf_async_worker_wait(void) +{ + int32_t signum; + + signum = gf_async_sigwait(&gf_async_ctrl.sigmask_ctrl); + if (caa_unlikely(signum != GF_ASYNC_SIGCTRL)) { + gf_async_fatal(-EINVAL, "Worker received an unexpected signal (%d)", + signum); + } +} + +static void +gf_async_leader_wait(void) +{ + int32_t signum; + + signum = gf_async_sigwait(&gf_async_ctrl.sigmask_queue); + if (caa_unlikely(signum != GF_ASYNC_SIGQUEUE)) { + gf_async_fatal(-EINVAL, "Leader received an unexpected signal (%d)", + signum); + } +} + +static void +gf_async_run(struct cds_wfcq_node *node) +{ + gf_async_t *async; + + /* We've just got work from the queue. Process it. */ + async = caa_container_of(node, gf_async_t, queue); + /* TODO: remove dependency from THIS and xl. */ + THIS = async->xl; + async->cbk(async->xl, async); +} + +static void +gf_async_worker_run(void) +{ + struct cds_wfcq_node *node; + + do { + /* We keep executing jobs from the queue while it's not empty. Note + * that while we do this, we are ignoring any stop request. That's + * fine, since we need to process our own 'join' messages to fully + * terminate all threads. Note that normal jobs should have already + * completed once a stop request is received. */ + node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, + &gf_async_ctrl.queue.tail); + if (node != NULL) { + gf_async_run(node); + } + } while (node != NULL); + + /* TODO: I've tried to keep the worker looking at the queue for some small + * amount of time in a busy loop to see if more jobs come soon. With + * this I attempted to avoid the overhead of signal management if + * jobs come fast enough. However experimental results seem to + * indicate that doing this, CPU utilization grows and performance + * is actually reduced. We need to see if that's because I used bad + * parameters or it's really better to do it as it's done now. */ +} + +static void +gf_async_leader_run(void) +{ + struct cds_wfcq_node *node; + + node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, + &gf_async_ctrl.queue.tail); + while (caa_unlikely(node == NULL)) { + gf_async_leader_wait(); + + node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, + &gf_async_ctrl.queue.tail); + } + + /* Activate the next available worker thread. It will become the new + * leader. */ + gf_async_worker_enable(); + + gf_async_run(node); +} + +static uint32_t +gf_async_stop_check(gf_async_worker_t *worker) +{ + uint32_t counts, old, running, max; + + /* First we check if we should stop without doing any costly atomic + * operation. */ + old = uatomic_read(&gf_async_ctrl.counts); + max = uatomic_read(&gf_async_ctrl.max_threads); + running = GF_ASYNC_COUNT_RUNNING(old); + while (running > max) { + /* There are too many threads. We try to stop the current worker. */ + counts = uatomic_cmpxchg(&gf_async_ctrl.counts, old, + old + GF_ASYNC_COUNTS(-1, 1)); + if (old != counts) { + /* Another thread has just updated the counts. We need to retry. */ + old = counts; + running = GF_ASYNC_COUNT_RUNNING(old); + + continue; + } + + running--; + worker->running = false; + } + + return running; +} + +static void +gf_async_stop_all(xlator_t *xl, gf_async_t *async) +{ + if (gf_async_stop_check(gf_async_current_worker) > 0) { + /* There are more workers running. We propagate the stop request to + * them. */ + gf_async(async, xl, gf_async_stop_all); + } +} + +static void +gf_async_join(xlator_t *xl, gf_async_t *async) +{ + gf_async_worker_t *worker; + + worker = caa_container_of(async, gf_async_worker_t, async); + + gf_async_thread_wait(worker->thread); + + cds_wfs_push(&gf_async_ctrl.available, &worker->stack); +} + +static void +gf_async_terminate(gf_async_worker_t *worker) +{ + uint32_t counts; + + counts = uatomic_add_return(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(0, -1)); + if (counts == 0) { + /* This is the termination of the last worker thread. We need to + * synchronize the main thread that is waiting for all workers to + * finish. */ + gf_async_ctrl.sync_thread = worker->thread; + + gf_async_sync_now(); + } else { + /* Force someone else to join this thread to release resources. */ + gf_async(&worker->async, THIS, gf_async_join); + } +} + +static void * +gf_async_worker(void *arg) +{ + gf_async_worker_t *worker; + + worker = (gf_async_worker_t *)arg; + gf_async_current_worker = worker; + + worker->running = true; + do { + /* This thread does nothing until someone enables it to become a + * leader. */ + gf_async_worker_wait(); + + /* This thread is now a leader. It will process jobs from the queue + * and, if necessary, enable another worker and transfer leadership + * to it. */ + gf_async_leader_run(); + + /* This thread is not a leader anymore. It will continue processing + * queued jobs until it becomes empty. */ + gf_async_worker_run(); + + /* Stop the current thread if there are too many threads running. */ + gf_async_stop_check(worker); + } while (worker->running); + + gf_async_terminate(worker); + + return NULL; +} + +static void +gf_async_cleanup(void) +{ + /* We do some basic initialization of the global variable 'gf_async_ctrl' + * so that it's put into a relatively consistent state. */ + + gf_async_ctrl.enabled = false; + + gf_async_ctrl.pid = 0; + gf_async_sigmask_empty(&gf_async_ctrl.sigmask_ctrl); + gf_async_sigmask_empty(&gf_async_ctrl.sigmask_queue); + + /* This is used to later detect if the handler of these signals have been + * changed or not. */ + gf_async_ctrl.handler_ctrl.sa_handler = gf_async_signal_handler; + gf_async_ctrl.handler_queue.sa_handler = gf_async_signal_handler; + + gf_async_ctrl.table = NULL; + gf_async_ctrl.max_threads = 0; + gf_async_ctrl.counts = 0; +} + +void +gf_async_fini(void) +{ + gf_async_t async; + + if (uatomic_read(&gf_async_ctrl.counts) != 0) { + /* We ensure that all threads will quit on the next check. */ + gf_async_ctrl.max_threads = 0; + + /* Send the stop request to the thread pool. This will cause the + * execution of gf_async_stop_all() by one of the worker threads which, + * eventually, will terminate all worker threads. */ + gf_async(&async, THIS, gf_async_stop_all); + + /* We synchronize here with the last thread. */ + gf_async_sync_now(); + + /* We have just synchronized with the latest thread. Now just wait for + * it to terminate. */ + gf_async_thread_wait(gf_async_ctrl.sync_thread); + + gf_async_signal_flush(); + } + + gf_async_signal_restore(); + + gf_async_cleanup(); +} + +void +gf_async_adjust_threads(int32_t threads) +{ + if (threads == 0) { + /* By default we allow a maximum of 2 * #cores worker threads. This + * value is to try to accommodate threads that will do some I/O. Having + * more threads than cores we can keep CPU busy even if some threads + * are blocked for I/O. In the most efficient case, we can have #cores + * computing threads and #cores blocked threads on I/O. However this is + * hard to achieve because we can end with more than #cores computing + * threads, which won't provide a real benefit and will increase + * contention. + * + * TODO: implement a more intelligent dynamic maximum based on CPU + * usage and/or system load. */ + threads = sysconf(_SC_NPROCESSORS_ONLN) * 2; + if (threads < 0) { + /* If we can't get the current number of processors, we pick a + * random number. */ + threads = 16; + } + } + if (threads > GF_ASYNC_MAX_THREADS) { + threads = GF_ASYNC_MAX_THREADS; + } + uatomic_set(&gf_async_ctrl.max_threads, threads); +} + +int32_t +gf_async_init(glusterfs_ctx_t *ctx) +{ + sigset_t set; + gf_async_worker_t *worker; + uint32_t i; + int32_t ret; + bool running; + + gf_async_cleanup(); + + if (!ctx->cmd_args.global_threading || + (ctx->process_mode == GF_GLUSTERD_PROCESS)) { + return 0; + } + + /* At the init time, the maximum number of threads has not yet been + * configured. We use a small starting value that will be layer dynamically + * adjusted when ctx->config.max_threads is updated. */ + gf_async_adjust_threads(GF_ASYNC_SPARE_THREADS + 1); + + gf_async_ctrl.pid = getpid(); + + __cds_wfs_init(&gf_async_ctrl.available); + cds_wfcq_init(&gf_async_ctrl.queue.head, &gf_async_ctrl.queue.tail); + + gf_async_sync_init(); + + /* TODO: it would be cleaner to use dynamic memory, but at this point some + * memory management resources are not yet initialized. */ + gf_async_ctrl.table = gf_async_workers; + + /* We keep all workers in a stack. It will be used when a new thread needs + * to be created. */ + for (i = GF_ASYNC_MAX_THREADS; i > 0; i--) { + worker = &gf_async_ctrl.table[i - 1]; + + worker->id = i - 1; + cds_wfs_node_init(&worker->stack); + cds_wfs_push(&gf_async_ctrl.available, &worker->stack); + } + + /* Prepare the signal mask for regular workers and the leader. */ + gf_async_sigmask_add(&gf_async_ctrl.sigmask_ctrl, GF_ASYNC_SIGCTRL); + gf_async_sigmask_add(&gf_async_ctrl.sigmask_queue, GF_ASYNC_SIGQUEUE); + + /* TODO: this is needed to block our special signals in the current thread + * and all children that it starts. It would be cleaner to do it when + * signals are initialized, but there doesn't seem to be a unique + * place to do that, so for now we do it here. */ + gf_async_sigmask_empty(&set); + gf_async_sigmask_add(&set, GF_ASYNC_SIGCTRL); + gf_async_sigmask_add(&set, GF_ASYNC_SIGQUEUE); + gf_async_sigmask_set(SIG_BLOCK, &set, NULL); + + /* Configure the signal handlers. This is mostly for safety, not really + * needed, but it doesn't hurt. Note that the caller must ensure that the + * signals we need to run are already blocked in any thread already + * started. Otherwise this won't work. */ + gf_async_signal_setup(); + + running = false; + + /* We start the spare workers + 1 for the leader. */ + for (i = 0; i < GF_ASYNC_SPARE_THREADS; i++) { + ret = gf_async_worker_create(); + if (caa_unlikely(ret < 0)) { + /* This is the initial start up so we enforce that the spare + * threads are created. If this fails at the beginning, it's very + * unlikely that the async workers could do its job, so we abort + * the initialization. */ + goto out; + } + + /* Once the first thread is started, we can enable it to become the + * initial leader. */ + if ((ret == 0) && !running) { + running = true; + gf_async_worker_enable(); + } + } + + if (caa_unlikely(!running)) { + gf_async_fatal(-ENOMEM, "No worker thread has started"); + } + + gf_async_ctrl.enabled = true; + + ret = 0; + +out: + if (ret < 0) { + gf_async_error(ret, "Unable to initialize the thread pool."); + gf_async_fini(); + } + + return ret; +} diff --git a/libglusterfs/src/glusterfs/async.h b/libglusterfs/src/glusterfs/async.h new file mode 100644 index 00000000000..d1d70ae0bc7 --- /dev/null +++ b/libglusterfs/src/glusterfs/async.h @@ -0,0 +1,209 @@ +/* + Copyright (c) 2019 Red Hat, Inc <https://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef __GLUSTERFS_ASYNC_H__ +#define __GLUSTERFS_ASYNC_H__ + +#define _LGPL_SOURCE + +#include <sys/types.h> +#include <signal.h> +#include <errno.h> + +#ifdef URCU_OLD + +/* TODO: Fix the include paths. Since this is a .h included from many places + * it makes no sense to append a '-I$(CONTRIBDIR)/userspace-rcu/' to each + * Makefile.am. I've also seen some problems with CI builders (they + * failed to find the include files, but the same source on another setup + * is working fine). */ +#include "wfcqueue.h" +#include "wfstack.h" + +#else /* !URCU_OLD */ + +#include <urcu/wfcqueue.h> +#include <urcu/wfstack.h> + +#endif /* URCU_OLD */ + +#include "glusterfs/xlator.h" +#include "glusterfs/common-utils.h" +#include "glusterfs/list.h" +#include "glusterfs/libglusterfs-messages.h" + +/* This is the name prefix that all worker threads will have. A number will + * be added to differentiate them. */ +#define GF_ASYNC_THREAD_NAME "tpw" + +/* This value determines the maximum number of threads that are allowed. */ +#define GF_ASYNC_MAX_THREADS 128 + +/* This value determines how many additional threads will be started but will + * remain inactive until they are explicitly activated by the leader. This is + * useful to react faster to bursts of load, but at the same time we minimize + * contention if they are not really needed to handle current load. + * + * TODO: Instead of a fixed number, it would probably be better to use a + * prcentage of the available cores. */ +#define GF_ASYNC_SPARE_THREADS 2 + +/* This value determines the signal used to wake the leader when new work has + * been added to the queue. To do so we reuse SIGALRM, since the most logical + * candidates (SIGUSR1/SIGUSR2) are already used. This signal must not be used + * by anything else in the process. */ +#define GF_ASYNC_SIGQUEUE SIGALRM + +/* This value determines the signal that will be used to transfer leader role + * to other workers. */ +#define GF_ASYNC_SIGCTRL SIGVTALRM + +#define gf_async_warning(_err, _msg, _args...) \ + gf_msg("async", GF_LOG_WARNING, -(_err), LG_MSG_ASYNC_WARNING, _msg, \ + ##_args) + +#define gf_async_error(_err, _msg, _args...) \ + gf_msg("async", GF_LOG_ERROR, -(_err), LG_MSG_ASYNC_FAILURE, _msg, ##_args) + +#define gf_async_fatal(_err, _msg, _args...) \ + do { \ + GF_ABORT("Critical error in async module. Unable to continue. (" _msg \ + "). Error %d.", \ + ##_args, -(_err)); \ + } while (0) + +struct _gf_async; +typedef struct _gf_async gf_async_t; + +struct _gf_async_worker; +typedef struct _gf_async_worker gf_async_worker_t; + +struct _gf_async_queue; +typedef struct _gf_async_queue gf_async_queue_t; + +struct _gf_async_control; +typedef struct _gf_async_control gf_async_control_t; + +typedef void (*gf_async_callback_f)(xlator_t *xl, gf_async_t *async); + +struct _gf_async { + /* TODO: remove dependency on xl/THIS. */ + xlator_t *xl; + gf_async_callback_f cbk; + struct cds_wfcq_node queue; +}; + +struct _gf_async_worker { + /* Used to send asynchronous jobs related to the worker. */ + gf_async_t async; + + /* Member of the available workers stack. */ + struct cds_wfs_node stack; + + /* Thread object of the current worker. */ + pthread_t thread; + + /* Unique identifier of this worker. */ + int32_t id; + + /* Indicates if this worker is enabled. */ + bool running; +}; + +struct _gf_async_queue { + /* Structures needed to manage a wait-free queue. For better performance + * they are placed in two different cache lines, as recommended by URCU + * documentation, even though in our case some threads will be producers + * and consumers at the same time. */ + struct cds_wfcq_head head __attribute__((aligned(64))); + struct cds_wfcq_tail tail __attribute__((aligned(64))); +}; + +#define GF_ASYNC_COUNTS(_run, _stop) (((uint32_t)(_run) << 16) + (_stop)) +#define GF_ASYNC_COUNT_RUNNING(_count) ((_count) >> 16) +#define GF_ASYNC_COUNT_STOPPING(_count) ((_count)&65535) + +struct _gf_async_control { + gf_async_queue_t queue; + + /* Stack of unused workers. */ + struct __cds_wfs_stack available; + + /* Array of preallocated worker structures. */ + gf_async_worker_t *table; + + /* Used to synchronize main thread with workers on termination. */ + pthread_barrier_t sync; + + /* The id of the last thread that will be used for synchronization. */ + pthread_t sync_thread; + + /* Signal mask to wait for control signals from leader. */ + sigset_t sigmask_ctrl; + + /* Signal mask to wait for queued items. */ + sigset_t sigmask_queue; + + /* Saved signal handlers. */ + struct sigaction handler_ctrl; + struct sigaction handler_queue; + + /* PID of the current process. */ + pid_t pid; + + /* Maximum number of allowed threads. */ + uint32_t max_threads; + + /* Current number of running and stopping workers. This value is split + * into 2 16-bits fields to track both counters atomically at the same + * time. */ + uint32_t counts; + + /* It's used to control whether the asynchronous infrastructure is used + * or not. */ + bool enabled; +}; + +extern gf_async_control_t gf_async_ctrl; + +int32_t +gf_async_init(glusterfs_ctx_t *ctx); + +void +gf_async_fini(void); + +void +gf_async_adjust_threads(int32_t threads); + +static inline void +gf_async(gf_async_t *async, xlator_t *xl, gf_async_callback_f cbk) +{ + if (!gf_async_ctrl.enabled) { + cbk(xl, async); + return; + } + + async->xl = xl; + async->cbk = cbk; + cds_wfcq_node_init(&async->queue); + if (caa_unlikely(!cds_wfcq_enqueue(&gf_async_ctrl.queue.head, + &gf_async_ctrl.queue.tail, + &async->queue))) { + /* The queue was empty, so the leader could be sleeping. We need to + * wake it so that the new item can be processed. If the queue was not + * empty, we don't need to do anything special since the leader will + * take care of it. */ + if (caa_unlikely(kill(gf_async_ctrl.pid, GF_ASYNC_SIGQUEUE) < 0)) { + gf_async_fatal(errno, "Unable to wake leader worker."); + }; + } +} + +#endif /* !__GLUSTERFS_ASYNC_H__ */ diff --git a/libglusterfs/src/glusterfs/common-utils.h b/libglusterfs/src/glusterfs/common-utils.h index f03d2c1049a..1418c6531c7 100644 --- a/libglusterfs/src/glusterfs/common-utils.h +++ b/libglusterfs/src/glusterfs/common-utils.h @@ -442,7 +442,7 @@ BIT_VALUE(unsigned char *array, unsigned int index) } while (0) #endif -#define GF_ABORT(msg) \ +#define GF_ABORT(msg...) \ do { \ gf_msg_callingfn("", GF_LOG_CRITICAL, 0, LG_MSG_ASSERTION_FAILED, \ "Assertion failed: " msg); \ diff --git a/libglusterfs/src/glusterfs/glusterfs.h b/libglusterfs/src/glusterfs/glusterfs.h index 7c6af090fd8..9d140c18ee3 100644 --- a/libglusterfs/src/glusterfs/glusterfs.h +++ b/libglusterfs/src/glusterfs/glusterfs.h @@ -575,6 +575,8 @@ struct _cmd_args { int fuse_flush_handle_interrupt; int fuse_auto_inval; + + bool global_threading; }; typedef struct _cmd_args cmd_args_t; diff --git a/libglusterfs/src/glusterfs/libglusterfs-messages.h b/libglusterfs/src/glusterfs/libglusterfs-messages.h index 1b72f6df5be..e17e33e06fb 100644 --- a/libglusterfs/src/glusterfs/libglusterfs-messages.h +++ b/libglusterfs/src/glusterfs/libglusterfs-messages.h @@ -109,6 +109,6 @@ GLFS_MSGID( LG_MSG_PTHREAD_ATTR_INIT_FAILED, LG_MSG_INVALID_INODE_LIST, LG_MSG_COMPACT_FAILED, LG_MSG_COMPACT_STATUS, LG_MSG_UTIMENSAT_FAILED, LG_MSG_PTHREAD_NAMING_FAILED, LG_MSG_SYSCALL_RETURNS_WRONG, - LG_MSG_XXH64_TO_GFID_FAILED); + LG_MSG_XXH64_TO_GFID_FAILED, LG_MSG_ASYNC_WARNING, LG_MSG_ASYNC_FAILURE); #endif /* !_LG_MESSAGES_H_ */ diff --git a/libglusterfs/src/libglusterfs.sym b/libglusterfs/src/libglusterfs.sym index 2ac87c491ae..4466a5eee9a 100644 --- a/libglusterfs/src/libglusterfs.sym +++ b/libglusterfs/src/libglusterfs.sym @@ -553,6 +553,11 @@ get_xlator_by_name get_xlator_by_type gf_array_insertionsort gf_asprintf +gf_async +gf_async_adjust_threads +gf_async_ctrl +gf_async_init +gf_async_fini gf_backtrace_save gf_bits_count gf_bits_index diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index f9cbdf133c7..ce984426cbe 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -144,6 +144,8 @@ rpc_transport_pollin_alloc(rpc_transport_t *this, struct iovec *vector, goto out; } + msg->trans = this; + if (count > 1) { msg->vectored = 1; } diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index 9e75d1a2bbb..6830279f07e 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -58,6 +58,7 @@ typedef struct rpc_transport rpc_transport_t; #include <glusterfs/dict.h> #include <glusterfs/compat.h> +#include <glusterfs/async.h> #include "rpcsvc-common.h" struct peer_info { @@ -155,16 +156,6 @@ struct rpc_request_info { }; typedef struct rpc_request_info rpc_request_info_t; -struct rpc_transport_pollin { - int count; - void *private; - struct iobref *iobref; - struct iovec vector[MAX_IOVEC]; - char is_reply; - char vectored; -}; -typedef struct rpc_transport_pollin rpc_transport_pollin_t; - typedef int (*rpc_transport_notify_t)(rpc_transport_t *, void *mydata, rpc_transport_event_t, void *data, ...); @@ -217,6 +208,18 @@ struct rpc_transport { gf_atomic_t disconnect_progress; }; +struct rpc_transport_pollin { + struct rpc_transport *trans; + int count; + void *private; + struct iobref *iobref; + struct iovec vector[MAX_IOVEC]; + char is_reply; + char vectored; + gf_async_t async; +}; +typedef struct rpc_transport_pollin rpc_transport_pollin_t; + struct rpc_transport_ops { /* no need of receive op, msg will be delivered through an event * notification diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 74373c44f91..c38a675b8c2 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -2314,6 +2314,11 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program, newprog->alive = _gf_true; + if (gf_async_ctrl.enabled) { + newprog->ownthread = _gf_false; + newprog->synctask = _gf_false; + } + /* make sure synctask gets priority over ownthread */ if (newprog->synctask) newprog->ownthread = _gf_false; diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index fa0e0f20901..26dbe0b706a 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -2495,6 +2495,33 @@ out: return ret; } +static void +socket_event_poll_in_async(xlator_t *xl, gf_async_t *async) +{ + rpc_transport_pollin_t *pollin; + rpc_transport_t *this; + socket_private_t *priv; + + pollin = caa_container_of(async, rpc_transport_pollin_t, async); + this = pollin->trans; + priv = this->private; + + rpc_transport_notify(this, RPC_TRANSPORT_MSG_RECEIVED, pollin); + + rpc_transport_unref(this); + + rpc_transport_pollin_destroy(pollin); + + pthread_mutex_lock(&priv->notify.lock); + { + --priv->notify.in_progress; + + if (!priv->notify.in_progress) + pthread_cond_signal(&priv->notify.cond); + } + pthread_mutex_unlock(&priv->notify.lock); +} + static int socket_event_poll_in(rpc_transport_t *this, gf_boolean_t notify_handled) { @@ -2519,18 +2546,8 @@ socket_event_poll_in(rpc_transport_t *this, gf_boolean_t notify_handled) event_handled(ctx->event_pool, priv->sock, priv->idx, priv->gen); if (pollin) { - rpc_transport_notify(this, RPC_TRANSPORT_MSG_RECEIVED, pollin); - - rpc_transport_pollin_destroy(pollin); - - pthread_mutex_lock(&priv->notify.lock); - { - --priv->notify.in_progress; - - if (!priv->notify.in_progress) - pthread_cond_signal(&priv->notify.cond); - } - pthread_mutex_unlock(&priv->notify.lock); + rpc_transport_ref(this); + gf_async(&pollin->async, THIS, socket_event_poll_in_async); } return ret; diff --git a/tests/basic/global-threading.t b/tests/basic/global-threading.t new file mode 100644 index 00000000000..f7d34044b09 --- /dev/null +++ b/tests/basic/global-threading.t @@ -0,0 +1,104 @@ +#!/bin/bash + +. $(dirname $0)/../include.rc +. $(dirname $0)/../volume.rc + +# Test if the given process has a number of threads of a given type between +# min and max. +function check_threads() { + local pid="${1}" + local pattern="${2}" + local min="${3}" + local max="${4-}" + local count + + count="$(ps hH -o comm ${pid} | grep "${pattern}" | wc -l)" + if [[ ${min} -gt ${count} ]]; then + return 1 + fi + if [[ ! -z "${max}" && ${max} -lt ${count} ]]; then + return 1 + fi + + return 0 +} + +cleanup + +TEST glusterd + +# Glusterd shouldn't use any thread +TEST check_threads $(get_glusterd_pid) glfs_tpw 0 0 +TEST check_threads $(get_glusterd_pid) glfs_iotwr 0 0 + +TEST pkill -9 glusterd + +TEST glusterd --global-threading + +# Glusterd shouldn't use global threads, even if enabled +TEST check_threads $(get_glusterd_pid) glfs_tpw 0 0 +TEST check_threads $(get_glusterd_pid) glfs_iotwr 0 0 + +TEST $CLI volume create $V0 replica 2 $H0:$B0/b{0,1} + +# Normal configuration using io-threads on bricks +TEST $CLI volume set $V0 config.global-threading off +TEST $CLI volume set $V0 performance.iot-pass-through off +TEST $CLI volume set $V0 performance.client-io-threads off +TEST $CLI volume start $V0 + +# There shouldn't be global threads +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b0) glfs_tpw 0 0 +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b1) glfs_tpw 0 0 + +# There should be at least 1 io-thread +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b0) glfs_iotwr 1 +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b1) glfs_iotwr 1 + +# Self-heal should be using global threads +TEST check_threads $(get_shd_process_pid) glfs_tpw 1 +TEST check_threads $(get_shd_process_pid) glfs_iotwr 0 0 + +TEST $CLI volume stop $V0 + +# Configuration with global threads on bricks +TEST $CLI volume set $V0 config.global-threading on +TEST $CLI volume set $V0 performance.iot-pass-through on +TEST $CLI volume start $V0 + +# There should be at least 1 global thread +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b0) glfs_tpw 1 +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b1) glfs_tpw 1 + +# There shouldn't be any io-thread worker threads +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b0) glfs_iotwr 0 0 +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b1) glfs_iotwr 0 0 + +# Normal configuration using io-threads on clients +TEST $CLI volume set $V0 performance.iot-pass-through off +TEST $CLI volume set $V0 performance.client-io-threads on +TEST $GFS --volfile-id=/$V0 --volfile-server=$H0 $M0 + +# There shouldn't be global threads +TEST check_threads $(get_mount_process_pid $V0 $M0) glfs_tpw 0 0 + +# There should be at least 1 io-thread +TEST check_threads $(get_mount_process_pid $V0 $M0) glfs_iotwr 1 + +EXPECT_WITHIN $UMOUNT_TIMEOUT "Y" force_umount $M0 + +# Configuration with global threads on clients +TEST $CLI volume set $V0 performance.client-io-threads off +TEST $GFS --volfile-id=/$V0 --volfile-server=$H0 --global-threading $M0 + +# There should be at least 1 global thread +TEST check_threads $(get_mount_process_pid $V0 $M0) glfs_tpw 1 + +# There shouldn't be io-threads +TEST check_threads $(get_mount_process_pid $V0 $M0) glfs_iotwr 0 0 + +# Some basic volume access checks with global-threading enabled everywhere +TEST mkdir ${M0}/dir +TEST dd if=/dev/zero of=${M0}/dir/file bs=128k count=8 + +cleanup diff --git a/tests/volume.rc b/tests/volume.rc index 261c6554d46..6fd7c3cdbcd 100644 --- a/tests/volume.rc +++ b/tests/volume.rc @@ -281,6 +281,10 @@ function quotad_up_status { gluster volume status | grep "Quota Daemon" | awk '{print $7}' } +function get_glusterd_pid { + pgrep '^glusterd$' | head -1 +} + function get_brick_pidfile { local vol=$1 local host=$2 diff --git a/xlators/debug/io-stats/src/io-stats.c b/xlators/debug/io-stats/src/io-stats.c index f12191fb8df..101e403d39a 100644 --- a/xlators/debug/io-stats/src/io-stats.c +++ b/xlators/debug/io-stats/src/io-stats.c @@ -40,6 +40,7 @@ #include <pwd.h> #include <grp.h> #include <glusterfs/upcall-utils.h> +#include <glusterfs/async.h> #define MAX_LIST_MEMBERS 100 #define DEFAULT_PWD_BUF_SZ 16384 @@ -3737,6 +3738,7 @@ reconfigure(xlator_t *this, dict_t *options) uint32_t log_buf_size = 0; uint32_t log_flush_timeout = 0; int32_t old_dump_interval; + int32_t threads; if (!this || !this->private) goto out; @@ -3809,6 +3811,9 @@ reconfigure(xlator_t *this, dict_t *options) out); gf_log_set_log_flush_timeout(log_flush_timeout); + GF_OPTION_RECONF("threads", threads, options, int32, out); + gf_async_adjust_threads(threads); + ret = 0; out: gf_log(this ? this->name : "io-stats", GF_LOG_DEBUG, @@ -3888,6 +3893,7 @@ init(xlator_t *this) int ret = -1; uint32_t log_buf_size = 0; uint32_t log_flush_timeout = 0; + int32_t threads; if (!this) return -1; @@ -3951,6 +3957,7 @@ init(xlator_t *this) gf_log(this->name, GF_LOG_ERROR, "Out of memory."); goto out; } + ret = -1; GF_OPTION_INIT("ios-dnscache-ttl-sec", conf->ios_dnscache_ttl_sec, int32, out); @@ -3987,6 +3994,9 @@ init(xlator_t *this) GF_OPTION_INIT("log-flush-timeout", log_flush_timeout, time, out); gf_log_set_log_flush_timeout(log_flush_timeout); + GF_OPTION_INIT("threads", threads, int32, out); + gf_async_adjust_threads(threads); + this->private = conf; if (conf->ios_dump_interval > 0) { conf->dump_thread_running = _gf_true; @@ -4430,8 +4440,37 @@ struct volume_options options[] = { .type = GF_OPTION_TYPE_STR, .default_value = "/no/such/path", .description = "Unique ID for our files."}, + {.key = {"global-threading"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "off", + .op_version = {GD_OP_VERSION_6_0}, + .flags = OPT_FLAG_SETTABLE, + .tags = {"io-stats", "threading"}, + .description = "This option enables the global threading support for " + "bricks. If enabled, it's recommended to also enable " + "'performance.iot-pass-through'"}, + {.key = {"threads"}, .type = GF_OPTION_TYPE_INT}, + {.key = {"brick-threads"}, + .type = GF_OPTION_TYPE_INT, + .default_value = "16", + .min = 0, + .max = GF_ASYNC_MAX_THREADS, + .op_version = {GD_OP_VERSION_6_0}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-stats", "threading"}, + .description = "When global threading is used, this value determines the " + "maximum amount of threads that can be created on bricks"}, + {.key = {"client-threads"}, + .type = GF_OPTION_TYPE_INT, + .default_value = "16", + .min = 0, + .max = GF_ASYNC_MAX_THREADS, + .op_version = {GD_OP_VERSION_6_0}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, + .tags = {"io-stats", "threading"}, + .description = "When global threading is used, this value determines the " + "maximum amount of threads that can be created on clients"}, {.key = {NULL}}, - }; xlator_api_t xlator_api = { diff --git a/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c b/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c index 4cd4cea15e4..6325f60f94a 100644 --- a/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c +++ b/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c @@ -213,6 +213,10 @@ glusterd_svc_start(glusterd_svc_t *svc, int flags, dict_t *cmdline) runner_add_arg(&runner, daemon_log_level); } + if (this->ctx->cmd_args.global_threading) { + runner_add_arg(&runner, "--global-threading"); + } + if (cmdline) dict_foreach(cmdline, svc_add_args, (void *)&runner); diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c index 1aa6947fbba..85a7884b51a 100644 --- a/xlators/mgmt/glusterd/src/glusterd-utils.c +++ b/xlators/mgmt/glusterd/src/glusterd-utils.c @@ -2045,6 +2045,8 @@ glusterd_volume_start_glusterfs(glusterd_volinfo_t *volinfo, int32_t len = 0; glusterd_brick_proc_t *brick_proc = NULL; char *inet_family = NULL; + char *global_threading = NULL; + bool threading = false; GF_ASSERT(volinfo); GF_ASSERT(brickinfo); @@ -2203,6 +2205,15 @@ retry: volinfo->volname, rdma_port); } + if (dict_get_strn(volinfo->dict, VKEY_CONFIG_GLOBAL_THREADING, + SLEN(VKEY_CONFIG_GLOBAL_THREADING), + &global_threading) == 0) { + if ((gf_string2boolean(global_threading, &threading) == 0) && + threading) { + runner_add_arg(&runner, "--global-threading"); + } + } + runner_add_arg(&runner, "--xlator-option"); runner_argprintf(&runner, "%s-server.listen-port=%d", volinfo->volname, port); diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c index b7c7bd9b638..448dd8669a1 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.c +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c @@ -1215,6 +1215,26 @@ loglevel_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme, } static int +threads_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme, + void *param) +{ + char *role = param; + struct volopt_map_entry vme2 = { + 0, + }; + + if ((strcmp(vme->option, "!client-threads") != 0 && + strcmp(vme->option, "!brick-threads") != 0) || + !strstr(vme->key, role)) + return 0; + + memcpy(&vme2, vme, sizeof(vme2)); + vme2.option = "threads"; + + return basic_option_handler(graph, &vme2, NULL); +} + +static int server_check_changelog_off(volgen_graph_t *graph, struct volopt_map_entry *vme, glusterd_volinfo_t *volinfo) { @@ -1506,6 +1526,9 @@ server_spec_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme, if (!ret) ret = log_localtime_logging_option_handler(graph, vme, "brick"); + if (!ret) + ret = threads_option_handler(graph, vme, "brick"); + return ret; } @@ -4085,6 +4108,14 @@ graph_set_generic_options(xlator_t *this, volgen_graph_t *graph, gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_GRAPH_SET_OPT_FAIL, "Failed to change " "log-localtime-logging option"); + + ret = volgen_graph_set_options_generic(graph, set_dict, "client", + &threads_option_handler); + + if (ret) + gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_GRAPH_SET_OPT_FAIL, + "changing %s threads failed", identifier); + return 0; } diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.h b/xlators/mgmt/glusterd/src/glusterd-volgen.h index f9fc068931b..37eecc04bef 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.h +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.h @@ -38,6 +38,9 @@ #define VKEY_RDA_CACHE_LIMIT "performance.rda-cache-limit" #define VKEY_RDA_REQUEST_SIZE "performance.rda-request-size" #define VKEY_CONFIG_GFPROXY "config.gfproxyd" +#define VKEY_CONFIG_GLOBAL_THREADING "config.global-threading" +#define VKEY_CONFIG_CLIENT_THREADS "config.client-threads" +#define VKEY_CONFIG_BRICK_THREADS "config.brick-threads" #define AUTH_ALLOW_MAP_KEY "auth.allow" #define AUTH_REJECT_MAP_KEY "auth.reject" diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c index b32b6ce0ec4..c8591cf0487 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c +++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c @@ -2961,4 +2961,19 @@ struct volopt_map_entry glusterd_volopt_map[] = { .validate_fn = validate_boolean, .description = "option to enforce mandatory lock on a file", .flags = VOLOPT_FLAG_XLATOR_OPT}, + {.key = VKEY_CONFIG_GLOBAL_THREADING, + .voltype = "debug/io-stats", + .option = "global-threading", + .value = "off", + .op_version = GD_OP_VERSION_6_0}, + {.key = VKEY_CONFIG_CLIENT_THREADS, + .voltype = "debug/io-stats", + .option = "!client-threads", + .value = "16", + .op_version = GD_OP_VERSION_6_0}, + {.key = VKEY_CONFIG_BRICK_THREADS, + .voltype = "debug/io-stats", + .option = "!brick-threads", + .value = "16", + .op_version = GD_OP_VERSION_6_0}, {.key = NULL}}; diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c index 3479d40ceeb..bd8bc114a32 100644 --- a/xlators/mount/fuse/src/fuse-bridge.c +++ b/xlators/mount/fuse/src/fuse-bridge.c @@ -16,10 +16,17 @@ #include <glusterfs/glusterfs-acl.h> #include <glusterfs/syscall.h> #include <glusterfs/timespec.h> +#include <glusterfs/async.h> #ifdef __NetBSD__ #undef open /* in perfuse.h, pulled from mount-gluster-compat.h */ #endif +typedef struct _fuse_async { + struct iobuf *iobuf; + fuse_in_header_t *finh; + void *msg; + gf_async_t async; +} fuse_async_t; static int gf_fuse_xattr_enotsup_log; @@ -5810,6 +5817,28 @@ fuse_get_mount_status(xlator_t *this) return kid_status; } +static void +fuse_dispatch(xlator_t *xl, gf_async_t *async) +{ + fuse_async_t *fasync; + fuse_private_t *priv; + fuse_in_header_t *finh; + struct iobuf *iobuf; + + priv = xl->private; + fasync = caa_container_of(async, fuse_async_t, async); + finh = fasync->finh; + iobuf = fasync->iobuf; + + priv->fuse_ops[finh->opcode](xl, finh, fasync->msg, iobuf); + + iobuf_unref(iobuf); +} + +/* We need 512 extra buffer size for BATCH_FORGET fop. By tests, it is + * found to be reduces 'REALLOC()' in the loop */ +#define FUSE_EXTRA_ALLOC 512 + static void * fuse_thread_proc(void *data) { @@ -5821,24 +5850,20 @@ fuse_thread_proc(void *data) fuse_in_header_t *finh = NULL; struct iovec iov_in[2]; void *msg = NULL; - /* we need 512 extra buffer size for BATCH_FORGET fop. By tests, it is - found to be reduces 'REALLOC()' in the loop */ - const size_t msg0_size = sizeof(*finh) + 512; - fuse_handler_t **fuse_ops = NULL; + size_t msg0_size = sizeof(*finh) + sizeof(struct fuse_write_in); + fuse_async_t *fasync; struct pollfd pfd[2] = {{ 0, }}; + uint32_t psize; this = data; priv = this->private; - fuse_ops = priv->fuse_ops; THIS = this; - iov_in[0].iov_len = sizeof(*finh) + sizeof(struct fuse_write_in); - iov_in[1].iov_len = ((struct iobuf_pool *)this->ctx->iobuf_pool) - ->default_page_size; - priv->msg0_len_p = &iov_in[0].iov_len; + psize = ((struct iobuf_pool *)this->ctx->iobuf_pool)->default_page_size; + priv->msg0_len_p = &msg0_size; for (;;) { /* THIS has to be reset here */ @@ -5895,14 +5920,15 @@ fuse_thread_proc(void *data) changing this one too */ iobuf = iobuf_get(this->ctx->iobuf_pool); - /* Add extra 128 byte to the first iov so that it can + /* Add extra 512 byte to the first iov so that it can * accommodate "ordinary" non-write requests. It's not * guaranteed to be big enough, as SETXATTR and namespace * operations with very long names may grow behind it, * but it's good enough in most cases (and we can handle - * rest via realloc). - */ - iov_in[0].iov_base = GF_CALLOC(1, msg0_size, gf_fuse_mt_iov_base); + * rest via realloc). */ + iov_in[0].iov_base = GF_MALLOC( + sizeof(fuse_async_t) + msg0_size + FUSE_EXTRA_ALLOC, + gf_fuse_mt_iov_base); if (!iobuf || !iov_in[0].iov_base) { gf_log(this->name, GF_LOG_ERROR, "Out of memory"); @@ -5915,6 +5941,9 @@ fuse_thread_proc(void *data) iov_in[1].iov_base = iobuf->ptr; + iov_in[0].iov_len = msg0_size; + iov_in[1].iov_len = psize; + res = sys_readv(priv->fd, iov_in, 2); if (res == -1) { @@ -5941,7 +5970,7 @@ fuse_thread_proc(void *data) goto cont_err; } - if (res < sizeof(finh)) { + if (res < sizeof(*finh)) { gf_log("glusterfs-fuse", GF_LOG_WARNING, "short read on /dev/fuse"); fuse_log_eh(this, "glusterfs-fuse: short read on " @@ -5983,8 +6012,9 @@ fuse_thread_proc(void *data) if (finh->opcode == FUSE_WRITE) msg = iov_in[1].iov_base; else { - if (res > msg0_size) { - void *b = GF_REALLOC(iov_in[0].iov_base, res); + if (res > msg0_size + FUSE_EXTRA_ALLOC) { + void *b = GF_REALLOC(iov_in[0].iov_base, + sizeof(fuse_async_t) + res); if (b) { iov_in[0].iov_base = b; finh = (fuse_in_header_t *)iov_in[0].iov_base; @@ -5996,22 +6026,29 @@ fuse_thread_proc(void *data) } } - if (res > iov_in[0].iov_len) + if (res > iov_in[0].iov_len) { memcpy(iov_in[0].iov_base + iov_in[0].iov_len, iov_in[1].iov_base, res - iov_in[0].iov_len); + iov_in[0].iov_len = res; + } msg = finh + 1; } if (priv->uid_map_root && finh->uid == priv->uid_map_root) finh->uid = 0; - if (finh->opcode >= FUSE_OP_HIGH) + if (finh->opcode >= FUSE_OP_HIGH) { /* turn down MacFUSE specific messages */ fuse_enosys(this, finh, msg, NULL); - else - fuse_ops[finh->opcode](this, finh, msg, iobuf); + iobuf_unref(iobuf); + } else { + fasync = iov_in[0].iov_base + iov_in[0].iov_len; + fasync->finh = finh; + fasync->msg = msg; + fasync->iobuf = iobuf; + gf_async(&fasync->async, this, fuse_dispatch); + } - iobuf_unref(iobuf); continue; cont_err: diff --git a/xlators/mount/fuse/utils/mount.glusterfs.in b/xlators/mount/fuse/utils/mount.glusterfs.in index 3f5d76d2e93..243c9c71af4 100755 --- a/xlators/mount/fuse/utils/mount.glusterfs.in +++ b/xlators/mount/fuse/utils/mount.glusterfs.in @@ -189,6 +189,10 @@ start_glusterfs () cmd_line=$(echo "$cmd_line --thin-client"); fi + if [ -n "$global_threading" ]; then + cmd_line=$(echo "$cmd_line --global-threading"); + fi + #options with values start here if [ -n "$halo_max_latency" ]; then cmd_line=$(echo "$cmd_line --xlator-option \ @@ -629,6 +633,9 @@ without_options() # "mount -t glusterfs" sends this, but it's useless. "rw") ;; + "global-threading") + global_threading=1 + ;; # TODO: not sure how to handle this yet "async"|"sync"|"dirsync"|\ "mand"|"nomand"|\ diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index bf75015eda8..9a4c728ae02 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -1255,12 +1255,14 @@ init(xlator_t *this) INIT_LIST_HEAD(&conf->no_client[i].reqs); } - ret = iot_workers_scale(conf); + if (!this->pass_through) { + ret = iot_workers_scale(conf); - if (ret == -1) { - gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED, - "cannot initialize worker threads, exiting init"); - goto out; + if (ret == -1) { + gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED, + "cannot initialize worker threads, exiting init"); + goto out; + } } this->private = conf; |