diff options
Diffstat (limited to 'libglusterfs/src/rot-buffs.c')
-rw-r--r-- | libglusterfs/src/rot-buffs.c | 491 |
1 files changed, 491 insertions, 0 deletions
diff --git a/libglusterfs/src/rot-buffs.c b/libglusterfs/src/rot-buffs.c new file mode 100644 index 00000000000..19399b824f4 --- /dev/null +++ b/libglusterfs/src/rot-buffs.c @@ -0,0 +1,491 @@ +/* + Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <math.h> + +#include "mem-types.h" +#include "mem-pool.h" + +#include "rot-buffs.h" + +/** + * Producer-Consumer based on top of rotational buffers. + * + * This favours writers (producer) and keeps the critical section + * light weight. Buffer switch happens when a consumer wants to + * consume data. This is the slow path and waits for pending + * writes to finish. + * + * TODO: do away with opaques (use arrays with indexing). + */ + +#define ROT_BUFF_DEFAULT_COUNT 2 +#define ROT_BUFF_ALLOC_SIZE (1 * 1024 * 1024) /* 1MB per iovec */ + +#define RLIST_IOV_MELDED_ALLOC_SIZE (RBUF_IOVEC_SIZE + ROT_BUFF_ALLOC_SIZE) + +/** + * iovec list is not shrinked (deallocated) if usage/total count + * falls in this range. this is the fast path and should satisfy + * most of the workloads. for the rest shrinking iovec list is + * generous. + */ +#define RVEC_LOW_WATERMARK_COUNT 1 +#define RVEC_HIGH_WATERMARK_COUNT (1 << 4) + +static inline +rbuf_list_t *rbuf_current_buffer (rbuf_t *rbuf) +{ + return rbuf->current; +} + +static inline void +rlist_mark_waiting (rbuf_list_t *rlist) +{ + LOCK (&rlist->c_lock); + { + rlist->awaiting = _gf_true; + } + UNLOCK (&rlist->c_lock); +} + +static inline int +__rlist_has_waiter (rbuf_list_t *rlist) +{ + return (rlist->awaiting == _gf_true); +} + +static inline void * +rbuf_alloc_rvec () +{ + return GF_CALLOC (1, RLIST_IOV_MELDED_ALLOC_SIZE, gf_common_mt_rvec_t); +} + +static inline void +rlist_reset_vector_usage (rbuf_list_t *rlist) +{ + rlist->used = 1; +} + +static inline void +rlist_increment_vector_usage (rbuf_list_t *rlist) +{ + rlist->used++; +} + +static inline void +rlist_increment_total_usage (rbuf_list_t *rlist) +{ + rlist->total++; +} + +static inline int +rvec_in_watermark_range (rbuf_list_t *rlist) +{ + return ((rlist->total >= RVEC_LOW_WATERMARK_COUNT) + && (rlist->total <= RVEC_HIGH_WATERMARK_COUNT)); +} + +static inline void +rbuf_reset_rvec (rbuf_iovec_t *rvec) +{ + /* iov_base is _never_ modified */ + rvec->iov.iov_len = 0; +} + +/* TODO: alloc multiple rbuf_iovec_t */ +static inline int +rlist_add_new_vec (rbuf_list_t *rlist) +{ + rbuf_iovec_t *rvec = NULL; + + rvec = (rbuf_iovec_t *) rbuf_alloc_rvec (); + if (!rvec) + return -1; + INIT_LIST_HEAD (&rvec->list); + rvec->iov.iov_base = ((char *)rvec) + RBUF_IOVEC_SIZE; + rvec->iov.iov_len = 0; + + list_add_tail (&rvec->list, &rlist->veclist); + + rlist->rvec = rvec; /* cache the latest */ + + rlist_increment_vector_usage (rlist); + rlist_increment_total_usage (rlist); + + return 0; +} + +static inline void +rlist_free_rvec (rbuf_iovec_t *rvec) +{ + if (!rvec) + return; + list_del (&rvec->list); + GF_FREE (rvec); +} + +static inline void +rlist_purge_all_rvec (rbuf_list_t *rlist) +{ + rbuf_iovec_t *rvec = NULL; + + if (!rlist) + return; + while (!list_empty (&rlist->veclist)) { + rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); + rlist_free_rvec (rvec); + } +} + +static inline void +rlist_shrink_rvec (rbuf_list_t *rlist, unsigned long long shrink) +{ + rbuf_iovec_t *rvec = NULL; + + while (!list_empty (&rlist->veclist) && (shrink-- > 0)) { + rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); + rlist_free_rvec (rvec); + } +} + +static inline void +rbuf_purge_rlist (rbuf_t *rbuf) +{ + rbuf_list_t *rlist = NULL; + + while (!list_empty (&rbuf->freelist)) { + rlist = list_first_entry (&rbuf->freelist, rbuf_list_t, list); + list_del (&rlist->list); + + rlist_purge_all_rvec (rlist); + + LOCK_DESTROY (&rlist->c_lock); + + (void) pthread_mutex_destroy (&rlist->b_lock); + (void) pthread_cond_destroy (&rlist->b_cond); + + GF_FREE (rlist); + } +} + +rbuf_t * +rbuf_init (int bufcount) +{ + int j = 0; + int ret = 0; + rbuf_t *rbuf = NULL; + rbuf_list_t *rlist = NULL; + + if (bufcount <= 0) + bufcount = ROT_BUFF_DEFAULT_COUNT; + + rbuf = GF_CALLOC (1, sizeof (rbuf_t), gf_common_mt_rbuf_t); + if (!rbuf) + goto error_return; + + LOCK_INIT (&rbuf->lock); + INIT_LIST_HEAD (&rbuf->freelist); + + /* it could have been one big calloc() but this is just once.. */ + for (j = 0; j < bufcount; j++) { + rlist = GF_CALLOC (1, + sizeof (rbuf_list_t), gf_common_mt_rlist_t); + if (!rlist) { + ret = -1; + break; + } + + INIT_LIST_HEAD (&rlist->list); + INIT_LIST_HEAD (&rlist->veclist); + + rlist->pending = rlist->completed = 0; + + ret = rlist_add_new_vec (rlist); + if (ret) + break; + + LOCK_INIT (&rlist->c_lock); + + rlist->awaiting = _gf_false; + ret = pthread_mutex_init (&rlist->b_lock, 0); + if (ret != 0) { + GF_FREE (rlist); + break; + } + + ret = pthread_cond_init (&rlist->b_cond, 0); + if (ret != 0) { + GF_FREE (rlist); + break; + } + + list_add_tail (&rlist->list, &rbuf->freelist); + } + + if (ret != 0) + goto dealloc_rlist; + + /* cache currently used buffer: first in the list */ + rbuf->current = list_first_entry (&rbuf->freelist, rbuf_list_t, list); + return rbuf; + + dealloc_rlist: + rbuf_purge_rlist (rbuf); + LOCK_DESTROY (&rbuf->lock); + GF_FREE (rbuf); + error_return: + return NULL; +} + +void +rbuf_dtor (rbuf_t *rbuf) +{ + if (!rbuf) + return; + rbuf->current = NULL; + rbuf_purge_rlist (rbuf); + LOCK_DESTROY (&rbuf->lock); + + GF_FREE (rbuf); +} + +static inline char * +rbuf_adjust_write_area (struct iovec *iov, size_t bytes) +{ + char *wbuf = NULL; + + wbuf = iov->iov_base + iov->iov_len; + iov->iov_len += bytes; + return wbuf; +} + +static inline char * +rbuf_alloc_write_area (rbuf_list_t *rlist, size_t bytes) +{ + int ret = 0; + struct iovec *iov = NULL; + + /* check for available space in _current_ IO buffer */ + iov = &rlist->rvec->iov; + if (iov->iov_len + bytes <= ROT_BUFF_ALLOC_SIZE) + return rbuf_adjust_write_area (iov, bytes); /* fast path */ + + /* not enough bytes, try next available buffers */ + if (list_is_last (&rlist->rvec->list, &rlist->veclist)) { + /* OH! consumed all vector buffers */ + GF_ASSERT (rlist->used == rlist->total); + ret = rlist_add_new_vec (rlist); + if (ret) + goto error_return; + } else { + /* not the end, have available rbuf_iovec's */ + rlist->rvec = list_next_entry (rlist->rvec, list); + rlist->used++; + rbuf_reset_rvec (rlist->rvec); + } + + iov = &rlist->rvec->iov; + return rbuf_adjust_write_area (iov, bytes); + + error_return: + return NULL; +} + +char * +rbuf_reserve_write_area (rbuf_t *rbuf, size_t bytes, void **opaque) +{ + char *wbuf = NULL; + rbuf_list_t *rlist = NULL; + + if (!rbuf || (bytes <= 0) || (bytes > ROT_BUFF_ALLOC_SIZE) || !opaque) + return NULL; + + LOCK (&rbuf->lock); + { + rlist = rbuf_current_buffer (rbuf); + wbuf = rbuf_alloc_write_area (rlist, bytes); + if (!wbuf) + goto unblock; + rlist->pending++; + } + unblock: + UNLOCK (&rbuf->lock); + + if (wbuf) + *opaque = rlist; + return wbuf; +} + +static inline void +rbuf_notify_waiter (rbuf_list_t *rlist) +{ + pthread_mutex_lock (&rlist->b_lock); + { + pthread_cond_signal (&rlist->b_cond); + } + pthread_mutex_unlock (&rlist->b_lock); +} + +int +rbuf_write_complete (void *opaque) +{ + rbuf_list_t *rlist = NULL; + gf_boolean_t notify = _gf_false; + + if (!opaque) + return -1; + + rlist = opaque; + + LOCK (&rlist->c_lock); + { + rlist->completed++; + /** + * it's safe to test ->pending without rbuf->lock *only* if + * there's a waiter as there can be no new incoming writes. + */ + if (__rlist_has_waiter (rlist) + && (rlist->completed == rlist->pending)) + notify = _gf_true; + } + UNLOCK (&rlist->c_lock); + + if (notify) + rbuf_notify_waiter (rlist); + + return 0; +} + +int +rbuf_get_buffer (rbuf_t *rbuf, + void **opaque, sequence_fn *seqfn, void *mydata) +{ + int retval = RBUF_CONSUMABLE; + rbuf_list_t *rlist = NULL; + + if (!rbuf || !opaque) + return -1; + + LOCK (&rbuf->lock); + { + rlist = rbuf_current_buffer (rbuf); + if (!rlist->pending) { + retval = RBUF_EMPTY; + goto unblock; + } + + if (list_is_singular (&rbuf->freelist)) { + /** + * removal would lead to writer starvation, disallow + * switching. + */ + retval = RBUF_WOULD_STARVE; + goto unblock; + } + + list_del_init (&rlist->list); + if (seqfn) + seqfn (rlist, mydata); + rbuf->current = + list_first_entry (&rbuf->freelist, rbuf_list_t, list); + } + unblock: + UNLOCK (&rbuf->lock); + + if (retval == RBUF_CONSUMABLE) + *opaque = rlist; /* caller _owns_ the buffer */ + + return retval; +} + +/** + * Wait for completion of pending writers and invoke dispatcher + * routine (for buffer consumption). + */ + +static inline void +__rbuf_wait_for_writers (rbuf_list_t *rlist) +{ + while (rlist->completed != rlist->pending) + pthread_cond_wait (&rlist->b_cond, &rlist->b_lock); +} + +#ifndef M_E +#define M_E 2.7 +#endif + +static inline void +rlist_shrink_vector (rbuf_list_t *rlist) +{ + unsigned long long shrink = 0; + + /** + * fast path: don't bother to deallocate if vectors are hardly + * used. + */ + if (rvec_in_watermark_range (rlist)) + return; + + /** + * Calculate the shrink count based on total allocated vectors. + * Note that the calculation sticks to rlist->total irrespective + * of the actual usage count (rlist->used). Later, ->used could + * be used to apply slack to the calculation based on how much + * it lags from ->total. For now, let's stick to slow decay. + */ + shrink = rlist->total - (rlist->total * pow (M_E, -0.2)); + + rlist_shrink_rvec (rlist, shrink); + rlist->total -= shrink; +} + +int +rbuf_wait_for_completion (rbuf_t *rbuf, void *opaque, + void (*fn)(rbuf_list_t *, void *), void *arg) +{ + rbuf_list_t *rlist = NULL; + + if (!rbuf || !opaque) + return -1; + + rlist = opaque; + + pthread_mutex_lock (&rlist->b_lock); + { + rlist_mark_waiting (rlist); + __rbuf_wait_for_writers (rlist); + } + pthread_mutex_unlock (&rlist->b_lock); + + /** + * from here on, no need of locking until the rlist is put + * back into rotation. + */ + + fn (rlist, arg); /* invoke dispatcher */ + + rlist->awaiting = _gf_false; + rlist->pending = rlist->completed = 0; + + rlist_shrink_vector (rlist); + rlist_reset_vector_usage (rlist); + + rlist->rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); + rbuf_reset_rvec (rlist->rvec); + + LOCK (&rbuf->lock); + { + list_add_tail (&rlist->list, &rbuf->freelist); + } + UNLOCK (&rbuf->lock); + + return 0; +} |