diff options
Diffstat (limited to 'libglusterfs/src/rot-buffs.c')
-rw-r--r-- | libglusterfs/src/rot-buffs.c | 616 |
1 files changed, 306 insertions, 310 deletions
diff --git a/libglusterfs/src/rot-buffs.c b/libglusterfs/src/rot-buffs.c index cbded2b3f46..c5147321c60 100644 --- a/libglusterfs/src/rot-buffs.c +++ b/libglusterfs/src/rot-buffs.c @@ -26,10 +26,10 @@ * TODO: do away with opaques (use arrays with indexing). */ -#define ROT_BUFF_DEFAULT_COUNT 2 -#define ROT_BUFF_ALLOC_SIZE (1 * 1024 * 1024) /* 1MB per iovec */ +#define ROT_BUFF_DEFAULT_COUNT 2 +#define ROT_BUFF_ALLOC_SIZE (1 * 1024 * 1024) /* 1MB per iovec */ -#define RLIST_IOV_MELDED_ALLOC_SIZE (RBUF_IOVEC_SIZE + ROT_BUFF_ALLOC_SIZE) +#define RLIST_IOV_MELDED_ALLOC_SIZE (RBUF_IOVEC_SIZE + ROT_BUFF_ALLOC_SIZE) /** * iovec list is not shrunk (deallocated) if usage/total count @@ -37,373 +37,369 @@ * most of the workloads. for the rest shrinking iovec list is * generous. */ -#define RVEC_LOW_WATERMARK_COUNT 1 +#define RVEC_LOW_WATERMARK_COUNT 1 #define RVEC_HIGH_WATERMARK_COUNT (1 << 4) -static inline -rbuf_list_t *rbuf_current_buffer (rbuf_t *rbuf) +static inline rbuf_list_t * +rbuf_current_buffer(rbuf_t *rbuf) { - return rbuf->current; + return rbuf->current; } static void -rlist_mark_waiting (rbuf_list_t *rlist) +rlist_mark_waiting(rbuf_list_t *rlist) { - LOCK (&rlist->c_lock); - { - rlist->awaiting = _gf_true; - } - UNLOCK (&rlist->c_lock); + LOCK(&rlist->c_lock); + { + rlist->awaiting = _gf_true; + } + UNLOCK(&rlist->c_lock); } static int -__rlist_has_waiter (rbuf_list_t *rlist) +__rlist_has_waiter(rbuf_list_t *rlist) { - return (rlist->awaiting == _gf_true); + return (rlist->awaiting == _gf_true); } static void * -rbuf_alloc_rvec () +rbuf_alloc_rvec() { - return GF_CALLOC (1, RLIST_IOV_MELDED_ALLOC_SIZE, gf_common_mt_rvec_t); + return GF_CALLOC(1, RLIST_IOV_MELDED_ALLOC_SIZE, gf_common_mt_rvec_t); } static void -rlist_reset_vector_usage (rbuf_list_t *rlist) +rlist_reset_vector_usage(rbuf_list_t *rlist) { - rlist->used = 1; + rlist->used = 1; } static void -rlist_increment_vector_usage (rbuf_list_t *rlist) +rlist_increment_vector_usage(rbuf_list_t *rlist) { - rlist->used++; + rlist->used++; } static void -rlist_increment_total_usage (rbuf_list_t *rlist) +rlist_increment_total_usage(rbuf_list_t *rlist) { - rlist->total++; + rlist->total++; } static int -rvec_in_watermark_range (rbuf_list_t *rlist) +rvec_in_watermark_range(rbuf_list_t *rlist) { - return ((rlist->total >= RVEC_LOW_WATERMARK_COUNT) - && (rlist->total <= RVEC_HIGH_WATERMARK_COUNT)); + return ((rlist->total >= RVEC_LOW_WATERMARK_COUNT) && + (rlist->total <= RVEC_HIGH_WATERMARK_COUNT)); } static void -rbuf_reset_rvec (rbuf_iovec_t *rvec) +rbuf_reset_rvec(rbuf_iovec_t *rvec) { - /* iov_base is _never_ modified */ - rvec->iov.iov_len = 0; + /* iov_base is _never_ modified */ + rvec->iov.iov_len = 0; } /* TODO: alloc multiple rbuf_iovec_t */ static int -rlist_add_new_vec (rbuf_list_t *rlist) +rlist_add_new_vec(rbuf_list_t *rlist) { - rbuf_iovec_t *rvec = NULL; + rbuf_iovec_t *rvec = NULL; - rvec = (rbuf_iovec_t *) rbuf_alloc_rvec (); - if (!rvec) - return -1; - INIT_LIST_HEAD (&rvec->list); - rvec->iov.iov_base = ((char *)rvec) + RBUF_IOVEC_SIZE; - rvec->iov.iov_len = 0; + rvec = (rbuf_iovec_t *)rbuf_alloc_rvec(); + if (!rvec) + return -1; + INIT_LIST_HEAD(&rvec->list); + rvec->iov.iov_base = ((char *)rvec) + RBUF_IOVEC_SIZE; + rvec->iov.iov_len = 0; - list_add_tail (&rvec->list, &rlist->veclist); + list_add_tail(&rvec->list, &rlist->veclist); - rlist->rvec = rvec; /* cache the latest */ + rlist->rvec = rvec; /* cache the latest */ - rlist_increment_vector_usage (rlist); - rlist_increment_total_usage (rlist); + rlist_increment_vector_usage(rlist); + rlist_increment_total_usage(rlist); - return 0; + return 0; } static void -rlist_free_rvec (rbuf_iovec_t *rvec) +rlist_free_rvec(rbuf_iovec_t *rvec) { - if (!rvec) - return; - list_del (&rvec->list); - GF_FREE (rvec); + if (!rvec) + return; + list_del(&rvec->list); + GF_FREE(rvec); } static void -rlist_purge_all_rvec (rbuf_list_t *rlist) +rlist_purge_all_rvec(rbuf_list_t *rlist) { - rbuf_iovec_t *rvec = NULL; - - if (!rlist) - return; - while (!list_empty (&rlist->veclist)) { - rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); - rlist_free_rvec (rvec); - } + rbuf_iovec_t *rvec = NULL; + + if (!rlist) + return; + while (!list_empty(&rlist->veclist)) { + rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list); + rlist_free_rvec(rvec); + } } static void -rlist_shrink_rvec (rbuf_list_t *rlist, unsigned long long shrink) +rlist_shrink_rvec(rbuf_list_t *rlist, unsigned long long shrink) { - rbuf_iovec_t *rvec = NULL; + rbuf_iovec_t *rvec = NULL; - while (!list_empty (&rlist->veclist) && (shrink-- > 0)) { - rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); - rlist_free_rvec (rvec); - } + while (!list_empty(&rlist->veclist) && (shrink-- > 0)) { + rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list); + rlist_free_rvec(rvec); + } } static void -rbuf_purge_rlist (rbuf_t *rbuf) +rbuf_purge_rlist(rbuf_t *rbuf) { - rbuf_list_t *rlist = NULL; + rbuf_list_t *rlist = NULL; - while (!list_empty (&rbuf->freelist)) { - rlist = list_first_entry (&rbuf->freelist, rbuf_list_t, list); - list_del (&rlist->list); + while (!list_empty(&rbuf->freelist)) { + rlist = list_first_entry(&rbuf->freelist, rbuf_list_t, list); + list_del(&rlist->list); - rlist_purge_all_rvec (rlist); + rlist_purge_all_rvec(rlist); - LOCK_DESTROY (&rlist->c_lock); + LOCK_DESTROY(&rlist->c_lock); - (void) pthread_mutex_destroy (&rlist->b_lock); - (void) pthread_cond_destroy (&rlist->b_cond); + (void)pthread_mutex_destroy(&rlist->b_lock); + (void)pthread_cond_destroy(&rlist->b_cond); - GF_FREE (rlist); - } + GF_FREE(rlist); + } } rbuf_t * -rbuf_init (int bufcount) +rbuf_init(int bufcount) { - int j = 0; - int ret = 0; - rbuf_t *rbuf = NULL; - rbuf_list_t *rlist = NULL; - - if (bufcount <= 0) - bufcount = ROT_BUFF_DEFAULT_COUNT; - - rbuf = GF_CALLOC (1, sizeof (rbuf_t), gf_common_mt_rbuf_t); - if (!rbuf) - goto error_return; - - LOCK_INIT (&rbuf->lock); - INIT_LIST_HEAD (&rbuf->freelist); - - /* it could have been one big calloc() but this is just once.. */ - for (j = 0; j < bufcount; j++) { - rlist = GF_CALLOC (1, - sizeof (rbuf_list_t), gf_common_mt_rlist_t); - if (!rlist) { - ret = -1; - break; - } - - INIT_LIST_HEAD (&rlist->list); - INIT_LIST_HEAD (&rlist->veclist); - - rlist->pending = rlist->completed = 0; - - ret = rlist_add_new_vec (rlist); - if (ret) - break; - - LOCK_INIT (&rlist->c_lock); - - rlist->awaiting = _gf_false; - ret = pthread_mutex_init (&rlist->b_lock, 0); - if (ret != 0) { - GF_FREE (rlist); - break; - } - - ret = pthread_cond_init (&rlist->b_cond, 0); - if (ret != 0) { - GF_FREE (rlist); - break; - } - - list_add_tail (&rlist->list, &rbuf->freelist); + int j = 0; + int ret = 0; + rbuf_t *rbuf = NULL; + rbuf_list_t *rlist = NULL; + + if (bufcount <= 0) + bufcount = ROT_BUFF_DEFAULT_COUNT; + + rbuf = GF_CALLOC(1, sizeof(rbuf_t), gf_common_mt_rbuf_t); + if (!rbuf) + goto error_return; + + LOCK_INIT(&rbuf->lock); + INIT_LIST_HEAD(&rbuf->freelist); + + /* it could have been one big calloc() but this is just once.. */ + for (j = 0; j < bufcount; j++) { + rlist = GF_CALLOC(1, sizeof(rbuf_list_t), gf_common_mt_rlist_t); + if (!rlist) { + ret = -1; + break; } - if (ret != 0) - goto dealloc_rlist; + INIT_LIST_HEAD(&rlist->list); + INIT_LIST_HEAD(&rlist->veclist); - /* cache currently used buffer: first in the list */ - rbuf->current = list_first_entry (&rbuf->freelist, rbuf_list_t, list); - return rbuf; + rlist->pending = rlist->completed = 0; - dealloc_rlist: - rbuf_purge_rlist (rbuf); - LOCK_DESTROY (&rbuf->lock); - GF_FREE (rbuf); - error_return: - return NULL; + ret = rlist_add_new_vec(rlist); + if (ret) + break; + + LOCK_INIT(&rlist->c_lock); + + rlist->awaiting = _gf_false; + ret = pthread_mutex_init(&rlist->b_lock, 0); + if (ret != 0) { + GF_FREE(rlist); + break; + } + + ret = pthread_cond_init(&rlist->b_cond, 0); + if (ret != 0) { + GF_FREE(rlist); + break; + } + + list_add_tail(&rlist->list, &rbuf->freelist); + } + + if (ret != 0) + goto dealloc_rlist; + + /* cache currently used buffer: first in the list */ + rbuf->current = list_first_entry(&rbuf->freelist, rbuf_list_t, list); + return rbuf; + +dealloc_rlist: + rbuf_purge_rlist(rbuf); + LOCK_DESTROY(&rbuf->lock); + GF_FREE(rbuf); +error_return: + return NULL; } void -rbuf_dtor (rbuf_t *rbuf) +rbuf_dtor(rbuf_t *rbuf) { - if (!rbuf) - return; - rbuf->current = NULL; - rbuf_purge_rlist (rbuf); - LOCK_DESTROY (&rbuf->lock); + if (!rbuf) + return; + rbuf->current = NULL; + rbuf_purge_rlist(rbuf); + LOCK_DESTROY(&rbuf->lock); - GF_FREE (rbuf); + GF_FREE(rbuf); } static char * -rbuf_adjust_write_area (struct iovec *iov, size_t bytes) +rbuf_adjust_write_area(struct iovec *iov, size_t bytes) { - char *wbuf = NULL; + char *wbuf = NULL; - wbuf = iov->iov_base + iov->iov_len; - iov->iov_len += bytes; - return wbuf; + wbuf = iov->iov_base + iov->iov_len; + iov->iov_len += bytes; + return wbuf; } static char * -rbuf_alloc_write_area (rbuf_list_t *rlist, size_t bytes) +rbuf_alloc_write_area(rbuf_list_t *rlist, size_t bytes) { - int ret = 0; - struct iovec *iov = NULL; - - /* check for available space in _current_ IO buffer */ - iov = &rlist->rvec->iov; - if (iov->iov_len + bytes <= ROT_BUFF_ALLOC_SIZE) - return rbuf_adjust_write_area (iov, bytes); /* fast path */ - - /* not enough bytes, try next available buffers */ - if (list_is_last (&rlist->rvec->list, &rlist->veclist)) { - /* OH! consumed all vector buffers */ - GF_ASSERT (rlist->used == rlist->total); - ret = rlist_add_new_vec (rlist); - if (ret) - goto error_return; - } else { - /* not the end, have available rbuf_iovec's */ - rlist->rvec = list_next_entry (rlist->rvec, list); - rlist->used++; - rbuf_reset_rvec (rlist->rvec); - } + int ret = 0; + struct iovec *iov = NULL; + + /* check for available space in _current_ IO buffer */ + iov = &rlist->rvec->iov; + if (iov->iov_len + bytes <= ROT_BUFF_ALLOC_SIZE) + return rbuf_adjust_write_area(iov, bytes); /* fast path */ + + /* not enough bytes, try next available buffers */ + if (list_is_last(&rlist->rvec->list, &rlist->veclist)) { + /* OH! consumed all vector buffers */ + GF_ASSERT(rlist->used == rlist->total); + ret = rlist_add_new_vec(rlist); + if (ret) + goto error_return; + } else { + /* not the end, have available rbuf_iovec's */ + rlist->rvec = list_next_entry(rlist->rvec, list); + rlist->used++; + rbuf_reset_rvec(rlist->rvec); + } - iov = &rlist->rvec->iov; - return rbuf_adjust_write_area (iov, bytes); + iov = &rlist->rvec->iov; + return rbuf_adjust_write_area(iov, bytes); - error_return: - return NULL; +error_return: + return NULL; } char * -rbuf_reserve_write_area (rbuf_t *rbuf, size_t bytes, void **opaque) +rbuf_reserve_write_area(rbuf_t *rbuf, size_t bytes, void **opaque) { - char *wbuf = NULL; - rbuf_list_t *rlist = NULL; - - if (!rbuf || (bytes <= 0) || (bytes > ROT_BUFF_ALLOC_SIZE) || !opaque) - return NULL; - - LOCK (&rbuf->lock); - { - rlist = rbuf_current_buffer (rbuf); - wbuf = rbuf_alloc_write_area (rlist, bytes); - if (!wbuf) - goto unblock; - rlist->pending++; - } - unblock: - UNLOCK (&rbuf->lock); + char *wbuf = NULL; + rbuf_list_t *rlist = NULL; + + if (!rbuf || (bytes <= 0) || (bytes > ROT_BUFF_ALLOC_SIZE) || !opaque) + return NULL; - if (wbuf) - *opaque = rlist; - return wbuf; + LOCK(&rbuf->lock); + { + rlist = rbuf_current_buffer(rbuf); + wbuf = rbuf_alloc_write_area(rlist, bytes); + if (!wbuf) + goto unblock; + rlist->pending++; + } +unblock: + UNLOCK(&rbuf->lock); + + if (wbuf) + *opaque = rlist; + return wbuf; } static void -rbuf_notify_waiter (rbuf_list_t *rlist) +rbuf_notify_waiter(rbuf_list_t *rlist) { - pthread_mutex_lock (&rlist->b_lock); - { - pthread_cond_signal (&rlist->b_cond); - } - pthread_mutex_unlock (&rlist->b_lock); + pthread_mutex_lock(&rlist->b_lock); + { + pthread_cond_signal(&rlist->b_cond); + } + pthread_mutex_unlock(&rlist->b_lock); } int -rbuf_write_complete (void *opaque) +rbuf_write_complete(void *opaque) { - rbuf_list_t *rlist = NULL; - gf_boolean_t notify = _gf_false; - - if (!opaque) - return -1; - - rlist = opaque; - - LOCK (&rlist->c_lock); - { - rlist->completed++; - /** - * it's safe to test ->pending without rbuf->lock *only* if - * there's a waiter as there can be no new incoming writes. - */ - if (__rlist_has_waiter (rlist) - && (rlist->completed == rlist->pending)) - notify = _gf_true; - } - UNLOCK (&rlist->c_lock); + rbuf_list_t *rlist = NULL; + gf_boolean_t notify = _gf_false; + + if (!opaque) + return -1; + + rlist = opaque; + + LOCK(&rlist->c_lock); + { + rlist->completed++; + /** + * it's safe to test ->pending without rbuf->lock *only* if + * there's a waiter as there can be no new incoming writes. + */ + if (__rlist_has_waiter(rlist) && (rlist->completed == rlist->pending)) + notify = _gf_true; + } + UNLOCK(&rlist->c_lock); - if (notify) - rbuf_notify_waiter (rlist); + if (notify) + rbuf_notify_waiter(rlist); - return 0; + return 0; } int -rbuf_get_buffer (rbuf_t *rbuf, - void **opaque, sequence_fn *seqfn, void *mydata) +rbuf_get_buffer(rbuf_t *rbuf, void **opaque, sequence_fn *seqfn, void *mydata) { - int retval = RBUF_CONSUMABLE; - rbuf_list_t *rlist = NULL; - - if (!rbuf || !opaque) - return -1; - - LOCK (&rbuf->lock); - { - rlist = rbuf_current_buffer (rbuf); - if (!rlist->pending) { - retval = RBUF_EMPTY; - goto unblock; - } - - if (list_is_singular (&rbuf->freelist)) { - /** - * removal would lead to writer starvation, disallow - * switching. - */ - retval = RBUF_WOULD_STARVE; - goto unblock; - } - - list_del_init (&rlist->list); - if (seqfn) - seqfn (rlist, mydata); - rbuf->current = - list_first_entry (&rbuf->freelist, rbuf_list_t, list); + int retval = RBUF_CONSUMABLE; + rbuf_list_t *rlist = NULL; + + if (!rbuf || !opaque) + return -1; + + LOCK(&rbuf->lock); + { + rlist = rbuf_current_buffer(rbuf); + if (!rlist->pending) { + retval = RBUF_EMPTY; + goto unblock; + } + + if (list_is_singular(&rbuf->freelist)) { + /** + * removal would lead to writer starvation, disallow + * switching. + */ + retval = RBUF_WOULD_STARVE; + goto unblock; } - unblock: - UNLOCK (&rbuf->lock); - if (retval == RBUF_CONSUMABLE) - *opaque = rlist; /* caller _owns_ the buffer */ + list_del_init(&rlist->list); + if (seqfn) + seqfn(rlist, mydata); + rbuf->current = list_first_entry(&rbuf->freelist, rbuf_list_t, list); + } +unblock: + UNLOCK(&rbuf->lock); - return retval; + if (retval == RBUF_CONSUMABLE) + *opaque = rlist; /* caller _owns_ the buffer */ + + return retval; } /** @@ -412,10 +408,10 @@ rbuf_get_buffer (rbuf_t *rbuf, */ static void -__rbuf_wait_for_writers (rbuf_list_t *rlist) +__rbuf_wait_for_writers(rbuf_list_t *rlist) { - while (rlist->completed != rlist->pending) - pthread_cond_wait (&rlist->b_cond, &rlist->b_lock); + while (rlist->completed != rlist->pending) + pthread_cond_wait(&rlist->b_cond, &rlist->b_lock); } #ifndef M_E @@ -423,69 +419,69 @@ __rbuf_wait_for_writers (rbuf_list_t *rlist) #endif static void -rlist_shrink_vector (rbuf_list_t *rlist) +rlist_shrink_vector(rbuf_list_t *rlist) { - unsigned long long shrink = 0; - - /** - * fast path: don't bother to deallocate if vectors are hardly - * used. - */ - if (rvec_in_watermark_range (rlist)) - return; - - /** - * Calculate the shrink count based on total allocated vectors. - * Note that the calculation sticks to rlist->total irrespective - * of the actual usage count (rlist->used). Later, ->used could - * be used to apply slack to the calculation based on how much - * it lags from ->total. For now, let's stick to slow decay. - */ - shrink = rlist->total - (rlist->total * pow (M_E, -0.2)); - - rlist_shrink_rvec (rlist, shrink); - rlist->total -= shrink; + unsigned long long shrink = 0; + + /** + * fast path: don't bother to deallocate if vectors are hardly + * used. + */ + if (rvec_in_watermark_range(rlist)) + return; + + /** + * Calculate the shrink count based on total allocated vectors. + * Note that the calculation sticks to rlist->total irrespective + * of the actual usage count (rlist->used). Later, ->used could + * be used to apply slack to the calculation based on how much + * it lags from ->total. For now, let's stick to slow decay. + */ + shrink = rlist->total - (rlist->total * pow(M_E, -0.2)); + + rlist_shrink_rvec(rlist, shrink); + rlist->total -= shrink; } int -rbuf_wait_for_completion (rbuf_t *rbuf, void *opaque, - void (*fn)(rbuf_list_t *, void *), void *arg) +rbuf_wait_for_completion(rbuf_t *rbuf, void *opaque, + void (*fn)(rbuf_list_t *, void *), void *arg) { - rbuf_list_t *rlist = NULL; + rbuf_list_t *rlist = NULL; - if (!rbuf || !opaque) - return -1; + if (!rbuf || !opaque) + return -1; - rlist = opaque; + rlist = opaque; - pthread_mutex_lock (&rlist->b_lock); - { - rlist_mark_waiting (rlist); - __rbuf_wait_for_writers (rlist); - } - pthread_mutex_unlock (&rlist->b_lock); + pthread_mutex_lock(&rlist->b_lock); + { + rlist_mark_waiting(rlist); + __rbuf_wait_for_writers(rlist); + } + pthread_mutex_unlock(&rlist->b_lock); - /** - * from here on, no need of locking until the rlist is put - * back into rotation. - */ + /** + * from here on, no need of locking until the rlist is put + * back into rotation. + */ - fn (rlist, arg); /* invoke dispatcher */ + fn(rlist, arg); /* invoke dispatcher */ - rlist->awaiting = _gf_false; - rlist->pending = rlist->completed = 0; + rlist->awaiting = _gf_false; + rlist->pending = rlist->completed = 0; - rlist_shrink_vector (rlist); - rlist_reset_vector_usage (rlist); + rlist_shrink_vector(rlist); + rlist_reset_vector_usage(rlist); - rlist->rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); - rbuf_reset_rvec (rlist->rvec); + rlist->rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list); + rbuf_reset_rvec(rlist->rvec); - LOCK (&rbuf->lock); - { - list_add_tail (&rlist->list, &rbuf->freelist); - } - UNLOCK (&rbuf->lock); + LOCK(&rbuf->lock); + { + list_add_tail(&rlist->list, &rbuf->freelist); + } + UNLOCK(&rbuf->lock); - return 0; + return 0; } |