diff options
Diffstat (limited to 'libglusterfs')
-rw-r--r-- | libglusterfs/src/Makefile.am | 6 | ||||
-rw-r--r-- | libglusterfs/src/list.h | 23 | ||||
-rw-r--r-- | libglusterfs/src/mem-types.h | 23 | ||||
-rw-r--r-- | libglusterfs/src/rot-buffs.c | 491 | ||||
-rw-r--r-- | libglusterfs/src/rot-buffs.h | 121 |
5 files changed, 650 insertions, 14 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am index 73ee69f8630..2441023ad39 100644 --- a/libglusterfs/src/Makefile.am +++ b/libglusterfs/src/Makefile.am @@ -7,7 +7,7 @@ libglusterfs_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 \ -I$(CONTRIBDIR)/libexecinfo ${ARGP_STANDALONE_CPPFLAGS} \ -DSBIN_DIR=\"$(sbindir)\" -lm -libglusterfs_la_LIBADD = @LEXLIB@ $(ZLIB_LIBS) +libglusterfs_la_LIBADD = @LEXLIB@ $(ZLIB_LIBS) $(MATH_LIB) libglusterfs_la_LDFLAGS = -version-info $(LIBGLUSTERFS_LT_VERSION) lib_LTLIBRARIES = libglusterfs.la @@ -30,7 +30,7 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \ $(CONTRIBDIR)/libgen/basename_r.c $(CONTRIBDIR)/libgen/dirname_r.c \ $(CONTRIBDIR)/stdlib/gf_mkostemp.c strfd.c parse-utils.c \ $(CONTRIBDIR)/mount/mntent.c $(CONTRIBDIR)/libexecinfo/execinfo.c\ - quota-common-utils.c + quota-common-utils.c rot-buffs.c nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c @@ -49,7 +49,7 @@ noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h timespec. template-component-messages.h strfd.h syncop-utils.h parse-utils.h \ $(CONTRIBDIR)/mount/mntent_compat.h lvm-defaults.h \ $(CONTRIBDIR)/libexecinfo/execinfo_compat.h \ - unittest/unittest.h quota-common-utils.h + unittest/unittest.h quota-common-utils.h rot-buffs.h EXTRA_DIST = graph.l graph.y diff --git a/libglusterfs/src/list.h b/libglusterfs/src/list.h index 04b4047129f..894fa3012cf 100644 --- a/libglusterfs/src/list.h +++ b/libglusterfs/src/list.h @@ -179,15 +179,36 @@ list_append_init (struct list_head *list, struct list_head *head) INIT_LIST_HEAD (list); } +static inline int +list_is_last (struct list_head *list, struct list_head *head) +{ + return (list->next == head); +} + +static inline int +list_is_singular(struct list_head *head) +{ + return !list_empty(head) && (head->next == head->prev); +} #define list_entry(ptr, type, member) \ ((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member))) +#define list_first_entry(ptr, type, member) \ + list_entry((ptr)->next, type, member) + +#define list_last_entry(ptr, type, member) \ + list_entry((ptr)->prev, type, member) + +#define list_next_entry(pos, member) \ + list_entry((pos)->member.next, typeof(*(pos)), member) + +#define list_prev_entry(pos, member) \ + list_entry((pos)->member.prev, typeof(*(pos)), member) #define list_for_each(pos, head) \ for (pos = (head)->next; pos != (head); pos = pos->next) - #define list_for_each_entry(pos, head, member) \ for (pos = list_entry((head)->next, typeof(*pos), member); \ &pos->member != (head); \ diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h index 5f4e7bcd2c2..4dd59b002a5 100644 --- a/libglusterfs/src/mem-types.h +++ b/libglusterfs/src/mem-types.h @@ -131,17 +131,20 @@ enum gf_common_mem_types_ { gf_common_mt_parser_t = 115, gf_common_quota_meta_t = 116, /*related to gfdb library*/ - gfdb_mt_time_t, - gf_mt_sql_cbk_args_t, - gf_mt_gfdb_query_record_t, - gf_mt_gfdb_link_info_t, - gf_mt_gfdb_db_operations_t, - gf_mt_sql_connection_t, - gf_mt_sql_conn_node_t, - gf_mt_db_conn_node_t, - gf_mt_db_connection_t, - gfdb_mt_db_record_t, + gfdb_mt_time_t = 117, + gf_mt_sql_cbk_args_t = 118, + gf_mt_gfdb_query_record_t = 119, + gf_mt_gfdb_link_info_t = 120, + gf_mt_gfdb_db_operations_t = 121, + gf_mt_sql_connection_t = 122, + gf_mt_sql_conn_node_t = 123, + gf_mt_db_conn_node_t = 124, + gf_mt_db_connection_t = 125, + gfdb_mt_db_record_t = 126, /*related to gfdb library*/ + gf_common_mt_rbuf_t = 127, + gf_common_mt_rlist_t = 128, + gf_common_mt_rvec_t = 129, gf_common_mt_end }; #endif diff --git a/libglusterfs/src/rot-buffs.c b/libglusterfs/src/rot-buffs.c new file mode 100644 index 00000000000..19399b824f4 --- /dev/null +++ b/libglusterfs/src/rot-buffs.c @@ -0,0 +1,491 @@ +/* + Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <math.h> + +#include "mem-types.h" +#include "mem-pool.h" + +#include "rot-buffs.h" + +/** + * Producer-Consumer based on top of rotational buffers. + * + * This favours writers (producer) and keeps the critical section + * light weight. Buffer switch happens when a consumer wants to + * consume data. This is the slow path and waits for pending + * writes to finish. + * + * TODO: do away with opaques (use arrays with indexing). + */ + +#define ROT_BUFF_DEFAULT_COUNT 2 +#define ROT_BUFF_ALLOC_SIZE (1 * 1024 * 1024) /* 1MB per iovec */ + +#define RLIST_IOV_MELDED_ALLOC_SIZE (RBUF_IOVEC_SIZE + ROT_BUFF_ALLOC_SIZE) + +/** + * iovec list is not shrinked (deallocated) if usage/total count + * falls in this range. this is the fast path and should satisfy + * most of the workloads. for the rest shrinking iovec list is + * generous. + */ +#define RVEC_LOW_WATERMARK_COUNT 1 +#define RVEC_HIGH_WATERMARK_COUNT (1 << 4) + +static inline +rbuf_list_t *rbuf_current_buffer (rbuf_t *rbuf) +{ + return rbuf->current; +} + +static inline void +rlist_mark_waiting (rbuf_list_t *rlist) +{ + LOCK (&rlist->c_lock); + { + rlist->awaiting = _gf_true; + } + UNLOCK (&rlist->c_lock); +} + +static inline int +__rlist_has_waiter (rbuf_list_t *rlist) +{ + return (rlist->awaiting == _gf_true); +} + +static inline void * +rbuf_alloc_rvec () +{ + return GF_CALLOC (1, RLIST_IOV_MELDED_ALLOC_SIZE, gf_common_mt_rvec_t); +} + +static inline void +rlist_reset_vector_usage (rbuf_list_t *rlist) +{ + rlist->used = 1; +} + +static inline void +rlist_increment_vector_usage (rbuf_list_t *rlist) +{ + rlist->used++; +} + +static inline void +rlist_increment_total_usage (rbuf_list_t *rlist) +{ + rlist->total++; +} + +static inline int +rvec_in_watermark_range (rbuf_list_t *rlist) +{ + return ((rlist->total >= RVEC_LOW_WATERMARK_COUNT) + && (rlist->total <= RVEC_HIGH_WATERMARK_COUNT)); +} + +static inline void +rbuf_reset_rvec (rbuf_iovec_t *rvec) +{ + /* iov_base is _never_ modified */ + rvec->iov.iov_len = 0; +} + +/* TODO: alloc multiple rbuf_iovec_t */ +static inline int +rlist_add_new_vec (rbuf_list_t *rlist) +{ + rbuf_iovec_t *rvec = NULL; + + rvec = (rbuf_iovec_t *) rbuf_alloc_rvec (); + if (!rvec) + return -1; + INIT_LIST_HEAD (&rvec->list); + rvec->iov.iov_base = ((char *)rvec) + RBUF_IOVEC_SIZE; + rvec->iov.iov_len = 0; + + list_add_tail (&rvec->list, &rlist->veclist); + + rlist->rvec = rvec; /* cache the latest */ + + rlist_increment_vector_usage (rlist); + rlist_increment_total_usage (rlist); + + return 0; +} + +static inline void +rlist_free_rvec (rbuf_iovec_t *rvec) +{ + if (!rvec) + return; + list_del (&rvec->list); + GF_FREE (rvec); +} + +static inline void +rlist_purge_all_rvec (rbuf_list_t *rlist) +{ + rbuf_iovec_t *rvec = NULL; + + if (!rlist) + return; + while (!list_empty (&rlist->veclist)) { + rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); + rlist_free_rvec (rvec); + } +} + +static inline void +rlist_shrink_rvec (rbuf_list_t *rlist, unsigned long long shrink) +{ + rbuf_iovec_t *rvec = NULL; + + while (!list_empty (&rlist->veclist) && (shrink-- > 0)) { + rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); + rlist_free_rvec (rvec); + } +} + +static inline void +rbuf_purge_rlist (rbuf_t *rbuf) +{ + rbuf_list_t *rlist = NULL; + + while (!list_empty (&rbuf->freelist)) { + rlist = list_first_entry (&rbuf->freelist, rbuf_list_t, list); + list_del (&rlist->list); + + rlist_purge_all_rvec (rlist); + + LOCK_DESTROY (&rlist->c_lock); + + (void) pthread_mutex_destroy (&rlist->b_lock); + (void) pthread_cond_destroy (&rlist->b_cond); + + GF_FREE (rlist); + } +} + +rbuf_t * +rbuf_init (int bufcount) +{ + int j = 0; + int ret = 0; + rbuf_t *rbuf = NULL; + rbuf_list_t *rlist = NULL; + + if (bufcount <= 0) + bufcount = ROT_BUFF_DEFAULT_COUNT; + + rbuf = GF_CALLOC (1, sizeof (rbuf_t), gf_common_mt_rbuf_t); + if (!rbuf) + goto error_return; + + LOCK_INIT (&rbuf->lock); + INIT_LIST_HEAD (&rbuf->freelist); + + /* it could have been one big calloc() but this is just once.. */ + for (j = 0; j < bufcount; j++) { + rlist = GF_CALLOC (1, + sizeof (rbuf_list_t), gf_common_mt_rlist_t); + if (!rlist) { + ret = -1; + break; + } + + INIT_LIST_HEAD (&rlist->list); + INIT_LIST_HEAD (&rlist->veclist); + + rlist->pending = rlist->completed = 0; + + ret = rlist_add_new_vec (rlist); + if (ret) + break; + + LOCK_INIT (&rlist->c_lock); + + rlist->awaiting = _gf_false; + ret = pthread_mutex_init (&rlist->b_lock, 0); + if (ret != 0) { + GF_FREE (rlist); + break; + } + + ret = pthread_cond_init (&rlist->b_cond, 0); + if (ret != 0) { + GF_FREE (rlist); + break; + } + + list_add_tail (&rlist->list, &rbuf->freelist); + } + + if (ret != 0) + goto dealloc_rlist; + + /* cache currently used buffer: first in the list */ + rbuf->current = list_first_entry (&rbuf->freelist, rbuf_list_t, list); + return rbuf; + + dealloc_rlist: + rbuf_purge_rlist (rbuf); + LOCK_DESTROY (&rbuf->lock); + GF_FREE (rbuf); + error_return: + return NULL; +} + +void +rbuf_dtor (rbuf_t *rbuf) +{ + if (!rbuf) + return; + rbuf->current = NULL; + rbuf_purge_rlist (rbuf); + LOCK_DESTROY (&rbuf->lock); + + GF_FREE (rbuf); +} + +static inline char * +rbuf_adjust_write_area (struct iovec *iov, size_t bytes) +{ + char *wbuf = NULL; + + wbuf = iov->iov_base + iov->iov_len; + iov->iov_len += bytes; + return wbuf; +} + +static inline char * +rbuf_alloc_write_area (rbuf_list_t *rlist, size_t bytes) +{ + int ret = 0; + struct iovec *iov = NULL; + + /* check for available space in _current_ IO buffer */ + iov = &rlist->rvec->iov; + if (iov->iov_len + bytes <= ROT_BUFF_ALLOC_SIZE) + return rbuf_adjust_write_area (iov, bytes); /* fast path */ + + /* not enough bytes, try next available buffers */ + if (list_is_last (&rlist->rvec->list, &rlist->veclist)) { + /* OH! consumed all vector buffers */ + GF_ASSERT (rlist->used == rlist->total); + ret = rlist_add_new_vec (rlist); + if (ret) + goto error_return; + } else { + /* not the end, have available rbuf_iovec's */ + rlist->rvec = list_next_entry (rlist->rvec, list); + rlist->used++; + rbuf_reset_rvec (rlist->rvec); + } + + iov = &rlist->rvec->iov; + return rbuf_adjust_write_area (iov, bytes); + + error_return: + return NULL; +} + +char * +rbuf_reserve_write_area (rbuf_t *rbuf, size_t bytes, void **opaque) +{ + char *wbuf = NULL; + rbuf_list_t *rlist = NULL; + + if (!rbuf || (bytes <= 0) || (bytes > ROT_BUFF_ALLOC_SIZE) || !opaque) + return NULL; + + LOCK (&rbuf->lock); + { + rlist = rbuf_current_buffer (rbuf); + wbuf = rbuf_alloc_write_area (rlist, bytes); + if (!wbuf) + goto unblock; + rlist->pending++; + } + unblock: + UNLOCK (&rbuf->lock); + + if (wbuf) + *opaque = rlist; + return wbuf; +} + +static inline void +rbuf_notify_waiter (rbuf_list_t *rlist) +{ + pthread_mutex_lock (&rlist->b_lock); + { + pthread_cond_signal (&rlist->b_cond); + } + pthread_mutex_unlock (&rlist->b_lock); +} + +int +rbuf_write_complete (void *opaque) +{ + rbuf_list_t *rlist = NULL; + gf_boolean_t notify = _gf_false; + + if (!opaque) + return -1; + + rlist = opaque; + + LOCK (&rlist->c_lock); + { + rlist->completed++; + /** + * it's safe to test ->pending without rbuf->lock *only* if + * there's a waiter as there can be no new incoming writes. + */ + if (__rlist_has_waiter (rlist) + && (rlist->completed == rlist->pending)) + notify = _gf_true; + } + UNLOCK (&rlist->c_lock); + + if (notify) + rbuf_notify_waiter (rlist); + + return 0; +} + +int +rbuf_get_buffer (rbuf_t *rbuf, + void **opaque, sequence_fn *seqfn, void *mydata) +{ + int retval = RBUF_CONSUMABLE; + rbuf_list_t *rlist = NULL; + + if (!rbuf || !opaque) + return -1; + + LOCK (&rbuf->lock); + { + rlist = rbuf_current_buffer (rbuf); + if (!rlist->pending) { + retval = RBUF_EMPTY; + goto unblock; + } + + if (list_is_singular (&rbuf->freelist)) { + /** + * removal would lead to writer starvation, disallow + * switching. + */ + retval = RBUF_WOULD_STARVE; + goto unblock; + } + + list_del_init (&rlist->list); + if (seqfn) + seqfn (rlist, mydata); + rbuf->current = + list_first_entry (&rbuf->freelist, rbuf_list_t, list); + } + unblock: + UNLOCK (&rbuf->lock); + + if (retval == RBUF_CONSUMABLE) + *opaque = rlist; /* caller _owns_ the buffer */ + + return retval; +} + +/** + * Wait for completion of pending writers and invoke dispatcher + * routine (for buffer consumption). + */ + +static inline void +__rbuf_wait_for_writers (rbuf_list_t *rlist) +{ + while (rlist->completed != rlist->pending) + pthread_cond_wait (&rlist->b_cond, &rlist->b_lock); +} + +#ifndef M_E +#define M_E 2.7 +#endif + +static inline void +rlist_shrink_vector (rbuf_list_t *rlist) +{ + unsigned long long shrink = 0; + + /** + * fast path: don't bother to deallocate if vectors are hardly + * used. + */ + if (rvec_in_watermark_range (rlist)) + return; + + /** + * Calculate the shrink count based on total allocated vectors. + * Note that the calculation sticks to rlist->total irrespective + * of the actual usage count (rlist->used). Later, ->used could + * be used to apply slack to the calculation based on how much + * it lags from ->total. For now, let's stick to slow decay. + */ + shrink = rlist->total - (rlist->total * pow (M_E, -0.2)); + + rlist_shrink_rvec (rlist, shrink); + rlist->total -= shrink; +} + +int +rbuf_wait_for_completion (rbuf_t *rbuf, void *opaque, + void (*fn)(rbuf_list_t *, void *), void *arg) +{ + rbuf_list_t *rlist = NULL; + + if (!rbuf || !opaque) + return -1; + + rlist = opaque; + + pthread_mutex_lock (&rlist->b_lock); + { + rlist_mark_waiting (rlist); + __rbuf_wait_for_writers (rlist); + } + pthread_mutex_unlock (&rlist->b_lock); + + /** + * from here on, no need of locking until the rlist is put + * back into rotation. + */ + + fn (rlist, arg); /* invoke dispatcher */ + + rlist->awaiting = _gf_false; + rlist->pending = rlist->completed = 0; + + rlist_shrink_vector (rlist); + rlist_reset_vector_usage (rlist); + + rlist->rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); + rbuf_reset_rvec (rlist->rvec); + + LOCK (&rbuf->lock); + { + list_add_tail (&rlist->list, &rbuf->freelist); + } + UNLOCK (&rbuf->lock); + + return 0; +} diff --git a/libglusterfs/src/rot-buffs.h b/libglusterfs/src/rot-buffs.h new file mode 100644 index 00000000000..aac24a4f571 --- /dev/null +++ b/libglusterfs/src/rot-buffs.h @@ -0,0 +1,121 @@ +/* + Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef __ROT_BUFFS_H +#define __ROT_BUFFS_H + +#include "list.h" +#include "locking.h" +#include "common-utils.h" + +typedef struct rbuf_iovec { + struct iovec iov; + + struct list_head list; +} rbuf_iovec_t; + +#define RBUF_IOVEC_SIZE (sizeof (rbuf_iovec_t)) + +typedef struct rbuf_list { + gf_lock_t c_lock; + + pthread_mutex_t b_lock; /* protects this structure */ + pthread_cond_t b_cond; /* signal for writer completion */ + + gf_boolean_t awaiting; + + unsigned long long pending; /* pending writers */ + unsigned long long completed; /* completed writers */ + + rbuf_iovec_t *rvec; /* currently used IO vector */ + + struct list_head veclist; /* list of attached rbuf_iov */ + + unsigned long long used; /* consumable entries + attached in ->veclist */ + unsigned long long total; /* total entries in ->veclist (used + during deallocation) */ + + unsigned long seq[2]; /* if interested, this whould store + the start sequence number and the + range */ + + struct list_head list; /* attachment to rbuf_t */ +} rbuf_list_t; + +struct rlist_iter { + struct list_head veclist; + + unsigned long long iter; +}; + +#define RLIST_ENTRY_COUNT(rlist) rlist->used + +#define rlist_iter_init(riter, rlist) \ + do { \ + (riter)->iter = rlist->used; \ + (riter)->veclist = rlist->veclist; \ + } while (0) + +#define rvec_for_each_entry(pos, riter) \ + for (pos = list_entry \ + ((riter)->veclist.next, typeof(*pos), list); \ + (riter)->iter > 0; \ + pos = list_entry \ + (pos->list.next, typeof(*pos), list), \ + --((riter)->iter)) + +/** + * Sequence number assigment routine is called during buffer + * switch under rbuff ->lock. + */ +typedef void (sequence_fn) (rbuf_list_t *, void *); + +#define RLIST_STORE_SEQ(rlist, start, range) \ + do { \ + rlist->seq[0] = start; \ + rlist->seq[1] = range; \ + } while (0) + +#define RLIST_GET_SEQ(rlist, start, range) \ + do { \ + start = rlist->seq[0]; \ + range = rlist->seq[1]; \ + } while (0) + +typedef struct rbuf { + gf_lock_t lock; /* protects "current" rlist */ + + rbuf_list_t *current; /* cached pointer to first free rlist */ + + struct list_head freelist; +} rbuf_t; + +typedef enum { + RBUF_CONSUMABLE = 1, + RBUF_BUSY, + RBUF_EMPTY, + RBUF_WOULD_STARVE, +} rlist_retval_t; + +/* Initialization/Destruction */ +rbuf_t *rbuf_init (int); +void rbuf_dtor (rbuf_t *); + +/* Producer API */ +char *rbuf_reserve_write_area (rbuf_t *, size_t, void **); +int rbuf_write_complete (void *); + +/* Consumer API */ +int rbuf_get_buffer (rbuf_t *, void **, sequence_fn *, void *); +int rbuf_wait_for_completion (rbuf_t *, void *, + void (*)(rbuf_list_t *, void *), void *); + +#endif |