diff options
-rw-r--r-- | xlators/features/changelog/src/changelog-helpers.c | 333 | ||||
-rw-r--r-- | xlators/features/changelog/src/changelog-helpers.h | 133 | ||||
-rw-r--r-- | xlators/features/changelog/src/changelog.c | 346 |
3 files changed, 800 insertions, 12 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c index 91c43a16c86..c3661b9b76c 100644 --- a/xlators/features/changelog/src/changelog-helpers.c +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -24,6 +24,15 @@ #include "changelog-encoders.h" #include <pthread.h> +static void +changelog_cleanup_free_mutex (void *arg_mutex) +{ + pthread_mutex_t *p_mutex = (pthread_mutex_t*) arg_mutex; + + if (p_mutex) + pthread_mutex_unlock(p_mutex); +} + void changelog_thread_cleanup (xlator_t *this, pthread_t thr_id) { @@ -134,6 +143,12 @@ changelog_rollover_changelog (xlator_t *this, char nfile[PATH_MAX] = {0,}; if (priv->changelog_fd != -1) { + ret = fsync (priv->changelog_fd); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "fsync failed (reason: %s)", + strerror (errno)); + } close (priv->changelog_fd); priv->changelog_fd = -1; } @@ -166,9 +181,34 @@ changelog_rollover_changelog (xlator_t *this, gf_log (this->name, GF_LOG_ERROR, "Failed to send file name to notify thread" " (reason: %s)", strerror (errno)); + } else { + /* If this is explicit rollover initiated by snapshot, + * wakeup reconfigure thread waiting for changelog to + * rollover + */ + if (priv->explicit_rollover) { + priv->explicit_rollover = _gf_false; + ret = pthread_mutex_lock ( + &priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->bn.bnotify = _gf_false; + ret = pthread_cond_signal ( + &priv->bn.bnotify_cond); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, + out); + gf_log (this->name, GF_LOG_INFO, + "Changelog published and" + " signalled bnotify"); + } + ret = pthread_mutex_unlock ( + &priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } } } + out: return ret; } @@ -313,9 +353,11 @@ changelog_local_init (xlator_t *this, inode_t *inode, uuid_t gfid, int xtra_records, gf_boolean_t update_flag) { + changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; struct iobuf *iobuf = NULL; + priv = this->private; /** * We relax the presence of inode if @update_flag is true. * The caller (implmentation of the fop) needs to be careful to @@ -378,6 +420,80 @@ changelog_inject_single_event (xlator_t *this, return priv->cd.dispatchfn (this, priv, priv->cd.cd_data, cld, NULL); } +/* Wait till all the black fops are drained */ +void +changelog_drain_black_fops (xlator_t *this, changelog_priv_t *priv) +{ + int ret = 0; + + /* clean up framework of pthread_mutex is required here as + * 'reconfigure' terminates the changelog_rollover thread + * on graph change. + */ + pthread_cleanup_push (changelog_cleanup_free_mutex, + &priv->dm.drain_black_mutex); + ret = pthread_mutex_lock (&priv->dm.drain_black_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread error:" + " Error:%d", ret); + while (priv->dm.black_fop_cnt > 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Condtional wait on black fops: %ld", + priv->dm.black_fop_cnt); + priv->dm.drain_wait_black = _gf_true; + ret = pthread_cond_wait (&priv->dm.drain_black_cond, + &priv->dm.drain_black_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread" + " cond wait failed: Error:%d", ret); + } + priv->dm.drain_wait_black = _gf_false; + ret = pthread_mutex_unlock (&priv->dm.drain_black_mutex); + pthread_cleanup_pop (0); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread error:" + " Error:%d", ret); + gf_log (this->name, GF_LOG_DEBUG, + "Woke up: Conditional wait on black fops"); +} + +/* Wait till all the white fops are drained */ +void +changelog_drain_white_fops (xlator_t *this, changelog_priv_t *priv) +{ + int ret = 0; + + /* clean up framework of pthread_mutex is required here as + * 'reconfigure' terminates the changelog_rollover thread + * on graph change. + */ + pthread_cleanup_push (changelog_cleanup_free_mutex, + &priv->dm.drain_white_mutex); + ret = pthread_mutex_lock (&priv->dm.drain_white_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread error:" + " Error:%d", ret); + while (priv->dm.white_fop_cnt > 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Condtional wait on white fops : %ld", + priv->dm.white_fop_cnt); + priv->dm.drain_wait_white = _gf_true; + ret = pthread_cond_wait (&priv->dm.drain_white_cond, + &priv->dm.drain_white_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread" + " cond wait failed: Error:%d", ret); + } + priv->dm.drain_wait_white = _gf_false; + ret = pthread_mutex_unlock (&priv->dm.drain_white_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread error:" + " Error:%d", ret); + pthread_cleanup_pop (0); + gf_log (this->name, GF_LOG_DEBUG, + "Woke up: Conditional wait on white fops"); +} + /** * TODO: these threads have many thing in common (wake up after * a certain time etc..). move them into separate routine. @@ -385,12 +501,17 @@ changelog_inject_single_event (xlator_t *this, void * changelog_rollover (void *data) { - int ret = 0; - xlator_t *this = NULL; - struct timeval tv = {0,}; - changelog_log_data_t cld = {0,}; - changelog_time_slice_t *slice = NULL; - changelog_priv_t *priv = data; + int ret = 0; + xlator_t *this = NULL; + struct timeval tv = {0,}; + changelog_log_data_t cld = {0,}; + changelog_time_slice_t *slice = NULL; + changelog_priv_t *priv = data; + int max_fd = 0; + char buf[1] = {0}; + int len = 0; + + fd_set rset; this = priv->cr.this; slice = &priv->slice; @@ -398,10 +519,62 @@ changelog_rollover (void *data) while (1) { tv.tv_sec = priv->rollover_time; tv.tv_usec = 0; - - ret = select (0, NULL, NULL, NULL, &tv); - if (ret) + FD_ZERO(&rset); + FD_SET(priv->cr.rfd, &rset); + max_fd = priv->cr.rfd; + max_fd = max_fd + 1; + + /* It seems there is a race between actual rollover and explicit + * rollover. But it is handled. If actual rollover is being + * done and the explicit rollover event comes, the event is + * not missed. The next select will immediately wakeup to + * handle explicit wakeup. + */ + + ret = select (max_fd, &rset, NULL, NULL, &tv); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "select failed: %s", strerror(errno)); continue; + } else if (ret && FD_ISSET(priv->cr.rfd, &rset)) { + gf_log (this->name, GF_LOG_INFO, + "Explicit wakeup of select on barrier notify"); + len = read(priv->cr.rfd, buf, 1); + if (len == 0) { + gf_log (this->name, GF_LOG_ERROR, "BUG: Got EOF" + " from reconfigure notification pipe"); + continue; + } + if (len < 0) { + gf_log (this->name, GF_LOG_ERROR, + "Failed to read wakeup data"); + continue; + } + /* Lock is not required as same thread is modifying.*/ + priv->explicit_rollover = _gf_true; + } else { + gf_log (this->name, GF_LOG_DEBUG, + "select wokeup on timeout"); + } + + /* Reading curent_color without lock is fine here + * as it is only modified here and is next to reading. + */ + if (priv->current_color == FOP_COLOR_BLACK) { + LOCK(&priv->lock); + priv->current_color = FOP_COLOR_WHITE; + UNLOCK(&priv->lock); + gf_log (this->name, GF_LOG_DEBUG, "Black fops" + " to be drained:%ld",priv->dm.black_fop_cnt); + changelog_drain_black_fops (this, priv); + } else { + LOCK(&priv->lock); + priv->current_color = FOP_COLOR_BLACK; + UNLOCK(&priv->lock); + gf_log (this->name, GF_LOG_DEBUG, "White fops" + " to be drained:%ld",priv->dm.white_fop_cnt); + changelog_drain_white_fops (this, priv); + } ret = changelog_fill_rollover_data (&cld, _gf_false); if (ret) { @@ -694,3 +867,145 @@ changelog_update (xlator_t *this, changelog_priv_t *priv, return; } + +/* Begin: Geo-rep snapshot dependency changes */ + +/* changelog_color_fop_and_inc_cnt: Assign color and inc fop cnt. + * + * Assigning color and increment of corresponding fop count should happen + * in a lock (i.e., there should be no window between them). If it does not, + * we might miss draining those fops which are colored but not yet incremented + * the count. Let's assume black fops are draining. If the black fop count + * reaches zero, we say draining is completed but we miss black fops which are + * not incremented fop count but color is assigned black. + */ + +inline void +changelog_color_fop_and_inc_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local) +{ + if (!priv || !local) + return; + + LOCK (&priv->lock); + { + local->color = priv->current_color; + changelog_inc_fop_cnt (this, priv, local); + } + UNLOCK (&priv->lock); +} + +/* Increments the respective fop counter based on the fop color */ +inline void +changelog_inc_fop_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local) +{ + int ret = 0; + + if (local) { + if (local->color == FOP_COLOR_BLACK) { + ret = pthread_mutex_lock (&priv->dm.drain_black_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->dm.black_fop_cnt++; + } + ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } else { + ret = pthread_mutex_lock (&priv->dm.drain_white_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->dm.white_fop_cnt++; + } + ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } + } + out: + return; +} + +/* Decrements the respective fop counter based on the fop color */ +inline void +changelog_dec_fop_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local) +{ + int ret = 0; + + if (local) { + if (local->color == FOP_COLOR_BLACK) { + ret = pthread_mutex_lock (&priv->dm.drain_black_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->dm.black_fop_cnt--; + if (priv->dm.black_fop_cnt == 0 && + priv->dm.drain_wait_black == _gf_true) { + ret = pthread_cond_signal ( + &priv->dm.drain_black_cond); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, + out); + gf_log (this->name, GF_LOG_DEBUG, + "Signalled draining of black"); + } + } + ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } else { + ret = pthread_mutex_lock (&priv->dm.drain_white_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->dm.white_fop_cnt--; + if (priv->dm.white_fop_cnt == 0 && + priv->dm.drain_wait_white == _gf_true) { + ret = pthread_cond_signal ( + &priv->dm.drain_white_cond); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, + out); + gf_log (this->name, GF_LOG_DEBUG, + "Signalled draining of white"); + } + } + ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } + } + out: + return; +} + +/* Write to a pipe setup between changelog main thread and changelog + * rollover thread to initiate explicit rollover of changelog journal. + */ +inline int +changelog_barrier_notify (changelog_priv_t *priv, char *buf) +{ + int ret = 0; + + LOCK(&priv->lock); + ret = changelog_write (priv->cr_wfd, buf, 1); + UNLOCK(&priv->lock); + return ret; +} + +/* Clean up flags set on barrier notification */ +/*TODO: Add changelog barrier stop code with changelog barrier patch*/ +inline void +changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv) +{ + int ret = 0; + + LOCK (&priv->bflags.lock); + priv->bflags.barrier_ext = _gf_false; + UNLOCK (&priv->bflags.lock); + + ret = pthread_mutex_lock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->bn.bnotify = _gf_false; + } + ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + out: + return; +} +/* End: Geo-Rep snapshot dependency changes */ diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h index 53588f55efa..54577592c90 100644 --- a/xlators/features/changelog/src/changelog-helpers.h +++ b/xlators/features/changelog/src/changelog-helpers.h @@ -107,6 +107,9 @@ typedef struct changelog_rollover { pthread_t rollover_th; xlator_t *this; + + /* read end of pipe used as event from barrier on snapshot */ + int rfd; } changelog_rollover_t; typedef struct changelog_fsync { @@ -139,6 +142,57 @@ typedef struct changelog_notify { xlator_t *this; } changelog_notify_t; +/* Draining during changelog rollover (for geo-rep snapshot dependency): + * -------------------------------------------------------------------- + * The introduction of draining of in-transit fops during changelog rollover + * (both explicit/timeout triggered) requires coloring of fops. Basically the + * implementation requires two counters, one counter which keeps the count of + * current intransit fops which should end up in current changelog and the other + * counter to keep track of incoming fops which should be drained as part of + * next changelog rollover event. The fops are colored w.r.t these counters. + * The fops that are to be drained as part of current changelog rollover is + * given one color and the fops which keep incoming during this and not + * necessarily should end up in current changelog and should be drained as part + * of next changelog rollover are given other color. The color switching + * continues with each changelog rollover. Two colors(black and white) are + * chosen here and initially black is chosen is default. + */ + +typedef enum chlog_fop_color { + FOP_COLOR_BLACK, + FOP_COLOR_WHITE +}chlog_fop_color_t; + +/* Barrier notify variable */ +typedef struct barrier_notify { + pthread_mutex_t bnotify_mutex; + pthread_cond_t bnotify_cond; + gf_boolean_t bnotify; +}barrier_notify_t; + +/* Two separate mutex and conditional variable set is used + * to drain white and black fops. */ + +typedef struct drain_mgmt { + pthread_mutex_t drain_black_mutex; + pthread_cond_t drain_black_cond; + pthread_mutex_t drain_white_mutex; + pthread_cond_t drain_white_cond; + /* Represents black fops count in-transit */ + unsigned long black_fop_cnt; + /* Represents white fops count in-transit */ + unsigned long white_fop_cnt; + gf_boolean_t drain_wait_black; + gf_boolean_t drain_wait_white; +}drain_mgmt_t; + +/* Internal and External barrier on/off indicating flags */ +typedef struct barrier_flags { + gf_lock_t lock; + gf_boolean_t barrier_ext; +}barrier_flags_t; + + struct changelog_priv { gf_boolean_t active; @@ -191,6 +245,26 @@ struct changelog_priv { /* encoder */ struct changelog_encoder *ce; + + /* snapshot dependency changes */ + + /* Draining of fops*/ + drain_mgmt_t dm; + + /* Represents the active color. Initially by default black */ + chlog_fop_color_t current_color; + + /* write end of pipe to do explicit rollover on barrier during snap */ + int cr_wfd; + + /* flag to determine explicit rollover is triggered */ + gf_boolean_t explicit_rollover; + + /* barrier notification variable protected by mutex */ + barrier_notify_t bn; + + /* barrier on/off indicating flags */ + barrier_flags_t bflags; }; struct changelog_local { @@ -206,6 +280,9 @@ struct changelog_local { * but we call it as ->prev_entry... ha ha ha */ struct changelog_local *prev_entry; + + /* snap dependency changes */ + chlog_fop_color_t color; }; typedef struct changelog_local changelog_local_t; @@ -311,6 +388,25 @@ changelog_fsync_thread (void *data); int changelog_forget (xlator_t *this, inode_t *inode); +/* Geo-Rep snapshot dependency changes */ +inline void +changelog_color_fop_and_inc_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local); +inline void +changelog_inc_fop_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local); +inline void +changelog_dec_fop_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local); +inline int +changelog_barrier_notify (changelog_priv_t *priv, char* buf); +inline void +changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv); +void +changelog_drain_white_fops (xlator_t *this, changelog_priv_t *priv); +void +changelog_drain_black_fops (xlator_t *this, changelog_priv_t *priv); + /* macros */ #define CHANGELOG_STACK_UNWIND(fop, frame, params ...) do { \ @@ -404,4 +500,41 @@ changelog_forget (xlator_t *this, inode_t *inode); goto label; \ } while (0) +/* Begin: Geo-Rep snapshot dependency changes */ + +#define DICT_ERROR -1 +#define BARRIER_OFF 0 +#define BARRIER_ON 1 +#define DICT_DEFAULT 2 + +#define CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, label) do { \ + if (!priv->active) { \ + gf_log (this->name, GF_LOG_WARNING, \ + "Changelog is not active, return success"); \ + ret = 0; \ + goto label; \ + } \ + } while (0) + +/* Log pthread error and goto label */ +#define CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, label) do { \ + if (ret) { \ + gf_log (this->name, GF_LOG_ERROR, \ + "pthread error: Error: %d", ret); \ + ret = -1; \ + goto label; \ + } \ + } while (0) + +/* Log pthread error, set flag and goto label */ +#define CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret, label, flag) do { \ + if (ret) { \ + gf_log (this->name, GF_LOG_ERROR, \ + "pthread error: Error: %d", ret); \ + ret = -1; \ + flag = _gf_true; \ + goto label; \ + } \ + } while (0) #endif /* _CHANGELOG_HELPERS_H */ +/* End: Geo-Rep snapshot dependency changes */ diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index 3e40984f6de..0a491c5ac07 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -62,6 +62,7 @@ changelog_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (rmdir, frame, op_ret, op_errno, preparent, postparent, xdata); return 0; @@ -94,6 +95,7 @@ changelog_rmdir (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 2); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_rmdir_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir, loc, xflags, xdata); @@ -118,6 +120,7 @@ changelog_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (unlink, frame, op_ret, op_errno, preparent, postparent, xdata); return 0; @@ -150,6 +153,7 @@ changelog_unlink (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 2); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_unlink_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink, loc, xflags, xdata); @@ -177,6 +181,7 @@ changelog_rename_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (rename, frame, op_ret, op_errno, buf, preoldparent, postoldparent, prenewparent, postnewparent, xdata); @@ -216,6 +221,7 @@ changelog_rename (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 3); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_rename_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename, oldloc, newloc, xdata); @@ -242,6 +248,7 @@ changelog_link_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (link, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; @@ -276,6 +283,7 @@ changelog_link (call_frame_t *frame, changelog_set_usable_record_and_length (frame->local, xtra_len, 2); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_link_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->link, oldloc, newloc, xdata); @@ -302,6 +310,7 @@ changelog_mkdir_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (mkdir, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; @@ -353,6 +362,7 @@ changelog_mkdir (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 5); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_mkdir_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->mkdir, loc, mode, umask, xdata); @@ -379,6 +389,7 @@ changelog_symlink_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (symlink, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; @@ -422,6 +433,7 @@ changelog_symlink (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 2); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_symlink_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->symlink, linkname, loc, umask, xdata); @@ -448,6 +460,7 @@ changelog_mknod_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (mknod, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; @@ -500,6 +513,7 @@ changelog_mknod (call_frame_t *frame, changelog_set_usable_record_and_length (frame->local, xtra_len, 5); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_mknod_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->mknod, loc, mode, dev, umask, xdata); @@ -527,6 +541,7 @@ changelog_create_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (create, frame, op_ret, op_errno, fd, inode, buf, preparent, postparent, xdata); @@ -583,6 +598,7 @@ changelog_create (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 5); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_create_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->create, loc, flags, mode, umask, fd, xdata); @@ -615,6 +631,7 @@ changelog_fsetattr_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (fsetattr, frame, op_ret, op_errno, preop_stbuf, postop_stbuf, xdata); @@ -649,6 +666,7 @@ changelog_fsetattr (call_frame_t *frame, changelog_set_usable_record_and_length (frame->local, xtra_len, 1); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_fsetattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetattr, fd, stbuf, valid, xdata); @@ -674,6 +692,7 @@ changelog_setattr_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (setattr, frame, op_ret, op_errno, preop_stbuf, postop_stbuf, xdata); @@ -706,6 +725,7 @@ changelog_setattr (call_frame_t *frame, changelog_set_usable_record_and_length (frame->local, xtra_len, 1); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_setattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->setattr, loc, stbuf, valid, xdata); @@ -730,6 +750,7 @@ changelog_fremovexattr_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (fremovexattr, frame, op_ret, op_errno, xdata); return 0; @@ -758,6 +779,7 @@ changelog_fremovexattr (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 1); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_fremovexattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->fremovexattr, fd, name, xdata); @@ -780,6 +802,7 @@ changelog_removexattr_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (removexattr, frame, op_ret, op_errno, xdata); return 0; @@ -808,6 +831,7 @@ changelog_removexattr (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 1); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_removexattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->removexattr, loc, name, xdata); @@ -832,6 +856,7 @@ changelog_setxattr_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata); return 0; @@ -861,6 +886,7 @@ changelog_setxattr (call_frame_t *frame, changelog_set_usable_record_and_length (frame->local, xtra_len, 1); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_setxattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->setxattr, loc, dict, flags, xdata); @@ -883,6 +909,7 @@ changelog_fsetxattr_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, xdata); return 0; @@ -912,6 +939,7 @@ changelog_fsetxattr (call_frame_t *frame, changelog_set_usable_record_and_length (frame->local, xtra_len, 1); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_fsetxattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetxattr, fd, dict, flags, xdata); @@ -944,6 +972,7 @@ changelog_truncate_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (truncate, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; @@ -962,6 +991,7 @@ changelog_truncate (call_frame_t *frame, loc->inode, loc->inode->gfid, 0); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_truncate_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->truncate, loc, offset, xdata); @@ -985,6 +1015,7 @@ changelog_ftruncate_cbk (call_frame_t *frame, changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (ftruncate, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; @@ -1003,6 +1034,7 @@ changelog_ftruncate (call_frame_t *frame, fd->inode, fd->inode->gfid, 0); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_ftruncate_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->ftruncate, fd, offset, xdata); @@ -1028,6 +1060,7 @@ changelog_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); unwind: + changelog_dec_fop_cnt (this, priv, local); CHANGELOG_STACK_UNWIND (writev, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; @@ -1048,6 +1081,7 @@ changelog_writev (call_frame_t *frame, fd->inode, fd->inode->gfid, 0); wind: + changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_writev_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->writev, fd, vector, count, offset, flags, iobref, xdata); @@ -1089,9 +1123,16 @@ changelog_assign_encoding (changelog_priv_t *priv, char *enc) static void changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv) { + int ret = 0; + if (priv->cr.rollover_th) { changelog_thread_cleanup (this, priv->cr.rollover_th); priv->cr.rollover_th = 0; + ret = close (priv->cr_wfd); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "error closing write end of rollover pipe" + " (reason: %s)", strerror (errno)); } if (priv->cf.fsync_th) { @@ -1105,6 +1146,41 @@ static int changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv) { int ret = 0; + int flags = 0; + int pipe_fd[2] = {0, 0}; + + /* Geo-Rep snapshot dependency: + * + * To implement explicit rollover of changlog journal on barrier + * notification, a pipe is created to communicate between + * 'changelog_rollover' thread and changelog main thread. The select + * call used to wait till roll-over time in changelog_rollover thread + * is modified to wait on read end of the pipe. When barrier + * notification comes (i.e, in 'reconfigure'), select in + * changelog_rollover thread is woken up explicitly by writing into + * the write end of the pipe in 'reconfigure'. + */ + + ret = pipe (pipe_fd); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "Cannot create pipe (reason: %s)", strerror (errno)); + goto out; + } + + /* writer is non-blocking */ + flags = fcntl (pipe_fd[1], F_GETFL); + flags |= O_NONBLOCK; + + ret = fcntl (pipe_fd[1], F_SETFL, flags); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set O_NONBLOCK flag"); + goto out; + } + + priv->cr_wfd = pipe_fd[1]; + priv->cr.rfd = pipe_fd[0]; priv->cr.this = this; ret = gf_thread_create (&priv->cr.rollover_th, @@ -1186,6 +1262,155 @@ changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv) return ret; } +int +notify (xlator_t *this, int event, void *data, ...) +{ + changelog_priv_t *priv = NULL; + dict_t *dict = NULL; + char buf[1] = {1}; + int barrier = DICT_DEFAULT; + gf_boolean_t bclean_req = _gf_false; + int ret = 0; + + priv = this->private; + if (!priv) + goto out; + + if (event == GF_EVENT_TRANSLATOR_OP) { + + dict = data; + /*TODO: Also barrier option is persistent. Need to + * decide on the brick crash scenarios. + */ + barrier = dict_get_str_boolean (dict, "barrier", DICT_DEFAULT); + + switch (barrier) { + case DICT_ERROR: + gf_log (this->name, GF_LOG_ERROR, + "Barrier dict_get_str_boolean failed"); + ret = -1; + goto out; + + case BARRIER_OFF: + gf_log (this->name, GF_LOG_INFO, + "Barrier off notification"); + + CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out); + + LOCK (&priv->bflags.lock); + { + if (priv->bflags.barrier_ext == _gf_false) + ret = -1; + } + UNLOCK (&priv->bflags.lock); + + if (ret == -1 ) { + gf_log (this->name, GF_LOG_ERROR, "Received" + " another barrier off notification" + " while already off"); + goto out; + } + + /*TODO: STOP CHANGELOG BARRIER */ + LOCK (&priv->bflags.lock); + { + priv->bflags.barrier_ext = _gf_false; + } + UNLOCK (&priv->bflags.lock); + + ret = 0; + goto out; + + case BARRIER_ON: + gf_log (this->name, GF_LOG_INFO, + "Barrier on notification"); + + CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out); + + LOCK (&priv->bflags.lock); + { + if (priv->bflags.barrier_ext == _gf_true) + ret = -1; + else + priv->bflags.barrier_ext = _gf_true; + } + UNLOCK (&priv->bflags.lock); + + if (ret == -1 ) { + gf_log (this->name, GF_LOG_ERROR, "Received" + " another barrier on notification when" + " last one is not served yet"); + goto out; + } + + ret = pthread_mutex_lock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, + bclean_req); + { + priv->bn.bnotify = _gf_true; + } + ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, + bclean_req); + + /*TODO: START CHANGELOG BARRIER */ + + ret = changelog_barrier_notify(priv, buf); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Explicit roll over: write failed"); + changelog_barrier_cleanup (this, priv); + ret = -1; + goto out; + } + + ret = pthread_mutex_lock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, + bclean_req); + { + /* The while condition check is required here to + * handle spurious wakeup of cond wait that can + * happen with pthreads. See man page */ + while (priv->bn.bnotify == _gf_true) { + ret = pthread_cond_wait ( + &priv->bn.bnotify_cond, + &priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, + out, + bclean_req); + } + } + ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, bclean_req); + gf_log (this->name, GF_LOG_INFO, + "Woke up: bnotify conditional wait"); + + ret = 0; + goto out; + + case DICT_DEFAULT: + gf_log (this->name, GF_LOG_ERROR, + "barrier key not found"); + ret = -1; + goto out; + + default: + gf_log (this->name, GF_LOG_ERROR, + "Something went bad in dict_get_str_boolean"); + ret = -1; + goto out; + } + } else { + ret = default_notify (this, event, data); + } + + out: + if (bclean_req) + changelog_barrier_cleanup (this, priv); + + return ret; +} + int32_t mem_acct_init (xlator_t *this) { @@ -1261,6 +1486,98 @@ changelog_init (xlator_t *this, changelog_priv_t *priv) return ret; } +/* Init all pthread condition variables and locks in changelog*/ +static int +changelog_pthread_init (xlator_t *this, changelog_priv_t *priv) +{ + gf_boolean_t bn_mutex_init = _gf_false; + gf_boolean_t bn_cond_init = _gf_false; + gf_boolean_t dm_mutex_black_init = _gf_false; + gf_boolean_t dm_cond_black_init = _gf_false; + gf_boolean_t dm_mutex_white_init = _gf_false; + gf_boolean_t dm_cond_white_init = _gf_false; + int ret = 0; + + if ((ret = pthread_mutex_init(&priv->bn.bnotify_mutex, NULL)) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "bnotify pthread_mutex_init failed (%d)", ret); + ret = -1; + goto out; + } + bn_mutex_init = _gf_true; + + if ((ret = pthread_cond_init(&priv->bn.bnotify_cond, NULL)) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "bnotify pthread_cond_init failed (%d)", ret); + ret = -1; + goto out; + } + bn_cond_init = _gf_true; + + if ((ret = pthread_mutex_init(&priv->dm.drain_black_mutex, NULL)) != 0) + { + gf_log (this->name, GF_LOG_ERROR, + "drain_black pthread_mutex_init failed (%d)", ret); + ret = -1; + goto out; + } + dm_mutex_black_init = _gf_true; + + if ((ret = pthread_cond_init(&priv->dm.drain_black_cond, NULL)) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "drain_black pthread_cond_init failed (%d)", ret); + ret = -1; + goto out; + } + dm_cond_black_init = _gf_true; + + if ((ret = pthread_mutex_init(&priv->dm.drain_white_mutex, NULL)) != 0) + { + gf_log (this->name, GF_LOG_ERROR, + "drain_white pthread_mutex_init failed (%d)", ret); + ret = -1; + goto out; + } + dm_mutex_white_init = _gf_true; + + if ((ret = pthread_cond_init(&priv->dm.drain_white_cond, NULL)) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "drain_white pthread_cond_init failed (%d)", ret); + ret = -1; + goto out; + } + dm_cond_white_init = _gf_true; + out: + if (ret) { + if (bn_mutex_init) + pthread_mutex_destroy(&priv->bn.bnotify_mutex); + if (bn_cond_init) + pthread_cond_destroy (&priv->bn.bnotify_cond); + if (dm_mutex_black_init) + pthread_mutex_destroy(&priv->dm.drain_black_mutex); + if (dm_cond_black_init) + pthread_cond_destroy (&priv->dm.drain_black_cond); + if (dm_mutex_white_init) + pthread_mutex_destroy(&priv->dm.drain_white_mutex); + if (dm_cond_white_init) + pthread_cond_destroy (&priv->dm.drain_white_cond); + } + return ret; +} + +/* Destroy all pthread condition variables and locks in changelog */ +static inline void +changelog_pthread_destroy (changelog_priv_t *priv) +{ + pthread_mutex_destroy (&priv->bn.bnotify_mutex); + pthread_cond_destroy (&priv->bn.bnotify_cond); + pthread_mutex_destroy (&priv->dm.drain_black_mutex); + pthread_cond_destroy (&priv->dm.drain_black_cond); + pthread_mutex_destroy (&priv->dm.drain_white_mutex); + pthread_cond_destroy (&priv->dm.drain_white_cond); + LOCK_DESTROY (&priv->bflags.lock); +} + int reconfigure (xlator_t *this, dict_t *options) { @@ -1363,9 +1680,10 @@ reconfigure (xlator_t *this, dict_t *options) int32_t init (xlator_t *this) { - int ret = -1; - char *tmp = NULL; - changelog_priv_t *priv = NULL; + int ret = -1; + char *tmp = NULL; + changelog_priv_t *priv = NULL; + gf_boolean_t cond_lock_init = _gf_false; GF_VALIDATE_OR_GOTO ("changelog", this, out); @@ -1451,6 +1769,24 @@ init (xlator_t *this) goto out; priv->changelog_fd = -1; + + /* snap dependency changes */ + priv->dm.black_fop_cnt = 0; + priv->dm.white_fop_cnt = 0; + priv->dm.drain_wait_black = _gf_false; + priv->dm.drain_wait_white = _gf_false; + priv->current_color = FOP_COLOR_BLACK; + priv->explicit_rollover = _gf_false; + /* Mutex is not needed as threads are not spawned yet */ + priv->bn.bnotify = _gf_false; + ret = changelog_pthread_init (this, priv); + if (ret) + goto out; + + LOCK_INIT (&priv->bflags.lock); + cond_lock_init = _gf_true; + priv->bflags.barrier_ext = _gf_false; + ret = changelog_init (this, priv); if (ret) goto out; @@ -1469,6 +1805,9 @@ init (xlator_t *this) } GF_FREE (priv->changelog_brick); GF_FREE (priv->changelog_dir); + if (cond_lock_init) + changelog_pthread_destroy (priv); + GF_FREE (priv); this->private = NULL; } else @@ -1493,6 +1832,7 @@ fini (xlator_t *this) mem_pool_destroy (this->local_pool); GF_FREE (priv->changelog_brick); GF_FREE (priv->changelog_dir); + changelog_pthread_destroy (priv); GF_FREE (priv); } |