diff options
29 files changed, 2858 insertions, 56 deletions
diff --git a/configure.ac b/configure.ac index 8752ca043bf..3aa7f4e77c1 100644 --- a/configure.ac +++ b/configure.ac @@ -1359,10 +1359,12 @@ PKG_CHECK_MODULES([URCU], [liburcu-bp], [],       AC_MSG_ERROR([liburcu-bp not found]))])  PKG_CHECK_MODULES([URCU_CDS], [liburcu-cds >= 0.8], [],    [PKG_CHECK_MODULES([URCU_CDS], [liburcu-cds >= 0.7], -    [AC_DEFINE(URCU_OLD, 1, [Define if liburcu 0.6 or 0.7 is found])], +    [AC_DEFINE(URCU_OLD, 1, [Define if liburcu 0.6 or 0.7 is found]) +     USE_CONTRIB_URCU='yes'],      [AC_CHECK_HEADERS([urcu/cds.h],        [AC_DEFINE(URCU_OLD, 1, [Define if liburcu 0.6 or 0.7 is found]) -       URCU_CDS_LIBS='-lurcu-cds'], +       URCU_CDS_LIBS='-lurcu-cds' +       USE_CONTRIB_URCU='yes'],        [AC_MSG_ERROR([liburcu-cds not found])])])])  BUILD_UNITTEST="no" @@ -1526,6 +1528,9 @@ AC_SUBST(CONTRIBDIR)  GF_CPPDEFINES='-D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -D$(GF_HOST_OS)'  GF_CPPINCLUDES='-include $(top_builddir)/config.h -include $(top_builddir)/site.h -I$(top_srcdir)/libglusterfs/src -I$(top_builddir)/libglusterfs/src' +if test "x${USE_CONTRIB_URCU}" = "xyes"; then +    GF_CPPINCLUDES="${GF_CPPINCLUDES} -I\$(CONTRIBDIR)/userspace-rcu" +fi  GF_CPPFLAGS="$GF_CPPFLAGS $GF_CPPDEFINES $GF_CPPINCLUDES"  AC_SUBST([GF_CPPFLAGS]) diff --git a/contrib/userspace-rcu/static-wfcqueue.h b/contrib/userspace-rcu/static-wfcqueue.h new file mode 100644 index 00000000000..37d14ad674b --- /dev/null +++ b/contrib/userspace-rcu/static-wfcqueue.h @@ -0,0 +1,685 @@ +#ifndef _URCU_WFCQUEUE_STATIC_H +#define _URCU_WFCQUEUE_STATIC_H + +/* + * urcu/static/wfcqueue.h + * + * Userspace RCU library - Concurrent Queue with Wait-Free Enqueue/Blocking Dequeue + * + * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu/wfcqueue.h for + * linking dynamically with the userspace rcu library. + * + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com> + * Copyright 2011-2012 - Lai Jiangshan <laijs@cn.fujitsu.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/* Copied from userspace-rcu 0.10 because version 0.7 doesn't contain it. */ + +#include <pthread.h> +#include <assert.h> +#include <poll.h> +#include <stdbool.h> +#include <urcu/compiler.h> +#include <urcu/uatomic.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Concurrent queue with wait-free enqueue/blocking dequeue. + * + * This queue has been designed and implemented collaboratively by + * Mathieu Desnoyers and Lai Jiangshan. Inspired from + * half-wait-free/half-blocking queue implementation done by Paul E. + * McKenney. + * + * Mutual exclusion of cds_wfcq_* / __cds_wfcq_* API + * + * Synchronization table: + * + * External synchronization techniques described in the API below is + * required between pairs marked with "X". No external synchronization + * required between pairs marked with "-". + * + * Legend: + * [1] cds_wfcq_enqueue + * [2] __cds_wfcq_splice (destination queue) + * [3] __cds_wfcq_dequeue + * [4] __cds_wfcq_splice (source queue) + * [5] __cds_wfcq_first + * [6] __cds_wfcq_next + * + *     [1] [2] [3] [4] [5] [6] + * [1]  -   -   -   -   -   - + * [2]  -   -   -   -   -   - + * [3]  -   -   X   X   X   X + * [4]  -   -   X   -   X   X + * [5]  -   -   X   X   -   - + * [6]  -   -   X   X   -   - + * + * Mutual exclusion can be ensured by holding cds_wfcq_dequeue_lock(). + * + * For convenience, cds_wfcq_dequeue_blocking() and + * cds_wfcq_splice_blocking() hold the dequeue lock. + * + * Besides locking, mutual exclusion of dequeue, splice and iteration + * can be ensured by performing all of those operations from a single + * thread, without requiring any lock. + */ + +#define WFCQ_ADAPT_ATTEMPTS		10	/* Retry if being set */ +#define WFCQ_WAIT			10	/* Wait 10 ms if being set */ + +/* + * cds_wfcq_node_init: initialize wait-free queue node. + */ +static inline void _cds_wfcq_node_init(struct cds_wfcq_node *node) +{ +	node->next = NULL; +} + +/* + * cds_wfcq_init: initialize wait-free queue (with lock). Pair with + * cds_wfcq_destroy(). + */ +static inline void _cds_wfcq_init(struct cds_wfcq_head *head, +		struct cds_wfcq_tail *tail) +{ +	int ret; + +	/* Set queue head and tail */ +	_cds_wfcq_node_init(&head->node); +	tail->p = &head->node; +	ret = pthread_mutex_init(&head->lock, NULL); +	assert(!ret); +} + +/* + * cds_wfcq_destroy: destroy wait-free queue (with lock). Pair with + * cds_wfcq_init(). + */ +static inline void _cds_wfcq_destroy(struct cds_wfcq_head *head, +		struct cds_wfcq_tail *tail) +{ +	int ret = pthread_mutex_destroy(&head->lock); +	assert(!ret); +} + +/* + * __cds_wfcq_init: initialize wait-free queue (without lock). Don't + * pair with any destroy function. + */ +static inline void ___cds_wfcq_init(struct __cds_wfcq_head *head, +		struct cds_wfcq_tail *tail) +{ +	/* Set queue head and tail */ +	_cds_wfcq_node_init(&head->node); +	tail->p = &head->node; +} + +/* + * cds_wfcq_empty: return whether wait-free queue is empty. + * + * No memory barrier is issued. No mutual exclusion is required. + * + * We perform the test on head->node.next to check if the queue is + * possibly empty, but we confirm this by checking if the tail pointer + * points to the head node because the tail pointer is the linearisation + * point of the enqueuers. Just checking the head next pointer could + * make a queue appear empty if an enqueuer is preempted for a long time + * between xchg() and setting the previous node's next pointer. + */ +static inline bool _cds_wfcq_empty(cds_wfcq_head_ptr_t u_head, +		struct cds_wfcq_tail *tail) +{ +	struct __cds_wfcq_head *head = u_head._h; +	/* +	 * Queue is empty if no node is pointed by head->node.next nor +	 * tail->p. Even though the tail->p check is sufficient to find +	 * out of the queue is empty, we first check head->node.next as a +	 * common case to ensure that dequeuers do not frequently access +	 * enqueuer's tail->p cache line. +	 */ +	return CMM_LOAD_SHARED(head->node.next) == NULL +		&& CMM_LOAD_SHARED(tail->p) == &head->node; +} + +static inline void _cds_wfcq_dequeue_lock(struct cds_wfcq_head *head, +		struct cds_wfcq_tail *tail) +{ +	int ret; + +	ret = pthread_mutex_lock(&head->lock); +	assert(!ret); +} + +static inline void _cds_wfcq_dequeue_unlock(struct cds_wfcq_head *head, +		struct cds_wfcq_tail *tail) +{ +	int ret; + +	ret = pthread_mutex_unlock(&head->lock); +	assert(!ret); +} + +static inline bool ___cds_wfcq_append(cds_wfcq_head_ptr_t u_head, +		struct cds_wfcq_tail *tail, +		struct cds_wfcq_node *new_head, +		struct cds_wfcq_node *new_tail) +{ +	struct __cds_wfcq_head *head = u_head._h; +	struct cds_wfcq_node *old_tail; + +	/* +	 * Implicit memory barrier before uatomic_xchg() orders earlier +	 * stores to data structure containing node and setting +	 * node->next to NULL before publication. +	 */ +	old_tail = uatomic_xchg(&tail->p, new_tail); + +	/* +	 * Implicit memory barrier after uatomic_xchg() orders store to +	 * q->tail before store to old_tail->next. +	 * +	 * At this point, dequeuers see a NULL tail->p->next, which +	 * indicates that the queue is being appended to. The following +	 * store will append "node" to the queue from a dequeuer +	 * perspective. +	 */ +	CMM_STORE_SHARED(old_tail->next, new_head); +	/* +	 * Return false if queue was empty prior to adding the node, +	 * else return true. +	 */ +	return old_tail != &head->node; +} + +/* + * cds_wfcq_enqueue: enqueue a node into a wait-free queue. + * + * Issues a full memory barrier before enqueue. No mutual exclusion is + * required. + * + * Returns false if the queue was empty prior to adding the node. + * Returns true otherwise. + */ +static inline bool _cds_wfcq_enqueue(cds_wfcq_head_ptr_t head, +		struct cds_wfcq_tail *tail, +		struct cds_wfcq_node *new_tail) +{ +	return ___cds_wfcq_append(head, tail, new_tail, new_tail); +} + +/* + * CDS_WFCQ_WAIT_SLEEP: + * + * By default, this sleeps for the given @msec milliseconds. + * This is a macro which LGPL users may #define themselves before + * including wfcqueue.h to override the default behavior (e.g. + * to log a warning or perform other background work). + */ +#ifndef CDS_WFCQ_WAIT_SLEEP +#define CDS_WFCQ_WAIT_SLEEP(msec) ___cds_wfcq_wait_sleep(msec) +#endif + +static inline void ___cds_wfcq_wait_sleep(int msec) +{ +	(void) poll(NULL, 0, msec); +} + +/* + * ___cds_wfcq_busy_wait: adaptative busy-wait. + * + * Returns 1 if nonblocking and needs to block, 0 otherwise. + */ +static inline bool +___cds_wfcq_busy_wait(int *attempt, int blocking) +{ +	if (!blocking) +		return 1; +	if (++(*attempt) >= WFCQ_ADAPT_ATTEMPTS) { +		CDS_WFCQ_WAIT_SLEEP(WFCQ_WAIT);		/* Wait for 10ms */ +		*attempt = 0; +	} else { +		caa_cpu_relax(); +	} +	return 0; +} + +/* + * Waiting for enqueuer to complete enqueue and return the next node. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_node_sync_next(struct cds_wfcq_node *node, int blocking) +{ +	struct cds_wfcq_node *next; +	int attempt = 0; + +	/* +	 * Adaptative busy-looping waiting for enqueuer to complete enqueue. +	 */ +	while ((next = CMM_LOAD_SHARED(node->next)) == NULL) { +		if (___cds_wfcq_busy_wait(&attempt, blocking)) +			return CDS_WFCQ_WOULDBLOCK; +	} + +	return next; +} + +static inline struct cds_wfcq_node * +___cds_wfcq_first(cds_wfcq_head_ptr_t u_head, +		struct cds_wfcq_tail *tail, +		int blocking) +{ +	struct __cds_wfcq_head *head = u_head._h; +	struct cds_wfcq_node *node; + +	if (_cds_wfcq_empty(__cds_wfcq_head_cast(head), tail)) +		return NULL; +	node = ___cds_wfcq_node_sync_next(&head->node, blocking); +	/* Load head->node.next before loading node's content */ +	cmm_smp_read_barrier_depends(); +	return node; +} + +/* + * __cds_wfcq_first_blocking: get first node of a queue, without dequeuing. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + * + * Used by for-like iteration macros in urcu/wfqueue.h: + * __cds_wfcq_for_each_blocking() + * __cds_wfcq_for_each_blocking_safe() + * + * Returns NULL if queue is empty, first node otherwise. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_first_blocking(cds_wfcq_head_ptr_t head, +		struct cds_wfcq_tail *tail) +{ +	return ___cds_wfcq_first(head, tail, 1); +} + + +/* + * __cds_wfcq_first_nonblocking: get first node of a queue, without dequeuing. + * + * Same as __cds_wfcq_first_blocking, but returns CDS_WFCQ_WOULDBLOCK if + * it needs to block. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_first_nonblocking(cds_wfcq_head_ptr_t head, +		struct cds_wfcq_tail *tail) +{ +	return ___cds_wfcq_first(head, tail, 0); +} + +static inline struct cds_wfcq_node * +___cds_wfcq_next(cds_wfcq_head_ptr_t head, +		struct cds_wfcq_tail *tail, +		struct cds_wfcq_node *node, +		int blocking) +{ +	struct cds_wfcq_node *next; + +	/* +	 * Even though the following tail->p check is sufficient to find +	 * out if we reached the end of the queue, we first check +	 * node->next as a common case to ensure that iteration on nodes +	 * do not frequently access enqueuer's tail->p cache line. +	 */ +	if ((next = CMM_LOAD_SHARED(node->next)) == NULL) { +		/* Load node->next before tail->p */ +		cmm_smp_rmb(); +		if (CMM_LOAD_SHARED(tail->p) == node) +			return NULL; +		next = ___cds_wfcq_node_sync_next(node, blocking); +	} +	/* Load node->next before loading next's content */ +	cmm_smp_read_barrier_depends(); +	return next; +} + +/* + * __cds_wfcq_next_blocking: get next node of a queue, without dequeuing. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + * + * Used by for-like iteration macros in urcu/wfqueue.h: + * __cds_wfcq_for_each_blocking() + * __cds_wfcq_for_each_blocking_safe() + * + * Returns NULL if reached end of queue, non-NULL next queue node + * otherwise. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_next_blocking(cds_wfcq_head_ptr_t head, +		struct cds_wfcq_tail *tail, +		struct cds_wfcq_node *node) +{ +	return ___cds_wfcq_next(head, tail, node, 1); +} + +/* + * __cds_wfcq_next_blocking: get next node of a queue, without dequeuing. + * + * Same as __cds_wfcq_next_blocking, but returns CDS_WFCQ_WOULDBLOCK if + * it needs to block. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_next_nonblocking(cds_wfcq_head_ptr_t head, +		struct cds_wfcq_tail *tail, +		struct cds_wfcq_node *node) +{ +	return ___cds_wfcq_next(head, tail, node, 0); +} + +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_with_state(cds_wfcq_head_ptr_t u_head, +		struct cds_wfcq_tail *tail, +		int *state, +		int blocking) +{ +	struct __cds_wfcq_head *head = u_head._h; +	struct cds_wfcq_node *node, *next; + +	if (state) +		*state = 0; + +	if (_cds_wfcq_empty(__cds_wfcq_head_cast(head), tail)) { +		return NULL; +	} + +	node = ___cds_wfcq_node_sync_next(&head->node, blocking); +	if (!blocking && node == CDS_WFCQ_WOULDBLOCK) { +		return CDS_WFCQ_WOULDBLOCK; +	} + +	if ((next = CMM_LOAD_SHARED(node->next)) == NULL) { +		/* +		 * @node is probably the only node in the queue. +		 * Try to move the tail to &q->head. +		 * q->head.next is set to NULL here, and stays +		 * NULL if the cmpxchg succeeds. Should the +		 * cmpxchg fail due to a concurrent enqueue, the +		 * q->head.next will be set to the next node. +		 * The implicit memory barrier before +		 * uatomic_cmpxchg() orders load node->next +		 * before loading q->tail. +		 * The implicit memory barrier before uatomic_cmpxchg +		 * orders load q->head.next before loading node's +		 * content. +		 */ +		_cds_wfcq_node_init(&head->node); +		if (uatomic_cmpxchg(&tail->p, node, &head->node) == node) { +			if (state) +				*state |= CDS_WFCQ_STATE_LAST; +			return node; +		} +		next = ___cds_wfcq_node_sync_next(node, blocking); +		/* +		 * In nonblocking mode, if we would need to block to +		 * get node's next, set the head next node pointer +		 * (currently NULL) back to its original value. +		 */ +		if (!blocking && next == CDS_WFCQ_WOULDBLOCK) { +			head->node.next = node; +			return CDS_WFCQ_WOULDBLOCK; +		} +	} + +	/* +	 * Move queue head forward. +	 */ +	head->node.next = next; + +	/* Load q->head.next before loading node's content */ +	cmm_smp_read_barrier_depends(); +	return node; +} + +/* + * __cds_wfcq_dequeue_with_state_blocking: dequeue node from queue, with state. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * It is valid to reuse and free a dequeued node immediately. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_with_state_blocking(cds_wfcq_head_ptr_t head, +		struct cds_wfcq_tail *tail, int *state) +{ +	return ___cds_wfcq_dequeue_with_state(head, tail, state, 1); +} + +/* + * ___cds_wfcq_dequeue_blocking: dequeue node from queue. + * + * Same as __cds_wfcq_dequeue_with_state_blocking, but without saving + * state. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_blocking(cds_wfcq_head_ptr_t head, +		struct cds_wfcq_tail *tail) +{ +	return ___cds_wfcq_dequeue_with_state_blocking(head, tail, NULL); +} + +/* + * __cds_wfcq_dequeue_with_state_nonblocking: dequeue node, with state. + * + * Same as __cds_wfcq_dequeue_blocking, but returns CDS_WFCQ_WOULDBLOCK + * if it needs to block. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_with_state_nonblocking(cds_wfcq_head_ptr_t head, +		struct cds_wfcq_tail *tail, int *state) +{ +	return ___cds_wfcq_dequeue_with_state(head, tail, state, 0); +} + +/* + * ___cds_wfcq_dequeue_nonblocking: dequeue node from queue. + * + * Same as __cds_wfcq_dequeue_with_state_nonblocking, but without saving + * state. + */ +static inline struct cds_wfcq_node * +___cds_wfcq_dequeue_nonblocking(cds_wfcq_head_ptr_t head, +		struct cds_wfcq_tail *tail) +{ +	return ___cds_wfcq_dequeue_with_state_nonblocking(head, tail, NULL); +} + +/* + * __cds_wfcq_splice: enqueue all src_q nodes at the end of dest_q. + * + * Dequeue all nodes from src_q. + * dest_q must be already initialized. + * Mutual exclusion for src_q should be ensured by the caller as + * specified in the "Synchronisation table". + * Returns enum cds_wfcq_ret which indicates the state of the src or + * dest queue. + */ +static inline enum cds_wfcq_ret +___cds_wfcq_splice( +		cds_wfcq_head_ptr_t u_dest_q_head, +		struct cds_wfcq_tail *dest_q_tail, +		cds_wfcq_head_ptr_t u_src_q_head, +		struct cds_wfcq_tail *src_q_tail, +		int blocking) +{ +	struct __cds_wfcq_head *dest_q_head = u_dest_q_head._h; +	struct __cds_wfcq_head *src_q_head = u_src_q_head._h; +	struct cds_wfcq_node *head, *tail; +	int attempt = 0; + +	/* +	 * Initial emptiness check to speed up cases where queue is +	 * empty: only require loads to check if queue is empty. +	 */ +	if (_cds_wfcq_empty(__cds_wfcq_head_cast(src_q_head), src_q_tail)) +		return CDS_WFCQ_RET_SRC_EMPTY; + +	for (;;) { +		/* +		 * Open-coded _cds_wfcq_empty() by testing result of +		 * uatomic_xchg, as well as tail pointer vs head node +		 * address. +		 */ +		head = uatomic_xchg(&src_q_head->node.next, NULL); +		if (head) +			break;	/* non-empty */ +		if (CMM_LOAD_SHARED(src_q_tail->p) == &src_q_head->node) +			return CDS_WFCQ_RET_SRC_EMPTY; +		if (___cds_wfcq_busy_wait(&attempt, blocking)) +			return CDS_WFCQ_RET_WOULDBLOCK; +	} + +	/* +	 * Memory barrier implied before uatomic_xchg() orders store to +	 * src_q->head before store to src_q->tail. This is required by +	 * concurrent enqueue on src_q, which exchanges the tail before +	 * updating the previous tail's next pointer. +	 */ +	tail = uatomic_xchg(&src_q_tail->p, &src_q_head->node); + +	/* +	 * Append the spliced content of src_q into dest_q. Does not +	 * require mutual exclusion on dest_q (wait-free). +	 */ +	if (___cds_wfcq_append(__cds_wfcq_head_cast(dest_q_head), dest_q_tail, +			head, tail)) +		return CDS_WFCQ_RET_DEST_NON_EMPTY; +	else +		return CDS_WFCQ_RET_DEST_EMPTY; +} + +/* + * __cds_wfcq_splice_blocking: enqueue all src_q nodes at the end of dest_q. + * + * Dequeue all nodes from src_q. + * dest_q must be already initialized. + * Mutual exclusion for src_q should be ensured by the caller as + * specified in the "Synchronisation table". + * Returns enum cds_wfcq_ret which indicates the state of the src or + * dest queue. Never returns CDS_WFCQ_RET_WOULDBLOCK. + */ +static inline enum cds_wfcq_ret +___cds_wfcq_splice_blocking( +		cds_wfcq_head_ptr_t dest_q_head, +		struct cds_wfcq_tail *dest_q_tail, +		cds_wfcq_head_ptr_t src_q_head, +		struct cds_wfcq_tail *src_q_tail) +{ +	return ___cds_wfcq_splice(dest_q_head, dest_q_tail, +		src_q_head, src_q_tail, 1); +} + +/* + * __cds_wfcq_splice_nonblocking: enqueue all src_q nodes at the end of dest_q. + * + * Same as __cds_wfcq_splice_blocking, but returns + * CDS_WFCQ_RET_WOULDBLOCK if it needs to block. + */ +static inline enum cds_wfcq_ret +___cds_wfcq_splice_nonblocking( +		cds_wfcq_head_ptr_t dest_q_head, +		struct cds_wfcq_tail *dest_q_tail, +		cds_wfcq_head_ptr_t src_q_head, +		struct cds_wfcq_tail *src_q_tail) +{ +	return ___cds_wfcq_splice(dest_q_head, dest_q_tail, +		src_q_head, src_q_tail, 0); +} + +/* + * cds_wfcq_dequeue_with_state_blocking: dequeue a node from a wait-free queue. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Mutual exclusion with cds_wfcq_splice_blocking and dequeue lock is + * ensured. + * It is valid to reuse and free a dequeued node immediately. + */ +static inline struct cds_wfcq_node * +_cds_wfcq_dequeue_with_state_blocking(struct cds_wfcq_head *head, +		struct cds_wfcq_tail *tail, int *state) +{ +	struct cds_wfcq_node *retval; + +	_cds_wfcq_dequeue_lock(head, tail); +	retval = ___cds_wfcq_dequeue_with_state_blocking(cds_wfcq_head_cast(head), +			tail, state); +	_cds_wfcq_dequeue_unlock(head, tail); +	return retval; +} + +/* + * cds_wfcq_dequeue_blocking: dequeue node from queue. + * + * Same as cds_wfcq_dequeue_blocking, but without saving state. + */ +static inline struct cds_wfcq_node * +_cds_wfcq_dequeue_blocking(struct cds_wfcq_head *head, +		struct cds_wfcq_tail *tail) +{ +	return _cds_wfcq_dequeue_with_state_blocking(head, tail, NULL); +} + +/* + * cds_wfcq_splice_blocking: enqueue all src_q nodes at the end of dest_q. + * + * Dequeue all nodes from src_q. + * dest_q must be already initialized. + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Mutual exclusion with cds_wfcq_dequeue_blocking and dequeue lock is + * ensured. + * Returns enum cds_wfcq_ret which indicates the state of the src or + * dest queue. Never returns CDS_WFCQ_RET_WOULDBLOCK. + */ +static inline enum cds_wfcq_ret +_cds_wfcq_splice_blocking( +		struct cds_wfcq_head *dest_q_head, +		struct cds_wfcq_tail *dest_q_tail, +		struct cds_wfcq_head *src_q_head, +		struct cds_wfcq_tail *src_q_tail) +{ +	enum cds_wfcq_ret ret; + +	_cds_wfcq_dequeue_lock(src_q_head, src_q_tail); +	ret = ___cds_wfcq_splice_blocking(cds_wfcq_head_cast(dest_q_head), dest_q_tail, +			cds_wfcq_head_cast(src_q_head), src_q_tail); +	_cds_wfcq_dequeue_unlock(src_q_head, src_q_tail); +	return ret; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _URCU_WFCQUEUE_STATIC_H */ diff --git a/contrib/userspace-rcu/static-wfstack.h b/contrib/userspace-rcu/static-wfstack.h new file mode 100644 index 00000000000..29b81c3aac3 --- /dev/null +++ b/contrib/userspace-rcu/static-wfstack.h @@ -0,0 +1,455 @@ +#ifndef _URCU_STATIC_WFSTACK_H +#define _URCU_STATIC_WFSTACK_H + +/* + * urcu/static/wfstack.h + * + * Userspace RCU library - Stack with with wait-free push, blocking traversal. + * + * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu/wfstack.h for + * linking dynamically with the userspace rcu library. + * + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/* Adapted from userspace-rcu 0.10 because version 0.7 doesn't support a stack + * without mutex. */ + +#include <pthread.h> +#include <assert.h> +#include <poll.h> +#include <stdbool.h> +#include <urcu/compiler.h> +#include <urcu/uatomic.h> + +#ifdef __cplusplus +extern "C" { +#endif + +#define CDS_WFS_END			((void *) 0x1UL) +#define CDS_WFS_ADAPT_ATTEMPTS		10	/* Retry if being set */ +#define CDS_WFS_WAIT			10	/* Wait 10 ms if being set */ + +/* + * Stack with wait-free push, blocking traversal. + * + * Stack implementing push, pop, pop_all operations, as well as iterator + * on the stack head returned by pop_all. + * + * Wait-free operations: cds_wfs_push, __cds_wfs_pop_all, cds_wfs_empty, + *                       cds_wfs_first. + * Blocking operations: cds_wfs_pop, cds_wfs_pop_all, cds_wfs_next, + *                      iteration on stack head returned by pop_all. + * + * Synchronization table: + * + * External synchronization techniques described in the API below is + * required between pairs marked with "X". No external synchronization + * required between pairs marked with "-". + * + *                      cds_wfs_push  __cds_wfs_pop  __cds_wfs_pop_all + * cds_wfs_push               -              -                  - + * __cds_wfs_pop              -              X                  X + * __cds_wfs_pop_all          -              X                  - + * + * cds_wfs_pop and cds_wfs_pop_all use an internal mutex to provide + * synchronization. + */ + +/* + * cds_wfs_node_init: initialize wait-free stack node. + */ +static inline +void _cds_wfs_node_init(struct cds_wfs_node *node) +{ +	node->next = NULL; +} + +/* + * __cds_wfs_init: initialize wait-free stack. Don't pair with + * any destroy function. + */ +static inline void ___cds_wfs_init(struct __cds_wfs_stack *s) +{ +	s->head = CDS_WFS_END; +} + +/* + * cds_wfs_init: initialize wait-free stack. Pair with + * cds_wfs_destroy(). + */ +static inline +void _cds_wfs_init(struct cds_wfs_stack *s) +{ +	int ret; + +	s->head = CDS_WFS_END; +	ret = pthread_mutex_init(&s->lock, NULL); +	assert(!ret); +} + +/* + * cds_wfs_destroy: destroy wait-free stack. Pair with + * cds_wfs_init(). + */ +static inline +void _cds_wfs_destroy(struct cds_wfs_stack *s) +{ +	int ret = pthread_mutex_destroy(&s->lock); +	assert(!ret); +} + +static inline bool ___cds_wfs_end(void *node) +{ +	return node == CDS_WFS_END; +} + +/* + * cds_wfs_empty: return whether wait-free stack is empty. + * + * No memory barrier is issued. No mutual exclusion is required. + */ +static inline bool _cds_wfs_empty(cds_wfs_stack_ptr_t u_stack) +{ +	struct __cds_wfs_stack *s = u_stack._s; + +	return ___cds_wfs_end(CMM_LOAD_SHARED(s->head)); +} + +/* + * cds_wfs_push: push a node into the stack. + * + * Issues a full memory barrier before push. No mutual exclusion is + * required. + * + * Returns 0 if the stack was empty prior to adding the node. + * Returns non-zero otherwise. + */ +static inline +int _cds_wfs_push(cds_wfs_stack_ptr_t u_stack, struct cds_wfs_node *node) +{ +	struct __cds_wfs_stack *s = u_stack._s; +	struct cds_wfs_head *old_head, *new_head; + +	assert(node->next == NULL); +	new_head = caa_container_of(node, struct cds_wfs_head, node); +	/* +	 * uatomic_xchg() implicit memory barrier orders earlier stores +	 * to node (setting it to NULL) before publication. +	 */ +	old_head = uatomic_xchg(&s->head, new_head); +	/* +	 * At this point, dequeuers see a NULL node->next, they should +	 * busy-wait until node->next is set to old_head. +	 */ +	CMM_STORE_SHARED(node->next, &old_head->node); +	return !___cds_wfs_end(old_head); +} + +/* + * Waiting for push to complete enqueue and return the next node. + */ +static inline struct cds_wfs_node * +___cds_wfs_node_sync_next(struct cds_wfs_node *node, int blocking) +{ +	struct cds_wfs_node *next; +	int attempt = 0; + +	/* +	 * Adaptative busy-looping waiting for push to complete. +	 */ +	while ((next = CMM_LOAD_SHARED(node->next)) == NULL) { +		if (!blocking) +			return CDS_WFS_WOULDBLOCK; +		if (++attempt >= CDS_WFS_ADAPT_ATTEMPTS) { +			(void) poll(NULL, 0, CDS_WFS_WAIT);	/* Wait for 10ms */ +			attempt = 0; +		} else { +			caa_cpu_relax(); +		} +	} + +	return next; +} + +static inline +struct cds_wfs_node * +___cds_wfs_pop(cds_wfs_stack_ptr_t u_stack, int *state, int blocking) +{ +	struct cds_wfs_head *head, *new_head; +	struct cds_wfs_node *next; +	struct __cds_wfs_stack *s = u_stack._s; + +	if (state) +		*state = 0; +	for (;;) { +		head = CMM_LOAD_SHARED(s->head); +		if (___cds_wfs_end(head)) { +			return NULL; +		} +		next = ___cds_wfs_node_sync_next(&head->node, blocking); +		if (!blocking && next == CDS_WFS_WOULDBLOCK) { +			return CDS_WFS_WOULDBLOCK; +		} +		new_head = caa_container_of(next, struct cds_wfs_head, node); +		if (uatomic_cmpxchg(&s->head, head, new_head) == head) { +			if (state && ___cds_wfs_end(new_head)) +				*state |= CDS_WFS_STATE_LAST; +			return &head->node; +		} +		if (!blocking) { +			return CDS_WFS_WOULDBLOCK; +		} +		/* busy-loop if head changed under us */ +	} +} + +/* + * __cds_wfs_pop_with_state_blocking: pop a node from the stack, with state. + * + * Returns NULL if stack is empty. + * + * __cds_wfs_pop_blocking needs to be synchronized using one of the + * following techniques: + * + * 1) Calling __cds_wfs_pop_blocking under rcu read lock critical + *    section. The caller must wait for a grace period to pass before + *    freeing the returned node or modifying the cds_wfs_node structure. + * 2) Using mutual exclusion (e.g. mutexes) to protect + *     __cds_wfs_pop_blocking and __cds_wfs_pop_all callers. + * 3) Ensuring that only ONE thread can call __cds_wfs_pop_blocking() + *    and __cds_wfs_pop_all(). (multi-provider/single-consumer scheme). + * + * "state" saves state flags atomically sampled with pop operation. + */ +static inline +struct cds_wfs_node * +___cds_wfs_pop_with_state_blocking(cds_wfs_stack_ptr_t u_stack, int *state) +{ +	return ___cds_wfs_pop(u_stack, state, 1); +} + +static inline +struct cds_wfs_node * +___cds_wfs_pop_blocking(cds_wfs_stack_ptr_t u_stack) +{ +	return ___cds_wfs_pop_with_state_blocking(u_stack, NULL); +} + +/* + * __cds_wfs_pop_with_state_nonblocking: pop a node from the stack. + * + * Same as __cds_wfs_pop_with_state_blocking, but returns + * CDS_WFS_WOULDBLOCK if it needs to block. + * + * "state" saves state flags atomically sampled with pop operation. + */ +static inline +struct cds_wfs_node * +___cds_wfs_pop_with_state_nonblocking(cds_wfs_stack_ptr_t u_stack, int *state) +{ +	return ___cds_wfs_pop(u_stack, state, 0); +} + +/* + * __cds_wfs_pop_nonblocking: pop a node from the stack. + * + * Same as __cds_wfs_pop_blocking, but returns CDS_WFS_WOULDBLOCK if + * it needs to block. + */ +static inline +struct cds_wfs_node * +___cds_wfs_pop_nonblocking(cds_wfs_stack_ptr_t u_stack) +{ +	return ___cds_wfs_pop_with_state_nonblocking(u_stack, NULL); +} + +/* + * __cds_wfs_pop_all: pop all nodes from a stack. + * + * __cds_wfs_pop_all does not require any synchronization with other + * push, nor with other __cds_wfs_pop_all, but requires synchronization + * matching the technique used to synchronize __cds_wfs_pop_blocking: + * + * 1) If __cds_wfs_pop_blocking is called under rcu read lock critical + *    section, both __cds_wfs_pop_blocking and cds_wfs_pop_all callers + *    must wait for a grace period to pass before freeing the returned + *    node or modifying the cds_wfs_node structure. However, no RCU + *    read-side critical section is needed around __cds_wfs_pop_all. + * 2) Using mutual exclusion (e.g. mutexes) to protect + *     __cds_wfs_pop_blocking and __cds_wfs_pop_all callers. + * 3) Ensuring that only ONE thread can call __cds_wfs_pop_blocking() + *    and __cds_wfs_pop_all(). (multi-provider/single-consumer scheme). + */ +static inline +struct cds_wfs_head * +___cds_wfs_pop_all(cds_wfs_stack_ptr_t u_stack) +{ +	struct __cds_wfs_stack *s = u_stack._s; +	struct cds_wfs_head *head; + +	/* +	 * Implicit memory barrier after uatomic_xchg() matches implicit +	 * memory barrier before uatomic_xchg() in cds_wfs_push. It +	 * ensures that all nodes of the returned list are consistent. +	 * There is no need to issue memory barriers when iterating on +	 * the returned list, because the full memory barrier issued +	 * prior to each uatomic_cmpxchg, which each write to head, are +	 * taking care to order writes to each node prior to the full +	 * memory barrier after this uatomic_xchg(). +	 */ +	head = uatomic_xchg(&s->head, CDS_WFS_END); +	if (___cds_wfs_end(head)) +		return NULL; +	return head; +} + +/* + * cds_wfs_pop_lock: lock stack pop-protection mutex. + */ +static inline void _cds_wfs_pop_lock(struct cds_wfs_stack *s) +{ +	int ret; + +	ret = pthread_mutex_lock(&s->lock); +	assert(!ret); +} + +/* + * cds_wfs_pop_unlock: unlock stack pop-protection mutex. + */ +static inline void _cds_wfs_pop_unlock(struct cds_wfs_stack *s) +{ +	int ret; + +	ret = pthread_mutex_unlock(&s->lock); +	assert(!ret); +} + +/* + * Call __cds_wfs_pop_with_state_blocking with an internal pop mutex held. + */ +static inline +struct cds_wfs_node * +_cds_wfs_pop_with_state_blocking(struct cds_wfs_stack *s, int *state) +{ +	struct cds_wfs_node *retnode; + +	_cds_wfs_pop_lock(s); +	retnode = ___cds_wfs_pop_with_state_blocking(s, state); +	_cds_wfs_pop_unlock(s); +	return retnode; +} + +/* + * Call _cds_wfs_pop_with_state_blocking without saving any state. + */ +static inline +struct cds_wfs_node * +_cds_wfs_pop_blocking(struct cds_wfs_stack *s) +{ +	return _cds_wfs_pop_with_state_blocking(s, NULL); +} + +/* + * Call __cds_wfs_pop_all with an internal pop mutex held. + */ +static inline +struct cds_wfs_head * +_cds_wfs_pop_all_blocking(struct cds_wfs_stack *s) +{ +	struct cds_wfs_head *rethead; + +	_cds_wfs_pop_lock(s); +	rethead = ___cds_wfs_pop_all(s); +	_cds_wfs_pop_unlock(s); +	return rethead; +} + +/* + * cds_wfs_first: get first node of a popped stack. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * + * Used by for-like iteration macros in urcu/wfstack.h: + * cds_wfs_for_each_blocking() + * cds_wfs_for_each_blocking_safe() + * + * Returns NULL if popped stack is empty, top stack node otherwise. + */ +static inline struct cds_wfs_node * +_cds_wfs_first(struct cds_wfs_head *head) +{ +	if (___cds_wfs_end(head)) +		return NULL; +	return &head->node; +} + +static inline struct cds_wfs_node * +___cds_wfs_next(struct cds_wfs_node *node, int blocking) +{ +	struct cds_wfs_node *next; + +	next = ___cds_wfs_node_sync_next(node, blocking); +	/* +	 * CDS_WFS_WOULDBLOCK != CSD_WFS_END, so we can check for end +	 * even if ___cds_wfs_node_sync_next returns CDS_WFS_WOULDBLOCK, +	 * and still return CDS_WFS_WOULDBLOCK. +	 */ +	if (___cds_wfs_end(next)) +		return NULL; +	return next; +} + +/* + * cds_wfs_next_blocking: get next node of a popped stack. + * + * Content written into the node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * + * Used by for-like iteration macros in urcu/wfstack.h: + * cds_wfs_for_each_blocking() + * cds_wfs_for_each_blocking_safe() + * + * Returns NULL if reached end of popped stack, non-NULL next stack + * node otherwise. + */ +static inline struct cds_wfs_node * +_cds_wfs_next_blocking(struct cds_wfs_node *node) +{ +	return ___cds_wfs_next(node, 1); +} + + +/* + * cds_wfs_next_nonblocking: get next node of a popped stack. + * + * Same as cds_wfs_next_blocking, but returns CDS_WFS_WOULDBLOCK if it + * needs to block. + */ +static inline struct cds_wfs_node * +_cds_wfs_next_nonblocking(struct cds_wfs_node *node) +{ +	return ___cds_wfs_next(node, 0); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _URCU_STATIC_WFSTACK_H */ diff --git a/contrib/userspace-rcu/wfcqueue.h b/contrib/userspace-rcu/wfcqueue.h new file mode 100644 index 00000000000..0292585ac79 --- /dev/null +++ b/contrib/userspace-rcu/wfcqueue.h @@ -0,0 +1,216 @@ +#ifndef _URCU_WFCQUEUE_H +#define _URCU_WFCQUEUE_H + +/* + * urcu/wfcqueue.h + * + * Userspace RCU library - Concurrent Queue with Wait-Free Enqueue/Blocking Dequeue + * + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com> + * Copyright 2011-2012 - Lai Jiangshan <laijs@cn.fujitsu.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/* Adapted from userspace-rcu 0.10 because version 0.7 doesn't contain it. + * The non-LGPL section has been removed. */ + +#include <pthread.h> +#include <assert.h> +#include <stdbool.h> +#include <urcu/compiler.h> +#include <urcu/arch.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Concurrent queue with wait-free enqueue/blocking dequeue. + * + * This queue has been designed and implemented collaboratively by + * Mathieu Desnoyers and Lai Jiangshan. Inspired from + * half-wait-free/half-blocking queue implementation done by Paul E. + * McKenney. + */ + +#define CDS_WFCQ_WOULDBLOCK	((struct cds_wfcq_node *) -1UL) + +enum cds_wfcq_ret { +	CDS_WFCQ_RET_WOULDBLOCK =	-1, +	CDS_WFCQ_RET_DEST_EMPTY =	0, +	CDS_WFCQ_RET_DEST_NON_EMPTY =	1, +	CDS_WFCQ_RET_SRC_EMPTY = 	2, +}; + +enum cds_wfcq_state { +	CDS_WFCQ_STATE_LAST =		(1U << 0), +}; + +struct cds_wfcq_node { +	struct cds_wfcq_node *next; +}; + +/* + * Do not put head and tail on the same cache-line if concurrent + * enqueue/dequeue are expected from many CPUs. This eliminates + * false-sharing between enqueue and dequeue. + */ +struct __cds_wfcq_head { +	struct cds_wfcq_node node; +}; + +struct cds_wfcq_head { +	struct cds_wfcq_node node; +	pthread_mutex_t lock; +}; + +#ifndef __cplusplus +/* + * The transparent union allows calling functions that work on both + * struct cds_wfcq_head and struct __cds_wfcq_head on any of those two + * types. + */ +typedef union { +	struct __cds_wfcq_head *_h; +	struct cds_wfcq_head *h; +} __attribute__((__transparent_union__)) cds_wfcq_head_ptr_t; + +/* + * This static inline is only present for compatibility with C++. It is + * effect-less in C. + */ +static inline struct __cds_wfcq_head *__cds_wfcq_head_cast(struct __cds_wfcq_head *head) +{ +	return head; +} + +/* + * This static inline is only present for compatibility with C++. It is + * effect-less in C. + */ +static inline struct cds_wfcq_head *cds_wfcq_head_cast(struct cds_wfcq_head *head) +{ +	return head; +} +#else /* #ifndef __cplusplus */ + +/* C++ ignores transparent union. */ +typedef union { +	struct __cds_wfcq_head *_h; +	struct cds_wfcq_head *h; +} cds_wfcq_head_ptr_t; + +/* C++ ignores transparent union. Requires an explicit conversion. */ +static inline cds_wfcq_head_ptr_t __cds_wfcq_head_cast(struct __cds_wfcq_head *head) +{ +	cds_wfcq_head_ptr_t ret = { ._h = head }; +	return ret; +} +/* C++ ignores transparent union. Requires an explicit conversion. */ +static inline cds_wfcq_head_ptr_t cds_wfcq_head_cast(struct cds_wfcq_head *head) +{ +	cds_wfcq_head_ptr_t ret = { .h = head }; +	return ret; +} +#endif /* #else #ifndef __cplusplus */ + +struct cds_wfcq_tail { +	struct cds_wfcq_node *p; +}; + +#include "static-wfcqueue.h" + +#define cds_wfcq_node_init		_cds_wfcq_node_init +#define cds_wfcq_init			_cds_wfcq_init +#define __cds_wfcq_init			___cds_wfcq_init +#define cds_wfcq_destroy		_cds_wfcq_destroy +#define cds_wfcq_empty			_cds_wfcq_empty +#define cds_wfcq_enqueue		_cds_wfcq_enqueue + +/* Dequeue locking */ +#define cds_wfcq_dequeue_lock		_cds_wfcq_dequeue_lock +#define cds_wfcq_dequeue_unlock		_cds_wfcq_dequeue_unlock + +/* Locking performed within cds_wfcq calls. */ +#define cds_wfcq_dequeue_blocking	_cds_wfcq_dequeue_blocking +#define cds_wfcq_dequeue_with_state_blocking	\ +					_cds_wfcq_dequeue_with_state_blocking +#define cds_wfcq_splice_blocking	_cds_wfcq_splice_blocking +#define cds_wfcq_first_blocking		_cds_wfcq_first_blocking +#define cds_wfcq_next_blocking		_cds_wfcq_next_blocking + +/* Locking ensured by caller by holding cds_wfcq_dequeue_lock() */ +#define __cds_wfcq_dequeue_blocking	___cds_wfcq_dequeue_blocking +#define __cds_wfcq_dequeue_with_state_blocking	\ +					___cds_wfcq_dequeue_with_state_blocking +#define __cds_wfcq_splice_blocking	___cds_wfcq_splice_blocking +#define __cds_wfcq_first_blocking	___cds_wfcq_first_blocking +#define __cds_wfcq_next_blocking	___cds_wfcq_next_blocking + +/* + * Locking ensured by caller by holding cds_wfcq_dequeue_lock(). + * Non-blocking: deque, first, next return CDS_WFCQ_WOULDBLOCK if they + * need to block. splice returns nonzero if it needs to block. + */ +#define __cds_wfcq_dequeue_nonblocking	___cds_wfcq_dequeue_nonblocking +#define __cds_wfcq_dequeue_with_state_nonblocking	\ +				___cds_wfcq_dequeue_with_state_nonblocking +#define __cds_wfcq_splice_nonblocking	___cds_wfcq_splice_nonblocking +#define __cds_wfcq_first_nonblocking	___cds_wfcq_first_nonblocking +#define __cds_wfcq_next_nonblocking	___cds_wfcq_next_nonblocking + +/* + * __cds_wfcq_for_each_blocking: Iterate over all nodes in a queue, + * without dequeuing them. + * @head: head of the queue (struct cds_wfcq_head or __cds_wfcq_head pointer). + * @tail: tail of the queue (struct cds_wfcq_tail pointer). + * @node: iterator on the queue (struct cds_wfcq_node pointer). + * + * Content written into each node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + */ +#define __cds_wfcq_for_each_blocking(head, tail, node)		\ +	for (node = __cds_wfcq_first_blocking(head, tail);	\ +		node != NULL;					\ +		node = __cds_wfcq_next_blocking(head, tail, node)) + +/* + * __cds_wfcq_for_each_blocking_safe: Iterate over all nodes in a queue, + * without dequeuing them. Safe against deletion. + * @head: head of the queue (struct cds_wfcq_head or __cds_wfcq_head pointer). + * @tail: tail of the queue (struct cds_wfcq_tail pointer). + * @node: iterator on the queue (struct cds_wfcq_node pointer). + * @n: struct cds_wfcq_node pointer holding the next pointer (used + *     internally). + * + * Content written into each node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + * Dequeue/splice/iteration mutual exclusion should be ensured by the + * caller. + */ +#define __cds_wfcq_for_each_blocking_safe(head, tail, node, n)		       \ +	for (node = __cds_wfcq_first_blocking(head, tail),		       \ +			n = (node ? __cds_wfcq_next_blocking(head, tail, node) : NULL); \ +		node != NULL;						       \ +		node = n, n = (node ? __cds_wfcq_next_blocking(head, tail, node) : NULL)) + +#ifdef __cplusplus +} +#endif + +#endif /* _URCU_WFCQUEUE_H */ diff --git a/contrib/userspace-rcu/wfstack.h b/contrib/userspace-rcu/wfstack.h new file mode 100644 index 00000000000..738fd1cfd33 --- /dev/null +++ b/contrib/userspace-rcu/wfstack.h @@ -0,0 +1,178 @@ +#ifndef _URCU_WFSTACK_H +#define _URCU_WFSTACK_H + +/* + * urcu/wfstack.h + * + * Userspace RCU library - Stack with wait-free push, blocking traversal. + * + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/* Adapted from userspace-rcu 0.10 because version 0.7 doesn't support a stack + * without mutex. The non-LGPL section has been removed. */ + +#include <pthread.h> +#include <assert.h> +#include <stdbool.h> +#include <urcu/compiler.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Stack with wait-free push, blocking traversal. + * + * Stack implementing push, pop, pop_all operations, as well as iterator + * on the stack head returned by pop_all. + * + * Wait-free operations: cds_wfs_push, __cds_wfs_pop_all, cds_wfs_empty, + *                       cds_wfs_first. + * Blocking operations: cds_wfs_pop, cds_wfs_pop_all, cds_wfs_next, + *                      iteration on stack head returned by pop_all. + * + * Synchronization table: + * + * External synchronization techniques described in the API below is + * required between pairs marked with "X". No external synchronization + * required between pairs marked with "-". + * + *                      cds_wfs_push  __cds_wfs_pop  __cds_wfs_pop_all + * cds_wfs_push               -              -                  - + * __cds_wfs_pop              -              X                  X + * __cds_wfs_pop_all          -              X                  - + * + * cds_wfs_pop and cds_wfs_pop_all use an internal mutex to provide + * synchronization. + */ + +#define CDS_WFS_WOULDBLOCK	((void *) -1UL) + +enum cds_wfs_state { +	CDS_WFS_STATE_LAST =		(1U << 0), +}; + +/* + * struct cds_wfs_node is returned by __cds_wfs_pop, and also used as + * iterator on stack. It is not safe to dereference the node next + * pointer when returned by __cds_wfs_pop_blocking. + */ +struct cds_wfs_node { +	struct cds_wfs_node *next; +}; + +/* + * struct cds_wfs_head is returned by __cds_wfs_pop_all, and can be used + * to begin iteration on the stack. "node" needs to be the first field of + * cds_wfs_head, so the end-of-stack pointer value can be used for both + * types. + */ +struct cds_wfs_head { +	struct cds_wfs_node node; +}; + +struct __cds_wfs_stack { +	struct cds_wfs_head *head; +}; + +struct cds_wfs_stack { +	struct cds_wfs_head *head; +	pthread_mutex_t lock; +}; + +/* + * The transparent union allows calling functions that work on both + * struct cds_wfs_stack and struct __cds_wfs_stack on any of those two + * types. + */ +typedef union { +	struct __cds_wfs_stack *_s; +	struct cds_wfs_stack *s; +} __attribute__((__transparent_union__)) cds_wfs_stack_ptr_t; + +#include "static-wfstack.h" + +#define cds_wfs_node_init		_cds_wfs_node_init +#define cds_wfs_init			_cds_wfs_init +#define cds_wfs_destroy			_cds_wfs_destroy +#define __cds_wfs_init			___cds_wfs_init +#define cds_wfs_empty			_cds_wfs_empty +#define cds_wfs_push			_cds_wfs_push + +/* Locking performed internally */ +#define cds_wfs_pop_blocking		_cds_wfs_pop_blocking +#define cds_wfs_pop_with_state_blocking	_cds_wfs_pop_with_state_blocking +#define cds_wfs_pop_all_blocking	_cds_wfs_pop_all_blocking + +/* + * For iteration on cds_wfs_head returned by __cds_wfs_pop_all or + * cds_wfs_pop_all_blocking. + */ +#define cds_wfs_first			_cds_wfs_first +#define cds_wfs_next_blocking		_cds_wfs_next_blocking +#define cds_wfs_next_nonblocking	_cds_wfs_next_nonblocking + +/* Pop locking with internal mutex */ +#define cds_wfs_pop_lock		_cds_wfs_pop_lock +#define cds_wfs_pop_unlock		_cds_wfs_pop_unlock + +/* Synchronization ensured by the caller. See synchronization table. */ +#define __cds_wfs_pop_blocking		___cds_wfs_pop_blocking +#define __cds_wfs_pop_with_state_blocking	\ +					___cds_wfs_pop_with_state_blocking +#define __cds_wfs_pop_nonblocking	___cds_wfs_pop_nonblocking +#define __cds_wfs_pop_with_state_nonblocking	\ +					___cds_wfs_pop_with_state_nonblocking +#define __cds_wfs_pop_all		___cds_wfs_pop_all + +#ifdef __cplusplus +} +#endif + +/* + * cds_wfs_for_each_blocking: Iterate over all nodes returned by + * __cds_wfs_pop_all(). + * @head: head of the queue (struct cds_wfs_head pointer). + * @node: iterator (struct cds_wfs_node pointer). + * + * Content written into each node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + */ +#define cds_wfs_for_each_blocking(head, node)			\ +	for (node = cds_wfs_first(head);			\ +		node != NULL;					\ +		node = cds_wfs_next_blocking(node)) + +/* + * cds_wfs_for_each_blocking_safe: Iterate over all nodes returned by + * __cds_wfs_pop_all(). Safe against deletion. + * @head: head of the queue (struct cds_wfs_head pointer). + * @node: iterator (struct cds_wfs_node pointer). + * @n: struct cds_wfs_node pointer holding the next pointer (used + *     internally). + * + * Content written into each node before enqueue is guaranteed to be + * consistent, but no other memory ordering is ensured. + */ +#define cds_wfs_for_each_blocking_safe(head, node, n)			   \ +	for (node = cds_wfs_first(head),				   \ +			n = (node ? cds_wfs_next_blocking(node) : NULL);   \ +		node != NULL;						   \ +		node = n, n = (node ? cds_wfs_next_blocking(node) : NULL)) + +#endif /* _URCU_WFSTACK_H */ diff --git a/glusterfsd/src/glusterfsd.c b/glusterfsd/src/glusterfsd.c index ae74de73f29..9ee88b6d57b 100644 --- a/glusterfsd/src/glusterfsd.c +++ b/glusterfsd/src/glusterfsd.c @@ -203,6 +203,8 @@ static struct argp_option gf_options[] = {       "Instantiate process global timer-wheel"},      {"thin-client", ARGP_THIN_CLIENT_KEY, 0, 0,       "Enables thin mount and connects via gfproxyd daemon"}, +    {"global-threading", ARGP_GLOBAL_THREADING_KEY, "BOOL", OPTION_ARG_OPTIONAL, +     "Use the global thread pool instead of io-threads"},      {0, 0, 0, 0, "Fuse options:"},      {"direct-io-mode", ARGP_DIRECT_IO_MODE_KEY, "BOOL|auto", @@ -697,6 +699,14 @@ set_fuse_mount_options(glusterfs_ctx_t *ctx, dict_t *options)                           cmd_args->fuse_flush_handle_interrupt);              break;      } +    if (cmd_args->global_threading) { +        ret = dict_set_static_ptr(options, "global-threading", "on"); +        if (ret < 0) { +            gf_msg("glusterfsd", GF_LOG_ERROR, 0, glusterfsd_msg_4, +                   "failed to set dict value for key global-threading"); +            goto err; +        } +    }      ret = 0;  err: @@ -1497,6 +1507,7 @@ parse_opts(int key, char *arg, struct argp_state *state)                           "unknown fuse flush handle interrupt setting \"%s\"",                           arg);              break; +          case ARGP_FUSE_AUTO_INVAL_KEY:              if (!arg)                  arg = "yes"; @@ -1507,6 +1518,20 @@ parse_opts(int key, char *arg, struct argp_state *state)              }              break; + +        case ARGP_GLOBAL_THREADING_KEY: +            if (!arg || (*arg == 0)) { +                arg = "yes"; +            } + +            if (gf_string2boolean(arg, &b) == 0) { +                cmd_args->global_threading = b; +                break; +            } + +            argp_failure(state, -1, 0, +                         "Invalid value for global threading \"%s\"", arg); +            break;      }      return 0;  } @@ -2435,6 +2460,10 @@ glusterfs_signals_setup(glusterfs_ctx_t *ctx)      sigaddset(&set, SIGUSR1); /* gf_proc_dump_info */      sigaddset(&set, SIGUSR2); +    /* Signals needed for asynchronous framework. */ +    sigaddset(&set, GF_ASYNC_SIGQUEUE); +    sigaddset(&set, GF_ASYNC_SIGCTRL); +      ret = pthread_sigmask(SIG_BLOCK, &set, NULL);      if (ret) {          gf_msg("glusterfsd", GF_LOG_WARNING, errno, glusterfsd_msg_22, @@ -2845,6 +2874,11 @@ main(int argc, char *argv[])       */      mem_pools_init_late(); +    ret = gf_async_init(ctx); +    if (ret < 0) { +        goto out; +    } +  #ifdef GF_LINUX_HOST_OS      ret = set_oom_score_adj(ctx);      if (ret) @@ -2871,6 +2905,7 @@ main(int argc, char *argv[])      ret = event_dispatch(ctx->event_pool);  out: -    //        glusterfs_ctx_destroy (ctx); +    //    glusterfs_ctx_destroy (ctx); +    gf_async_fini();      return ret;  } diff --git a/glusterfsd/src/glusterfsd.h b/glusterfsd/src/glusterfsd.h index 35cf6d88b7a..c91daa0fb54 100644 --- a/glusterfsd/src/glusterfsd.h +++ b/glusterfsd/src/glusterfsd.h @@ -111,6 +111,7 @@ enum argp_option_keys {      ARGP_FUSE_FLUSH_HANDLE_INTERRUPT_KEY = 189,      ARGP_FUSE_LRU_LIMIT_KEY = 190,      ARGP_FUSE_AUTO_INVAL_KEY = 191, +    ARGP_GLOBAL_THREADING_KEY = 192  };  struct _gfd_vol_top_priv { diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am index 970f4b74978..f030a70b0f0 100644 --- a/libglusterfs/src/Makefile.am +++ b/libglusterfs/src/Makefile.am @@ -37,7 +37,7 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \  	$(CONTRIBDIR)/timer-wheel/timer-wheel.c \  	$(CONTRIBDIR)/timer-wheel/find_last_bit.c default-args.c locking.c \  	$(CONTRIBDIR)/xxhash/xxhash.c \ -	compound-fop-utils.c throttle-tbf.c monitoring.c +	compound-fop-utils.c throttle-tbf.c monitoring.c async.c  nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c defaults.c  nodist_libglusterfs_la_HEADERS = y.tab.h protocol-common.h @@ -69,7 +69,7 @@ libglusterfs_la_HEADERS = glusterfs/common-utils.h glusterfs/defaults.h \  	glusterfs/quota-common-utils.h glusterfs/rot-buffs.h \  	glusterfs/compat-uuid.h glusterfs/upcall-utils.h glusterfs/throttle-tbf.h \  	glusterfs/events.h glusterfs/compound-fop-utils.h glusterfs/atomic.h \ -	glusterfs/monitoring.h +	glusterfs/monitoring.h glusterfs/async.h  libglusterfs_ladir = $(includedir)/glusterfs @@ -79,6 +79,10 @@ noinst_HEADERS = unittest/unittest.h \  	$(CONTRIBDIR)/libexecinfo/execinfo_compat.h \  	$(CONTRIBDIR)/timer-wheel/timer-wheel.h \  	$(CONTRIBDIR)/xxhash/xxhash.h \ +	$(CONTRIBDIR)/userspace-rcu/wfcqueue.h \ +	$(CONTRIBDIR)/userspace-rcu/wfstack.h \ +	$(CONTRIBDIR)/userspace-rcu/static-wfcqueue.h \ +	$(CONTRIBDIR)/userspace-rcu/static-wfstack.h \  	tier-ctr-interface.h  eventtypes.h: $(top_srcdir)/events/eventskeygen.py diff --git a/libglusterfs/src/async.c b/libglusterfs/src/async.c new file mode 100644 index 00000000000..ae7152ff7fa --- /dev/null +++ b/libglusterfs/src/async.c @@ -0,0 +1,723 @@ +/* +  Copyright (c) 2019 Red Hat, Inc <https://www.redhat.com> +  This file is part of GlusterFS. + +  This file is licensed to you under your choice of the GNU Lesser +  General Public License, version 3 or any later version (LGPLv3 or +  later), or the GNU General Public License, version 2 (GPLv2), in all +  cases as published by the Free Software Foundation. +*/ + +/* To implement an efficient thread pool with minimum contention we have used + * the following ideas: + * + *    - The queue of jobs has been implemented using a Wait-Free queue provided + *      by the userspace-rcu library. This queue requires a mutex when multiple + *      consumers can be extracting items from it concurrently, but the locked + *      region is very small, which minimizes the chances of contention. To + *      further minimize contention, the number of active worker threads that + *      are accessing the queue is dynamically adjusted so that we always have + *      the minimum required amount of workers contending for the queue. Adding + *      new items can be done with a single atomic operation, without locks. + * + *    - All queue management operations, like creating more threads, enabling + *      sleeping ones, etc. are done by a single thread. This makes it possible + *      to manage all scaling related information and workers lists without + *      locks. This functionality is implemented as a role that can be assigned + *      to any of the worker threads, which avoids that some lengthy operations + *      could interfere with this task. + * + *    - Management is based on signals. We used signals for management tasks to + *      avoid multiple system calls for each request (with signals we can wait + *      for multiple events and get some additional data for each request in a + *      single call, instead of first polling and then reading). + * + * TODO: There are some other changes that can take advantage of this new + *       thread pool. + * + *          - Use this thread pool as the core threading model for synctasks. I + *            think this would improve synctask performance because I think we + *            currently have some contention there for some workloads. + * + *          - Implement a per thread timer that will allow adding and removing + *            timers without using mutexes. + * + *          - Integrate with userspace-rcu library in QSBR mode, allowing + *            other portions of code to be implemented using RCU-based + *            structures with a extremely fast read side without contention. + * + *          - Integrate I/O into the thread pool so that the thread pool is + *            able to efficiently manage all loads and scale dynamically. This + *            could make it possible to minimize context switching when serving + *            requests from fuse or network. + * + *          - Dynamically scale the number of workers based on system load. + *            This will make it possible to reduce contention when system is + *            heavily loaded, improving performance under these circumstances + *            (or minimizing performance loss). This will also make it possible + *            that gluster can coexist with other processes that also consume + *            CPU, with minimal interference from each other. + */ + +#include <unistd.h> +#include <pthread.h> +#include <errno.h> + +//#include <urcu/uatomic.h> + +#include "glusterfs/logging.h" +#include "glusterfs/list.h" +#include "glusterfs/mem-types.h" +#include "glusterfs/async.h" + +/* These macros wrap a simple system/library call to check the returned error + * and log a message in case of failure. */ +#define GF_ASYNC_CHECK(_func, _args...)                                        \ +    ({                                                                         \ +        int32_t __async_error = -_func(_args);                                 \ +        if (caa_unlikely(__async_error != 0)) {                                \ +            gf_async_error(__async_error, #_func "() failed.");                \ +        }                                                                      \ +        __async_error;                                                         \ +    }) + +#define GF_ASYNC_CHECK_ERRNO(_func, _args...)                                  \ +    ({                                                                         \ +        int32_t __async_error = _func(_args);                                  \ +        if (caa_unlikely(__async_error < 0)) {                                 \ +            __async_error = -errno;                                            \ +            gf_async_error(__async_error, #_func "() failed.");                \ +        }                                                                      \ +        __async_error;                                                         \ +    }) + +/* These macros are used when, based on POSIX documentation, the function + * should never fail under the conditions we are using it. So any unexpected + * error will be handled as a fatal event. It probably means a critical bug + * or memory corruption. In both cases we consider that stopping the process + * is safer (otherwise it could cause more corruption with unknown effects + * that could be worse). */ +#define GF_ASYNC_CANTFAIL(_func, _args...)                                     \ +    do {                                                                       \ +        int32_t __async_error = -_func(_args);                                 \ +        if (caa_unlikely(__async_error != 0)) {                                \ +            gf_async_fatal(__async_error, #_func "() failed");                 \ +        }                                                                      \ +    } while (0) + +#define GF_ASYNC_CANTFAIL_ERRNO(_func, _args...)                               \ +    ({                                                                         \ +        int32_t __async_error = _func(_args);                                  \ +        if (caa_unlikely(__async_error < 0)) {                                 \ +            __async_error = -errno;                                            \ +            gf_async_fatal(__async_error, #_func "() failed");                 \ +        }                                                                      \ +        __async_error;                                                         \ +    }) + +/* TODO: for now we allocate a static array of workers. There's an issue if we + *       try to use dynamic memory since these workers are initialized very + *       early in the process startup and it seems that sometimes not all is + *       ready to use dynamic memory. */ +static gf_async_worker_t gf_async_workers[GF_ASYNC_MAX_THREADS]; + +/* This is the only global variable needed to manage the entire framework. */ +gf_async_control_t gf_async_ctrl = {}; + +static __thread gf_async_worker_t *gf_async_current_worker = NULL; + +/* The main function of the worker threads. */ +static void * +gf_async_worker(void *arg); + +static void +gf_async_sync_init(void) +{ +    GF_ASYNC_CANTFAIL(pthread_barrier_init, &gf_async_ctrl.sync, NULL, 2); +} + +static void +gf_async_sync_now(void) +{ +    int32_t ret; + +    ret = pthread_barrier_wait(&gf_async_ctrl.sync); +    if (ret == PTHREAD_BARRIER_SERIAL_THREAD) { +        GF_ASYNC_CANTFAIL(pthread_barrier_destroy, &gf_async_ctrl.sync); +        ret = 0; +    } +    if (caa_unlikely(ret != 0)) { +        gf_async_fatal(-ret, "pthread_barrier_wait() failed"); +    } +} + +static void +gf_async_sigmask_empty(sigset_t *mask) +{ +    GF_ASYNC_CANTFAIL_ERRNO(sigemptyset, mask); +} + +static void +gf_async_sigmask_add(sigset_t *mask, int32_t signal) +{ +    GF_ASYNC_CANTFAIL_ERRNO(sigaddset, mask, signal); +} + +static void +gf_async_sigmask_set(int32_t mode, sigset_t *mask, sigset_t *old) +{ +    GF_ASYNC_CANTFAIL(pthread_sigmask, mode, mask, old); +} + +static void +gf_async_sigaction(int32_t signum, const struct sigaction *action, +                   struct sigaction *old) +{ +    GF_ASYNC_CANTFAIL_ERRNO(sigaction, signum, action, old); +} + +static int32_t +gf_async_sigwait(sigset_t *set) +{ +    int32_t ret, signum; + +    do { +        ret = sigwait(set, &signum); +    } while (caa_unlikely((ret < 0) && (errno == EINTR))); + +    if (caa_unlikely(ret < 0)) { +        ret = -errno; +        gf_async_fatal(ret, "sigwait() failed"); +    } + +    return signum; +} + +static int32_t +gf_async_sigtimedwait(sigset_t *set, struct timespec *timeout) +{ +    int32_t ret; + +    do { +        ret = sigtimedwait(set, NULL, timeout); +    } while (caa_unlikely((ret < 0) && (errno == EINTR))); +    if (caa_unlikely(ret < 0)) { +        ret = -errno; +        /* EAGAIN means that the timeout has expired, so we allow this error. +         * Any other error shouldn't happen. */ +        if (caa_unlikely(ret != -EAGAIN)) { +            gf_async_fatal(ret, "sigtimedwait() failed"); +        } +        ret = 0; +    } + +    return ret; +} + +static void +gf_async_sigbroadcast(int32_t signum) +{ +    GF_ASYNC_CANTFAIL_ERRNO(kill, gf_async_ctrl.pid, signum); +} + +static void +gf_async_signal_handler(int32_t signum) +{ +    /* We should never handle a signal in this function. */ +    gf_async_fatal(-EBUSY, +                   "Unexpected processing of signal %d through a handler.", +                   signum); +} + +static void +gf_async_signal_setup(void) +{ +    struct sigaction action; + +    /* We configure all related signals so that we can detect threads using an +     * invalid signal mask that doesn't block our critical signal. */ +    memset(&action, 0, sizeof(action)); +    action.sa_handler = gf_async_signal_handler; + +    gf_async_sigaction(GF_ASYNC_SIGCTRL, &action, &gf_async_ctrl.handler_ctrl); + +    gf_async_sigaction(GF_ASYNC_SIGQUEUE, &action, +                       &gf_async_ctrl.handler_queue); +} + +static void +gf_async_signal_restore(void) +{ +    /* Handlers we have previously changed are restored back to their original +     * value. */ + +    if (gf_async_ctrl.handler_ctrl.sa_handler != gf_async_signal_handler) { +        gf_async_sigaction(GF_ASYNC_SIGCTRL, &gf_async_ctrl.handler_ctrl, NULL); +    } + +    if (gf_async_ctrl.handler_queue.sa_handler != gf_async_signal_handler) { +        gf_async_sigaction(GF_ASYNC_SIGQUEUE, &gf_async_ctrl.handler_queue, +                           NULL); +    } +} + +static void +gf_async_signal_flush(void) +{ +    struct timespec delay; + +    delay.tv_sec = 0; +    delay.tv_nsec = 0; + +    /* We read all pending signals so that they don't trigger once the signal +     * mask of some thread is changed. */ +    while (gf_async_sigtimedwait(&gf_async_ctrl.sigmask_ctrl, &delay) > 0) { +    } +    while (gf_async_sigtimedwait(&gf_async_ctrl.sigmask_queue, &delay) > 0) { +    } +} + +static int32_t +gf_async_thread_create(pthread_t *thread, int32_t id, void *data) +{ +    int32_t ret; + +    ret = gf_thread_create(thread, NULL, gf_async_worker, data, +                           GF_ASYNC_THREAD_NAME "%u", id); +    if (caa_unlikely(ret < 0)) { +        /* TODO: gf_thread_create() should return a more specific error +         *       code. */ +        return -ENOMEM; +    } + +    return 0; +} + +static void +gf_async_thread_wait(pthread_t thread) +{ +    /* TODO: this is a blocking call executed inside one of the workers of the +     *       thread pool. This is bad, but this is only executed once we have +     *       received a notification from the thread that it's terminating, so +     *       this should return almost immediately. However, to be more robust +     *       it would be better to use pthread_timedjoin_np() (or even a call +     *       to pthread_tryjoin_np() followed by a delayed recheck if it +     *       fails), but they are not portable. We should see how to do this +     *       in other platforms. */ +    GF_ASYNC_CANTFAIL(pthread_join, thread, NULL); +} + +static int32_t +gf_async_worker_create(void) +{ +    struct cds_wfs_node *node; +    gf_async_worker_t *worker; +    uint32_t counts, running, max; +    int32_t ret; + +    node = __cds_wfs_pop_blocking(&gf_async_ctrl.available); +    if (caa_unlikely(node == NULL)) { +        /* There are no more available workers. We have all threads running. */ +        return 1; +    } +    cds_wfs_node_init(node); + +    ret = 1; + +    counts = uatomic_read(&gf_async_ctrl.counts); +    max = uatomic_read(&gf_async_ctrl.max_threads); +    running = GF_ASYNC_COUNT_RUNNING(counts); +    if (running < max) { +        uatomic_add(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(1, 0)); + +        worker = caa_container_of(node, gf_async_worker_t, stack); + +        ret = gf_async_thread_create(&worker->thread, worker->id, worker); +        if (caa_likely(ret >= 0)) { +            return 0; +        } + +        uatomic_add(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(-1, 0)); +    } + +    cds_wfs_push(&gf_async_ctrl.available, node); + +    return ret; +} + +static void +gf_async_worker_enable(void) +{ +    /* This will wake one of the spare workers. If all workers are busy now, +     * the signal will be queued so that the first one that completes its +     * work will become the leader. */ +    gf_async_sigbroadcast(GF_ASYNC_SIGCTRL); + +    /* We have consumed a spare worker. We create another one for future +     * needs. */ +    gf_async_worker_create(); +} + +static void +gf_async_worker_wait(void) +{ +    int32_t signum; + +    signum = gf_async_sigwait(&gf_async_ctrl.sigmask_ctrl); +    if (caa_unlikely(signum != GF_ASYNC_SIGCTRL)) { +        gf_async_fatal(-EINVAL, "Worker received an unexpected signal (%d)", +                       signum); +    } +} + +static void +gf_async_leader_wait(void) +{ +    int32_t signum; + +    signum = gf_async_sigwait(&gf_async_ctrl.sigmask_queue); +    if (caa_unlikely(signum != GF_ASYNC_SIGQUEUE)) { +        gf_async_fatal(-EINVAL, "Leader received an unexpected signal (%d)", +                       signum); +    } +} + +static void +gf_async_run(struct cds_wfcq_node *node) +{ +    gf_async_t *async; + +    /* We've just got work from the queue. Process it. */ +    async = caa_container_of(node, gf_async_t, queue); +    /* TODO: remove dependency from THIS and xl. */ +    THIS = async->xl; +    async->cbk(async->xl, async); +} + +static void +gf_async_worker_run(void) +{ +    struct cds_wfcq_node *node; + +    do { +        /* We keep executing jobs from the queue while it's not empty. Note +         * that while we do this, we are ignoring any stop request. That's +         * fine, since we need to process our own 'join' messages to fully +         * terminate all threads. Note that normal jobs should have already +         * completed once a stop request is received. */ +        node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, +                                         &gf_async_ctrl.queue.tail); +        if (node != NULL) { +            gf_async_run(node); +        } +    } while (node != NULL); + +    /* TODO: I've tried to keep the worker looking at the queue for some small +     *       amount of time in a busy loop to see if more jobs come soon. With +     *       this I attempted to avoid the overhead of signal management if +     *       jobs come fast enough. However experimental results seem to +     *       indicate that doing this, CPU utilization grows and performance +     *       is actually reduced. We need to see if that's because I used bad +     *       parameters or it's really better to do it as it's done now. */ +} + +static void +gf_async_leader_run(void) +{ +    struct cds_wfcq_node *node; + +    node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, +                                     &gf_async_ctrl.queue.tail); +    while (caa_unlikely(node == NULL)) { +        gf_async_leader_wait(); + +        node = cds_wfcq_dequeue_blocking(&gf_async_ctrl.queue.head, +                                         &gf_async_ctrl.queue.tail); +    } + +    /* Activate the next available worker thread. It will become the new +     * leader. */ +    gf_async_worker_enable(); + +    gf_async_run(node); +} + +static uint32_t +gf_async_stop_check(gf_async_worker_t *worker) +{ +    uint32_t counts, old, running, max; + +    /* First we check if we should stop without doing any costly atomic +     * operation. */ +    old = uatomic_read(&gf_async_ctrl.counts); +    max = uatomic_read(&gf_async_ctrl.max_threads); +    running = GF_ASYNC_COUNT_RUNNING(old); +    while (running > max) { +        /* There are too many threads. We try to stop the current worker. */ +        counts = uatomic_cmpxchg(&gf_async_ctrl.counts, old, +                                 old + GF_ASYNC_COUNTS(-1, 1)); +        if (old != counts) { +            /* Another thread has just updated the counts. We need to retry. */ +            old = counts; +            running = GF_ASYNC_COUNT_RUNNING(old); + +            continue; +        } + +        running--; +        worker->running = false; +    } + +    return running; +} + +static void +gf_async_stop_all(xlator_t *xl, gf_async_t *async) +{ +    if (gf_async_stop_check(gf_async_current_worker) > 0) { +        /* There are more workers running. We propagate the stop request to +         * them. */ +        gf_async(async, xl, gf_async_stop_all); +    } +} + +static void +gf_async_join(xlator_t *xl, gf_async_t *async) +{ +    gf_async_worker_t *worker; + +    worker = caa_container_of(async, gf_async_worker_t, async); + +    gf_async_thread_wait(worker->thread); + +    cds_wfs_push(&gf_async_ctrl.available, &worker->stack); +} + +static void +gf_async_terminate(gf_async_worker_t *worker) +{ +    uint32_t counts; + +    counts = uatomic_add_return(&gf_async_ctrl.counts, GF_ASYNC_COUNTS(0, -1)); +    if (counts == 0) { +        /* This is the termination of the last worker thread. We need to +         * synchronize the main thread that is waiting for all workers to +         * finish. */ +        gf_async_ctrl.sync_thread = worker->thread; + +        gf_async_sync_now(); +    } else { +        /* Force someone else to join this thread to release resources. */ +        gf_async(&worker->async, THIS, gf_async_join); +    } +} + +static void * +gf_async_worker(void *arg) +{ +    gf_async_worker_t *worker; + +    worker = (gf_async_worker_t *)arg; +    gf_async_current_worker = worker; + +    worker->running = true; +    do { +        /* This thread does nothing until someone enables it to become a +         * leader. */ +        gf_async_worker_wait(); + +        /* This thread is now a leader. It will process jobs from the queue +         * and, if necessary, enable another worker and transfer leadership +         * to it. */ +        gf_async_leader_run(); + +        /* This thread is not a leader anymore. It will continue processing +         * queued jobs until it becomes empty. */ +        gf_async_worker_run(); + +        /* Stop the current thread if there are too many threads running. */ +        gf_async_stop_check(worker); +    } while (worker->running); + +    gf_async_terminate(worker); + +    return NULL; +} + +static void +gf_async_cleanup(void) +{ +    /* We do some basic initialization of the global variable 'gf_async_ctrl' +     * so that it's put into a relatively consistent state. */ + +    gf_async_ctrl.enabled = false; + +    gf_async_ctrl.pid = 0; +    gf_async_sigmask_empty(&gf_async_ctrl.sigmask_ctrl); +    gf_async_sigmask_empty(&gf_async_ctrl.sigmask_queue); + +    /* This is used to later detect if the handler of these signals have been +     * changed or not. */ +    gf_async_ctrl.handler_ctrl.sa_handler = gf_async_signal_handler; +    gf_async_ctrl.handler_queue.sa_handler = gf_async_signal_handler; + +    gf_async_ctrl.table = NULL; +    gf_async_ctrl.max_threads = 0; +    gf_async_ctrl.counts = 0; +} + +void +gf_async_fini(void) +{ +    gf_async_t async; + +    if (uatomic_read(&gf_async_ctrl.counts) != 0) { +        /* We ensure that all threads will quit on the next check. */ +        gf_async_ctrl.max_threads = 0; + +        /* Send the stop request to the thread pool. This will cause the +         * execution of gf_async_stop_all() by one of the worker threads which, +         * eventually, will terminate all worker threads. */ +        gf_async(&async, THIS, gf_async_stop_all); + +        /* We synchronize here with the last thread. */ +        gf_async_sync_now(); + +        /* We have just synchronized with the latest thread. Now just wait for +         * it to terminate. */ +        gf_async_thread_wait(gf_async_ctrl.sync_thread); + +        gf_async_signal_flush(); +    } + +    gf_async_signal_restore(); + +    gf_async_cleanup(); +} + +void +gf_async_adjust_threads(int32_t threads) +{ +    if (threads == 0) { +        /* By default we allow a maximum of 2 * #cores worker threads. This +         * value is to try to accommodate threads that will do some I/O. Having +         * more threads than cores we can keep CPU busy even if some threads +         * are blocked for I/O. In the most efficient case, we can have #cores +         * computing threads and #cores blocked threads on I/O. However this is +         * hard to achieve because we can end with more than #cores computing +         * threads, which won't provide a real benefit and will increase +         * contention. +         * +         * TODO: implement a more intelligent dynamic maximum based on CPU +         *       usage and/or system load. */ +        threads = sysconf(_SC_NPROCESSORS_ONLN) * 2; +        if (threads < 0) { +            /* If we can't get the current number of processors, we pick a +             * random number. */ +            threads = 16; +        } +    } +    if (threads > GF_ASYNC_MAX_THREADS) { +        threads = GF_ASYNC_MAX_THREADS; +    } +    uatomic_set(&gf_async_ctrl.max_threads, threads); +} + +int32_t +gf_async_init(glusterfs_ctx_t *ctx) +{ +    sigset_t set; +    gf_async_worker_t *worker; +    uint32_t i; +    int32_t ret; +    bool running; + +    gf_async_cleanup(); + +    if (!ctx->cmd_args.global_threading || +        (ctx->process_mode == GF_GLUSTERD_PROCESS)) { +        return 0; +    } + +    /* At the init time, the maximum number of threads has not yet been +     * configured. We use a small starting value that will be layer dynamically +     * adjusted when ctx->config.max_threads is updated. */ +    gf_async_adjust_threads(GF_ASYNC_SPARE_THREADS + 1); + +    gf_async_ctrl.pid = getpid(); + +    __cds_wfs_init(&gf_async_ctrl.available); +    cds_wfcq_init(&gf_async_ctrl.queue.head, &gf_async_ctrl.queue.tail); + +    gf_async_sync_init(); + +    /* TODO: it would be cleaner to use dynamic memory, but at this point some +     *       memory management resources are not yet initialized. */ +    gf_async_ctrl.table = gf_async_workers; + +    /* We keep all workers in a stack. It will be used when a new thread needs +     * to be created. */ +    for (i = GF_ASYNC_MAX_THREADS; i > 0; i--) { +        worker = &gf_async_ctrl.table[i - 1]; + +        worker->id = i - 1; +        cds_wfs_node_init(&worker->stack); +        cds_wfs_push(&gf_async_ctrl.available, &worker->stack); +    } + +    /* Prepare the signal mask for regular workers and the leader. */ +    gf_async_sigmask_add(&gf_async_ctrl.sigmask_ctrl, GF_ASYNC_SIGCTRL); +    gf_async_sigmask_add(&gf_async_ctrl.sigmask_queue, GF_ASYNC_SIGQUEUE); + +    /* TODO: this is needed to block our special signals in the current thread +     *       and all children that it starts. It would be cleaner to do it when +     *       signals are initialized, but there doesn't seem to be a unique +     *       place to do that, so for now we do it here. */ +    gf_async_sigmask_empty(&set); +    gf_async_sigmask_add(&set, GF_ASYNC_SIGCTRL); +    gf_async_sigmask_add(&set, GF_ASYNC_SIGQUEUE); +    gf_async_sigmask_set(SIG_BLOCK, &set, NULL); + +    /* Configure the signal handlers. This is mostly for safety, not really +     * needed, but it doesn't hurt. Note that the caller must ensure that the +     * signals we need to run are already blocked in any thread already +     * started. Otherwise this won't work. */ +    gf_async_signal_setup(); + +    running = false; + +    /* We start the spare workers + 1 for the leader. */ +    for (i = 0; i < GF_ASYNC_SPARE_THREADS; i++) { +        ret = gf_async_worker_create(); +        if (caa_unlikely(ret < 0)) { +            /* This is the initial start up so we enforce that the spare +             * threads are created. If this fails at the beginning, it's very +             * unlikely that the async workers could do its job, so we abort +             * the initialization. */ +            goto out; +        } + +        /* Once the first thread is started, we can enable it to become the +         * initial leader. */ +        if ((ret == 0) && !running) { +            running = true; +            gf_async_worker_enable(); +        } +    } + +    if (caa_unlikely(!running)) { +        gf_async_fatal(-ENOMEM, "No worker thread has started"); +    } + +    gf_async_ctrl.enabled = true; + +    ret = 0; + +out: +    if (ret < 0) { +        gf_async_error(ret, "Unable to initialize the thread pool."); +        gf_async_fini(); +    } + +    return ret; +} diff --git a/libglusterfs/src/glusterfs/async.h b/libglusterfs/src/glusterfs/async.h new file mode 100644 index 00000000000..d1d70ae0bc7 --- /dev/null +++ b/libglusterfs/src/glusterfs/async.h @@ -0,0 +1,209 @@ +/* +  Copyright (c) 2019 Red Hat, Inc <https://www.redhat.com> +  This file is part of GlusterFS. + +  This file is licensed to you under your choice of the GNU Lesser +  General Public License, version 3 or any later version (LGPLv3 or +  later), or the GNU General Public License, version 2 (GPLv2), in all +  cases as published by the Free Software Foundation. +*/ + +#ifndef __GLUSTERFS_ASYNC_H__ +#define __GLUSTERFS_ASYNC_H__ + +#define _LGPL_SOURCE + +#include <sys/types.h> +#include <signal.h> +#include <errno.h> + +#ifdef URCU_OLD + +/* TODO: Fix the include paths. Since this is a .h included from many places + *       it makes no sense to append a '-I$(CONTRIBDIR)/userspace-rcu/' to each + *       Makefile.am. I've also seen some problems with CI builders (they + *       failed to find the include files, but the same source on another setup + *       is working fine). */ +#include "wfcqueue.h" +#include "wfstack.h" + +#else /* !URCU_OLD */ + +#include <urcu/wfcqueue.h> +#include <urcu/wfstack.h> + +#endif /* URCU_OLD */ + +#include "glusterfs/xlator.h" +#include "glusterfs/common-utils.h" +#include "glusterfs/list.h" +#include "glusterfs/libglusterfs-messages.h" + +/* This is the name prefix that all worker threads will have. A number will + * be added to differentiate them. */ +#define GF_ASYNC_THREAD_NAME "tpw" + +/* This value determines the maximum number of threads that are allowed. */ +#define GF_ASYNC_MAX_THREADS 128 + +/* This value determines how many additional threads will be started but will + * remain inactive until they are explicitly activated by the leader. This is + * useful to react faster to bursts of load, but at the same time we minimize + * contention if they are not really needed to handle current load. + * + * TODO: Instead of a fixed number, it would probably be better to use a + *       prcentage of the available cores. */ +#define GF_ASYNC_SPARE_THREADS 2 + +/* This value determines the signal used to wake the leader when new work has + * been added to the queue. To do so we reuse SIGALRM, since the most logical + * candidates (SIGUSR1/SIGUSR2) are already used. This signal must not be used + * by anything else in the process. */ +#define GF_ASYNC_SIGQUEUE SIGALRM + +/* This value determines the signal that will be used to transfer leader role + * to other workers. */ +#define GF_ASYNC_SIGCTRL SIGVTALRM + +#define gf_async_warning(_err, _msg, _args...)                                 \ +    gf_msg("async", GF_LOG_WARNING, -(_err), LG_MSG_ASYNC_WARNING, _msg,       \ +           ##_args) + +#define gf_async_error(_err, _msg, _args...)                                   \ +    gf_msg("async", GF_LOG_ERROR, -(_err), LG_MSG_ASYNC_FAILURE, _msg, ##_args) + +#define gf_async_fatal(_err, _msg, _args...)                                   \ +    do {                                                                       \ +        GF_ABORT("Critical error in async module. Unable to continue. (" _msg  \ +                 "). Error %d.",                                               \ +                 ##_args, -(_err));                                            \ +    } while (0) + +struct _gf_async; +typedef struct _gf_async gf_async_t; + +struct _gf_async_worker; +typedef struct _gf_async_worker gf_async_worker_t; + +struct _gf_async_queue; +typedef struct _gf_async_queue gf_async_queue_t; + +struct _gf_async_control; +typedef struct _gf_async_control gf_async_control_t; + +typedef void (*gf_async_callback_f)(xlator_t *xl, gf_async_t *async); + +struct _gf_async { +    /* TODO: remove dependency on xl/THIS. */ +    xlator_t *xl; +    gf_async_callback_f cbk; +    struct cds_wfcq_node queue; +}; + +struct _gf_async_worker { +    /* Used to send asynchronous jobs related to the worker. */ +    gf_async_t async; + +    /* Member of the available workers stack. */ +    struct cds_wfs_node stack; + +    /* Thread object of the current worker. */ +    pthread_t thread; + +    /* Unique identifier of this worker. */ +    int32_t id; + +    /* Indicates if this worker is enabled. */ +    bool running; +}; + +struct _gf_async_queue { +    /* Structures needed to manage a wait-free queue. For better performance +     * they are placed in two different cache lines, as recommended by URCU +     * documentation, even though in our case some threads will be producers +     * and consumers at the same time. */ +    struct cds_wfcq_head head __attribute__((aligned(64))); +    struct cds_wfcq_tail tail __attribute__((aligned(64))); +}; + +#define GF_ASYNC_COUNTS(_run, _stop) (((uint32_t)(_run) << 16) + (_stop)) +#define GF_ASYNC_COUNT_RUNNING(_count) ((_count) >> 16) +#define GF_ASYNC_COUNT_STOPPING(_count) ((_count)&65535) + +struct _gf_async_control { +    gf_async_queue_t queue; + +    /* Stack of unused workers. */ +    struct __cds_wfs_stack available; + +    /* Array of preallocated worker structures. */ +    gf_async_worker_t *table; + +    /* Used to synchronize main thread with workers on termination. */ +    pthread_barrier_t sync; + +    /* The id of the last thread that will be used for synchronization. */ +    pthread_t sync_thread; + +    /* Signal mask to wait for control signals from leader. */ +    sigset_t sigmask_ctrl; + +    /* Signal mask to wait for queued items. */ +    sigset_t sigmask_queue; + +    /* Saved signal handlers. */ +    struct sigaction handler_ctrl; +    struct sigaction handler_queue; + +    /* PID of the current process. */ +    pid_t pid; + +    /* Maximum number of allowed threads. */ +    uint32_t max_threads; + +    /* Current number of running and stopping workers. This value is split +     * into 2 16-bits fields to track both counters atomically at the same +     * time. */ +    uint32_t counts; + +    /* It's used to control whether the asynchronous infrastructure is used +     * or not. */ +    bool enabled; +}; + +extern gf_async_control_t gf_async_ctrl; + +int32_t +gf_async_init(glusterfs_ctx_t *ctx); + +void +gf_async_fini(void); + +void +gf_async_adjust_threads(int32_t threads); + +static inline void +gf_async(gf_async_t *async, xlator_t *xl, gf_async_callback_f cbk) +{ +    if (!gf_async_ctrl.enabled) { +        cbk(xl, async); +        return; +    } + +    async->xl = xl; +    async->cbk = cbk; +    cds_wfcq_node_init(&async->queue); +    if (caa_unlikely(!cds_wfcq_enqueue(&gf_async_ctrl.queue.head, +                                       &gf_async_ctrl.queue.tail, +                                       &async->queue))) { +        /* The queue was empty, so the leader could be sleeping. We need to +         * wake it so that the new item can be processed. If the queue was not +         * empty, we don't need to do anything special since the leader will +         * take care of it. */ +        if (caa_unlikely(kill(gf_async_ctrl.pid, GF_ASYNC_SIGQUEUE) < 0)) { +            gf_async_fatal(errno, "Unable to wake leader worker."); +        }; +    } +} + +#endif /* !__GLUSTERFS_ASYNC_H__ */ diff --git a/libglusterfs/src/glusterfs/common-utils.h b/libglusterfs/src/glusterfs/common-utils.h index f03d2c1049a..1418c6531c7 100644 --- a/libglusterfs/src/glusterfs/common-utils.h +++ b/libglusterfs/src/glusterfs/common-utils.h @@ -442,7 +442,7 @@ BIT_VALUE(unsigned char *array, unsigned int index)      } while (0)  #endif -#define GF_ABORT(msg)                                                          \ +#define GF_ABORT(msg...)                                                       \      do {                                                                       \          gf_msg_callingfn("", GF_LOG_CRITICAL, 0, LG_MSG_ASSERTION_FAILED,      \                           "Assertion failed: " msg);                            \ diff --git a/libglusterfs/src/glusterfs/glusterfs.h b/libglusterfs/src/glusterfs/glusterfs.h index 7c6af090fd8..9d140c18ee3 100644 --- a/libglusterfs/src/glusterfs/glusterfs.h +++ b/libglusterfs/src/glusterfs/glusterfs.h @@ -575,6 +575,8 @@ struct _cmd_args {      int fuse_flush_handle_interrupt;      int fuse_auto_inval; + +    bool global_threading;  };  typedef struct _cmd_args cmd_args_t; diff --git a/libglusterfs/src/glusterfs/libglusterfs-messages.h b/libglusterfs/src/glusterfs/libglusterfs-messages.h index 1b72f6df5be..e17e33e06fb 100644 --- a/libglusterfs/src/glusterfs/libglusterfs-messages.h +++ b/libglusterfs/src/glusterfs/libglusterfs-messages.h @@ -109,6 +109,6 @@ GLFS_MSGID(      LG_MSG_PTHREAD_ATTR_INIT_FAILED, LG_MSG_INVALID_INODE_LIST,      LG_MSG_COMPACT_FAILED, LG_MSG_COMPACT_STATUS, LG_MSG_UTIMENSAT_FAILED,      LG_MSG_PTHREAD_NAMING_FAILED, LG_MSG_SYSCALL_RETURNS_WRONG, -    LG_MSG_XXH64_TO_GFID_FAILED); +    LG_MSG_XXH64_TO_GFID_FAILED, LG_MSG_ASYNC_WARNING, LG_MSG_ASYNC_FAILURE);  #endif /* !_LG_MESSAGES_H_ */ diff --git a/libglusterfs/src/libglusterfs.sym b/libglusterfs/src/libglusterfs.sym index 2ac87c491ae..4466a5eee9a 100644 --- a/libglusterfs/src/libglusterfs.sym +++ b/libglusterfs/src/libglusterfs.sym @@ -553,6 +553,11 @@ get_xlator_by_name  get_xlator_by_type  gf_array_insertionsort  gf_asprintf +gf_async +gf_async_adjust_threads +gf_async_ctrl +gf_async_init +gf_async_fini  gf_backtrace_save  gf_bits_count  gf_bits_index diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index f9cbdf133c7..ce984426cbe 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -144,6 +144,8 @@ rpc_transport_pollin_alloc(rpc_transport_t *this, struct iovec *vector,          goto out;      } +    msg->trans = this; +      if (count > 1) {          msg->vectored = 1;      } diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index 9e75d1a2bbb..6830279f07e 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -58,6 +58,7 @@ typedef struct rpc_transport rpc_transport_t;  #include <glusterfs/dict.h>  #include <glusterfs/compat.h> +#include <glusterfs/async.h>  #include "rpcsvc-common.h"  struct peer_info { @@ -155,16 +156,6 @@ struct rpc_request_info {  };  typedef struct rpc_request_info rpc_request_info_t; -struct rpc_transport_pollin { -    int count; -    void *private; -    struct iobref *iobref; -    struct iovec vector[MAX_IOVEC]; -    char is_reply; -    char vectored; -}; -typedef struct rpc_transport_pollin rpc_transport_pollin_t; -  typedef int (*rpc_transport_notify_t)(rpc_transport_t *, void *mydata,                                        rpc_transport_event_t, void *data, ...); @@ -217,6 +208,18 @@ struct rpc_transport {      gf_atomic_t disconnect_progress;  }; +struct rpc_transport_pollin { +    struct rpc_transport *trans; +    int count; +    void *private; +    struct iobref *iobref; +    struct iovec vector[MAX_IOVEC]; +    char is_reply; +    char vectored; +    gf_async_t async; +}; +typedef struct rpc_transport_pollin rpc_transport_pollin_t; +  struct rpc_transport_ops {      /* no need of receive op, msg will be delivered through an event       * notification diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 74373c44f91..c38a675b8c2 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -2314,6 +2314,11 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,      newprog->alive = _gf_true; +    if (gf_async_ctrl.enabled) { +        newprog->ownthread = _gf_false; +        newprog->synctask = _gf_false; +    } +      /* make sure synctask gets priority over ownthread */      if (newprog->synctask)          newprog->ownthread = _gf_false; diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index fa0e0f20901..26dbe0b706a 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -2495,6 +2495,33 @@ out:      return ret;  } +static void +socket_event_poll_in_async(xlator_t *xl, gf_async_t *async) +{ +    rpc_transport_pollin_t *pollin; +    rpc_transport_t *this; +    socket_private_t *priv; + +    pollin = caa_container_of(async, rpc_transport_pollin_t, async); +    this = pollin->trans; +    priv = this->private; + +    rpc_transport_notify(this, RPC_TRANSPORT_MSG_RECEIVED, pollin); + +    rpc_transport_unref(this); + +    rpc_transport_pollin_destroy(pollin); + +    pthread_mutex_lock(&priv->notify.lock); +    { +        --priv->notify.in_progress; + +        if (!priv->notify.in_progress) +            pthread_cond_signal(&priv->notify.cond); +    } +    pthread_mutex_unlock(&priv->notify.lock); +} +  static int  socket_event_poll_in(rpc_transport_t *this, gf_boolean_t notify_handled)  { @@ -2519,18 +2546,8 @@ socket_event_poll_in(rpc_transport_t *this, gf_boolean_t notify_handled)          event_handled(ctx->event_pool, priv->sock, priv->idx, priv->gen);      if (pollin) { -        rpc_transport_notify(this, RPC_TRANSPORT_MSG_RECEIVED, pollin); - -        rpc_transport_pollin_destroy(pollin); - -        pthread_mutex_lock(&priv->notify.lock); -        { -            --priv->notify.in_progress; - -            if (!priv->notify.in_progress) -                pthread_cond_signal(&priv->notify.cond); -        } -        pthread_mutex_unlock(&priv->notify.lock); +        rpc_transport_ref(this); +        gf_async(&pollin->async, THIS, socket_event_poll_in_async);      }      return ret; diff --git a/tests/basic/global-threading.t b/tests/basic/global-threading.t new file mode 100644 index 00000000000..f7d34044b09 --- /dev/null +++ b/tests/basic/global-threading.t @@ -0,0 +1,104 @@ +#!/bin/bash + +. $(dirname $0)/../include.rc +. $(dirname $0)/../volume.rc + +# Test if the given process has a number of threads of a given type between +# min and max. +function check_threads() { +    local pid="${1}" +    local pattern="${2}" +    local min="${3}" +    local max="${4-}" +    local count + +    count="$(ps hH -o comm ${pid} | grep "${pattern}" | wc -l)" +    if [[ ${min} -gt ${count} ]]; then +        return 1 +    fi +    if [[ ! -z "${max}" && ${max} -lt ${count} ]]; then +        return 1 +    fi + +    return 0 +} + +cleanup + +TEST glusterd + +# Glusterd shouldn't use any thread +TEST check_threads $(get_glusterd_pid) glfs_tpw 0 0 +TEST check_threads $(get_glusterd_pid) glfs_iotwr 0 0 + +TEST pkill -9 glusterd + +TEST glusterd --global-threading + +# Glusterd shouldn't use global threads, even if enabled +TEST check_threads $(get_glusterd_pid) glfs_tpw 0 0 +TEST check_threads $(get_glusterd_pid) glfs_iotwr 0 0 + +TEST $CLI volume create $V0 replica 2 $H0:$B0/b{0,1} + +# Normal configuration using io-threads on bricks +TEST $CLI volume set $V0 config.global-threading off +TEST $CLI volume set $V0 performance.iot-pass-through off +TEST $CLI volume set $V0 performance.client-io-threads off +TEST $CLI volume start $V0 + +# There shouldn't be global threads +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b0) glfs_tpw 0 0 +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b1) glfs_tpw 0 0 + +# There should be at least 1 io-thread +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b0) glfs_iotwr 1 +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b1) glfs_iotwr 1 + +# Self-heal should be using global threads +TEST check_threads $(get_shd_process_pid) glfs_tpw 1 +TEST check_threads $(get_shd_process_pid) glfs_iotwr 0 0 + +TEST $CLI volume stop $V0 + +# Configuration with global threads on bricks +TEST $CLI volume set $V0 config.global-threading on +TEST $CLI volume set $V0 performance.iot-pass-through on +TEST $CLI volume start $V0 + +# There should be at least 1 global thread +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b0) glfs_tpw 1 +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b1) glfs_tpw 1 + +# There shouldn't be any io-thread worker threads +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b0) glfs_iotwr 0 0 +TEST check_threads $(get_brick_pid $V0 $H0 $B0/b1) glfs_iotwr 0 0 + +# Normal configuration using io-threads on clients +TEST $CLI volume set $V0 performance.iot-pass-through off +TEST $CLI volume set $V0 performance.client-io-threads on +TEST $GFS --volfile-id=/$V0 --volfile-server=$H0 $M0 + +# There shouldn't be global threads +TEST check_threads $(get_mount_process_pid $V0 $M0) glfs_tpw 0 0 + +# There should be at least 1 io-thread +TEST check_threads $(get_mount_process_pid $V0 $M0) glfs_iotwr 1 + +EXPECT_WITHIN $UMOUNT_TIMEOUT "Y" force_umount $M0 + +# Configuration with global threads on clients +TEST $CLI volume set $V0 performance.client-io-threads off +TEST $GFS --volfile-id=/$V0 --volfile-server=$H0 --global-threading $M0 + +# There should be at least 1 global thread +TEST check_threads $(get_mount_process_pid $V0 $M0) glfs_tpw 1 + +# There shouldn't be io-threads +TEST check_threads $(get_mount_process_pid $V0 $M0) glfs_iotwr 0 0 + +# Some basic volume access checks with global-threading enabled everywhere +TEST mkdir ${M0}/dir +TEST dd if=/dev/zero of=${M0}/dir/file bs=128k count=8 + +cleanup diff --git a/tests/volume.rc b/tests/volume.rc index 261c6554d46..6fd7c3cdbcd 100644 --- a/tests/volume.rc +++ b/tests/volume.rc @@ -281,6 +281,10 @@ function quotad_up_status {          gluster volume status | grep "Quota Daemon" | awk '{print $7}'  } +function get_glusterd_pid { +        pgrep '^glusterd$' | head -1 +} +  function get_brick_pidfile {          local vol=$1          local host=$2 diff --git a/xlators/debug/io-stats/src/io-stats.c b/xlators/debug/io-stats/src/io-stats.c index f12191fb8df..101e403d39a 100644 --- a/xlators/debug/io-stats/src/io-stats.c +++ b/xlators/debug/io-stats/src/io-stats.c @@ -40,6 +40,7 @@  #include <pwd.h>  #include <grp.h>  #include <glusterfs/upcall-utils.h> +#include <glusterfs/async.h>  #define MAX_LIST_MEMBERS 100  #define DEFAULT_PWD_BUF_SZ 16384 @@ -3737,6 +3738,7 @@ reconfigure(xlator_t *this, dict_t *options)      uint32_t log_buf_size = 0;      uint32_t log_flush_timeout = 0;      int32_t old_dump_interval; +    int32_t threads;      if (!this || !this->private)          goto out; @@ -3809,6 +3811,9 @@ reconfigure(xlator_t *this, dict_t *options)                       out);      gf_log_set_log_flush_timeout(log_flush_timeout); +    GF_OPTION_RECONF("threads", threads, options, int32, out); +    gf_async_adjust_threads(threads); +      ret = 0;  out:      gf_log(this ? this->name : "io-stats", GF_LOG_DEBUG, @@ -3888,6 +3893,7 @@ init(xlator_t *this)      int ret = -1;      uint32_t log_buf_size = 0;      uint32_t log_flush_timeout = 0; +    int32_t threads;      if (!this)          return -1; @@ -3951,6 +3957,7 @@ init(xlator_t *this)          gf_log(this->name, GF_LOG_ERROR, "Out of memory.");          goto out;      } +    ret = -1;      GF_OPTION_INIT("ios-dnscache-ttl-sec", conf->ios_dnscache_ttl_sec, int32,                     out); @@ -3987,6 +3994,9 @@ init(xlator_t *this)      GF_OPTION_INIT("log-flush-timeout", log_flush_timeout, time, out);      gf_log_set_log_flush_timeout(log_flush_timeout); +    GF_OPTION_INIT("threads", threads, int32, out); +    gf_async_adjust_threads(threads); +      this->private = conf;      if (conf->ios_dump_interval > 0) {          conf->dump_thread_running = _gf_true; @@ -4430,8 +4440,37 @@ struct volume_options options[] = {       .type = GF_OPTION_TYPE_STR,       .default_value = "/no/such/path",       .description = "Unique ID for our files."}, +    {.key = {"global-threading"}, +     .type = GF_OPTION_TYPE_BOOL, +     .default_value = "off", +     .op_version = {GD_OP_VERSION_6_0}, +     .flags = OPT_FLAG_SETTABLE, +     .tags = {"io-stats", "threading"}, +     .description = "This option enables the global threading support for " +                    "bricks. If enabled, it's recommended to also enable " +                    "'performance.iot-pass-through'"}, +    {.key = {"threads"}, .type = GF_OPTION_TYPE_INT}, +    {.key = {"brick-threads"}, +     .type = GF_OPTION_TYPE_INT, +     .default_value = "16", +     .min = 0, +     .max = GF_ASYNC_MAX_THREADS, +     .op_version = {GD_OP_VERSION_6_0}, +     .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, +     .tags = {"io-stats", "threading"}, +     .description = "When global threading is used, this value determines the " +                    "maximum amount of threads that can be created on bricks"}, +    {.key = {"client-threads"}, +     .type = GF_OPTION_TYPE_INT, +     .default_value = "16", +     .min = 0, +     .max = GF_ASYNC_MAX_THREADS, +     .op_version = {GD_OP_VERSION_6_0}, +     .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, +     .tags = {"io-stats", "threading"}, +     .description = "When global threading is used, this value determines the " +                    "maximum amount of threads that can be created on clients"},      {.key = {NULL}}, -  };  xlator_api_t xlator_api = { diff --git a/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c b/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c index 4cd4cea15e4..6325f60f94a 100644 --- a/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c +++ b/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c @@ -213,6 +213,10 @@ glusterd_svc_start(glusterd_svc_t *svc, int flags, dict_t *cmdline)          runner_add_arg(&runner, daemon_log_level);      } +    if (this->ctx->cmd_args.global_threading) { +        runner_add_arg(&runner, "--global-threading"); +    } +      if (cmdline)          dict_foreach(cmdline, svc_add_args, (void *)&runner); diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c index 1aa6947fbba..85a7884b51a 100644 --- a/xlators/mgmt/glusterd/src/glusterd-utils.c +++ b/xlators/mgmt/glusterd/src/glusterd-utils.c @@ -2045,6 +2045,8 @@ glusterd_volume_start_glusterfs(glusterd_volinfo_t *volinfo,      int32_t len = 0;      glusterd_brick_proc_t *brick_proc = NULL;      char *inet_family = NULL; +    char *global_threading = NULL; +    bool threading = false;      GF_ASSERT(volinfo);      GF_ASSERT(brickinfo); @@ -2203,6 +2205,15 @@ retry:                           volinfo->volname, rdma_port);      } +    if (dict_get_strn(volinfo->dict, VKEY_CONFIG_GLOBAL_THREADING, +                      SLEN(VKEY_CONFIG_GLOBAL_THREADING), +                      &global_threading) == 0) { +        if ((gf_string2boolean(global_threading, &threading) == 0) && +            threading) { +            runner_add_arg(&runner, "--global-threading"); +        } +    } +      runner_add_arg(&runner, "--xlator-option");      runner_argprintf(&runner, "%s-server.listen-port=%d", volinfo->volname,                       port); diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c index b7c7bd9b638..448dd8669a1 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.c +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c @@ -1215,6 +1215,26 @@ loglevel_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme,  }  static int +threads_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme, +                       void *param) +{ +    char *role = param; +    struct volopt_map_entry vme2 = { +        0, +    }; + +    if ((strcmp(vme->option, "!client-threads") != 0 && +         strcmp(vme->option, "!brick-threads") != 0) || +        !strstr(vme->key, role)) +        return 0; + +    memcpy(&vme2, vme, sizeof(vme2)); +    vme2.option = "threads"; + +    return basic_option_handler(graph, &vme2, NULL); +} + +static int  server_check_changelog_off(volgen_graph_t *graph, struct volopt_map_entry *vme,                             glusterd_volinfo_t *volinfo)  { @@ -1506,6 +1526,9 @@ server_spec_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme,      if (!ret)          ret = log_localtime_logging_option_handler(graph, vme, "brick"); +    if (!ret) +        ret = threads_option_handler(graph, vme, "brick"); +      return ret;  } @@ -4085,6 +4108,14 @@ graph_set_generic_options(xlator_t *this, volgen_graph_t *graph,          gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_GRAPH_SET_OPT_FAIL,                 "Failed to change "                 "log-localtime-logging option"); + +    ret = volgen_graph_set_options_generic(graph, set_dict, "client", +                                           &threads_option_handler); + +    if (ret) +        gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_GRAPH_SET_OPT_FAIL, +               "changing %s threads failed", identifier); +      return 0;  } diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.h b/xlators/mgmt/glusterd/src/glusterd-volgen.h index f9fc068931b..37eecc04bef 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.h +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.h @@ -38,6 +38,9 @@  #define VKEY_RDA_CACHE_LIMIT "performance.rda-cache-limit"  #define VKEY_RDA_REQUEST_SIZE "performance.rda-request-size"  #define VKEY_CONFIG_GFPROXY "config.gfproxyd" +#define VKEY_CONFIG_GLOBAL_THREADING "config.global-threading" +#define VKEY_CONFIG_CLIENT_THREADS "config.client-threads" +#define VKEY_CONFIG_BRICK_THREADS "config.brick-threads"  #define AUTH_ALLOW_MAP_KEY "auth.allow"  #define AUTH_REJECT_MAP_KEY "auth.reject" diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c index b32b6ce0ec4..c8591cf0487 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c +++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c @@ -2961,4 +2961,19 @@ struct volopt_map_entry glusterd_volopt_map[] = {       .validate_fn = validate_boolean,       .description = "option to enforce mandatory lock on a file",       .flags = VOLOPT_FLAG_XLATOR_OPT}, +    {.key = VKEY_CONFIG_GLOBAL_THREADING, +     .voltype = "debug/io-stats", +     .option = "global-threading", +     .value = "off", +     .op_version = GD_OP_VERSION_6_0}, +    {.key = VKEY_CONFIG_CLIENT_THREADS, +     .voltype = "debug/io-stats", +     .option = "!client-threads", +     .value = "16", +     .op_version = GD_OP_VERSION_6_0}, +    {.key = VKEY_CONFIG_BRICK_THREADS, +     .voltype = "debug/io-stats", +     .option = "!brick-threads", +     .value = "16", +     .op_version = GD_OP_VERSION_6_0},      {.key = NULL}}; diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c index 3479d40ceeb..bd8bc114a32 100644 --- a/xlators/mount/fuse/src/fuse-bridge.c +++ b/xlators/mount/fuse/src/fuse-bridge.c @@ -16,10 +16,17 @@  #include <glusterfs/glusterfs-acl.h>  #include <glusterfs/syscall.h>  #include <glusterfs/timespec.h> +#include <glusterfs/async.h>  #ifdef __NetBSD__  #undef open /* in perfuse.h, pulled from mount-gluster-compat.h */  #endif +typedef struct _fuse_async { +    struct iobuf *iobuf; +    fuse_in_header_t *finh; +    void *msg; +    gf_async_t async; +} fuse_async_t;  static int gf_fuse_xattr_enotsup_log; @@ -5810,6 +5817,28 @@ fuse_get_mount_status(xlator_t *this)      return kid_status;  } +static void +fuse_dispatch(xlator_t *xl, gf_async_t *async) +{ +    fuse_async_t *fasync; +    fuse_private_t *priv; +    fuse_in_header_t *finh; +    struct iobuf *iobuf; + +    priv = xl->private; +    fasync = caa_container_of(async, fuse_async_t, async); +    finh = fasync->finh; +    iobuf = fasync->iobuf; + +    priv->fuse_ops[finh->opcode](xl, finh, fasync->msg, iobuf); + +    iobuf_unref(iobuf); +} + +/* We need 512 extra buffer size for BATCH_FORGET fop. By tests, it is + * found to be reduces 'REALLOC()' in the loop */ +#define FUSE_EXTRA_ALLOC 512 +  static void *  fuse_thread_proc(void *data)  { @@ -5821,24 +5850,20 @@ fuse_thread_proc(void *data)      fuse_in_header_t *finh = NULL;      struct iovec iov_in[2];      void *msg = NULL; -    /* we need 512 extra buffer size for BATCH_FORGET fop. By tests, it is -       found to be reduces 'REALLOC()' in the loop */ -    const size_t msg0_size = sizeof(*finh) + 512; -    fuse_handler_t **fuse_ops = NULL; +    size_t msg0_size = sizeof(*finh) + sizeof(struct fuse_write_in); +    fuse_async_t *fasync;      struct pollfd pfd[2] = {{          0,      }}; +    uint32_t psize;      this = data;      priv = this->private; -    fuse_ops = priv->fuse_ops;      THIS = this; -    iov_in[0].iov_len = sizeof(*finh) + sizeof(struct fuse_write_in); -    iov_in[1].iov_len = ((struct iobuf_pool *)this->ctx->iobuf_pool) -                            ->default_page_size; -    priv->msg0_len_p = &iov_in[0].iov_len; +    psize = ((struct iobuf_pool *)this->ctx->iobuf_pool)->default_page_size; +    priv->msg0_len_p = &msg0_size;      for (;;) {          /* THIS has to be reset here */ @@ -5895,14 +5920,15 @@ fuse_thread_proc(void *data)             changing this one too */          iobuf = iobuf_get(this->ctx->iobuf_pool); -        /* Add extra 128 byte to the first iov so that it can +        /* Add extra 512 byte to the first iov so that it can           * accommodate "ordinary" non-write requests. It's not           * guaranteed to be big enough, as SETXATTR and namespace           * operations with very long names may grow behind it,           * but it's good enough in most cases (and we can handle -         * rest via realloc). -         */ -        iov_in[0].iov_base = GF_CALLOC(1, msg0_size, gf_fuse_mt_iov_base); +         * rest via realloc). */ +        iov_in[0].iov_base = GF_MALLOC( +            sizeof(fuse_async_t) + msg0_size + FUSE_EXTRA_ALLOC, +            gf_fuse_mt_iov_base);          if (!iobuf || !iov_in[0].iov_base) {              gf_log(this->name, GF_LOG_ERROR, "Out of memory"); @@ -5915,6 +5941,9 @@ fuse_thread_proc(void *data)          iov_in[1].iov_base = iobuf->ptr; +        iov_in[0].iov_len = msg0_size; +        iov_in[1].iov_len = psize; +          res = sys_readv(priv->fd, iov_in, 2);          if (res == -1) { @@ -5941,7 +5970,7 @@ fuse_thread_proc(void *data)              goto cont_err;          } -        if (res < sizeof(finh)) { +        if (res < sizeof(*finh)) {              gf_log("glusterfs-fuse", GF_LOG_WARNING, "short read on /dev/fuse");              fuse_log_eh(this,                          "glusterfs-fuse: short read on " @@ -5983,8 +6012,9 @@ fuse_thread_proc(void *data)          if (finh->opcode == FUSE_WRITE)              msg = iov_in[1].iov_base;          else { -            if (res > msg0_size) { -                void *b = GF_REALLOC(iov_in[0].iov_base, res); +            if (res > msg0_size + FUSE_EXTRA_ALLOC) { +                void *b = GF_REALLOC(iov_in[0].iov_base, +                                     sizeof(fuse_async_t) + res);                  if (b) {                      iov_in[0].iov_base = b;                      finh = (fuse_in_header_t *)iov_in[0].iov_base; @@ -5996,22 +6026,29 @@ fuse_thread_proc(void *data)                  }              } -            if (res > iov_in[0].iov_len) +            if (res > iov_in[0].iov_len) {                  memcpy(iov_in[0].iov_base + iov_in[0].iov_len,                         iov_in[1].iov_base, res - iov_in[0].iov_len); +                iov_in[0].iov_len = res; +            }              msg = finh + 1;          }          if (priv->uid_map_root && finh->uid == priv->uid_map_root)              finh->uid = 0; -        if (finh->opcode >= FUSE_OP_HIGH) +        if (finh->opcode >= FUSE_OP_HIGH) {              /* turn down MacFUSE specific messages */              fuse_enosys(this, finh, msg, NULL); -        else -            fuse_ops[finh->opcode](this, finh, msg, iobuf); +            iobuf_unref(iobuf); +        } else { +            fasync = iov_in[0].iov_base + iov_in[0].iov_len; +            fasync->finh = finh; +            fasync->msg = msg; +            fasync->iobuf = iobuf; +            gf_async(&fasync->async, this, fuse_dispatch); +        } -        iobuf_unref(iobuf);          continue;      cont_err: diff --git a/xlators/mount/fuse/utils/mount.glusterfs.in b/xlators/mount/fuse/utils/mount.glusterfs.in index 3f5d76d2e93..243c9c71af4 100755 --- a/xlators/mount/fuse/utils/mount.glusterfs.in +++ b/xlators/mount/fuse/utils/mount.glusterfs.in @@ -189,6 +189,10 @@ start_glusterfs ()          cmd_line=$(echo "$cmd_line --thin-client");      fi +    if [ -n "$global_threading" ]; then +        cmd_line=$(echo "$cmd_line --global-threading"); +    fi +  #options with values start here      if [ -n "$halo_max_latency" ]; then        cmd_line=$(echo "$cmd_line --xlator-option \ @@ -629,6 +633,9 @@ without_options()           # "mount -t glusterfs" sends this, but it's useless.          "rw")              ;; +        "global-threading") +            global_threading=1 +            ;;           # TODO: not sure how to handle this yet          "async"|"sync"|"dirsync"|\          "mand"|"nomand"|\ diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index bf75015eda8..9a4c728ae02 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -1255,12 +1255,14 @@ init(xlator_t *this)          INIT_LIST_HEAD(&conf->no_client[i].reqs);      } -    ret = iot_workers_scale(conf); +    if (!this->pass_through) { +        ret = iot_workers_scale(conf); -    if (ret == -1) { -        gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED, -               "cannot initialize worker threads, exiting init"); -        goto out; +        if (ret == -1) { +            gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED, +                   "cannot initialize worker threads, exiting init"); +            goto out; +        }      }      this->private = conf;  | 
