diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog-helpers.c')
| -rw-r--r-- | xlators/features/changelog/src/changelog-helpers.c | 210 | 
1 files changed, 171 insertions, 39 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c index 3af2938190d..5c755d76d69 100644 --- a/xlators/features/changelog/src/changelog-helpers.c +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -24,6 +24,7 @@  #include "changelog-mem-types.h"  #include "changelog-encoders.h" +#include "changelog-rpc-common.h"  #include <pthread.h>  static inline void @@ -57,7 +58,7 @@ changelog_cleanup_free_mutex (void *arg_mutex)              pthread_mutex_unlock(p_mutex);  } -void +int  changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)  {          int   ret    = 0; @@ -65,7 +66,7 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)          /* send a cancel request to the thread */          ret = pthread_cancel (thr_id); -        if (ret) { +        if (ret != 0) {                  gf_log (this->name, GF_LOG_ERROR,                          "could not cancel thread (reason: %s)",                          strerror (errno)); @@ -73,14 +74,14 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)          }          ret = pthread_join (thr_id, &retval); -        if (ret || (retval != PTHREAD_CANCELED)) { +        if ((ret != 0) || (retval != PTHREAD_CANCELED)) {                  gf_log (this->name, GF_LOG_ERROR,                          "cancel request not adhered as expected"                          " (reason: %s)", strerror (errno));          }   out: -        return; +        return ret;  }  inline void * @@ -98,6 +99,145 @@ changelog_get_usable_buffer (changelog_local_t *local)          return cld->cld_iobuf->ptr;  } +static inline int +changelog_selector_index (unsigned int selector) +{ +        return (ffs (selector) - 1); +} + +inline int +changelog_ev_selected (xlator_t *this, +                       changelog_ev_selector_t *selection, +                       unsigned int selector) +{ +        int idx = 0; + +        idx = changelog_selector_index (selector); +        gf_log (this->name, GF_LOG_DEBUG, +                "selector ref count for %d (idx: %d): %d", +                selector, idx, selection->ref[idx]); +        /* this can be lockless */ +        return (idx < CHANGELOG_EV_SELECTION_RANGE +                 && (selection->ref[idx] > 0)); +} + +inline void +changelog_select_event (xlator_t *this, +                        changelog_ev_selector_t *selection, +                        unsigned int selector) +{ +        int idx = 0; + +        LOCK (&selection->reflock); +        { +                while (selector) { +                        idx = changelog_selector_index (selector); +                        if (idx < CHANGELOG_EV_SELECTION_RANGE) { +                                selection->ref[idx]++; +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "selecting event %d", idx); +                        } +                        selector &= ~(1 << idx); +                } +        } +        UNLOCK (&selection->reflock); +} + +inline void +changelog_deselect_event (xlator_t *this, +                          changelog_ev_selector_t *selection, +                          unsigned int selector) +{ +        int idx = 0; + +        LOCK (&selection->reflock); +        { +                while (selector) { +                        idx = changelog_selector_index (selector); +                        if (idx < CHANGELOG_EV_SELECTION_RANGE) { +                                selection->ref[idx]--; +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "de-selecting event %d", idx); +                        } +                        selector &= ~(1 << idx); +                } +        } +        UNLOCK (&selection->reflock); +} + +inline int +changelog_init_event_selection (xlator_t *this, +                                changelog_ev_selector_t *selection) +{ +        int ret = 0; +        int j = CHANGELOG_EV_SELECTION_RANGE; + +        ret = LOCK_INIT (&selection->reflock); +        if (ret != 0) +                return -1; + +        LOCK (&selection->reflock); +        { +                while (j--) { +                        selection->ref[j] = 0; +                } +        } +        UNLOCK (&selection->reflock); + +        return 0; +} + +inline int +changelog_cleanup_event_selection (xlator_t *this, +                                   changelog_ev_selector_t *selection) +{ +        int ret = 0; +        int j = CHANGELOG_EV_SELECTION_RANGE; + +        LOCK (&selection->reflock); +        { +                while (j--) { +                        if (selection->ref[j] > 0) +                                gf_log (this->name, GF_LOG_WARNING, +                                        "changelog event selection cleaning up " +                                        " on active references"); +                } +        } +        UNLOCK (&selection->reflock); + +        return LOCK_DESTROY (&selection->reflock); +} + +static inline void +changelog_perform_dispatch (xlator_t *this, +                            changelog_priv_t *priv, void *mem, size_t size) +{ +        char *buf    = NULL; +        void *opaque = NULL; + +        buf = rbuf_reserve_write_area (priv->rbuf, size, &opaque); +        if (!buf) { +                gf_log_callingfn (this->name, +                                  GF_LOG_WARNING, "failed to dispatch event"); +                return; +        } + +        memcpy (buf, mem, size); +        rbuf_write_complete (opaque); +} + +inline void +changelog_dispatch_event (xlator_t *this, +                          changelog_priv_t *priv, changelog_event_t *ev) +{ +        changelog_ev_selector_t *selection = NULL; + +        selection = &priv->ev_selection; +        if (changelog_ev_selected (this, selection, ev->ev_type)) { +                changelog_perform_dispatch (this, priv, ev, CHANGELOG_EV_SIZE); +        } +} +  inline void  changelog_set_usable_record_and_length (changelog_local_t *local,                                          size_t len, int xr) @@ -206,9 +346,9 @@ changelog_rollover_changelog (xlator_t *this,  {          int   ret            = -1;          int   notify         = 0; -        char *bname          = NULL;          char ofile[PATH_MAX] = {0,};          char nfile[PATH_MAX] = {0,}; +        changelog_event_t ev = {0,};          if (priv->changelog_fd != -1) {                  ret = fsync (priv->changelog_fd); @@ -252,40 +392,32 @@ changelog_rollover_changelog (xlator_t *this,          }          if (notify) { -                bname = basename (nfile); -                gf_log (this->name, GF_LOG_DEBUG, "notifying: %s", bname); -                ret = changelog_write (priv->wfd, bname, strlen (bname) + 1); -                if (ret) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "Failed to send file name to notify thread" -                                " (reason: %s)", strerror (errno)); -                } else { -                        /* If this is explicit rollover initiated by snapshot, -                         * wakeup reconfigure thread waiting for changelog to -                         * rollover -                         */ -                        if (priv->explicit_rollover) { -                                priv->explicit_rollover = _gf_false; -                                ret = pthread_mutex_lock ( -                                                   &priv->bn.bnotify_mutex); -                                CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); -                                { -                                         priv->bn.bnotify = _gf_false; -                                         ret = pthread_cond_signal ( -                                                        &priv->bn.bnotify_cond); -                                         CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, -                                                                           out); -                                         gf_log (this->name, GF_LOG_INFO, -                                                 "Changelog published: %s and" -                                                 " signalled bnotify", bname); -                                } -                                ret = pthread_mutex_unlock ( -                                                       &priv->bn.bnotify_mutex); +                ev.ev_type = CHANGELOG_OP_TYPE_JOURNAL; +                memcpy (ev.u.journal.path, nfile, strlen (nfile) + 1); +                changelog_dispatch_event (this, priv, &ev); + +                /* If this is explicit rollover initiated by snapshot, +                 * wakeup reconfigure thread waiting for changelog to +                 * rollover +                 */ +                if (priv->explicit_rollover) { +                        priv->explicit_rollover = _gf_false; + +                        ret = pthread_mutex_lock (&priv->bn.bnotify_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +                        { +                                priv->bn.bnotify = _gf_false; +                                ret = pthread_cond_signal +                                        (&priv->bn.bnotify_cond);                                  CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +                                gf_log (this->name, GF_LOG_INFO, +                                        "Changelog published: %s signalled" +                                        " bnotify", nfile);                          } +                        ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);                  }          } -   out:          return ret;  } @@ -434,8 +566,8 @@ changelog_snap_logging_stop (xlator_t *this,  }  int -changelog_open (xlator_t *this, -                changelog_priv_t *priv) +changelog_open_journal (xlator_t *this, +                        changelog_priv_t *priv)  {          int fd                        = 0;          int ret                       = -1; @@ -490,7 +622,7 @@ changelog_start_next_change (xlator_t *this,          ret = changelog_rollover_changelog (this, priv, ts);          if (!ret && !finale) -                ret = changelog_open (this, priv); +                ret = changelog_open_journal (this, priv);          return ret;  } @@ -975,7 +1107,7 @@ __changelog_inode_ctx_set (xlator_t *this,   * one shot routine to get the address and the value of a inode version   * for a particular type.   */ -static changelog_inode_ctx_t * +changelog_inode_ctx_t *  __changelog_inode_ctx_get (xlator_t *this,                             inode_t *inode, unsigned long **iver,                             unsigned long *version, changelog_log_type type)  | 
