diff options
| -rw-r--r-- | xlators/features/changelog/src/changelog-helpers.c | 333 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-helpers.h | 133 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog.c | 346 | 
3 files changed, 800 insertions, 12 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c index 91c43a16c86..c3661b9b76c 100644 --- a/xlators/features/changelog/src/changelog-helpers.c +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -24,6 +24,15 @@  #include "changelog-encoders.h"  #include <pthread.h> +static void +changelog_cleanup_free_mutex (void *arg_mutex) +{ +    pthread_mutex_t *p_mutex = (pthread_mutex_t*) arg_mutex; + +    if (p_mutex) +            pthread_mutex_unlock(p_mutex); +} +  void  changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)  { @@ -134,6 +143,12 @@ changelog_rollover_changelog (xlator_t *this,          char nfile[PATH_MAX] = {0,};          if (priv->changelog_fd != -1) { +                ret = fsync (priv->changelog_fd); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "fsync failed (reason: %s)", +                                strerror (errno)); +                }                  close (priv->changelog_fd);                  priv->changelog_fd = -1;          } @@ -166,9 +181,34 @@ changelog_rollover_changelog (xlator_t *this,                          gf_log (this->name, GF_LOG_ERROR,                                  "Failed to send file name to notify thread"                                  " (reason: %s)", strerror (errno)); +                } else { +                        /* If this is explicit rollover initiated by snapshot, +                         * wakeup reconfigure thread waiting for changelog to +                         * rollover +                         */ +                        if (priv->explicit_rollover) { +                                priv->explicit_rollover = _gf_false; +                                ret = pthread_mutex_lock ( +                                                   &priv->bn.bnotify_mutex); +                                CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +                                { +                                         priv->bn.bnotify = _gf_false; +                                         ret = pthread_cond_signal ( +                                                        &priv->bn.bnotify_cond); +                                         CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, +                                                                           out); +                                         gf_log (this->name, GF_LOG_INFO, +                                                 "Changelog published and" +                                                 " signalled bnotify"); +                                } +                                ret = pthread_mutex_unlock ( +                                                       &priv->bn.bnotify_mutex); +                                CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +                        }                  }          } + out:          return ret;  } @@ -313,9 +353,11 @@ changelog_local_init (xlator_t *this, inode_t *inode,                        uuid_t gfid, int xtra_records,                        gf_boolean_t update_flag)  { +        changelog_priv_t  *priv  = NULL;          changelog_local_t *local = NULL;          struct iobuf      *iobuf = NULL; +        priv = this->private;          /**           * We relax the presence of inode if @update_flag is true.           * The caller (implmentation of the fop) needs to be careful to @@ -378,6 +420,80 @@ changelog_inject_single_event (xlator_t *this,          return priv->cd.dispatchfn (this, priv, priv->cd.cd_data, cld, NULL);  } +/* Wait till all the black fops are drained */ +void +changelog_drain_black_fops (xlator_t *this, changelog_priv_t *priv) +{ +        int ret = 0; + +        /* clean up framework of pthread_mutex is required here as +         * 'reconfigure' terminates the changelog_rollover thread +         * on graph change. +         */ +        pthread_cleanup_push (changelog_cleanup_free_mutex, +                                        &priv->dm.drain_black_mutex); +        ret = pthread_mutex_lock (&priv->dm.drain_black_mutex); +        if (ret) +                gf_log (this->name, GF_LOG_ERROR, "pthread error:" +                        " Error:%d", ret); +        while (priv->dm.black_fop_cnt > 0) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "Condtional wait on black fops: %ld", +                        priv->dm.black_fop_cnt); +                priv->dm.drain_wait_black = _gf_true; +                ret = pthread_cond_wait (&priv->dm.drain_black_cond, +                                         &priv->dm.drain_black_mutex); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, "pthread" +                                " cond wait failed: Error:%d", ret); +        } +        priv->dm.drain_wait_black = _gf_false; +        ret = pthread_mutex_unlock (&priv->dm.drain_black_mutex); +        pthread_cleanup_pop (0); +        if (ret) +                gf_log (this->name, GF_LOG_ERROR, "pthread error:" +                        " Error:%d", ret); +        gf_log (this->name, GF_LOG_DEBUG, +                "Woke up: Conditional wait on black fops"); +} + +/* Wait till all the white  fops are drained */ +void +changelog_drain_white_fops (xlator_t *this, changelog_priv_t *priv) +{ +        int ret = 0; + +        /* clean up framework of pthread_mutex is required here as +         * 'reconfigure' terminates the changelog_rollover thread +         * on graph change. +         */ +        pthread_cleanup_push (changelog_cleanup_free_mutex, +                                        &priv->dm.drain_white_mutex); +        ret = pthread_mutex_lock (&priv->dm.drain_white_mutex); +        if (ret) +                gf_log (this->name, GF_LOG_ERROR, "pthread error:" +                        " Error:%d", ret); +        while (priv->dm.white_fop_cnt > 0) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "Condtional wait on white fops : %ld", +                        priv->dm.white_fop_cnt); +                priv->dm.drain_wait_white = _gf_true; +                ret = pthread_cond_wait (&priv->dm.drain_white_cond, +                                         &priv->dm.drain_white_mutex); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, "pthread" +                                " cond wait failed: Error:%d", ret); +        } +        priv->dm.drain_wait_white = _gf_false; +        ret = pthread_mutex_unlock (&priv->dm.drain_white_mutex); +        if (ret) +                gf_log (this->name, GF_LOG_ERROR, "pthread error:" +                        " Error:%d", ret); +        pthread_cleanup_pop (0); +        gf_log (this->name, GF_LOG_DEBUG, +                "Woke up: Conditional wait on white fops"); +} +  /**   * TODO: these threads have many thing in common (wake up after   * a certain time etc..). move them into separate routine. @@ -385,12 +501,17 @@ changelog_inject_single_event (xlator_t *this,  void *  changelog_rollover (void *data)  { -        int                     ret   = 0; -        xlator_t               *this  = NULL; -        struct timeval          tv    = {0,}; -        changelog_log_data_t    cld   = {0,}; -        changelog_time_slice_t *slice = NULL; -        changelog_priv_t       *priv  = data; +        int                     ret             = 0; +        xlator_t               *this            = NULL; +        struct timeval          tv              = {0,}; +        changelog_log_data_t    cld             = {0,}; +        changelog_time_slice_t *slice           = NULL; +        changelog_priv_t       *priv            = data; +        int                     max_fd          = 0; +        char                    buf[1]          = {0}; +        int                     len             = 0; + +        fd_set                  rset;          this = priv->cr.this;          slice = &priv->slice; @@ -398,10 +519,62 @@ changelog_rollover (void *data)          while (1) {                  tv.tv_sec  = priv->rollover_time;                  tv.tv_usec = 0; - -                ret = select (0, NULL, NULL, NULL, &tv); -                if (ret) +                FD_ZERO(&rset); +                FD_SET(priv->cr.rfd, &rset); +                max_fd = priv->cr.rfd; +                max_fd = max_fd + 1; + +               /* It seems there is a race between actual rollover and explicit +                * rollover. But it is handled. If actual rollover is being +                * done and the explicit rollover event comes, the event is +                * not missed. The next select will immediately wakeup to +                * handle explicit wakeup. +                */ + +                ret = select (max_fd, &rset, NULL, NULL, &tv); +                if (ret == -1) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "select failed: %s", strerror(errno));                          continue; +                } else if (ret && FD_ISSET(priv->cr.rfd, &rset)) { +                        gf_log (this->name, GF_LOG_INFO, +                                "Explicit wakeup of select on barrier notify"); +                        len = read(priv->cr.rfd, buf, 1); +                        if (len == 0) { +                                gf_log (this->name, GF_LOG_ERROR, "BUG: Got EOF" +                                        " from reconfigure notification pipe"); +                                continue; +                        } +                        if (len < 0) { +                                gf_log (this->name, GF_LOG_ERROR, +                                        "Failed to read wakeup data"); +                                continue; +                        } +                        /* Lock is not required as same thread is modifying.*/ +                        priv->explicit_rollover = _gf_true; +                } else { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "select wokeup on timeout"); +                } + +               /* Reading curent_color without lock is fine here +                * as it is only modified here and is next to reading. +                */ +                if (priv->current_color == FOP_COLOR_BLACK) { +                        LOCK(&priv->lock); +                                priv->current_color = FOP_COLOR_WHITE; +                        UNLOCK(&priv->lock); +                        gf_log (this->name, GF_LOG_DEBUG, "Black fops" +                                " to be drained:%ld",priv->dm.black_fop_cnt); +                        changelog_drain_black_fops (this, priv); +                } else { +                        LOCK(&priv->lock); +                                priv->current_color = FOP_COLOR_BLACK; +                        UNLOCK(&priv->lock); +                        gf_log (this->name, GF_LOG_DEBUG, "White fops" +                                " to be drained:%ld",priv->dm.white_fop_cnt); +                        changelog_drain_white_fops (this, priv); +                }                  ret = changelog_fill_rollover_data (&cld, _gf_false);                  if (ret) { @@ -694,3 +867,145 @@ changelog_update (xlator_t *this, changelog_priv_t *priv,          return;  } + +/* Begin: Geo-rep snapshot dependency changes */ + +/* changelog_color_fop_and_inc_cnt: Assign color and inc fop cnt. + * + * Assigning color and increment of corresponding fop count should happen + * in a lock (i.e., there should be no window between them). If it does not, + * we might miss draining those fops which are colored but not yet incremented + * the count. Let's assume black fops are draining. If the black fop count + * reaches zero, we say draining is completed but we miss black fops which are + * not incremented fop count but color is assigned black. + */ + +inline void +changelog_color_fop_and_inc_cnt (xlator_t *this, changelog_priv_t *priv, +                                                 changelog_local_t *local) +{ +        if (!priv || !local) +                return; + +        LOCK (&priv->lock); +        { +                local->color = priv->current_color; +                changelog_inc_fop_cnt (this, priv, local); +        } +        UNLOCK (&priv->lock); +} + +/* Increments the respective fop counter based on the fop color */ +inline void +changelog_inc_fop_cnt (xlator_t *this, changelog_priv_t *priv, +                                       changelog_local_t *local) +{ +        int ret = 0; + +        if (local) { +                if (local->color == FOP_COLOR_BLACK) { +                        ret = pthread_mutex_lock (&priv->dm.drain_black_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +                        { +                                priv->dm.black_fop_cnt++; +                        } +                        ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +                } else { +                        ret = pthread_mutex_lock (&priv->dm.drain_white_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +                        { +                                priv->dm.white_fop_cnt++; +                        } +                        ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +                } +        } + out: +        return; +} + +/* Decrements the respective fop counter based on the fop color */ +inline void +changelog_dec_fop_cnt (xlator_t *this, changelog_priv_t *priv, +                                       changelog_local_t *local) +{ +        int ret = 0; + +        if (local) { +                if (local->color == FOP_COLOR_BLACK) { +                        ret = pthread_mutex_lock (&priv->dm.drain_black_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +                        { +                                priv->dm.black_fop_cnt--; +                                if (priv->dm.black_fop_cnt == 0 && +                                    priv->dm.drain_wait_black == _gf_true) { +                                        ret = pthread_cond_signal ( +                                                    &priv->dm.drain_black_cond); +                                        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, +                                                                           out); +                                        gf_log (this->name, GF_LOG_DEBUG, +                                                "Signalled draining of black"); +                                } +                        } +                        ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +                } else { +                        ret = pthread_mutex_lock (&priv->dm.drain_white_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +                        { +                                priv->dm.white_fop_cnt--; +                                if (priv->dm.white_fop_cnt == 0 && +                                    priv->dm.drain_wait_white == _gf_true) { +                                        ret = pthread_cond_signal ( +                                                    &priv->dm.drain_white_cond); +                                        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, +                                                                           out); +                                        gf_log (this->name, GF_LOG_DEBUG, +                                                "Signalled draining of white"); +                                } +                        } +                        ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +                } +        } + out: +        return; +} + +/* Write to a pipe setup between changelog main thread and changelog + * rollover thread to initiate explicit rollover of changelog journal. + */ +inline int +changelog_barrier_notify (changelog_priv_t *priv, char *buf) +{ +        int ret = 0; + +        LOCK(&priv->lock); +                ret = changelog_write (priv->cr_wfd, buf, 1); +        UNLOCK(&priv->lock); +        return ret; +} + +/* Clean up flags set on barrier notification */ +/*TODO: Add changelog barrier stop code with changelog barrier patch*/ +inline void +changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv) +{ +        int ret = 0; + +        LOCK (&priv->bflags.lock); +                priv->bflags.barrier_ext = _gf_false; +        UNLOCK (&priv->bflags.lock); + +        ret = pthread_mutex_lock (&priv->bn.bnotify_mutex); +        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +        { +                priv->bn.bnotify = _gf_false; +        } +        ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex); +        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + out: +        return; +} +/* End: Geo-Rep snapshot dependency changes */ diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h index 53588f55efa..54577592c90 100644 --- a/xlators/features/changelog/src/changelog-helpers.h +++ b/xlators/features/changelog/src/changelog-helpers.h @@ -107,6 +107,9 @@ typedef struct changelog_rollover {          pthread_t rollover_th;          xlator_t *this; + +        /* read end of pipe used as event from barrier on snapshot */ +        int rfd;  } changelog_rollover_t;  typedef struct changelog_fsync { @@ -139,6 +142,57 @@ typedef struct changelog_notify {          xlator_t *this;  } changelog_notify_t; +/* Draining during changelog rollover (for geo-rep snapshot dependency): + * -------------------------------------------------------------------- + * The introduction of draining of in-transit fops during changelog rollover + * (both explicit/timeout triggered) requires coloring of fops. Basically the + * implementation requires two counters, one counter which keeps the count of + * current intransit fops which should end up in current changelog and the other + * counter to keep track of incoming fops which should be drained as part of + * next changelog rollover event. The fops are colored w.r.t these counters. + * The fops that are to be drained as part of current changelog rollover is + * given one color and the fops which keep incoming during this and not + * necessarily should end up in current changelog and should be drained as part + * of next changelog rollover are given other color. The color switching + * continues with each changelog rollover. Two colors(black and white) are + * chosen here and initially black is chosen is default. + */ + +typedef enum chlog_fop_color { +         FOP_COLOR_BLACK, +         FOP_COLOR_WHITE +}chlog_fop_color_t; + +/* Barrier notify variable */ +typedef struct barrier_notify { +         pthread_mutex_t        bnotify_mutex; +         pthread_cond_t         bnotify_cond; +         gf_boolean_t           bnotify; +}barrier_notify_t; + +/* Two separate mutex and conditional variable set is used + * to drain white and black fops. */ + +typedef struct drain_mgmt { +         pthread_mutex_t        drain_black_mutex; +         pthread_cond_t         drain_black_cond; +         pthread_mutex_t        drain_white_mutex; +         pthread_cond_t         drain_white_cond; +         /* Represents black fops count in-transit */ +         unsigned long          black_fop_cnt; +         /* Represents white fops count in-transit */ +         unsigned long          white_fop_cnt; +         gf_boolean_t           drain_wait_black; +         gf_boolean_t           drain_wait_white; +}drain_mgmt_t; + +/* Internal and External barrier on/off indicating flags */ +typedef struct barrier_flags { +        gf_lock_t lock; +        gf_boolean_t barrier_ext; +}barrier_flags_t; + +  struct changelog_priv {          gf_boolean_t active; @@ -191,6 +245,26 @@ struct changelog_priv {          /* encoder */          struct changelog_encoder *ce; + +        /* snapshot dependency changes */ + +        /* Draining of fops*/ +        drain_mgmt_t dm; + +        /* Represents the active color. Initially by default black */ +        chlog_fop_color_t current_color; + +        /* write end of pipe to do explicit rollover on barrier during snap */ +        int cr_wfd; + +        /* flag to determine explicit rollover is triggered */ +        gf_boolean_t explicit_rollover; + +        /* barrier notification variable protected by mutex */ +        barrier_notify_t bn; + +        /* barrier on/off indicating flags */ +        barrier_flags_t bflags;  };  struct changelog_local { @@ -206,6 +280,9 @@ struct changelog_local {           * but we call it as ->prev_entry... ha ha ha           */          struct changelog_local *prev_entry; + +        /* snap dependency changes */ +        chlog_fop_color_t color;  };  typedef struct changelog_local changelog_local_t; @@ -311,6 +388,25 @@ changelog_fsync_thread (void *data);  int  changelog_forget (xlator_t *this, inode_t *inode); +/* Geo-Rep snapshot dependency changes */ +inline void +changelog_color_fop_and_inc_cnt (xlator_t *this, changelog_priv_t *priv, +                                                 changelog_local_t *local); +inline void +changelog_inc_fop_cnt (xlator_t *this, changelog_priv_t *priv, +                                       changelog_local_t *local); +inline void +changelog_dec_fop_cnt (xlator_t *this, changelog_priv_t *priv, +                                       changelog_local_t *local); +inline int +changelog_barrier_notify (changelog_priv_t *priv, char* buf); +inline void +changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv); +void +changelog_drain_white_fops (xlator_t *this, changelog_priv_t *priv); +void +changelog_drain_black_fops (xlator_t *this, changelog_priv_t *priv); +  /* macros */  #define CHANGELOG_STACK_UNWIND(fop, frame, params ...) do {             \ @@ -404,4 +500,41 @@ changelog_forget (xlator_t *this, inode_t *inode);                          goto label;                                    \          } while (0) +/* Begin: Geo-Rep snapshot dependency changes */ + +#define DICT_ERROR         -1 +#define BARRIER_OFF         0 +#define BARRIER_ON          1 +#define DICT_DEFAULT        2 + +#define CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, label) do {                      \ +                if (!priv->active) {                                           \ +                        gf_log (this->name, GF_LOG_WARNING,                    \ +                                "Changelog is not active, return success");    \ +                        ret = 0;                                               \ +                        goto label;                                            \ +                }                                                              \ +        } while (0) + +/* Log pthread error and goto label */ +#define CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, label) do {                      \ +                if (ret) {                                                     \ +                        gf_log (this->name, GF_LOG_ERROR,                      \ +                                "pthread error: Error: %d", ret);              \ +                        ret = -1;                                              \ +                        goto label;                                            \ +                }                                                              \ +        } while (0) + +/* Log pthread error, set flag and goto label */ +#define CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret, label, flag) do {                \ +                if (ret) {                                                     \ +                        gf_log (this->name, GF_LOG_ERROR,                      \ +                                "pthread error: Error: %d", ret);              \ +                        ret = -1;                                              \ +                        flag = _gf_true;                                       \ +                        goto label;                                            \ +                }                                                              \ +        } while (0)  #endif /* _CHANGELOG_HELPERS_H */ +/* End: Geo-Rep snapshot dependency changes */ diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index 3e40984f6de..0a491c5ac07 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -62,6 +62,7 @@ changelog_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (rmdir, frame, op_ret, op_errno,                                  preparent, postparent, xdata);          return 0; @@ -94,6 +95,7 @@ changelog_rmdir (call_frame_t *frame, xlator_t *this,          changelog_set_usable_record_and_length (frame->local, xtra_len, 2);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_rmdir_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir,                      loc, xflags, xdata); @@ -118,6 +120,7 @@ changelog_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (unlink, frame, op_ret, op_errno,                                  preparent, postparent, xdata);          return 0; @@ -150,6 +153,7 @@ changelog_unlink (call_frame_t *frame, xlator_t *this,          changelog_set_usable_record_and_length (frame->local, xtra_len, 2);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_unlink_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink,                      loc, xflags, xdata); @@ -177,6 +181,7 @@ changelog_rename_cbk (call_frame_t *frame,          changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (rename, frame, op_ret, op_errno,                                  buf, preoldparent, postoldparent,                                  prenewparent, postnewparent, xdata); @@ -216,6 +221,7 @@ changelog_rename (call_frame_t *frame, xlator_t *this,          changelog_set_usable_record_and_length (frame->local, xtra_len, 3);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_rename_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename,                      oldloc, newloc, xdata); @@ -242,6 +248,7 @@ changelog_link_cbk (call_frame_t *frame,          changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (link, frame, op_ret, op_errno,                                  inode, buf, preparent, postparent, xdata);          return 0; @@ -276,6 +283,7 @@ changelog_link (call_frame_t *frame,          changelog_set_usable_record_and_length (frame->local, xtra_len, 2);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_link_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->link,                      oldloc, newloc, xdata); @@ -302,6 +310,7 @@ changelog_mkdir_cbk (call_frame_t *frame,          changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (mkdir, frame, op_ret, op_errno,                                  inode, buf, preparent, postparent, xdata);          return 0; @@ -353,6 +362,7 @@ changelog_mkdir (call_frame_t *frame, xlator_t *this,          changelog_set_usable_record_and_length (frame->local, xtra_len, 5);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_mkdir_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->mkdir,                      loc, mode, umask, xdata); @@ -379,6 +389,7 @@ changelog_symlink_cbk (call_frame_t *frame,          changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (symlink, frame, op_ret, op_errno,                                  inode, buf, preparent, postparent, xdata);          return 0; @@ -422,6 +433,7 @@ changelog_symlink (call_frame_t *frame, xlator_t *this,          changelog_set_usable_record_and_length (frame->local, xtra_len, 2);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_symlink_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->symlink,                      linkname, loc, umask, xdata); @@ -448,6 +460,7 @@ changelog_mknod_cbk (call_frame_t *frame,          changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (mknod, frame, op_ret, op_errno,                                  inode, buf, preparent, postparent, xdata);          return 0; @@ -500,6 +513,7 @@ changelog_mknod (call_frame_t *frame,          changelog_set_usable_record_and_length (frame->local, xtra_len, 5);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_mknod_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->mknod,                      loc, mode, dev, umask, xdata); @@ -527,6 +541,7 @@ changelog_create_cbk (call_frame_t *frame,          changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (create, frame,                                  op_ret, op_errno, fd, inode,                                  buf, preparent, postparent, xdata); @@ -583,6 +598,7 @@ changelog_create (call_frame_t *frame, xlator_t *this,          changelog_set_usable_record_and_length (frame->local, xtra_len, 5);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_create_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->create,                      loc, flags, mode, umask, fd, xdata); @@ -615,6 +631,7 @@ changelog_fsetattr_cbk (call_frame_t *frame,          changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (fsetattr, frame, op_ret, op_errno,                                  preop_stbuf, postop_stbuf, xdata); @@ -649,6 +666,7 @@ changelog_fsetattr (call_frame_t *frame,          changelog_set_usable_record_and_length (frame->local, xtra_len, 1);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_fsetattr_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetattr,                      fd, stbuf, valid, xdata); @@ -674,6 +692,7 @@ changelog_setattr_cbk (call_frame_t *frame,          changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (setattr, frame, op_ret, op_errno,                                  preop_stbuf, postop_stbuf, xdata); @@ -706,6 +725,7 @@ changelog_setattr (call_frame_t *frame,          changelog_set_usable_record_and_length (frame->local, xtra_len, 1);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_setattr_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->setattr,                      loc, stbuf, valid, xdata); @@ -730,6 +750,7 @@ changelog_fremovexattr_cbk (call_frame_t *frame,          changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (fremovexattr, frame, op_ret, op_errno, xdata);          return 0; @@ -758,6 +779,7 @@ changelog_fremovexattr (call_frame_t *frame, xlator_t *this,          changelog_set_usable_record_and_length (frame->local, xtra_len, 1);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_fremovexattr_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->fremovexattr,                      fd, name, xdata); @@ -780,6 +802,7 @@ changelog_removexattr_cbk (call_frame_t *frame,          changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (removexattr, frame, op_ret, op_errno, xdata);          return 0; @@ -808,6 +831,7 @@ changelog_removexattr (call_frame_t *frame, xlator_t *this,          changelog_set_usable_record_and_length (frame->local, xtra_len, 1);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_removexattr_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->removexattr,                      loc, name, xdata); @@ -832,6 +856,7 @@ changelog_setxattr_cbk (call_frame_t *frame,          changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata);          return 0; @@ -861,6 +886,7 @@ changelog_setxattr (call_frame_t *frame,          changelog_set_usable_record_and_length (frame->local, xtra_len, 1);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_setxattr_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->setxattr,                      loc, dict, flags, xdata); @@ -883,6 +909,7 @@ changelog_fsetxattr_cbk (call_frame_t *frame,          changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, xdata);          return 0; @@ -912,6 +939,7 @@ changelog_fsetxattr (call_frame_t *frame,          changelog_set_usable_record_and_length (frame->local, xtra_len, 1);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_fsetxattr_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetxattr,                      fd, dict, flags, xdata); @@ -944,6 +972,7 @@ changelog_truncate_cbk (call_frame_t *frame,          changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (truncate, frame,                                  op_ret, op_errno, prebuf, postbuf, xdata);          return 0; @@ -962,6 +991,7 @@ changelog_truncate (call_frame_t *frame,                          loc->inode, loc->inode->gfid, 0);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_truncate_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->truncate,                      loc, offset, xdata); @@ -985,6 +1015,7 @@ changelog_ftruncate_cbk (call_frame_t *frame,          changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (ftruncate, frame,                                  op_ret, op_errno, prebuf, postbuf, xdata);          return 0; @@ -1003,6 +1034,7 @@ changelog_ftruncate (call_frame_t *frame,                          fd->inode, fd->inode->gfid, 0);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_ftruncate_cbk,                      FIRST_CHILD (this), FIRST_CHILD (this)->fops->ftruncate,                      fd, offset, xdata); @@ -1028,6 +1060,7 @@ changelog_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);   unwind: +        changelog_dec_fop_cnt (this, priv, local);          CHANGELOG_STACK_UNWIND (writev, frame,                                  op_ret, op_errno, prebuf, postbuf, xdata);          return 0; @@ -1048,6 +1081,7 @@ changelog_writev (call_frame_t *frame,                          fd->inode, fd->inode->gfid, 0);   wind: +        changelog_color_fop_and_inc_cnt (this, priv, frame->local);          STACK_WIND (frame, changelog_writev_cbk, FIRST_CHILD (this),                      FIRST_CHILD (this)->fops->writev, fd, vector,                      count, offset, flags, iobref, xdata); @@ -1089,9 +1123,16 @@ changelog_assign_encoding (changelog_priv_t *priv, char *enc)  static void  changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv)  { +        int ret = 0; +          if (priv->cr.rollover_th) {                  changelog_thread_cleanup (this, priv->cr.rollover_th);                  priv->cr.rollover_th = 0; +                ret = close (priv->cr_wfd); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, +                                "error closing write end of rollover pipe" +                                " (reason: %s)", strerror (errno));          }          if (priv->cf.fsync_th) { @@ -1105,6 +1146,41 @@ static int  changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv)  {          int ret = 0; +        int flags = 0; +        int pipe_fd[2] = {0, 0}; + +        /* Geo-Rep snapshot dependency: +         * +         * To implement explicit rollover of changlog journal on barrier +         * notification, a pipe is created to communicate between +         * 'changelog_rollover' thread and changelog main thread. The select +         * call used to wait till roll-over time in changelog_rollover thread +         * is modified to wait on read end of the pipe. When barrier +         * notification comes (i.e, in 'reconfigure'), select in +         * changelog_rollover thread is woken up explicitly by writing into +         * the write end of the pipe in 'reconfigure'. +         */ + +        ret = pipe (pipe_fd); +        if (ret == -1) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Cannot create pipe (reason: %s)", strerror (errno)); +                goto out; +        } + +        /* writer is non-blocking */ +        flags = fcntl (pipe_fd[1], F_GETFL); +        flags |= O_NONBLOCK; + +        ret = fcntl (pipe_fd[1], F_SETFL, flags); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "failed to set O_NONBLOCK flag"); +                goto out; +        } + +        priv->cr_wfd = pipe_fd[1]; +        priv->cr.rfd = pipe_fd[0];          priv->cr.this = this;          ret = gf_thread_create (&priv->cr.rollover_th, @@ -1186,6 +1262,155 @@ changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv)          return ret;  } +int +notify (xlator_t *this, int event, void *data, ...) +{ +        changelog_priv_t       *priv           = NULL; +        dict_t                 *dict           = NULL; +        char                    buf[1]         = {1}; +        int                     barrier        = DICT_DEFAULT; +        gf_boolean_t            bclean_req     = _gf_false; +        int                     ret            = 0; + +        priv = this->private; +        if (!priv) +                goto out; + +        if (event == GF_EVENT_TRANSLATOR_OP) { + +                dict = data; +                /*TODO: Also barrier option is persistent. Need to +                 *      decide on the brick crash scenarios. +                 */ +                barrier = dict_get_str_boolean (dict, "barrier", DICT_DEFAULT); + +                switch (barrier) { +                case DICT_ERROR: +                        gf_log (this->name, GF_LOG_ERROR, +                                "Barrier dict_get_str_boolean failed"); +                        ret = -1; +                        goto out; + +                case BARRIER_OFF: +                        gf_log (this->name, GF_LOG_INFO, +                                "Barrier off notification"); + +                        CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out); + +                        LOCK (&priv->bflags.lock); +                        { +                                if (priv->bflags.barrier_ext == _gf_false) +                                        ret = -1; +                        } +                        UNLOCK (&priv->bflags.lock); + +                        if (ret == -1 ) { +                                gf_log (this->name, GF_LOG_ERROR, "Received" +                                        " another barrier off notification" +                                        " while already off"); +                                goto out; +                        } + +                        /*TODO: STOP CHANGELOG BARRIER */ +                        LOCK (&priv->bflags.lock); +                        { +                                priv->bflags.barrier_ext = _gf_false; +                        } +                        UNLOCK (&priv->bflags.lock); + +                        ret = 0; +                        goto out; + +                case BARRIER_ON: +                        gf_log (this->name, GF_LOG_INFO, +                                "Barrier on notification"); + +                        CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out); + +                        LOCK (&priv->bflags.lock); +                        { +                                if (priv->bflags.barrier_ext == _gf_true) +                                        ret = -1; +                                else +                                        priv->bflags.barrier_ext = _gf_true; +                        } +                        UNLOCK (&priv->bflags.lock); + +                        if (ret == -1 ) { +                                gf_log (this->name, GF_LOG_ERROR, "Received" +                                        " another barrier on notification when" +                                        " last one is not served yet"); +                                goto out; +                        } + +                        ret = pthread_mutex_lock (&priv->bn.bnotify_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, +                                                                    bclean_req); +                        { +                                priv->bn.bnotify = _gf_true; +                        } +                        ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, +                                                                    bclean_req); + +                        /*TODO: START CHANGELOG BARRIER */ + +                        ret = changelog_barrier_notify(priv, buf); +                        if (ret) { +                                gf_log (this->name, GF_LOG_ERROR, +                                        "Explicit roll over: write failed"); +                                changelog_barrier_cleanup (this, priv); +                                ret = -1; +                                goto out; +                        } + +                        ret = pthread_mutex_lock (&priv->bn.bnotify_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, +                                                                    bclean_req); +                        { +                                /* The while condition check is required here to +                                 * handle spurious wakeup of cond wait that can +                                 * happen with pthreads. See man page */ +                                while (priv->bn.bnotify == _gf_true) { +                                        ret = pthread_cond_wait ( +                                                       &priv->bn.bnotify_cond, +                                                       &priv->bn.bnotify_mutex); +                                        CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, +                                                                          out, +                                                                    bclean_req); +                                } +                        } +                        ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, bclean_req); +                        gf_log (this->name, GF_LOG_INFO, +                                "Woke up: bnotify conditional wait"); + +                        ret = 0; +                        goto out; + +                case DICT_DEFAULT: +                        gf_log (this->name, GF_LOG_ERROR, +                                "barrier key not found"); +                        ret = -1; +                        goto out; + +                default: +                        gf_log (this->name, GF_LOG_ERROR, +                                "Something went bad in dict_get_str_boolean"); +                        ret = -1; +                        goto out; +                } +        } else { +                ret = default_notify (this, event, data); +        } + + out: +        if (bclean_req) +                changelog_barrier_cleanup (this, priv); + +        return ret; +} +  int32_t  mem_acct_init (xlator_t *this)  { @@ -1261,6 +1486,98 @@ changelog_init (xlator_t *this, changelog_priv_t *priv)          return ret;  } +/* Init all pthread condition variables and locks in changelog*/ +static int +changelog_pthread_init (xlator_t *this, changelog_priv_t *priv) +{ +        gf_boolean_t    bn_mutex_init         = _gf_false; +        gf_boolean_t    bn_cond_init          = _gf_false; +        gf_boolean_t    dm_mutex_black_init   = _gf_false; +        gf_boolean_t    dm_cond_black_init    = _gf_false; +        gf_boolean_t    dm_mutex_white_init   = _gf_false; +        gf_boolean_t    dm_cond_white_init    = _gf_false; +        int             ret                   = 0; + +        if ((ret = pthread_mutex_init(&priv->bn.bnotify_mutex, NULL)) != 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "bnotify pthread_mutex_init failed (%d)", ret); +                ret = -1; +                goto out; +        } +        bn_mutex_init = _gf_true; + +        if ((ret = pthread_cond_init(&priv->bn.bnotify_cond, NULL)) != 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "bnotify pthread_cond_init failed (%d)", ret); +                ret = -1; +                goto out; +        } +        bn_cond_init = _gf_true; + +        if ((ret = pthread_mutex_init(&priv->dm.drain_black_mutex, NULL)) != 0) +        { +                gf_log (this->name, GF_LOG_ERROR, +                        "drain_black pthread_mutex_init failed (%d)", ret); +                ret = -1; +                goto out; +        } +        dm_mutex_black_init = _gf_true; + +        if ((ret = pthread_cond_init(&priv->dm.drain_black_cond, NULL)) != 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "drain_black pthread_cond_init failed (%d)", ret); +                ret = -1; +                goto out; +        } +        dm_cond_black_init = _gf_true; + +        if ((ret = pthread_mutex_init(&priv->dm.drain_white_mutex, NULL)) != 0) +        { +                gf_log (this->name, GF_LOG_ERROR, +                        "drain_white pthread_mutex_init failed (%d)", ret); +                ret = -1; +                goto out; +        } +        dm_mutex_white_init = _gf_true; + +        if ((ret = pthread_cond_init(&priv->dm.drain_white_cond, NULL)) != 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "drain_white pthread_cond_init failed (%d)", ret); +                ret = -1; +                goto out; +        } +        dm_cond_white_init = _gf_true; + out: +        if (ret) { +                if (bn_mutex_init) +                        pthread_mutex_destroy(&priv->bn.bnotify_mutex); +                if (bn_cond_init) +                        pthread_cond_destroy (&priv->bn.bnotify_cond); +                if (dm_mutex_black_init) +                        pthread_mutex_destroy(&priv->dm.drain_black_mutex); +                if (dm_cond_black_init) +                        pthread_cond_destroy (&priv->dm.drain_black_cond); +                if (dm_mutex_white_init) +                        pthread_mutex_destroy(&priv->dm.drain_white_mutex); +                if (dm_cond_white_init) +                        pthread_cond_destroy (&priv->dm.drain_white_cond); +        } +        return ret; +} + +/* Destroy all pthread condition variables and locks in changelog */ +static inline void +changelog_pthread_destroy (changelog_priv_t *priv) +{ +        pthread_mutex_destroy (&priv->bn.bnotify_mutex); +        pthread_cond_destroy (&priv->bn.bnotify_cond); +        pthread_mutex_destroy (&priv->dm.drain_black_mutex); +        pthread_cond_destroy (&priv->dm.drain_black_cond); +        pthread_mutex_destroy (&priv->dm.drain_white_mutex); +        pthread_cond_destroy (&priv->dm.drain_white_cond); +        LOCK_DESTROY (&priv->bflags.lock); +} +  int  reconfigure (xlator_t *this, dict_t *options)  { @@ -1363,9 +1680,10 @@ reconfigure (xlator_t *this, dict_t *options)  int32_t  init (xlator_t *this)  { -        int               ret  = -1; -        char             *tmp  = NULL; -        changelog_priv_t *priv = NULL; +        int                     ret             = -1; +        char                    *tmp            = NULL; +        changelog_priv_t        *priv           = NULL; +        gf_boolean_t            cond_lock_init  = _gf_false;          GF_VALIDATE_OR_GOTO ("changelog", this, out); @@ -1451,6 +1769,24 @@ init (xlator_t *this)                  goto out;          priv->changelog_fd = -1; + +        /* snap dependency changes */ +        priv->dm.black_fop_cnt = 0; +        priv->dm.white_fop_cnt = 0; +        priv->dm.drain_wait_black = _gf_false; +        priv->dm.drain_wait_white = _gf_false; +        priv->current_color = FOP_COLOR_BLACK; +        priv->explicit_rollover = _gf_false; +        /* Mutex is not needed as threads are not spawned yet */ +        priv->bn.bnotify = _gf_false; +        ret = changelog_pthread_init (this, priv); +        if (ret) +                goto out; + +        LOCK_INIT (&priv->bflags.lock); +        cond_lock_init = _gf_true; +        priv->bflags.barrier_ext = _gf_false; +          ret = changelog_init (this, priv);          if (ret)                  goto out; @@ -1469,6 +1805,9 @@ init (xlator_t *this)                  }                  GF_FREE (priv->changelog_brick);                  GF_FREE (priv->changelog_dir); +                if (cond_lock_init) +                        changelog_pthread_destroy (priv); +                  GF_FREE (priv);                  this->private = NULL;          } else @@ -1493,6 +1832,7 @@ fini (xlator_t *this)                  mem_pool_destroy (this->local_pool);                  GF_FREE (priv->changelog_brick);                  GF_FREE (priv->changelog_dir); +                changelog_pthread_destroy (priv);                  GF_FREE (priv);          }  | 
