diff options
author | Venky Shankar <vshankar@redhat.com> | 2015-02-03 19:03:30 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2015-03-18 21:22:33 -0700 |
commit | 87a9d23627586a5d5cd8a502a08ecbdb6f0f7bb2 (patch) | |
tree | 7e627c0810e151177364f66a9442529d1e95826f /libglusterfs/src | |
parent | d236b01a8bf68b83c76ea1cfa8351833e19695f7 (diff) |
libglusterfs/rot-buffs: rotational buffers
This patch introduces rotational buffers aiming at the classic
multiple producer and multiple consumer problem. A fixed set
of buffer list is allocated during initialization, where each
list consist of a list of buffers. Each buffer is an iovec
pointing to a memory region of fixed allocation size. Multiple
producers write data to these buffers. A buffer list starts with
a single buffer (iovec) and allocates more when required (although
this can be preallocatd in multiples of k).
rot-buffs allow multiple producers to write data parallely with
a bit of extra cost of taking locks. Therefore, it's much suited
for large writes. Multiple producers are allowed to write in the
buffer parallely by "reserving" write space for selected number
of bytes and returning pointer to the start of the reserved area.
The write size is selected by the producer before it starts the
write (which is often known). Therefore, the write itself need not
be serialized -- just the space reservation needs to be done safely.
The other part is when a consumer kicks in to consume what has
been produced. At this point, a buffer list switch is performed.
The "current" buffer list pointer is safely pointed to the next
available buffer list. New writes are now directed to the just
switched buffer list (the old buffer list is now considered out
of rotation). Note that the old buffer still may have producers
in progress (pending writes), so the consumer has to wait till
the writers are drained. Currently this is the slow path for
producers (write completion) and needs to be improved.
Currently, there is special handling for cases where the number
of consumers match (or exceed) the number of producers, which
could result in writer starvation. In this scenario, when a
consumers requests a buffer list for consumption, a check is
performed for writer starvation and consumption is denied
until at least another buffer list is ready of the producer
for writes, i.e., one (or more) consumer(s) completed, thereby
putting the buffer list back in rotation.
[
NOTE:
I've not performance tested this producer-consumer model
yet. It's being used in changelog for event notification.
The list of buffers (iovecs) are directly passed to RPC
layer.
]
Change-Id: I88d235522b05ab82509aba861374a2312bff57f2
BUG: 1170075
Signed-off-by: Venky Shankar <vshankar@redhat.com>
Reviewed-on: http://review.gluster.org/9706
Tested-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'libglusterfs/src')
-rw-r--r-- | libglusterfs/src/Makefile.am | 6 | ||||
-rw-r--r-- | libglusterfs/src/list.h | 23 | ||||
-rw-r--r-- | libglusterfs/src/mem-types.h | 23 | ||||
-rw-r--r-- | libglusterfs/src/rot-buffs.c | 491 | ||||
-rw-r--r-- | libglusterfs/src/rot-buffs.h | 121 |
5 files changed, 650 insertions, 14 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am index 73ee69f8630..2441023ad39 100644 --- a/libglusterfs/src/Makefile.am +++ b/libglusterfs/src/Makefile.am @@ -7,7 +7,7 @@ libglusterfs_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 \ -I$(CONTRIBDIR)/libexecinfo ${ARGP_STANDALONE_CPPFLAGS} \ -DSBIN_DIR=\"$(sbindir)\" -lm -libglusterfs_la_LIBADD = @LEXLIB@ $(ZLIB_LIBS) +libglusterfs_la_LIBADD = @LEXLIB@ $(ZLIB_LIBS) $(MATH_LIB) libglusterfs_la_LDFLAGS = -version-info $(LIBGLUSTERFS_LT_VERSION) lib_LTLIBRARIES = libglusterfs.la @@ -30,7 +30,7 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \ $(CONTRIBDIR)/libgen/basename_r.c $(CONTRIBDIR)/libgen/dirname_r.c \ $(CONTRIBDIR)/stdlib/gf_mkostemp.c strfd.c parse-utils.c \ $(CONTRIBDIR)/mount/mntent.c $(CONTRIBDIR)/libexecinfo/execinfo.c\ - quota-common-utils.c + quota-common-utils.c rot-buffs.c nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c @@ -49,7 +49,7 @@ noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h timespec. template-component-messages.h strfd.h syncop-utils.h parse-utils.h \ $(CONTRIBDIR)/mount/mntent_compat.h lvm-defaults.h \ $(CONTRIBDIR)/libexecinfo/execinfo_compat.h \ - unittest/unittest.h quota-common-utils.h + unittest/unittest.h quota-common-utils.h rot-buffs.h EXTRA_DIST = graph.l graph.y diff --git a/libglusterfs/src/list.h b/libglusterfs/src/list.h index 04b4047129f..894fa3012cf 100644 --- a/libglusterfs/src/list.h +++ b/libglusterfs/src/list.h @@ -179,15 +179,36 @@ list_append_init (struct list_head *list, struct list_head *head) INIT_LIST_HEAD (list); } +static inline int +list_is_last (struct list_head *list, struct list_head *head) +{ + return (list->next == head); +} + +static inline int +list_is_singular(struct list_head *head) +{ + return !list_empty(head) && (head->next == head->prev); +} #define list_entry(ptr, type, member) \ ((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member))) +#define list_first_entry(ptr, type, member) \ + list_entry((ptr)->next, type, member) + +#define list_last_entry(ptr, type, member) \ + list_entry((ptr)->prev, type, member) + +#define list_next_entry(pos, member) \ + list_entry((pos)->member.next, typeof(*(pos)), member) + +#define list_prev_entry(pos, member) \ + list_entry((pos)->member.prev, typeof(*(pos)), member) #define list_for_each(pos, head) \ for (pos = (head)->next; pos != (head); pos = pos->next) - #define list_for_each_entry(pos, head, member) \ for (pos = list_entry((head)->next, typeof(*pos), member); \ &pos->member != (head); \ diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h index 5f4e7bcd2c2..4dd59b002a5 100644 --- a/libglusterfs/src/mem-types.h +++ b/libglusterfs/src/mem-types.h @@ -131,17 +131,20 @@ enum gf_common_mem_types_ { gf_common_mt_parser_t = 115, gf_common_quota_meta_t = 116, /*related to gfdb library*/ - gfdb_mt_time_t, - gf_mt_sql_cbk_args_t, - gf_mt_gfdb_query_record_t, - gf_mt_gfdb_link_info_t, - gf_mt_gfdb_db_operations_t, - gf_mt_sql_connection_t, - gf_mt_sql_conn_node_t, - gf_mt_db_conn_node_t, - gf_mt_db_connection_t, - gfdb_mt_db_record_t, + gfdb_mt_time_t = 117, + gf_mt_sql_cbk_args_t = 118, + gf_mt_gfdb_query_record_t = 119, + gf_mt_gfdb_link_info_t = 120, + gf_mt_gfdb_db_operations_t = 121, + gf_mt_sql_connection_t = 122, + gf_mt_sql_conn_node_t = 123, + gf_mt_db_conn_node_t = 124, + gf_mt_db_connection_t = 125, + gfdb_mt_db_record_t = 126, /*related to gfdb library*/ + gf_common_mt_rbuf_t = 127, + gf_common_mt_rlist_t = 128, + gf_common_mt_rvec_t = 129, gf_common_mt_end }; #endif diff --git a/libglusterfs/src/rot-buffs.c b/libglusterfs/src/rot-buffs.c new file mode 100644 index 00000000000..19399b824f4 --- /dev/null +++ b/libglusterfs/src/rot-buffs.c @@ -0,0 +1,491 @@ +/* + Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <math.h> + +#include "mem-types.h" +#include "mem-pool.h" + +#include "rot-buffs.h" + +/** + * Producer-Consumer based on top of rotational buffers. + * + * This favours writers (producer) and keeps the critical section + * light weight. Buffer switch happens when a consumer wants to + * consume data. This is the slow path and waits for pending + * writes to finish. + * + * TODO: do away with opaques (use arrays with indexing). + */ + +#define ROT_BUFF_DEFAULT_COUNT 2 +#define ROT_BUFF_ALLOC_SIZE (1 * 1024 * 1024) /* 1MB per iovec */ + +#define RLIST_IOV_MELDED_ALLOC_SIZE (RBUF_IOVEC_SIZE + ROT_BUFF_ALLOC_SIZE) + +/** + * iovec list is not shrinked (deallocated) if usage/total count + * falls in this range. this is the fast path and should satisfy + * most of the workloads. for the rest shrinking iovec list is + * generous. + */ +#define RVEC_LOW_WATERMARK_COUNT 1 +#define RVEC_HIGH_WATERMARK_COUNT (1 << 4) + +static inline +rbuf_list_t *rbuf_current_buffer (rbuf_t *rbuf) +{ + return rbuf->current; +} + +static inline void +rlist_mark_waiting (rbuf_list_t *rlist) +{ + LOCK (&rlist->c_lock); + { + rlist->awaiting = _gf_true; + } + UNLOCK (&rlist->c_lock); +} + +static inline int +__rlist_has_waiter (rbuf_list_t *rlist) +{ + return (rlist->awaiting == _gf_true); +} + +static inline void * +rbuf_alloc_rvec () +{ + return GF_CALLOC (1, RLIST_IOV_MELDED_ALLOC_SIZE, gf_common_mt_rvec_t); +} + +static inline void +rlist_reset_vector_usage (rbuf_list_t *rlist) +{ + rlist->used = 1; +} + +static inline void +rlist_increment_vector_usage (rbuf_list_t *rlist) +{ + rlist->used++; +} + +static inline void +rlist_increment_total_usage (rbuf_list_t *rlist) +{ + rlist->total++; +} + +static inline int +rvec_in_watermark_range (rbuf_list_t *rlist) +{ + return ((rlist->total >= RVEC_LOW_WATERMARK_COUNT) + && (rlist->total <= RVEC_HIGH_WATERMARK_COUNT)); +} + +static inline void +rbuf_reset_rvec (rbuf_iovec_t *rvec) +{ + /* iov_base is _never_ modified */ + rvec->iov.iov_len = 0; +} + +/* TODO: alloc multiple rbuf_iovec_t */ +static inline int +rlist_add_new_vec (rbuf_list_t *rlist) +{ + rbuf_iovec_t *rvec = NULL; + + rvec = (rbuf_iovec_t *) rbuf_alloc_rvec (); + if (!rvec) + return -1; + INIT_LIST_HEAD (&rvec->list); + rvec->iov.iov_base = ((char *)rvec) + RBUF_IOVEC_SIZE; + rvec->iov.iov_len = 0; + + list_add_tail (&rvec->list, &rlist->veclist); + + rlist->rvec = rvec; /* cache the latest */ + + rlist_increment_vector_usage (rlist); + rlist_increment_total_usage (rlist); + + return 0; +} + +static inline void +rlist_free_rvec (rbuf_iovec_t *rvec) +{ + if (!rvec) + return; + list_del (&rvec->list); + GF_FREE (rvec); +} + +static inline void +rlist_purge_all_rvec (rbuf_list_t *rlist) +{ + rbuf_iovec_t *rvec = NULL; + + if (!rlist) + return; + while (!list_empty (&rlist->veclist)) { + rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); + rlist_free_rvec (rvec); + } +} + +static inline void +rlist_shrink_rvec (rbuf_list_t *rlist, unsigned long long shrink) +{ + rbuf_iovec_t *rvec = NULL; + + while (!list_empty (&rlist->veclist) && (shrink-- > 0)) { + rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); + rlist_free_rvec (rvec); + } +} + +static inline void +rbuf_purge_rlist (rbuf_t *rbuf) +{ + rbuf_list_t *rlist = NULL; + + while (!list_empty (&rbuf->freelist)) { + rlist = list_first_entry (&rbuf->freelist, rbuf_list_t, list); + list_del (&rlist->list); + + rlist_purge_all_rvec (rlist); + + LOCK_DESTROY (&rlist->c_lock); + + (void) pthread_mutex_destroy (&rlist->b_lock); + (void) pthread_cond_destroy (&rlist->b_cond); + + GF_FREE (rlist); + } +} + +rbuf_t * +rbuf_init (int bufcount) +{ + int j = 0; + int ret = 0; + rbuf_t *rbuf = NULL; + rbuf_list_t *rlist = NULL; + + if (bufcount <= 0) + bufcount = ROT_BUFF_DEFAULT_COUNT; + + rbuf = GF_CALLOC (1, sizeof (rbuf_t), gf_common_mt_rbuf_t); + if (!rbuf) + goto error_return; + + LOCK_INIT (&rbuf->lock); + INIT_LIST_HEAD (&rbuf->freelist); + + /* it could have been one big calloc() but this is just once.. */ + for (j = 0; j < bufcount; j++) { + rlist = GF_CALLOC (1, + sizeof (rbuf_list_t), gf_common_mt_rlist_t); + if (!rlist) { + ret = -1; + break; + } + + INIT_LIST_HEAD (&rlist->list); + INIT_LIST_HEAD (&rlist->veclist); + + rlist->pending = rlist->completed = 0; + + ret = rlist_add_new_vec (rlist); + if (ret) + break; + + LOCK_INIT (&rlist->c_lock); + + rlist->awaiting = _gf_false; + ret = pthread_mutex_init (&rlist->b_lock, 0); + if (ret != 0) { + GF_FREE (rlist); + break; + } + + ret = pthread_cond_init (&rlist->b_cond, 0); + if (ret != 0) { + GF_FREE (rlist); + break; + } + + list_add_tail (&rlist->list, &rbuf->freelist); + } + + if (ret != 0) + goto dealloc_rlist; + + /* cache currently used buffer: first in the list */ + rbuf->current = list_first_entry (&rbuf->freelist, rbuf_list_t, list); + return rbuf; + + dealloc_rlist: + rbuf_purge_rlist (rbuf); + LOCK_DESTROY (&rbuf->lock); + GF_FREE (rbuf); + error_return: + return NULL; +} + +void +rbuf_dtor (rbuf_t *rbuf) +{ + if (!rbuf) + return; + rbuf->current = NULL; + rbuf_purge_rlist (rbuf); + LOCK_DESTROY (&rbuf->lock); + + GF_FREE (rbuf); +} + +static inline char * +rbuf_adjust_write_area (struct iovec *iov, size_t bytes) +{ + char *wbuf = NULL; + + wbuf = iov->iov_base + iov->iov_len; + iov->iov_len += bytes; + return wbuf; +} + +static inline char * +rbuf_alloc_write_area (rbuf_list_t *rlist, size_t bytes) +{ + int ret = 0; + struct iovec *iov = NULL; + + /* check for available space in _current_ IO buffer */ + iov = &rlist->rvec->iov; + if (iov->iov_len + bytes <= ROT_BUFF_ALLOC_SIZE) + return rbuf_adjust_write_area (iov, bytes); /* fast path */ + + /* not enough bytes, try next available buffers */ + if (list_is_last (&rlist->rvec->list, &rlist->veclist)) { + /* OH! consumed all vector buffers */ + GF_ASSERT (rlist->used == rlist->total); + ret = rlist_add_new_vec (rlist); + if (ret) + goto error_return; + } else { + /* not the end, have available rbuf_iovec's */ + rlist->rvec = list_next_entry (rlist->rvec, list); + rlist->used++; + rbuf_reset_rvec (rlist->rvec); + } + + iov = &rlist->rvec->iov; + return rbuf_adjust_write_area (iov, bytes); + + error_return: + return NULL; +} + +char * +rbuf_reserve_write_area (rbuf_t *rbuf, size_t bytes, void **opaque) +{ + char *wbuf = NULL; + rbuf_list_t *rlist = NULL; + + if (!rbuf || (bytes <= 0) || (bytes > ROT_BUFF_ALLOC_SIZE) || !opaque) + return NULL; + + LOCK (&rbuf->lock); + { + rlist = rbuf_current_buffer (rbuf); + wbuf = rbuf_alloc_write_area (rlist, bytes); + if (!wbuf) + goto unblock; + rlist->pending++; + } + unblock: + UNLOCK (&rbuf->lock); + + if (wbuf) + *opaque = rlist; + return wbuf; +} + +static inline void +rbuf_notify_waiter (rbuf_list_t *rlist) +{ + pthread_mutex_lock (&rlist->b_lock); + { + pthread_cond_signal (&rlist->b_cond); + } + pthread_mutex_unlock (&rlist->b_lock); +} + +int +rbuf_write_complete (void *opaque) +{ + rbuf_list_t *rlist = NULL; + gf_boolean_t notify = _gf_false; + + if (!opaque) + return -1; + + rlist = opaque; + + LOCK (&rlist->c_lock); + { + rlist->completed++; + /** + * it's safe to test ->pending without rbuf->lock *only* if + * there's a waiter as there can be no new incoming writes. + */ + if (__rlist_has_waiter (rlist) + && (rlist->completed == rlist->pending)) + notify = _gf_true; + } + UNLOCK (&rlist->c_lock); + + if (notify) + rbuf_notify_waiter (rlist); + + return 0; +} + +int +rbuf_get_buffer (rbuf_t *rbuf, + void **opaque, sequence_fn *seqfn, void *mydata) +{ + int retval = RBUF_CONSUMABLE; + rbuf_list_t *rlist = NULL; + + if (!rbuf || !opaque) + return -1; + + LOCK (&rbuf->lock); + { + rlist = rbuf_current_buffer (rbuf); + if (!rlist->pending) { + retval = RBUF_EMPTY; + goto unblock; + } + + if (list_is_singular (&rbuf->freelist)) { + /** + * removal would lead to writer starvation, disallow + * switching. + */ + retval = RBUF_WOULD_STARVE; + goto unblock; + } + + list_del_init (&rlist->list); + if (seqfn) + seqfn (rlist, mydata); + rbuf->current = + list_first_entry (&rbuf->freelist, rbuf_list_t, list); + } + unblock: + UNLOCK (&rbuf->lock); + + if (retval == RBUF_CONSUMABLE) + *opaque = rlist; /* caller _owns_ the buffer */ + + return retval; +} + +/** + * Wait for completion of pending writers and invoke dispatcher + * routine (for buffer consumption). + */ + +static inline void +__rbuf_wait_for_writers (rbuf_list_t *rlist) +{ + while (rlist->completed != rlist->pending) + pthread_cond_wait (&rlist->b_cond, &rlist->b_lock); +} + +#ifndef M_E +#define M_E 2.7 +#endif + +static inline void +rlist_shrink_vector (rbuf_list_t *rlist) +{ + unsigned long long shrink = 0; + + /** + * fast path: don't bother to deallocate if vectors are hardly + * used. + */ + if (rvec_in_watermark_range (rlist)) + return; + + /** + * Calculate the shrink count based on total allocated vectors. + * Note that the calculation sticks to rlist->total irrespective + * of the actual usage count (rlist->used). Later, ->used could + * be used to apply slack to the calculation based on how much + * it lags from ->total. For now, let's stick to slow decay. + */ + shrink = rlist->total - (rlist->total * pow (M_E, -0.2)); + + rlist_shrink_rvec (rlist, shrink); + rlist->total -= shrink; +} + +int +rbuf_wait_for_completion (rbuf_t *rbuf, void *opaque, + void (*fn)(rbuf_list_t *, void *), void *arg) +{ + rbuf_list_t *rlist = NULL; + + if (!rbuf || !opaque) + return -1; + + rlist = opaque; + + pthread_mutex_lock (&rlist->b_lock); + { + rlist_mark_waiting (rlist); + __rbuf_wait_for_writers (rlist); + } + pthread_mutex_unlock (&rlist->b_lock); + + /** + * from here on, no need of locking until the rlist is put + * back into rotation. + */ + + fn (rlist, arg); /* invoke dispatcher */ + + rlist->awaiting = _gf_false; + rlist->pending = rlist->completed = 0; + + rlist_shrink_vector (rlist); + rlist_reset_vector_usage (rlist); + + rlist->rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); + rbuf_reset_rvec (rlist->rvec); + + LOCK (&rbuf->lock); + { + list_add_tail (&rlist->list, &rbuf->freelist); + } + UNLOCK (&rbuf->lock); + + return 0; +} diff --git a/libglusterfs/src/rot-buffs.h b/libglusterfs/src/rot-buffs.h new file mode 100644 index 00000000000..aac24a4f571 --- /dev/null +++ b/libglusterfs/src/rot-buffs.h @@ -0,0 +1,121 @@ +/* + Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef __ROT_BUFFS_H +#define __ROT_BUFFS_H + +#include "list.h" +#include "locking.h" +#include "common-utils.h" + +typedef struct rbuf_iovec { + struct iovec iov; + + struct list_head list; +} rbuf_iovec_t; + +#define RBUF_IOVEC_SIZE (sizeof (rbuf_iovec_t)) + +typedef struct rbuf_list { + gf_lock_t c_lock; + + pthread_mutex_t b_lock; /* protects this structure */ + pthread_cond_t b_cond; /* signal for writer completion */ + + gf_boolean_t awaiting; + + unsigned long long pending; /* pending writers */ + unsigned long long completed; /* completed writers */ + + rbuf_iovec_t *rvec; /* currently used IO vector */ + + struct list_head veclist; /* list of attached rbuf_iov */ + + unsigned long long used; /* consumable entries + attached in ->veclist */ + unsigned long long total; /* total entries in ->veclist (used + during deallocation) */ + + unsigned long seq[2]; /* if interested, this whould store + the start sequence number and the + range */ + + struct list_head list; /* attachment to rbuf_t */ +} rbuf_list_t; + +struct rlist_iter { + struct list_head veclist; + + unsigned long long iter; +}; + +#define RLIST_ENTRY_COUNT(rlist) rlist->used + +#define rlist_iter_init(riter, rlist) \ + do { \ + (riter)->iter = rlist->used; \ + (riter)->veclist = rlist->veclist; \ + } while (0) + +#define rvec_for_each_entry(pos, riter) \ + for (pos = list_entry \ + ((riter)->veclist.next, typeof(*pos), list); \ + (riter)->iter > 0; \ + pos = list_entry \ + (pos->list.next, typeof(*pos), list), \ + --((riter)->iter)) + +/** + * Sequence number assigment routine is called during buffer + * switch under rbuff ->lock. + */ +typedef void (sequence_fn) (rbuf_list_t *, void *); + +#define RLIST_STORE_SEQ(rlist, start, range) \ + do { \ + rlist->seq[0] = start; \ + rlist->seq[1] = range; \ + } while (0) + +#define RLIST_GET_SEQ(rlist, start, range) \ + do { \ + start = rlist->seq[0]; \ + range = rlist->seq[1]; \ + } while (0) + +typedef struct rbuf { + gf_lock_t lock; /* protects "current" rlist */ + + rbuf_list_t *current; /* cached pointer to first free rlist */ + + struct list_head freelist; +} rbuf_t; + +typedef enum { + RBUF_CONSUMABLE = 1, + RBUF_BUSY, + RBUF_EMPTY, + RBUF_WOULD_STARVE, +} rlist_retval_t; + +/* Initialization/Destruction */ +rbuf_t *rbuf_init (int); +void rbuf_dtor (rbuf_t *); + +/* Producer API */ +char *rbuf_reserve_write_area (rbuf_t *, size_t, void **); +int rbuf_write_complete (void *); + +/* Consumer API */ +int rbuf_get_buffer (rbuf_t *, void **, sequence_fn *, void *); +int rbuf_wait_for_completion (rbuf_t *, void *, + void (*)(rbuf_list_t *, void *), void *); + +#endif |