diff options
author | Venky Shankar <vshankar@redhat.com> | 2015-02-03 19:22:16 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2015-03-18 18:22:36 -0700 |
commit | 4737584fffcd25dbe35d17b076c95bf90a422cf2 (patch) | |
tree | 9f30e0e90c88c245787b78af3ca78d7ae05e30f2 /xlators/features/changelog/src/changelog-helpers.c | |
parent | 728fcd41eb39f66744d84b979dd8195fd47313ed (diff) |
features/changelog: RPC'fy {libgf}changelog
This patch introduces RPC based communication between the changelog
translator and libgfchangelog. It replaces the old pathetic stream
based interaction that existed earlier (due to time constraints :-/).
Changelog, upon initialization starts a RPC server (rpcsvc) allowing
clients to invoke a probe API as a bootup mechanism to request for
event notifications. During probe, clients can choose an event
filter specifying the type(s) of events they are interested in. As
of now there is no way to change the event notification set once
the probe RPC call is made, but that is easier to implement.
The actual event notifications is done on a separate RPC session.
The client (libgfchangelog) itself starts and RPC server which the
changelog translator "connects back" during probe. Notifications
are dispatched by a bunch of threads from the server (translator)
and the client optionally orders them if ordered notifications
are requried. FOPs fill in their respective event details in a
buffer (rot-buffs to be particular) and a bunch of threads
(consumers) swap the buffers out of roatation and dispatch them
via RPC. To avoid writer starvation, then number of dispatcher
threads is one less than the number of buffer list in rot-buffs.x
libgfchangelog becomes purely callback based -- upon event
notification from the server (and re-ordering them if required)
invoke a callback routine specified by consumer(s).
A major part of the patch is also aimed at providing backward
compatibility for geo-replication, which was one of the main
consumer of the stream based API. Also, this patch does not\
"turn on" event notifications for all fops, just a bunch which
is currently in requirement. Another pain point is that the
server does not filter events before dispatching it to the
clients. That load is taken up by the client itself (although
it's done at the library layer rather than making it hard on
the callback implementor). This needs improvement and care
needs to be taken to not load the server up with expensive
filtering mechanisms.
Change-Id: Ibf60a432b68f2dfa60c6f9add2bcfd37a9c41395
BUG: 1170075
Signed-off-by: Venky Shankar <vshankar@redhat.com>
Reviewed-on: http://review.gluster.org/9708
Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features/changelog/src/changelog-helpers.c')
-rw-r--r-- | xlators/features/changelog/src/changelog-helpers.c | 210 |
1 files changed, 171 insertions, 39 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c index 3af2938190d..5c755d76d69 100644 --- a/xlators/features/changelog/src/changelog-helpers.c +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -24,6 +24,7 @@ #include "changelog-mem-types.h" #include "changelog-encoders.h" +#include "changelog-rpc-common.h" #include <pthread.h> static inline void @@ -57,7 +58,7 @@ changelog_cleanup_free_mutex (void *arg_mutex) pthread_mutex_unlock(p_mutex); } -void +int changelog_thread_cleanup (xlator_t *this, pthread_t thr_id) { int ret = 0; @@ -65,7 +66,7 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id) /* send a cancel request to the thread */ ret = pthread_cancel (thr_id); - if (ret) { + if (ret != 0) { gf_log (this->name, GF_LOG_ERROR, "could not cancel thread (reason: %s)", strerror (errno)); @@ -73,14 +74,14 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id) } ret = pthread_join (thr_id, &retval); - if (ret || (retval != PTHREAD_CANCELED)) { + if ((ret != 0) || (retval != PTHREAD_CANCELED)) { gf_log (this->name, GF_LOG_ERROR, "cancel request not adhered as expected" " (reason: %s)", strerror (errno)); } out: - return; + return ret; } inline void * @@ -98,6 +99,145 @@ changelog_get_usable_buffer (changelog_local_t *local) return cld->cld_iobuf->ptr; } +static inline int +changelog_selector_index (unsigned int selector) +{ + return (ffs (selector) - 1); +} + +inline int +changelog_ev_selected (xlator_t *this, + changelog_ev_selector_t *selection, + unsigned int selector) +{ + int idx = 0; + + idx = changelog_selector_index (selector); + gf_log (this->name, GF_LOG_DEBUG, + "selector ref count for %d (idx: %d): %d", + selector, idx, selection->ref[idx]); + /* this can be lockless */ + return (idx < CHANGELOG_EV_SELECTION_RANGE + && (selection->ref[idx] > 0)); +} + +inline void +changelog_select_event (xlator_t *this, + changelog_ev_selector_t *selection, + unsigned int selector) +{ + int idx = 0; + + LOCK (&selection->reflock); + { + while (selector) { + idx = changelog_selector_index (selector); + if (idx < CHANGELOG_EV_SELECTION_RANGE) { + selection->ref[idx]++; + gf_log (this->name, GF_LOG_DEBUG, + "selecting event %d", idx); + } + selector &= ~(1 << idx); + } + } + UNLOCK (&selection->reflock); +} + +inline void +changelog_deselect_event (xlator_t *this, + changelog_ev_selector_t *selection, + unsigned int selector) +{ + int idx = 0; + + LOCK (&selection->reflock); + { + while (selector) { + idx = changelog_selector_index (selector); + if (idx < CHANGELOG_EV_SELECTION_RANGE) { + selection->ref[idx]--; + gf_log (this->name, GF_LOG_DEBUG, + "de-selecting event %d", idx); + } + selector &= ~(1 << idx); + } + } + UNLOCK (&selection->reflock); +} + +inline int +changelog_init_event_selection (xlator_t *this, + changelog_ev_selector_t *selection) +{ + int ret = 0; + int j = CHANGELOG_EV_SELECTION_RANGE; + + ret = LOCK_INIT (&selection->reflock); + if (ret != 0) + return -1; + + LOCK (&selection->reflock); + { + while (j--) { + selection->ref[j] = 0; + } + } + UNLOCK (&selection->reflock); + + return 0; +} + +inline int +changelog_cleanup_event_selection (xlator_t *this, + changelog_ev_selector_t *selection) +{ + int ret = 0; + int j = CHANGELOG_EV_SELECTION_RANGE; + + LOCK (&selection->reflock); + { + while (j--) { + if (selection->ref[j] > 0) + gf_log (this->name, GF_LOG_WARNING, + "changelog event selection cleaning up " + " on active references"); + } + } + UNLOCK (&selection->reflock); + + return LOCK_DESTROY (&selection->reflock); +} + +static inline void +changelog_perform_dispatch (xlator_t *this, + changelog_priv_t *priv, void *mem, size_t size) +{ + char *buf = NULL; + void *opaque = NULL; + + buf = rbuf_reserve_write_area (priv->rbuf, size, &opaque); + if (!buf) { + gf_log_callingfn (this->name, + GF_LOG_WARNING, "failed to dispatch event"); + return; + } + + memcpy (buf, mem, size); + rbuf_write_complete (opaque); +} + +inline void +changelog_dispatch_event (xlator_t *this, + changelog_priv_t *priv, changelog_event_t *ev) +{ + changelog_ev_selector_t *selection = NULL; + + selection = &priv->ev_selection; + if (changelog_ev_selected (this, selection, ev->ev_type)) { + changelog_perform_dispatch (this, priv, ev, CHANGELOG_EV_SIZE); + } +} + inline void changelog_set_usable_record_and_length (changelog_local_t *local, size_t len, int xr) @@ -206,9 +346,9 @@ changelog_rollover_changelog (xlator_t *this, { int ret = -1; int notify = 0; - char *bname = NULL; char ofile[PATH_MAX] = {0,}; char nfile[PATH_MAX] = {0,}; + changelog_event_t ev = {0,}; if (priv->changelog_fd != -1) { ret = fsync (priv->changelog_fd); @@ -252,40 +392,32 @@ changelog_rollover_changelog (xlator_t *this, } if (notify) { - bname = basename (nfile); - gf_log (this->name, GF_LOG_DEBUG, "notifying: %s", bname); - ret = changelog_write (priv->wfd, bname, strlen (bname) + 1); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "Failed to send file name to notify thread" - " (reason: %s)", strerror (errno)); - } else { - /* If this is explicit rollover initiated by snapshot, - * wakeup reconfigure thread waiting for changelog to - * rollover - */ - if (priv->explicit_rollover) { - priv->explicit_rollover = _gf_false; - ret = pthread_mutex_lock ( - &priv->bn.bnotify_mutex); - CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); - { - priv->bn.bnotify = _gf_false; - ret = pthread_cond_signal ( - &priv->bn.bnotify_cond); - CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, - out); - gf_log (this->name, GF_LOG_INFO, - "Changelog published: %s and" - " signalled bnotify", bname); - } - ret = pthread_mutex_unlock ( - &priv->bn.bnotify_mutex); + ev.ev_type = CHANGELOG_OP_TYPE_JOURNAL; + memcpy (ev.u.journal.path, nfile, strlen (nfile) + 1); + changelog_dispatch_event (this, priv, &ev); + + /* If this is explicit rollover initiated by snapshot, + * wakeup reconfigure thread waiting for changelog to + * rollover + */ + if (priv->explicit_rollover) { + priv->explicit_rollover = _gf_false; + + ret = pthread_mutex_lock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->bn.bnotify = _gf_false; + ret = pthread_cond_signal + (&priv->bn.bnotify_cond); CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + gf_log (this->name, GF_LOG_INFO, + "Changelog published: %s signalled" + " bnotify", nfile); } + ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); } } - out: return ret; } @@ -434,8 +566,8 @@ changelog_snap_logging_stop (xlator_t *this, } int -changelog_open (xlator_t *this, - changelog_priv_t *priv) +changelog_open_journal (xlator_t *this, + changelog_priv_t *priv) { int fd = 0; int ret = -1; @@ -490,7 +622,7 @@ changelog_start_next_change (xlator_t *this, ret = changelog_rollover_changelog (this, priv, ts); if (!ret && !finale) - ret = changelog_open (this, priv); + ret = changelog_open_journal (this, priv); return ret; } @@ -975,7 +1107,7 @@ __changelog_inode_ctx_set (xlator_t *this, * one shot routine to get the address and the value of a inode version * for a particular type. */ -static changelog_inode_ctx_t * +changelog_inode_ctx_t * __changelog_inode_ctx_get (xlator_t *this, inode_t *inode, unsigned long **iver, unsigned long *version, changelog_log_type type) |