diff options
author | Kotresh H R <khiremat@redhat.com> | 2014-03-10 20:03:55 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2014-05-01 08:44:56 -0700 |
commit | c523a04a0bd3edce9cf8ed238b838ebd957f1066 (patch) | |
tree | 0b96d0993b56a0333b28de71b47598ac7c3eed07 /xlators/features/changelog/src | |
parent | 12f1fab930dc0f6f103bae03fab981409ed31b4e (diff) |
feature/changelog: Draining of in-transit fops in changelog.
This is required for Geo-rep to work with snapshots.
Following things are done in this patch.
1. Draining of in-transit fops during changelog rollover.
2. Explicit rollover of changelog when snapshot barrier
notification comes. During this, intransit fops are
drained and changelog is rolled over.
For more details on the purpose of the patch. Please
visit following link.
http://www.gluster.org/community/documentation/index.php/Changelog_Design_changes_for_snapshot
Change-Id: I22690131e19d3027f6d8957178bdc3431b9062f6
Signed-off-by: Kotresh H R <khiremat@redhat.com>
Reviewed-on: http://review.gluster.org/7216
Reviewed-by: Venky Shankar <vshankar@redhat.com>
Reviewed-by: Varun Shastry <vshastry@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'xlators/features/changelog/src')
-rw-r--r-- | xlators/features/changelog/src/changelog-helpers.c | 333 | ||||
-rw-r--r-- | xlators/features/changelog/src/changelog-helpers.h | 133 | ||||
-rw-r--r-- | xlators/features/changelog/src/changelog.c | 346 |
3 files changed, 800 insertions, 12 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c index 91c43a16c86..c3661b9b76c 100644 --- a/xlators/features/changelog/src/changelog-helpers.c +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -24,6 +24,15 @@ #include "changelog-encoders.h" #include <pthread.h> +static void +changelog_cleanup_free_mutex (void *arg_mutex) +{ + pthread_mutex_t *p_mutex = (pthread_mutex_t*) arg_mutex; + + if (p_mutex) + pthread_mutex_unlock(p_mutex); +} + void changelog_thread_cleanup (xlator_t *this, pthread_t thr_id) { @@ -134,6 +143,12 @@ changelog_rollover_changelog (xlator_t *this, char nfile[PATH_MAX] = {0,}; if (priv->changelog_fd != -1) { + ret = fsync (priv->changelog_fd); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "fsync failed (reason: %s)", + strerror (errno)); + } close (priv->changelog_fd); priv->changelog_fd = -1; } @@ -166,9 +181,34 @@ changelog_rollover_changelog (xlator_t *this, gf_log (this->name, GF_LOG_ERROR, "Failed to send file name to notify thread" " (reason: %s)", strerror (errno)); + } else { + /* If this is explicit rollover initiated by snapshot, + * wakeup reconfigure thread waiting for changelog to + * rollover + */ + if (priv->explicit_rollover) { + priv->explicit_rollover = _gf_false; + ret = pthread_mutex_lock ( + &priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->bn.bnotify = _gf_false; + ret = pthread_cond_signal ( + &priv->bn.bnotify_cond); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, + out); + gf_log (this->name, GF_LOG_INFO, + "Changelog published and" + " signalled bnotify"); + } + ret = pthread_mutex_unlock ( + &priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } } } + out: return ret; } @@ -313,9 +353,11 @@ changelog_local_init (xlator_t *this, inode_t *inode, uuid_t gfid, int xtra_records, gf_boolean_t update_flag) { + changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; struct iobuf *iobuf = NULL; + priv = this->private; /** * We relax the presence of inode if @update_flag is true. * The caller (implmentation of the fop) needs to be careful to @@ -378,6 +420,80 @@ changelog_inject_single_event (xlator_t *this, return priv->cd.dispatchfn (this, priv, priv->cd.cd_data, cld, NULL); } +/* Wait till all the black fops are drained */ +void +changelog_drain_black_fops (xlator_t *this, changelog_priv_t *priv) +{ + int ret = 0; + + /* clean up framework of pthread_mutex is required here as + * 'reconfigure' terminates the changelog_rollover thread + * on graph change. + */ + pthread_cleanup_push (changelog_cleanup_free_mutex, + &priv->dm.drain_black_mutex); + ret = pthread_mutex_lock (&priv->dm.drain_black_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread error:" + " Error:%d", ret); + while (priv->dm.black_fop_cnt > 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Condtional wait on black fops: %ld", + priv->dm.black_fop_cnt); + priv->dm.drain_wait_black = _gf_true; + ret = pthread_cond_wait (&priv->dm.drain_black_cond, + &priv->dm.drain_black_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread" + " cond wait failed: Error:%d", ret); + } + priv->dm.drain_wait_black = _gf_false; + ret = pthread_mutex_unlock (&priv->dm.drain_black_mutex); + pthread_cleanup_pop (0); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread error:" + " Error:%d", ret); + gf_log (this->name, GF_LOG_DEBUG, + "Woke up: Conditional wait on black fops"); +} + +/* Wait till all the white fops are drained */ +void +changelog_drain_white_fops (xlator_t *this, changelog_priv_t *priv) +{ + int ret = 0; + + /* clean up framework of pthread_mutex is required here as + * 'reconfigure' terminates the changelog_rollover thread + * on graph change. + */ + pthread_cleanup_push (changelog_cleanup_free_mutex, + &priv->dm.drain_white_mutex); + ret = pthread_mutex_lock (&priv->dm.drain_white_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread error:" + " Error:%d", ret); + while (priv->dm.white_fop_cnt > 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Condtional wait on white fops : %ld", + priv->dm.white_fop_cnt); + priv->dm.drain_wait_white = _gf_true; + ret = pthread_cond_wait (&priv->dm.drain_white_cond, + &priv->dm.drain_white_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread" + " cond wait failed: Error:%d", ret); + } + priv->dm.drain_wait_white = _gf_false; + ret = pthread_mutex_unlock (&priv->dm.drain_white_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread error:" + " Error:%d", ret); + pthread_cleanup_pop (0); + gf_log (this->name, GF_LOG_DEBUG, + "Woke up: Conditional wait on white fops"); +} + /** * TODO: these threads have many thing in common (wake up after * a certain time etc..). move them into separate routine. @@ -385,12 +501,17 @@ changelog_inject_single_event (xlator_t *this, void * changelog_rollover (void *data) { - int ret = 0; - xlator_t *this = NULL; - struct timeval tv = {0,}; - changelog_log_data_t cld = {0,}; - changelog_time_slice_t *slice = NULL; - changelog_priv_t *priv = data; + int ret = 0; + xlator_t *this = NULL; + struct timeval tv = {0,}; + changelog_log_data_t cld = {0,}; + changelog_time_slice_t *slice = NULL; + changelog_priv_t *priv = data; + int max_fd = 0; + char buf[1] = {0}; + int len = 0; + + fd_set rset; this = priv->cr.this; slice = &priv->slice; @@ -398,10 +519,62 @@ changelog_rollover (void *data) while (1) { tv.tv_sec = priv->rollover_time; tv.tv_usec = 0; - - ret = select (0, NULL, NULL, NULL, &tv); - if (ret) + FD_ZERO(&rset); + FD_SET(priv->cr.rfd, &rset); + max_fd = priv->cr.rfd; + max_fd = max_fd + 1; + + /* It seems there is a race between actual rollover and explicit + * rollover. But it is handled. If actual rollover is being + * done and the explicit rollover event comes, the event is + * not missed. The next select will immediately wakeup to + * handle explicit wakeup. + */ + + ret = select (max_fd, &rset, NULL, NULL, &tv); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "select failed: %s", strerror(errno)); continue; + } else if (ret && FD_ISSET(priv->cr.rfd, &rset)) { + gf_log (this->name, GF_LOG_INFO, + "Explicit wakeup of select on barrier notify"); + len = read(priv->cr.rfd, buf, 1); + if (len == 0) { + gf_log (this->name, GF_LOG_ERROR, "BUG: Got EOF" + " from reconfigure notification pipe"); + continue; + } + if (len < 0) { + gf_log (this->name, GF_LOG_ERROR, + "Failed to read wakeup data"); + continue; + } + /* Lock is not required as same thread is modifying.*/ + priv->explicit_rollover = _gf_true; + } else { + gf_log (this->name, GF_LOG_DEBUG, + "select wokeup on timeout"); + } + + /* Reading curent_color without lock is fine here + * as it is only modified here and is next to reading. + */ + if (priv->current_color == FOP_COLOR_BLACK) { + LOCK(&priv->lock); + priv->current_color = FOP_COLOR_WHITE; + UNLOCK(&priv->lock); + gf_log (this->name, GF_LOG_DEBUG, "Black fops" + " to be drained:%ld",priv->dm.black_fop_cnt); + changelog_drain_black_fops (this, priv); + } else { + LOCK(&priv->lock); + priv->current_color = FOP_COLOR_BLACK; + UNLOCK(&priv->lock); + gf_log (this->name, GF_LOG_DEBUG, "White fops" + " to be drained:%ld",priv->dm.white_fop_cnt); + changelog_drain_white_fops (this, priv); + } ret = changelog_fill_rollover_data (&cld, _gf_false); if (ret) { @@ -694,3 +867,145 @@ changelog_update (xlator_t *this, changelog_priv_t *priv, return; } + +/* Begin: Geo-rep snapshot dependency changes */ + +/* changelog_color_fop_and_inc_cnt: Assign color and inc fop cnt. + * + * Assigning color and increment of corresponding fop count should happen + * in a lock (i.e., there should be no window between them). If it does not, + * we might miss draining those fops which are colored but not yet incremented + * the count. Let's assume black fops are draining. If the black fop count + * reaches zero, we say draining is completed but we miss black fops which are + * not incremented fop count but color is assigned black. + */ + +inline void +changelog_color_fop_and_inc_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local) +{ + if (!priv || !local) + return; + + LOCK (&priv->lock); + { + local->color = priv->current_color; + changelog_inc_fop_cnt (this, priv, local); + } + UNLOCK (&priv->lock); +} + +/* Increments the respective fop counter based on the fop color */ +inline void +changelog_inc_fop_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local) +{ + int ret = 0; + + if (local) { + if (local->color == FOP_COLOR_BLACK) { + ret = pthread_mutex_lock (&priv->dm.drain_black_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->dm.black_fop_cnt++; + } + ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } else { + ret = pthread_mutex_lock (&priv->dm.drain_white_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->dm.white_fop_cnt++; + } + ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } + } + out: + return; +} + +/* Decrements the respective fop counter based on the fop color */ +inline void +changelog_dec_fop_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local) +{ + int ret = 0; + + if (local) { + if (local->color == FOP_COLOR_BLACK) { + ret = pthread_mutex_lock (&priv->dm.drain_black_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->dm.black_fop_cnt--; + if (priv->dm.black_fop_cnt == 0 && + priv->dm.drain_wait_black == _gf_true) { + ret = pthread_cond_signal ( + &priv->dm.drain_black_cond); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, + out); + gf_log (this->name, GF_LOG_DEBUG, + "Signalled draining of black"); + } + } + ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } else { + ret = pthread_mutex_lock (&priv->dm.drain_white_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->dm.white_fop_cnt--; + if (priv->dm.white_fop_cnt == 0 && + priv->dm.drain_wait_white == _gf_true) { + ret = pthread_cond_signal ( + &priv->dm.drain_white_cond); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, + out); + gf_log (this->name, GF_LOG_DEBUG, + "Signalled draining of white"); + } + } + ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } + } + out: + return; +} + +/* Write to a pipe setup between changelog main thread and changelog + * rollover thread to initiate explicit rollover of changelog journal. + */ +inline int +changelog_barrier_notify (changelog_priv_t *priv, char *buf) +{ + int ret = 0; + + LOCK(&priv->lock); + ret = changelog_write (priv->cr_wfd, buf, 1); + UNLOCK(&priv->lock); + return ret; +} + +/* Clean up flags set on barrier notification */ +/*TODO: Add changelog barrier stop code with changelog barrier patch*/ +inline void +changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv) +{ + int ret = 0; + + LOCK (&priv->bflags.lock); + priv->bflags.barrier_ext = _gf_false; + UNLOCK (&priv->bflags.lock); + + ret = pthread_mutex_lock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->bn.bnotify = _gf_false; + } + ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + out: + return; +} +/* End: Geo-Rep snapshot dependency changes */ diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h index 53588f55efa..54577592c90 100644 --- a/xlators/features/changelog/src/changelog-helpers.h +++ b/xlators/features/changelog/src/changelog-helpers.h @@ -107,6 +107,9 @@ typedef struct changelog_rollover { pthread_t rollover_th; xlator_t *this; + + /* read end of pipe used as event from barrier on snapshot */ + int rfd; } changelog_rollover_t; typedef struct changelog_fsync { @@ -139,6 +142,57 @@ typedef struct changelog_notify { xlator_t *this; } changelog_notify_t; +/* Draining during changelog rollover (for geo-rep snapshot dependency): + * -------------------------------------------------------------------- + * The introduction of draining of in-transit fops during changelog rollover + * (both explicit/timeout triggered) requires coloring of fops. Basically the + * implementation requires two counters, one counter which keeps the count of + * current intransit fops which should end up in current changelog and the other + * counter to keep track of incoming fops which should be drained as part of + * next changelog rollover event. The fops are colored w.r.t these counters. + * The fops that are to be drained as part of current changelog rollover is + * given one color and the fops which keep incoming during this and not + * necessarily should end up in current changelog and should be drained as part + * of next changelog rollover are given other color. The color switching + * continues with each changelog rollover. Two colors(black and white) are + * chosen here and initially black is chosen is default. + */ + +typedef enum chlog_fop_color { + FOP_COLOR_BLACK, + FOP_COLOR_WHITE +}chlog_fop_color_t; + +/* Barrier notify variable */ +typedef struct barrier_notify { + pthread_mutex_t bnotify_mutex; + pthread_cond_t bnotify_cond; + gf_boolean_t bnotify; +}barrier_notify_t; + +/* Two separate mutex and conditional variable set is used + * to drain white and black fops. */ + +typedef struct drain_mgmt { + pthread_mutex_t drain_black_mutex; + pthread_cond_t drain_black_cond; + pthread_mutex_t drain_white_mutex; + pthread_cond_t drain_white_cond; + /* Represents black fops count in-transit */ + unsigned long black_fop_cnt; + /* Represents white fops count in-transit */ + unsigned long white_fop_cnt; + gf_boolean_t drain_wait_black; + gf_boolean_t drain_wait_white; +}drain_mgmt_t; + +/* Internal and External barrier on/off indicating flags */ +typedef struct barrier_flags { + gf_lock_t lock; + gf_boolean_t barrier_ext; +}barrier_flags_t; + + struct changelog_priv { gf_boolean_t active; @@ -191,6 +245,26 @@ struct changelog_priv { /* encoder */ struct changelog_encoder *ce; + + /* snapshot dependency changes */ + + /* Draining of fops*/ + drain_mgmt_t dm; + + /* Represents the active color. Initially by default black */ + chlog_fop_color_t current_color; + + /* write end of pipe to do explicit rollover on barrier during snap */ + int cr_wfd; + + /* flag to determine explicit rollover is triggered */ + gf_boolean_t explicit_rollover; + + /* barrier notification variable protected by mutex */ + barrier_notify_t bn; + + /* barrier on/off indicating flags */ + barrier_flags_t bflags; }; struct changelog_local { @@ -206,6 +280,9 @@ struct changelog_local { * but we call it as ->prev_entry... ha ha ha */ struct changelog_local *prev_entry; + + /* snap dependency changes */ + chlog_fop_color_t color; }; typedef struct changelog_local changelog_local_t; @@ -311,6 +388,25 @@ changelog_fsync_thread (void *data); int changelog_forget (xlator_t *this, inode_t *inode); +/* Geo-Rep snapshot dependency changes */ +inline void +changelog_color_fop_and_inc_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local); +inline void +changelog_inc_fop_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local); +inline void +changelog_dec_fop_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local); +inline int +changelog_barrier_notify (changelog_priv_t *priv, char* buf); +inline void +changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv); +void +changelog_drain_white_fops (xlator_t *this, changelog_priv_t *priv); +void +changelog_drain_black_fops (xlator_t *this, changelog_priv_t *priv); + /* macros */ #define CHANGELOG_STACK_UNWIND(fop, frame, params ...) do { \ @@ -404,4 +500,41 @@ changelog_forget (xlator_t *this, inode_t *inode); goto label; \ } while (0) +/* Begin: Geo-Rep snapshot dependency changes */ + +#define DICT_ERROR -1 +#define BARRIER_OFF 0 +#define BARRIER_ON 1 +#define DICT_DEFAULT 2 + +#define CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, label) do { \ + if (!priv->active) { \ + gf_log (this->name, GF_LOG_WARNING, \ + "Changelog is not active, return success"); \ + ret = 0; \ + goto label; \ + } \ + } while (0) + +/* Log pthread error and goto label */ +#define CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, label) do { \ + if (ret) { \ + gf_log (this->name, GF_LOG_ERROR, \ + "pthread error: Error: %d", ret); \ + ret = -1; \ + goto label; \ + } \ + } while (0) + +/* Log pthread error, set flag and goto label */ +#define CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret, label, flag) do { \ + if (ret) { \ + gf_log (this->name, GF_LOG_ERROR, \ + "pthread error: Error: %d", ret); \ + ret = -1; \ + flag = _gf_true; \ + goto label; \ + } \ + } while (0) #endif /* _CHANGELOG_HELPERS_H */ +/* End: Geo-Rep snapshot dependency changes */ diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index 3e40984f6de..0a491c5ac07 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -62,6 +62,7 @@ changelog_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (rmdir, frame, op_ret, op_errno, preparent, postparent, xdata); return 0; @@ -94,6 +95,7 @@ changelog_rmdir (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 2); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_rmdir_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir, loc, xflags, xdata); @@ -118,6 +120,7 @@ changelog_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (unlink, frame, op_ret, op_errno, preparent, postparent, xdata); return 0; @@ -150,6 +153,7 @@ changelog_unlink (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 2); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_unlink_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink, loc, xflags, xdata); @@ -177,6 +181,7 @@ changelog_rename_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (rename, frame, op_ret, op_errno, buf, preoldparent, postoldparent, prenewparent, postnewparent, xdata); @@ -216,6 +221,7 @@ changelog_rename (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 3); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_rename_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename, oldloc, newloc, xdata); @@ -242,6 +248,7 @@ changelog_link_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (link, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; @@ -276,6 +283,7 @@ changelog_link (call_frame_t *frame, changelog_set_usable_record_and_length (frame->local, xtra_len, 2); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_link_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->link, oldloc, newloc, xdata); @@ -302,6 +310,7 @@ changelog_mkdir_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (mkdir, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; @@ -353,6 +362,7 @@ changelog_mkdir (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 5); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_mkdir_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->mkdir, loc, mode, umask, xdata); @@ -379,6 +389,7 @@ changelog_symlink_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (symlink, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; @@ -422,6 +433,7 @@ changelog_symlink (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 2); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_symlink_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->symlink, linkname, loc, umask, xdata); @@ -448,6 +460,7 @@ changelog_mknod_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (mknod, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; @@ -500,6 +513,7 @@ changelog_mknod (call_frame_t *frame, changelog_set_usable_record_and_length (frame->local, xtra_len, 5); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_mknod_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->mknod, loc, mode, dev, umask, xdata); @@ -527,6 +541,7 @@ changelog_create_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (create, frame, op_ret, op_errno, fd, inode, buf, preparent, postparent, xdata); @@ -583,6 +598,7 @@ changelog_create (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 5); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_create_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->create, loc, flags, mode, umask, fd, xdata); @@ -615,6 +631,7 @@ changelog_fsetattr_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (fsetattr, frame, op_ret, op_errno, preop_stbuf, postop_stbuf, xdata); @@ -649,6 +666,7 @@ changelog_fsetattr (call_frame_t *frame, changelog_set_usable_record_and_length (frame->local, xtra_len, 1); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_fsetattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetattr, fd, stbuf, valid, xdata); @@ -674,6 +692,7 @@ changelog_setattr_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (setattr, frame, op_ret, op_errno, preop_stbuf, postop_stbuf, xdata); @@ -706,6 +725,7 @@ changelog_setattr (call_frame_t *frame, changelog_set_usable_record_and_length (frame->local, xtra_len, 1); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_setattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->setattr, loc, stbuf, valid, xdata); @@ -730,6 +750,7 @@ changelog_fremovexattr_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (fremovexattr, frame, op_ret, op_errno, xdata); return 0; @@ -758,6 +779,7 @@ changelog_fremovexattr (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 1); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_fremovexattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->fremovexattr, fd, name, xdata); @@ -780,6 +802,7 @@ changelog_removexattr_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (removexattr, frame, op_ret, op_errno, xdata); return 0; @@ -808,6 +831,7 @@ changelog_removexattr (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 1); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_removexattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->removexattr, loc, name, xdata); @@ -832,6 +856,7 @@ changelog_setxattr_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata); return 0; @@ -861,6 +886,7 @@ changelog_setxattr (call_frame_t *frame, changelog_set_usable_record_and_length (frame->local, xtra_len, 1); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_setxattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->setxattr, loc, dict, flags, xdata); @@ -883,6 +909,7 @@ changelog_fsetxattr_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, xdata); return 0; @@ -912,6 +939,7 @@ changelog_fsetxattr (call_frame_t *frame, changelog_set_usable_record_and_length (frame->local, xtra_len, 1); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_fsetxattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetxattr, fd, dict, flags, xdata); @@ -944,6 +972,7 @@ changelog_truncate_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (truncate, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; @@ -962,6 +991,7 @@ changelog_truncate (call_frame_t *frame, loc->inode, loc->inode->gfid, 0); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_truncate_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->truncate, loc, offset, xdata); @@ -985,6 +1015,7 @@ changelog_ftruncate_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (ftruncate, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; @@ -1003,6 +1034,7 @@ changelog_ftruncate (call_frame_t *frame, fd->inode, fd->inode->gfid, 0); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_ftruncate_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->ftruncate, fd, offset, xdata); @@ -1028,6 +1060,7 @@ changelog_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (writev, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; @@ -1048,6 +1081,7 @@ changelog_writev (call_frame_t *frame, fd->inode, fd->inode->gfid, 0); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_writev_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->writev, fd, vector, count, offset, flags, iobref, xdata); @@ -1089,9 +1123,16 @@ changelog_assign_encoding (changelog_priv_t *priv, char *enc) static void changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv) { + int ret = 0; + if (priv->cr.rollover_th) { changelog_thread_cleanup (this, priv->cr.rollover_th); priv->cr.rollover_th = 0; + ret = close (priv->cr_wfd); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "error closing write end of rollover pipe" + " (reason: %s)", strerror (errno)); } if (priv->cf.fsync_th) { @@ -1105,6 +1146,41 @@ static int changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv) { int ret = 0; + int flags = 0; + int pipe_fd[2] = {0, 0}; + + /* Geo-Rep snapshot dependency: + * + * To implement explicit rollover of changlog journal on barrier + * notification, a pipe is created to communicate between + * 'changelog_rollover' thread and changelog main thread. The select + * call used to wait till roll-over time in changelog_rollover thread + * is modified to wait on read end of the pipe. When barrier + * notification comes (i.e, in 'reconfigure'), select in + * changelog_rollover thread is woken up explicitly by writing into + * the write end of the pipe in 'reconfigure'. + */ + + ret = pipe (pipe_fd); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "Cannot create pipe (reason: %s)", strerror (errno)); + goto out; + } + + /* writer is non-blocking */ + flags = fcntl (pipe_fd[1], F_GETFL); + flags |= O_NONBLOCK; + + ret = fcntl (pipe_fd[1], F_SETFL, flags); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set O_NONBLOCK flag"); + goto out; + } + + priv->cr_wfd = pipe_fd[1]; + priv->cr.rfd = pipe_fd[0]; priv->cr.this = this; ret = gf_thread_create (&priv->cr.rollover_th, @@ -1186,6 +1262,155 @@ changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv) return ret; } +int +notify (xlator_t *this, int event, void *data, ...) +{ + changelog_priv_t *priv = NULL; + dict_t *dict = NULL; + char buf[1] = {1}; + int barrier = DICT_DEFAULT; + gf_boolean_t bclean_req = _gf_false; + int ret = 0; + + priv = this->private; + if (!priv) + goto out; + + if (event == GF_EVENT_TRANSLATOR_OP) { + + dict = data; + /*TODO: Also barrier option is persistent. Need to + * decide on the brick crash scenarios. + */ + barrier = dict_get_str_boolean (dict, "barrier", DICT_DEFAULT); + + switch (barrier) { + case DICT_ERROR: + gf_log (this->name, GF_LOG_ERROR, + "Barrier dict_get_str_boolean failed"); + ret = -1; + goto out; + + case BARRIER_OFF: + gf_log (this->name, GF_LOG_INFO, + "Barrier off notification"); + + CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out); + + LOCK (&priv->bflags.lock); + { + if (priv->bflags.barrier_ext == _gf_false) + ret = -1; + } + UNLOCK (&priv->bflags.lock); + + if (ret == -1 ) { + gf_log (this->name, GF_LOG_ERROR, "Received" + " another barrier off notification" + " while already off"); + goto out; + } + + /*TODO: STOP CHANGELOG BARRIER */ + LOCK (&priv->bflags.lock); + { + priv->bflags.barrier_ext = _gf_false; + } + UNLOCK (&priv->bflags.lock); + + ret = 0; + goto out; + + case BARRIER_ON: + gf_log (this->name, GF_LOG_INFO, + "Barrier on notification"); + + CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out); + + LOCK (&priv->bflags.lock); + { + if (priv->bflags.barrier_ext == _gf_true) + ret = -1; + else + priv->bflags.barrier_ext = _gf_true; + } + UNLOCK (&priv->bflags.lock); + + if (ret == -1 ) { + gf_log (this->name, GF_LOG_ERROR, "Received" + " another barrier on notification when" + " last one is not served yet"); + goto out; + } + + ret = pthread_mutex_lock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, + bclean_req); + { + priv->bn.bnotify = _gf_true; + } + ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, + bclean_req); + + /*TODO: START CHANGELOG BARRIER */ + + ret = changelog_barrier_notify(priv, buf); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Explicit roll over: write failed"); + changelog_barrier_cleanup (this, priv); + ret = -1; + goto out; + } + + ret = pthread_mutex_lock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, + bclean_req); + { + /* The while condition check is required here to + * handle spurious wakeup of cond wait that can + * happen with pthreads. See man page */ + while (priv->bn.bnotify == _gf_true) { + ret = pthread_cond_wait ( + &priv->bn.bnotify_cond, + &priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, + out, + bclean_req); + } + } + ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, bclean_req); + gf_log (this->name, GF_LOG_INFO, + "Woke up: bnotify conditional wait"); + + ret = 0; + goto out; + + case DICT_DEFAULT: + gf_log (this->name, GF_LOG_ERROR, + "barrier key not found"); + ret = -1; + goto out; + + default: + gf_log (this->name, GF_LOG_ERROR, + "Something went bad in dict_get_str_boolean"); + ret = -1; + goto out; + } + } else { + ret = default_notify (this, event, data); + } + + out: + if (bclean_req) + changelog_barrier_cleanup (this, priv); + + return ret; +} + int32_t mem_acct_init (xlator_t *this) { @@ -1261,6 +1486,98 @@ changelog_init (xlator_t *this, changelog_priv_t *priv) return ret; } +/* Init all pthread condition variables and locks in changelog*/ +static int +changelog_pthread_init (xlator_t *this, changelog_priv_t *priv) +{ + gf_boolean_t bn_mutex_init = _gf_false; + gf_boolean_t bn_cond_init = _gf_false; + gf_boolean_t dm_mutex_black_init = _gf_false; + gf_boolean_t dm_cond_black_init = _gf_false; + gf_boolean_t dm_mutex_white_init = _gf_false; + gf_boolean_t dm_cond_white_init = _gf_false; + int ret = 0; + + if ((ret = pthread_mutex_init(&priv->bn.bnotify_mutex, NULL)) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "bnotify pthread_mutex_init failed (%d)", ret); + ret = -1; + goto out; + } + bn_mutex_init = _gf_true; + + if ((ret = pthread_cond_init(&priv->bn.bnotify_cond, NULL)) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "bnotify pthread_cond_init failed (%d)", ret); + ret = -1; + goto out; + } + bn_cond_init = _gf_true; + + if ((ret = pthread_mutex_init(&priv->dm.drain_black_mutex, NULL)) != 0) + { + gf_log (this->name, GF_LOG_ERROR, + "drain_black pthread_mutex_init failed (%d)", ret); + ret = -1; + goto out; + } + dm_mutex_black_init = _gf_true; + + if ((ret = pthread_cond_init(&priv->dm.drain_black_cond, NULL)) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "drain_black pthread_cond_init failed (%d)", ret); + ret = -1; + goto out; + } + dm_cond_black_init = _gf_true; + + if ((ret = pthread_mutex_init(&priv->dm.drain_white_mutex, NULL)) != 0) + { + gf_log (this->name, GF_LOG_ERROR, + "drain_white pthread_mutex_init failed (%d)", ret); + ret = -1; + goto out; + } + dm_mutex_white_init = _gf_true; + + if ((ret = pthread_cond_init(&priv->dm.drain_white_cond, NULL)) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "drain_white pthread_cond_init failed (%d)", ret); + ret = -1; + goto out; + } + dm_cond_white_init = _gf_true; + out: + if (ret) { + if (bn_mutex_init) + pthread_mutex_destroy(&priv->bn.bnotify_mutex); + if (bn_cond_init) + pthread_cond_destroy (&priv->bn.bnotify_cond); + if (dm_mutex_black_init) + pthread_mutex_destroy(&priv->dm.drain_black_mutex); + if (dm_cond_black_init) + pthread_cond_destroy (&priv->dm.drain_black_cond); + if (dm_mutex_white_init) + pthread_mutex_destroy(&priv->dm.drain_white_mutex); + if (dm_cond_white_init) + pthread_cond_destroy (&priv->dm.drain_white_cond); + } + return ret; +} + +/* Destroy all pthread condition variables and locks in changelog */ +static inline void +changelog_pthread_destroy (changelog_priv_t *priv) +{ + pthread_mutex_destroy (&priv->bn.bnotify_mutex); + pthread_cond_destroy (&priv->bn.bnotify_cond); + pthread_mutex_destroy (&priv->dm.drain_black_mutex); + pthread_cond_destroy (&priv->dm.drain_black_cond); + pthread_mutex_destroy (&priv->dm.drain_white_mutex); + pthread_cond_destroy (&priv->dm.drain_white_cond); + LOCK_DESTROY (&priv->bflags.lock); +} + int reconfigure (xlator_t *this, dict_t *options) { @@ -1363,9 +1680,10 @@ reconfigure (xlator_t *this, dict_t *options) int32_t init (xlator_t *this) { - int ret = -1; - char *tmp = NULL; - changelog_priv_t *priv = NULL; + int ret = -1; + char *tmp = NULL; + changelog_priv_t *priv = NULL; + gf_boolean_t cond_lock_init = _gf_false; GF_VALIDATE_OR_GOTO ("changelog", this, out); @@ -1451,6 +1769,24 @@ init (xlator_t *this) goto out; priv->changelog_fd = -1; + + /* snap dependency changes */ + priv->dm.black_fop_cnt = 0; + priv->dm.white_fop_cnt = 0; + priv->dm.drain_wait_black = _gf_false; + priv->dm.drain_wait_white = _gf_false; + priv->current_color = FOP_COLOR_BLACK; + priv->explicit_rollover = _gf_false; + /* Mutex is not needed as threads are not spawned yet */ + priv->bn.bnotify = _gf_false; + ret = changelog_pthread_init (this, priv); + if (ret) + goto out; + + LOCK_INIT (&priv->bflags.lock); + cond_lock_init = _gf_true; + priv->bflags.barrier_ext = _gf_false; + ret = changelog_init (this, priv); if (ret) goto out; @@ -1469,6 +1805,9 @@ init (xlator_t *this) } GF_FREE (priv->changelog_brick); GF_FREE (priv->changelog_dir); + if (cond_lock_init) + changelog_pthread_destroy (priv); + GF_FREE (priv); this->private = NULL; } else @@ -1493,6 +1832,7 @@ fini (xlator_t *this) mem_pool_destroy (this->local_pool); GF_FREE (priv->changelog_brick); GF_FREE (priv->changelog_dir); + changelog_pthread_destroy (priv); GF_FREE (priv); } |