summaryrefslogtreecommitdiffstats
path: root/libglusterfs
diff options
context:
space:
mode:
Diffstat (limited to 'libglusterfs')
-rw-r--r--libglusterfs/src/Makefile.am6
-rw-r--r--libglusterfs/src/list.h23
-rw-r--r--libglusterfs/src/mem-types.h23
-rw-r--r--libglusterfs/src/rot-buffs.c491
-rw-r--r--libglusterfs/src/rot-buffs.h121
5 files changed, 650 insertions, 14 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am
index 73ee69f8630..2441023ad39 100644
--- a/libglusterfs/src/Makefile.am
+++ b/libglusterfs/src/Makefile.am
@@ -7,7 +7,7 @@ libglusterfs_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 \
-I$(CONTRIBDIR)/libexecinfo ${ARGP_STANDALONE_CPPFLAGS} \
-DSBIN_DIR=\"$(sbindir)\" -lm
-libglusterfs_la_LIBADD = @LEXLIB@ $(ZLIB_LIBS)
+libglusterfs_la_LIBADD = @LEXLIB@ $(ZLIB_LIBS) $(MATH_LIB)
libglusterfs_la_LDFLAGS = -version-info $(LIBGLUSTERFS_LT_VERSION)
lib_LTLIBRARIES = libglusterfs.la
@@ -30,7 +30,7 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \
$(CONTRIBDIR)/libgen/basename_r.c $(CONTRIBDIR)/libgen/dirname_r.c \
$(CONTRIBDIR)/stdlib/gf_mkostemp.c strfd.c parse-utils.c \
$(CONTRIBDIR)/mount/mntent.c $(CONTRIBDIR)/libexecinfo/execinfo.c\
- quota-common-utils.c
+ quota-common-utils.c rot-buffs.c
nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c
@@ -49,7 +49,7 @@ noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h timespec.
template-component-messages.h strfd.h syncop-utils.h parse-utils.h \
$(CONTRIBDIR)/mount/mntent_compat.h lvm-defaults.h \
$(CONTRIBDIR)/libexecinfo/execinfo_compat.h \
- unittest/unittest.h quota-common-utils.h
+ unittest/unittest.h quota-common-utils.h rot-buffs.h
EXTRA_DIST = graph.l graph.y
diff --git a/libglusterfs/src/list.h b/libglusterfs/src/list.h
index 04b4047129f..894fa3012cf 100644
--- a/libglusterfs/src/list.h
+++ b/libglusterfs/src/list.h
@@ -179,15 +179,36 @@ list_append_init (struct list_head *list, struct list_head *head)
INIT_LIST_HEAD (list);
}
+static inline int
+list_is_last (struct list_head *list, struct list_head *head)
+{
+ return (list->next == head);
+}
+
+static inline int
+list_is_singular(struct list_head *head)
+{
+ return !list_empty(head) && (head->next == head->prev);
+}
#define list_entry(ptr, type, member) \
((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))
+#define list_first_entry(ptr, type, member) \
+ list_entry((ptr)->next, type, member)
+
+#define list_last_entry(ptr, type, member) \
+ list_entry((ptr)->prev, type, member)
+
+#define list_next_entry(pos, member) \
+ list_entry((pos)->member.next, typeof(*(pos)), member)
+
+#define list_prev_entry(pos, member) \
+ list_entry((pos)->member.prev, typeof(*(pos)), member)
#define list_for_each(pos, head) \
for (pos = (head)->next; pos != (head); pos = pos->next)
-
#define list_for_each_entry(pos, head, member) \
for (pos = list_entry((head)->next, typeof(*pos), member); \
&pos->member != (head); \
diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h
index 5f4e7bcd2c2..4dd59b002a5 100644
--- a/libglusterfs/src/mem-types.h
+++ b/libglusterfs/src/mem-types.h
@@ -131,17 +131,20 @@ enum gf_common_mem_types_ {
gf_common_mt_parser_t = 115,
gf_common_quota_meta_t = 116,
/*related to gfdb library*/
- gfdb_mt_time_t,
- gf_mt_sql_cbk_args_t,
- gf_mt_gfdb_query_record_t,
- gf_mt_gfdb_link_info_t,
- gf_mt_gfdb_db_operations_t,
- gf_mt_sql_connection_t,
- gf_mt_sql_conn_node_t,
- gf_mt_db_conn_node_t,
- gf_mt_db_connection_t,
- gfdb_mt_db_record_t,
+ gfdb_mt_time_t = 117,
+ gf_mt_sql_cbk_args_t = 118,
+ gf_mt_gfdb_query_record_t = 119,
+ gf_mt_gfdb_link_info_t = 120,
+ gf_mt_gfdb_db_operations_t = 121,
+ gf_mt_sql_connection_t = 122,
+ gf_mt_sql_conn_node_t = 123,
+ gf_mt_db_conn_node_t = 124,
+ gf_mt_db_connection_t = 125,
+ gfdb_mt_db_record_t = 126,
/*related to gfdb library*/
+ gf_common_mt_rbuf_t = 127,
+ gf_common_mt_rlist_t = 128,
+ gf_common_mt_rvec_t = 129,
gf_common_mt_end
};
#endif
diff --git a/libglusterfs/src/rot-buffs.c b/libglusterfs/src/rot-buffs.c
new file mode 100644
index 00000000000..19399b824f4
--- /dev/null
+++ b/libglusterfs/src/rot-buffs.c
@@ -0,0 +1,491 @@
+/*
+ Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <math.h>
+
+#include "mem-types.h"
+#include "mem-pool.h"
+
+#include "rot-buffs.h"
+
+/**
+ * Producer-Consumer based on top of rotational buffers.
+ *
+ * This favours writers (producer) and keeps the critical section
+ * light weight. Buffer switch happens when a consumer wants to
+ * consume data. This is the slow path and waits for pending
+ * writes to finish.
+ *
+ * TODO: do away with opaques (use arrays with indexing).
+ */
+
+#define ROT_BUFF_DEFAULT_COUNT 2
+#define ROT_BUFF_ALLOC_SIZE (1 * 1024 * 1024) /* 1MB per iovec */
+
+#define RLIST_IOV_MELDED_ALLOC_SIZE (RBUF_IOVEC_SIZE + ROT_BUFF_ALLOC_SIZE)
+
+/**
+ * iovec list is not shrinked (deallocated) if usage/total count
+ * falls in this range. this is the fast path and should satisfy
+ * most of the workloads. for the rest shrinking iovec list is
+ * generous.
+ */
+#define RVEC_LOW_WATERMARK_COUNT 1
+#define RVEC_HIGH_WATERMARK_COUNT (1 << 4)
+
+static inline
+rbuf_list_t *rbuf_current_buffer (rbuf_t *rbuf)
+{
+ return rbuf->current;
+}
+
+static inline void
+rlist_mark_waiting (rbuf_list_t *rlist)
+{
+ LOCK (&rlist->c_lock);
+ {
+ rlist->awaiting = _gf_true;
+ }
+ UNLOCK (&rlist->c_lock);
+}
+
+static inline int
+__rlist_has_waiter (rbuf_list_t *rlist)
+{
+ return (rlist->awaiting == _gf_true);
+}
+
+static inline void *
+rbuf_alloc_rvec ()
+{
+ return GF_CALLOC (1, RLIST_IOV_MELDED_ALLOC_SIZE, gf_common_mt_rvec_t);
+}
+
+static inline void
+rlist_reset_vector_usage (rbuf_list_t *rlist)
+{
+ rlist->used = 1;
+}
+
+static inline void
+rlist_increment_vector_usage (rbuf_list_t *rlist)
+{
+ rlist->used++;
+}
+
+static inline void
+rlist_increment_total_usage (rbuf_list_t *rlist)
+{
+ rlist->total++;
+}
+
+static inline int
+rvec_in_watermark_range (rbuf_list_t *rlist)
+{
+ return ((rlist->total >= RVEC_LOW_WATERMARK_COUNT)
+ && (rlist->total <= RVEC_HIGH_WATERMARK_COUNT));
+}
+
+static inline void
+rbuf_reset_rvec (rbuf_iovec_t *rvec)
+{
+ /* iov_base is _never_ modified */
+ rvec->iov.iov_len = 0;
+}
+
+/* TODO: alloc multiple rbuf_iovec_t */
+static inline int
+rlist_add_new_vec (rbuf_list_t *rlist)
+{
+ rbuf_iovec_t *rvec = NULL;
+
+ rvec = (rbuf_iovec_t *) rbuf_alloc_rvec ();
+ if (!rvec)
+ return -1;
+ INIT_LIST_HEAD (&rvec->list);
+ rvec->iov.iov_base = ((char *)rvec) + RBUF_IOVEC_SIZE;
+ rvec->iov.iov_len = 0;
+
+ list_add_tail (&rvec->list, &rlist->veclist);
+
+ rlist->rvec = rvec; /* cache the latest */
+
+ rlist_increment_vector_usage (rlist);
+ rlist_increment_total_usage (rlist);
+
+ return 0;
+}
+
+static inline void
+rlist_free_rvec (rbuf_iovec_t *rvec)
+{
+ if (!rvec)
+ return;
+ list_del (&rvec->list);
+ GF_FREE (rvec);
+}
+
+static inline void
+rlist_purge_all_rvec (rbuf_list_t *rlist)
+{
+ rbuf_iovec_t *rvec = NULL;
+
+ if (!rlist)
+ return;
+ while (!list_empty (&rlist->veclist)) {
+ rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list);
+ rlist_free_rvec (rvec);
+ }
+}
+
+static inline void
+rlist_shrink_rvec (rbuf_list_t *rlist, unsigned long long shrink)
+{
+ rbuf_iovec_t *rvec = NULL;
+
+ while (!list_empty (&rlist->veclist) && (shrink-- > 0)) {
+ rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list);
+ rlist_free_rvec (rvec);
+ }
+}
+
+static inline void
+rbuf_purge_rlist (rbuf_t *rbuf)
+{
+ rbuf_list_t *rlist = NULL;
+
+ while (!list_empty (&rbuf->freelist)) {
+ rlist = list_first_entry (&rbuf->freelist, rbuf_list_t, list);
+ list_del (&rlist->list);
+
+ rlist_purge_all_rvec (rlist);
+
+ LOCK_DESTROY (&rlist->c_lock);
+
+ (void) pthread_mutex_destroy (&rlist->b_lock);
+ (void) pthread_cond_destroy (&rlist->b_cond);
+
+ GF_FREE (rlist);
+ }
+}
+
+rbuf_t *
+rbuf_init (int bufcount)
+{
+ int j = 0;
+ int ret = 0;
+ rbuf_t *rbuf = NULL;
+ rbuf_list_t *rlist = NULL;
+
+ if (bufcount <= 0)
+ bufcount = ROT_BUFF_DEFAULT_COUNT;
+
+ rbuf = GF_CALLOC (1, sizeof (rbuf_t), gf_common_mt_rbuf_t);
+ if (!rbuf)
+ goto error_return;
+
+ LOCK_INIT (&rbuf->lock);
+ INIT_LIST_HEAD (&rbuf->freelist);
+
+ /* it could have been one big calloc() but this is just once.. */
+ for (j = 0; j < bufcount; j++) {
+ rlist = GF_CALLOC (1,
+ sizeof (rbuf_list_t), gf_common_mt_rlist_t);
+ if (!rlist) {
+ ret = -1;
+ break;
+ }
+
+ INIT_LIST_HEAD (&rlist->list);
+ INIT_LIST_HEAD (&rlist->veclist);
+
+ rlist->pending = rlist->completed = 0;
+
+ ret = rlist_add_new_vec (rlist);
+ if (ret)
+ break;
+
+ LOCK_INIT (&rlist->c_lock);
+
+ rlist->awaiting = _gf_false;
+ ret = pthread_mutex_init (&rlist->b_lock, 0);
+ if (ret != 0) {
+ GF_FREE (rlist);
+ break;
+ }
+
+ ret = pthread_cond_init (&rlist->b_cond, 0);
+ if (ret != 0) {
+ GF_FREE (rlist);
+ break;
+ }
+
+ list_add_tail (&rlist->list, &rbuf->freelist);
+ }
+
+ if (ret != 0)
+ goto dealloc_rlist;
+
+ /* cache currently used buffer: first in the list */
+ rbuf->current = list_first_entry (&rbuf->freelist, rbuf_list_t, list);
+ return rbuf;
+
+ dealloc_rlist:
+ rbuf_purge_rlist (rbuf);
+ LOCK_DESTROY (&rbuf->lock);
+ GF_FREE (rbuf);
+ error_return:
+ return NULL;
+}
+
+void
+rbuf_dtor (rbuf_t *rbuf)
+{
+ if (!rbuf)
+ return;
+ rbuf->current = NULL;
+ rbuf_purge_rlist (rbuf);
+ LOCK_DESTROY (&rbuf->lock);
+
+ GF_FREE (rbuf);
+}
+
+static inline char *
+rbuf_adjust_write_area (struct iovec *iov, size_t bytes)
+{
+ char *wbuf = NULL;
+
+ wbuf = iov->iov_base + iov->iov_len;
+ iov->iov_len += bytes;
+ return wbuf;
+}
+
+static inline char *
+rbuf_alloc_write_area (rbuf_list_t *rlist, size_t bytes)
+{
+ int ret = 0;
+ struct iovec *iov = NULL;
+
+ /* check for available space in _current_ IO buffer */
+ iov = &rlist->rvec->iov;
+ if (iov->iov_len + bytes <= ROT_BUFF_ALLOC_SIZE)
+ return rbuf_adjust_write_area (iov, bytes); /* fast path */
+
+ /* not enough bytes, try next available buffers */
+ if (list_is_last (&rlist->rvec->list, &rlist->veclist)) {
+ /* OH! consumed all vector buffers */
+ GF_ASSERT (rlist->used == rlist->total);
+ ret = rlist_add_new_vec (rlist);
+ if (ret)
+ goto error_return;
+ } else {
+ /* not the end, have available rbuf_iovec's */
+ rlist->rvec = list_next_entry (rlist->rvec, list);
+ rlist->used++;
+ rbuf_reset_rvec (rlist->rvec);
+ }
+
+ iov = &rlist->rvec->iov;
+ return rbuf_adjust_write_area (iov, bytes);
+
+ error_return:
+ return NULL;
+}
+
+char *
+rbuf_reserve_write_area (rbuf_t *rbuf, size_t bytes, void **opaque)
+{
+ char *wbuf = NULL;
+ rbuf_list_t *rlist = NULL;
+
+ if (!rbuf || (bytes <= 0) || (bytes > ROT_BUFF_ALLOC_SIZE) || !opaque)
+ return NULL;
+
+ LOCK (&rbuf->lock);
+ {
+ rlist = rbuf_current_buffer (rbuf);
+ wbuf = rbuf_alloc_write_area (rlist, bytes);
+ if (!wbuf)
+ goto unblock;
+ rlist->pending++;
+ }
+ unblock:
+ UNLOCK (&rbuf->lock);
+
+ if (wbuf)
+ *opaque = rlist;
+ return wbuf;
+}
+
+static inline void
+rbuf_notify_waiter (rbuf_list_t *rlist)
+{
+ pthread_mutex_lock (&rlist->b_lock);
+ {
+ pthread_cond_signal (&rlist->b_cond);
+ }
+ pthread_mutex_unlock (&rlist->b_lock);
+}
+
+int
+rbuf_write_complete (void *opaque)
+{
+ rbuf_list_t *rlist = NULL;
+ gf_boolean_t notify = _gf_false;
+
+ if (!opaque)
+ return -1;
+
+ rlist = opaque;
+
+ LOCK (&rlist->c_lock);
+ {
+ rlist->completed++;
+ /**
+ * it's safe to test ->pending without rbuf->lock *only* if
+ * there's a waiter as there can be no new incoming writes.
+ */
+ if (__rlist_has_waiter (rlist)
+ && (rlist->completed == rlist->pending))
+ notify = _gf_true;
+ }
+ UNLOCK (&rlist->c_lock);
+
+ if (notify)
+ rbuf_notify_waiter (rlist);
+
+ return 0;
+}
+
+int
+rbuf_get_buffer (rbuf_t *rbuf,
+ void **opaque, sequence_fn *seqfn, void *mydata)
+{
+ int retval = RBUF_CONSUMABLE;
+ rbuf_list_t *rlist = NULL;
+
+ if (!rbuf || !opaque)
+ return -1;
+
+ LOCK (&rbuf->lock);
+ {
+ rlist = rbuf_current_buffer (rbuf);
+ if (!rlist->pending) {
+ retval = RBUF_EMPTY;
+ goto unblock;
+ }
+
+ if (list_is_singular (&rbuf->freelist)) {
+ /**
+ * removal would lead to writer starvation, disallow
+ * switching.
+ */
+ retval = RBUF_WOULD_STARVE;
+ goto unblock;
+ }
+
+ list_del_init (&rlist->list);
+ if (seqfn)
+ seqfn (rlist, mydata);
+ rbuf->current =
+ list_first_entry (&rbuf->freelist, rbuf_list_t, list);
+ }
+ unblock:
+ UNLOCK (&rbuf->lock);
+
+ if (retval == RBUF_CONSUMABLE)
+ *opaque = rlist; /* caller _owns_ the buffer */
+
+ return retval;
+}
+
+/**
+ * Wait for completion of pending writers and invoke dispatcher
+ * routine (for buffer consumption).
+ */
+
+static inline void
+__rbuf_wait_for_writers (rbuf_list_t *rlist)
+{
+ while (rlist->completed != rlist->pending)
+ pthread_cond_wait (&rlist->b_cond, &rlist->b_lock);
+}
+
+#ifndef M_E
+#define M_E 2.7
+#endif
+
+static inline void
+rlist_shrink_vector (rbuf_list_t *rlist)
+{
+ unsigned long long shrink = 0;
+
+ /**
+ * fast path: don't bother to deallocate if vectors are hardly
+ * used.
+ */
+ if (rvec_in_watermark_range (rlist))
+ return;
+
+ /**
+ * Calculate the shrink count based on total allocated vectors.
+ * Note that the calculation sticks to rlist->total irrespective
+ * of the actual usage count (rlist->used). Later, ->used could
+ * be used to apply slack to the calculation based on how much
+ * it lags from ->total. For now, let's stick to slow decay.
+ */
+ shrink = rlist->total - (rlist->total * pow (M_E, -0.2));
+
+ rlist_shrink_rvec (rlist, shrink);
+ rlist->total -= shrink;
+}
+
+int
+rbuf_wait_for_completion (rbuf_t *rbuf, void *opaque,
+ void (*fn)(rbuf_list_t *, void *), void *arg)
+{
+ rbuf_list_t *rlist = NULL;
+
+ if (!rbuf || !opaque)
+ return -1;
+
+ rlist = opaque;
+
+ pthread_mutex_lock (&rlist->b_lock);
+ {
+ rlist_mark_waiting (rlist);
+ __rbuf_wait_for_writers (rlist);
+ }
+ pthread_mutex_unlock (&rlist->b_lock);
+
+ /**
+ * from here on, no need of locking until the rlist is put
+ * back into rotation.
+ */
+
+ fn (rlist, arg); /* invoke dispatcher */
+
+ rlist->awaiting = _gf_false;
+ rlist->pending = rlist->completed = 0;
+
+ rlist_shrink_vector (rlist);
+ rlist_reset_vector_usage (rlist);
+
+ rlist->rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list);
+ rbuf_reset_rvec (rlist->rvec);
+
+ LOCK (&rbuf->lock);
+ {
+ list_add_tail (&rlist->list, &rbuf->freelist);
+ }
+ UNLOCK (&rbuf->lock);
+
+ return 0;
+}
diff --git a/libglusterfs/src/rot-buffs.h b/libglusterfs/src/rot-buffs.h
new file mode 100644
index 00000000000..aac24a4f571
--- /dev/null
+++ b/libglusterfs/src/rot-buffs.h
@@ -0,0 +1,121 @@
+/*
+ Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __ROT_BUFFS_H
+#define __ROT_BUFFS_H
+
+#include "list.h"
+#include "locking.h"
+#include "common-utils.h"
+
+typedef struct rbuf_iovec {
+ struct iovec iov;
+
+ struct list_head list;
+} rbuf_iovec_t;
+
+#define RBUF_IOVEC_SIZE (sizeof (rbuf_iovec_t))
+
+typedef struct rbuf_list {
+ gf_lock_t c_lock;
+
+ pthread_mutex_t b_lock; /* protects this structure */
+ pthread_cond_t b_cond; /* signal for writer completion */
+
+ gf_boolean_t awaiting;
+
+ unsigned long long pending; /* pending writers */
+ unsigned long long completed; /* completed writers */
+
+ rbuf_iovec_t *rvec; /* currently used IO vector */
+
+ struct list_head veclist; /* list of attached rbuf_iov */
+
+ unsigned long long used; /* consumable entries
+ attached in ->veclist */
+ unsigned long long total; /* total entries in ->veclist (used
+ during deallocation) */
+
+ unsigned long seq[2]; /* if interested, this whould store
+ the start sequence number and the
+ range */
+
+ struct list_head list; /* attachment to rbuf_t */
+} rbuf_list_t;
+
+struct rlist_iter {
+ struct list_head veclist;
+
+ unsigned long long iter;
+};
+
+#define RLIST_ENTRY_COUNT(rlist) rlist->used
+
+#define rlist_iter_init(riter, rlist) \
+ do { \
+ (riter)->iter = rlist->used; \
+ (riter)->veclist = rlist->veclist; \
+ } while (0)
+
+#define rvec_for_each_entry(pos, riter) \
+ for (pos = list_entry \
+ ((riter)->veclist.next, typeof(*pos), list); \
+ (riter)->iter > 0; \
+ pos = list_entry \
+ (pos->list.next, typeof(*pos), list), \
+ --((riter)->iter))
+
+/**
+ * Sequence number assigment routine is called during buffer
+ * switch under rbuff ->lock.
+ */
+typedef void (sequence_fn) (rbuf_list_t *, void *);
+
+#define RLIST_STORE_SEQ(rlist, start, range) \
+ do { \
+ rlist->seq[0] = start; \
+ rlist->seq[1] = range; \
+ } while (0)
+
+#define RLIST_GET_SEQ(rlist, start, range) \
+ do { \
+ start = rlist->seq[0]; \
+ range = rlist->seq[1]; \
+ } while (0)
+
+typedef struct rbuf {
+ gf_lock_t lock; /* protects "current" rlist */
+
+ rbuf_list_t *current; /* cached pointer to first free rlist */
+
+ struct list_head freelist;
+} rbuf_t;
+
+typedef enum {
+ RBUF_CONSUMABLE = 1,
+ RBUF_BUSY,
+ RBUF_EMPTY,
+ RBUF_WOULD_STARVE,
+} rlist_retval_t;
+
+/* Initialization/Destruction */
+rbuf_t *rbuf_init (int);
+void rbuf_dtor (rbuf_t *);
+
+/* Producer API */
+char *rbuf_reserve_write_area (rbuf_t *, size_t, void **);
+int rbuf_write_complete (void *);
+
+/* Consumer API */
+int rbuf_get_buffer (rbuf_t *, void **, sequence_fn *, void *);
+int rbuf_wait_for_completion (rbuf_t *, void *,
+ void (*)(rbuf_list_t *, void *), void *);
+
+#endif