diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog-helpers.c')
-rw-r--r-- | xlators/features/changelog/src/changelog-helpers.c | 333 |
1 files changed, 324 insertions, 9 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c index 91c43a16c86..c3661b9b76c 100644 --- a/xlators/features/changelog/src/changelog-helpers.c +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -24,6 +24,15 @@ #include "changelog-encoders.h" #include <pthread.h> +static void +changelog_cleanup_free_mutex (void *arg_mutex) +{ + pthread_mutex_t *p_mutex = (pthread_mutex_t*) arg_mutex; + + if (p_mutex) + pthread_mutex_unlock(p_mutex); +} + void changelog_thread_cleanup (xlator_t *this, pthread_t thr_id) { @@ -134,6 +143,12 @@ changelog_rollover_changelog (xlator_t *this, char nfile[PATH_MAX] = {0,}; if (priv->changelog_fd != -1) { + ret = fsync (priv->changelog_fd); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "fsync failed (reason: %s)", + strerror (errno)); + } close (priv->changelog_fd); priv->changelog_fd = -1; } @@ -166,9 +181,34 @@ changelog_rollover_changelog (xlator_t *this, gf_log (this->name, GF_LOG_ERROR, "Failed to send file name to notify thread" " (reason: %s)", strerror (errno)); + } else { + /* If this is explicit rollover initiated by snapshot, + * wakeup reconfigure thread waiting for changelog to + * rollover + */ + if (priv->explicit_rollover) { + priv->explicit_rollover = _gf_false; + ret = pthread_mutex_lock ( + &priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->bn.bnotify = _gf_false; + ret = pthread_cond_signal ( + &priv->bn.bnotify_cond); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, + out); + gf_log (this->name, GF_LOG_INFO, + "Changelog published and" + " signalled bnotify"); + } + ret = pthread_mutex_unlock ( + &priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } } } + out: return ret; } @@ -313,9 +353,11 @@ changelog_local_init (xlator_t *this, inode_t *inode, uuid_t gfid, int xtra_records, gf_boolean_t update_flag) { + changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; struct iobuf *iobuf = NULL; + priv = this->private; /** * We relax the presence of inode if @update_flag is true. * The caller (implmentation of the fop) needs to be careful to @@ -378,6 +420,80 @@ changelog_inject_single_event (xlator_t *this, return priv->cd.dispatchfn (this, priv, priv->cd.cd_data, cld, NULL); } +/* Wait till all the black fops are drained */ +void +changelog_drain_black_fops (xlator_t *this, changelog_priv_t *priv) +{ + int ret = 0; + + /* clean up framework of pthread_mutex is required here as + * 'reconfigure' terminates the changelog_rollover thread + * on graph change. + */ + pthread_cleanup_push (changelog_cleanup_free_mutex, + &priv->dm.drain_black_mutex); + ret = pthread_mutex_lock (&priv->dm.drain_black_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread error:" + " Error:%d", ret); + while (priv->dm.black_fop_cnt > 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Condtional wait on black fops: %ld", + priv->dm.black_fop_cnt); + priv->dm.drain_wait_black = _gf_true; + ret = pthread_cond_wait (&priv->dm.drain_black_cond, + &priv->dm.drain_black_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread" + " cond wait failed: Error:%d", ret); + } + priv->dm.drain_wait_black = _gf_false; + ret = pthread_mutex_unlock (&priv->dm.drain_black_mutex); + pthread_cleanup_pop (0); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread error:" + " Error:%d", ret); + gf_log (this->name, GF_LOG_DEBUG, + "Woke up: Conditional wait on black fops"); +} + +/* Wait till all the white fops are drained */ +void +changelog_drain_white_fops (xlator_t *this, changelog_priv_t *priv) +{ + int ret = 0; + + /* clean up framework of pthread_mutex is required here as + * 'reconfigure' terminates the changelog_rollover thread + * on graph change. + */ + pthread_cleanup_push (changelog_cleanup_free_mutex, + &priv->dm.drain_white_mutex); + ret = pthread_mutex_lock (&priv->dm.drain_white_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread error:" + " Error:%d", ret); + while (priv->dm.white_fop_cnt > 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Condtional wait on white fops : %ld", + priv->dm.white_fop_cnt); + priv->dm.drain_wait_white = _gf_true; + ret = pthread_cond_wait (&priv->dm.drain_white_cond, + &priv->dm.drain_white_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread" + " cond wait failed: Error:%d", ret); + } + priv->dm.drain_wait_white = _gf_false; + ret = pthread_mutex_unlock (&priv->dm.drain_white_mutex); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "pthread error:" + " Error:%d", ret); + pthread_cleanup_pop (0); + gf_log (this->name, GF_LOG_DEBUG, + "Woke up: Conditional wait on white fops"); +} + /** * TODO: these threads have many thing in common (wake up after * a certain time etc..). move them into separate routine. @@ -385,12 +501,17 @@ changelog_inject_single_event (xlator_t *this, void * changelog_rollover (void *data) { - int ret = 0; - xlator_t *this = NULL; - struct timeval tv = {0,}; - changelog_log_data_t cld = {0,}; - changelog_time_slice_t *slice = NULL; - changelog_priv_t *priv = data; + int ret = 0; + xlator_t *this = NULL; + struct timeval tv = {0,}; + changelog_log_data_t cld = {0,}; + changelog_time_slice_t *slice = NULL; + changelog_priv_t *priv = data; + int max_fd = 0; + char buf[1] = {0}; + int len = 0; + + fd_set rset; this = priv->cr.this; slice = &priv->slice; @@ -398,10 +519,62 @@ changelog_rollover (void *data) while (1) { tv.tv_sec = priv->rollover_time; tv.tv_usec = 0; - - ret = select (0, NULL, NULL, NULL, &tv); - if (ret) + FD_ZERO(&rset); + FD_SET(priv->cr.rfd, &rset); + max_fd = priv->cr.rfd; + max_fd = max_fd + 1; + + /* It seems there is a race between actual rollover and explicit + * rollover. But it is handled. If actual rollover is being + * done and the explicit rollover event comes, the event is + * not missed. The next select will immediately wakeup to + * handle explicit wakeup. + */ + + ret = select (max_fd, &rset, NULL, NULL, &tv); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "select failed: %s", strerror(errno)); continue; + } else if (ret && FD_ISSET(priv->cr.rfd, &rset)) { + gf_log (this->name, GF_LOG_INFO, + "Explicit wakeup of select on barrier notify"); + len = read(priv->cr.rfd, buf, 1); + if (len == 0) { + gf_log (this->name, GF_LOG_ERROR, "BUG: Got EOF" + " from reconfigure notification pipe"); + continue; + } + if (len < 0) { + gf_log (this->name, GF_LOG_ERROR, + "Failed to read wakeup data"); + continue; + } + /* Lock is not required as same thread is modifying.*/ + priv->explicit_rollover = _gf_true; + } else { + gf_log (this->name, GF_LOG_DEBUG, + "select wokeup on timeout"); + } + + /* Reading curent_color without lock is fine here + * as it is only modified here and is next to reading. + */ + if (priv->current_color == FOP_COLOR_BLACK) { + LOCK(&priv->lock); + priv->current_color = FOP_COLOR_WHITE; + UNLOCK(&priv->lock); + gf_log (this->name, GF_LOG_DEBUG, "Black fops" + " to be drained:%ld",priv->dm.black_fop_cnt); + changelog_drain_black_fops (this, priv); + } else { + LOCK(&priv->lock); + priv->current_color = FOP_COLOR_BLACK; + UNLOCK(&priv->lock); + gf_log (this->name, GF_LOG_DEBUG, "White fops" + " to be drained:%ld",priv->dm.white_fop_cnt); + changelog_drain_white_fops (this, priv); + } ret = changelog_fill_rollover_data (&cld, _gf_false); if (ret) { @@ -694,3 +867,145 @@ changelog_update (xlator_t *this, changelog_priv_t *priv, return; } + +/* Begin: Geo-rep snapshot dependency changes */ + +/* changelog_color_fop_and_inc_cnt: Assign color and inc fop cnt. + * + * Assigning color and increment of corresponding fop count should happen + * in a lock (i.e., there should be no window between them). If it does not, + * we might miss draining those fops which are colored but not yet incremented + * the count. Let's assume black fops are draining. If the black fop count + * reaches zero, we say draining is completed but we miss black fops which are + * not incremented fop count but color is assigned black. + */ + +inline void +changelog_color_fop_and_inc_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local) +{ + if (!priv || !local) + return; + + LOCK (&priv->lock); + { + local->color = priv->current_color; + changelog_inc_fop_cnt (this, priv, local); + } + UNLOCK (&priv->lock); +} + +/* Increments the respective fop counter based on the fop color */ +inline void +changelog_inc_fop_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local) +{ + int ret = 0; + + if (local) { + if (local->color == FOP_COLOR_BLACK) { + ret = pthread_mutex_lock (&priv->dm.drain_black_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->dm.black_fop_cnt++; + } + ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } else { + ret = pthread_mutex_lock (&priv->dm.drain_white_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->dm.white_fop_cnt++; + } + ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } + } + out: + return; +} + +/* Decrements the respective fop counter based on the fop color */ +inline void +changelog_dec_fop_cnt (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local) +{ + int ret = 0; + + if (local) { + if (local->color == FOP_COLOR_BLACK) { + ret = pthread_mutex_lock (&priv->dm.drain_black_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->dm.black_fop_cnt--; + if (priv->dm.black_fop_cnt == 0 && + priv->dm.drain_wait_black == _gf_true) { + ret = pthread_cond_signal ( + &priv->dm.drain_black_cond); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, + out); + gf_log (this->name, GF_LOG_DEBUG, + "Signalled draining of black"); + } + } + ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } else { + ret = pthread_mutex_lock (&priv->dm.drain_white_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->dm.white_fop_cnt--; + if (priv->dm.white_fop_cnt == 0 && + priv->dm.drain_wait_white == _gf_true) { + ret = pthread_cond_signal ( + &priv->dm.drain_white_cond); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, + out); + gf_log (this->name, GF_LOG_DEBUG, + "Signalled draining of white"); + } + } + ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + } + } + out: + return; +} + +/* Write to a pipe setup between changelog main thread and changelog + * rollover thread to initiate explicit rollover of changelog journal. + */ +inline int +changelog_barrier_notify (changelog_priv_t *priv, char *buf) +{ + int ret = 0; + + LOCK(&priv->lock); + ret = changelog_write (priv->cr_wfd, buf, 1); + UNLOCK(&priv->lock); + return ret; +} + +/* Clean up flags set on barrier notification */ +/*TODO: Add changelog barrier stop code with changelog barrier patch*/ +inline void +changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv) +{ + int ret = 0; + + LOCK (&priv->bflags.lock); + priv->bflags.barrier_ext = _gf_false; + UNLOCK (&priv->bflags.lock); + + ret = pthread_mutex_lock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->bn.bnotify = _gf_false; + } + ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + out: + return; +} +/* End: Geo-Rep snapshot dependency changes */ |