diff options
Diffstat (limited to 'xlators/features/changelog/src')
| -rw-r--r-- | xlators/features/changelog/src/Makefile.am | 19 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-encoders.c | 156 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-encoders.h | 44 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-helpers.c | 691 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-helpers.h | 386 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-mem-types.h | 29 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-misc.h | 101 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-notifier.c | 314 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-notifier.h | 19 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-rt.c | 72 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-rt.h | 33 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog.c | 1487 | 
12 files changed, 3351 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/Makefile.am b/xlators/features/changelog/src/Makefile.am new file mode 100644 index 00000000000..e85031ad496 --- /dev/null +++ b/xlators/features/changelog/src/Makefile.am @@ -0,0 +1,19 @@ +xlator_LTLIBRARIES = changelog.la + +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features + +noinst_HEADERS = changelog-helpers.h changelog-mem-types.h changelog-rt.h \ +	changelog-misc.h changelog-encoders.h changelog-notifier.h + +changelog_la_LDFLAGS = -module -avoidversion + +changelog_la_SOURCES = changelog.c changelog-rt.c changelog-helpers.c \ +	changelog-encoders.c changelog-notifier.c +changelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src -fPIC -D_FILE_OFFSET_BITS=64 \ +	-D_GNU_SOURCE -D$(GF_HOST_OS) -shared -nostartfiles -DDATADIR=\"$(localstatedir)\" + +AM_CFLAGS = -Wall $(GF_CFLAGS) + +CLEANFILES = diff --git a/xlators/features/changelog/src/changelog-encoders.c b/xlators/features/changelog/src/changelog-encoders.c new file mode 100644 index 00000000000..c71ea9270b0 --- /dev/null +++ b/xlators/features/changelog/src/changelog-encoders.c @@ -0,0 +1,156 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "changelog-encoders.h" + +size_t +entry_fn (void *data, char *buffer, gf_boolean_t encode) +{ +        char    *tmpbuf = NULL; +        size_t  bufsz  = 0; +        struct changelog_entry_fields *ce = NULL; + +        ce = (struct changelog_entry_fields *) data; + +        if (encode) { +                tmpbuf = uuid_utoa (ce->cef_uuid); +                CHANGELOG_FILL_BUFFER (buffer, bufsz, tmpbuf, strlen (tmpbuf)); +        } else { +                CHANGELOG_FILL_BUFFER (buffer, bufsz, +                                       ce->cef_uuid, sizeof (uuid_t)); +        } + +        CHANGELOG_FILL_BUFFER (buffer, bufsz, "/", 1); +        CHANGELOG_FILL_BUFFER (buffer, bufsz, +                               ce->cef_bname, strlen (ce->cef_bname)); +        return bufsz; +} + +size_t +fop_fn (void *data, char *buffer, gf_boolean_t encode) +{ +        char buf[10]          = {0,}; +        size_t         bufsz = 0; +        glusterfs_fop_t fop   = 0; + +        fop = *(glusterfs_fop_t *) data; + +        if (encode) { +                (void) snprintf (buf, sizeof (buf), "%d", fop); +                CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); +        } else +                CHANGELOG_FILL_BUFFER (buffer, bufsz, &fop, sizeof (fop)); + +        return bufsz; +} + +void +entry_free_fn (void *data) +{ +        changelog_opt_t *co = data; + +        if (!co) +                return; + +        GF_FREE (co->co_entry.cef_bname); +} + +/** + * try to write all data in one shot + */ + +static inline void +changelog_encode_write_xtra (changelog_log_data_t *cld, +                             char *buffer, size_t *off, gf_boolean_t encode) +{ +        int              i      = 0; +        size_t           offset = 0; +        void            *data   = NULL; +        changelog_opt_t *co     = NULL; + +        offset = *off; + +        co = (changelog_opt_t *) cld->cld_ptr; + +        for (; i < cld->cld_xtra_records; i++, co++) { +                CHANGELOG_FILL_BUFFER (buffer, offset, "\0", 1); + +                switch (co->co_type) { +                case CHANGELOG_OPT_REC_FOP: +                        data = &co->co_fop; +                        break; +                case CHANGELOG_OPT_REC_ENTRY: +                        data = &co->co_entry; +                        break; +                } + +                if (co->co_convert) +                        offset += co->co_convert (data, +                                                  buffer + offset, encode); +                else /* no coversion: write it out as it is */ +                        CHANGELOG_FILL_BUFFER (buffer, offset, +                                               data, co->co_len); +        } + +        *off = offset; +} + +int +changelog_encode_ascii (xlator_t *this, changelog_log_data_t *cld) +{ +        size_t            off      = 0; +        size_t            gfid_len = 0; +        char             *gfid_str = NULL; +        char             *buffer   = NULL; +        changelog_priv_t *priv     = NULL; + +        priv = this->private; + +        gfid_str = uuid_utoa (cld->cld_gfid); +        gfid_len = strlen (gfid_str); + +        /* extra bytes for decorations */ +        buffer = alloca (gfid_len + cld->cld_ptr_len + 10); +        CHANGELOG_STORE_ASCII (priv, buffer, +                               off, gfid_str, gfid_len, cld); + +        if (cld->cld_xtra_records) +                changelog_encode_write_xtra (cld, buffer, &off, _gf_true); + +        CHANGELOG_FILL_BUFFER (buffer, off, "\0", 1); + +        return changelog_write_change (priv, buffer, off); +} + +int +changelog_encode_binary (xlator_t *this, changelog_log_data_t *cld) +{ +        size_t            off    = 0; +        char             *buffer = NULL; +        changelog_priv_t *priv   = NULL; + +        priv = this->private; + +        /* extra bytes for decorations */ +        buffer = alloca (sizeof (uuid_t) + cld->cld_ptr_len + 10); +        CHANGELOG_STORE_BINARY (priv, buffer, off, cld->cld_gfid, cld); + +        if (cld->cld_xtra_records) +                changelog_encode_write_xtra (cld, buffer, &off, _gf_false); + +        CHANGELOG_FILL_BUFFER (buffer, off, "\0", 1); + +        return changelog_write_change (priv, buffer, off); +} diff --git a/xlators/features/changelog/src/changelog-encoders.h b/xlators/features/changelog/src/changelog-encoders.h new file mode 100644 index 00000000000..43deb1307d7 --- /dev/null +++ b/xlators/features/changelog/src/changelog-encoders.h @@ -0,0 +1,44 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CHANGELOG_ENCODERS_H +#define _CHANGELOG_ENCODERS_H + +#include "xlator.h" +#include "defaults.h" + +#include "changelog-helpers.h" + +#define CHANGELOG_STORE_ASCII(priv, buf, off, gfid, gfid_len, cld) do { \ +                CHANGELOG_FILL_BUFFER (buffer, off,                     \ +                                       priv->maps[cld->cld_type], 1);   \ +                CHANGELOG_FILL_BUFFER (buffer,                          \ +                                       off, gfid, gfid_len);            \ +        } while (0) + +#define CHANGELOG_STORE_BINARY(priv, buf, off, gfid, cld) do {          \ +                CHANGELOG_FILL_BUFFER (buffer, off,                     \ +                                       priv->maps[cld->cld_type], 1);   \ +                CHANGELOG_FILL_BUFFER (buffer,                          \ +                                       off, gfid, sizeof (uuid_t));     \ +        } while (0) + +size_t +entry_fn (void *data, char *buffer, gf_boolean_t encode); +size_t +fop_fn (void *data, char *buffer, gf_boolean_t encode); +void +entry_free_fn (void *data); +int +changelog_encode_binary (xlator_t *, changelog_log_data_t *); +int +changelog_encode_ascii (xlator_t *, changelog_log_data_t *); + +#endif /* _CHANGELOG_ENCODERS_H */ diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c new file mode 100644 index 00000000000..c1bb6e5fef9 --- /dev/null +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -0,0 +1,691 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "xlator.h" +#include "defaults.h" +#include "logging.h" +#include "iobuf.h" + +#include "changelog-helpers.h" +#include "changelog-mem-types.h" + +#include <pthread.h> + +void +changelog_thread_cleanup (xlator_t *this, pthread_t thr_id) +{ +        int   ret    = 0; +        void *retval = NULL; + +        /* send a cancel request to the thread */ +        ret = pthread_cancel (thr_id); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "could not cancel thread (reason: %s)", +                        strerror (errno)); +                goto out; +        } + +        ret = pthread_join (thr_id, &retval); +        if (ret || (retval != PTHREAD_CANCELED)) { +                gf_log (this->name, GF_LOG_ERROR, +                        "cancel request not adhered as expected" +                        " (reason: %s)", strerror (errno)); +        } + + out: +        return; +} + +inline void * +changelog_get_usable_buffer (changelog_local_t *local) +{ +        changelog_log_data_t *cld = NULL; + +        cld = &local->cld; +        if (!cld->cld_iobuf) +                return NULL; + +        return cld->cld_iobuf->ptr; +} + +inline void +changelog_set_usable_record_and_length (changelog_local_t *local, +                                        size_t len, int xr) +{ +        changelog_log_data_t *cld = NULL; + +        cld = &local->cld; + +        cld->cld_ptr_len = len; +        cld->cld_xtra_records = xr; +} + +void +changelog_local_cleanup (xlator_t *xl, changelog_local_t *local) +{ +        int                   i   = 0; +        changelog_opt_t      *co  = NULL; +        changelog_log_data_t *cld = NULL; + +        if (!local) +                return; + +        cld = &local->cld; + +        /* cleanup dynamic allocation for extra records */ +        if (cld->cld_xtra_records) { +                co = (changelog_opt_t *) cld->cld_ptr; +                for (; i < cld->cld_xtra_records; i++, co++) +                        if (co->co_free) +                                co->co_free (co); +        } + +        CHANGELOG_IOBUF_UNREF (cld->cld_iobuf); + +        if (local->inode) +                inode_unref (local->inode); + +        mem_put (local); +} + +inline int +changelog_write (int fd, char *buffer, size_t len) +{ +        ssize_t size = 0; +        size_t writen = 0; + +        while (writen < len) { +                size = write (fd, +                              buffer + writen, len - writen); +                if (size <= 0) +                        break; + +                writen += size; +        } + +        return (writen != len); +} + +static int +changelog_rollover_changelog (xlator_t *this, +                              changelog_priv_t *priv, unsigned long ts) +{ +        int   ret            = -1; +        int   notify         = 0; +        char *bname          = NULL; +        char ofile[PATH_MAX] = {0,}; +        char nfile[PATH_MAX] = {0,}; + +        if (priv->changelog_fd != -1) { +                close (priv->changelog_fd); +                priv->changelog_fd = -1; +        } + +        (void) snprintf (ofile, PATH_MAX, +                         "%s/"CHANGELOG_FILE_NAME, priv->changelog_dir); +        (void) snprintf (nfile, PATH_MAX, +                         "%s/"CHANGELOG_FILE_NAME".%lu", +                         priv->changelog_dir, ts); + +        ret = rename (ofile, nfile); +        if (!ret) +                notify = 1; + +        if (ret && (errno == ENOENT)) { +                ret = 0; +        } + +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "error renaming %s -> %s (reason %s)", +                        ofile, nfile, strerror (errno)); +        } + +        if (notify) { +                bname = basename (nfile); +                gf_log (this->name, GF_LOG_DEBUG, "notifying: %s", bname); +                ret = changelog_write (priv->wfd, bname, strlen (bname) + 1); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "Failed to send file name to notify thread" +                                " (reason: %s)", strerror (errno)); +                } +        } + +        return ret; +} + +int +changelog_open (xlator_t *this, +                changelog_priv_t *priv) +{ +        int fd                        = 0; +        int ret                       = -1; +        int flags                     = 0; +        char buffer[1024]             = {0,}; +        char changelog_path[PATH_MAX] = {0,}; + +        (void) snprintf (changelog_path, PATH_MAX, +                         "%s/"CHANGELOG_FILE_NAME, +                         priv->changelog_dir); + +        flags |= (O_CREAT | O_RDWR); +        if (priv->fsync_interval == 0) +                flags |= O_SYNC; + +        fd = open (changelog_path, flags, +                   S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); +        if (fd < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "unable to open/create changelog file %s" +                        " (reason: %s). change-logging will be" +                        " inactive", changelog_path, strerror (errno)); +                goto out; +        } + +        priv->changelog_fd = fd; + +        (void) snprintf (buffer, 1024, CHANGELOG_HEADER, +                         CHANGELOG_VERSION_MAJOR, +                         CHANGELOG_VERSION_MINOR, +                         priv->encode_mode); +        ret = changelog_write_change (priv, buffer, strlen (buffer)); +        if (ret) { +                close (priv->changelog_fd); +                priv->changelog_fd = -1; +                goto out; +        } + +        ret = 0; + + out: +        return ret; +} + +int +changelog_start_next_change (xlator_t *this, +                             changelog_priv_t *priv, +                             unsigned long ts, gf_boolean_t finale) +{ +        int ret = -1; + +        ret = changelog_rollover_changelog (this, priv, ts); + +        if (!ret && !finale) +                ret = changelog_open (this, priv); + +        return ret; +} + +/** + * return the length of entry + */ +inline size_t +changelog_entry_length () +{ +        return sizeof (changelog_log_data_t); +} + +int +changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last) +{ +        struct timeval tv = {0,}; + +        cld->cld_type = CHANGELOG_TYPE_ROLLOVER; + +        if (gettimeofday (&tv, NULL)) +                return -1; + +        cld->cld_roll_time = (unsigned long) tv.tv_sec; +        cld->cld_finale = is_last; +        return 0; +} + +int +changelog_write_change (changelog_priv_t *priv, char *buffer, size_t len) +{ +        return changelog_write (priv->changelog_fd, buffer, len); +} + +inline int +changelog_handle_change (xlator_t *this, +                         changelog_priv_t *priv, changelog_log_data_t *cld) +{ +        int ret = 0; + +        if (CHANGELOG_TYPE_IS_ROLLOVER (cld->cld_type)) { +                ret = changelog_start_next_change (this, priv, +                                                   cld->cld_roll_time, +                                                   cld->cld_finale); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, +                                "Problem rolling over changelog(s)"); +                goto out; +        } + +        /** +         * case when there is reconfigure done (disabling changelog) and there +         * are still fops that have updates in prgress. +         */ +        if (priv->changelog_fd == -1) +                return 0; + +        if (CHANGELOG_TYPE_IS_FSYNC (cld->cld_type)) { +                ret = fsync (priv->changelog_fd); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "fsync failed (reason: %s)", +                                strerror (errno)); +                } +                goto out; +        } + +        ret = priv->ce->encode (this, cld); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "error writing changelog to disk"); +        } + + out: +        return ret; +} + +changelog_local_t * +changelog_local_init (xlator_t *this, inode_t *inode, +                      uuid_t gfid, int xtra_records, +                      gf_boolean_t update_flag) +{ +        changelog_local_t *local = NULL; +        struct iobuf      *iobuf = NULL; + +        /** +         * We relax the presence of inode if @update_flag is true. +         * The caller (implmentation of the fop) needs to be careful to +         * not blindly use local->inode. +         */ +        if (!update_flag && !inode) { +                gf_log_callingfn (this->name, GF_LOG_WARNING, +                                  "inode needed for version checking !!!"); +                goto out; +        } + +        if (xtra_records) { +                iobuf = iobuf_get2 (this->ctx->iobuf_pool, +                                    xtra_records * CHANGELOG_OPT_RECORD_LEN); +                if (!iobuf) +                        goto out; +        } + +        local = mem_get0 (this->local_pool); +        if (!local) { +                CHANGELOG_IOBUF_UNREF (iobuf); +                goto out; +        } + +        local->update_no_check = update_flag; + +        uuid_copy (local->cld.cld_gfid, gfid); + +        local->cld.cld_iobuf = iobuf; +        local->cld.cld_xtra_records = 0; /* set by the caller */ + +        if (inode) +                local->inode = inode_ref (inode); + + out: +        return local; +} + +int +changelog_forget (xlator_t *this, inode_t *inode) +{ +        uint64_t ctx_addr = 0; +        changelog_inode_ctx_t *ctx = NULL; + +        inode_ctx_del (inode, this, &ctx_addr); +        if (!ctx_addr) +                return 0; + +        ctx = (changelog_inode_ctx_t *) (long) ctx_addr; +        GF_FREE (ctx); + +        return 0; +} + +int +changelog_inject_single_event (xlator_t *this, +                               changelog_priv_t *priv, +                               changelog_log_data_t *cld) +{ +        return priv->cd.dispatchfn (this, priv, priv->cd.cd_data, cld, NULL); +} + +/** + * TODO: these threads have many thing in common (wake up after + * a certain time etc..). move them into separate routine. + */ +void * +changelog_rollover (void *data) +{ +        int                     ret   = 0; +        xlator_t               *this  = NULL; +        struct timeval          tv    = {0,}; +        changelog_log_data_t    cld   = {0,}; +        changelog_time_slice_t *slice = NULL; +        changelog_priv_t       *priv  = data; + +        this = priv->cr.this; +        slice = &priv->slice; + +        while (1) { +                tv.tv_sec  = priv->rollover_time; +                tv.tv_usec = 0; + +                ret = select (0, NULL, NULL, NULL, &tv); +                if (ret) +                        continue; + +                ret = changelog_fill_rollover_data (&cld, _gf_false); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "failed to fill rollover data"); +                        continue; +                } + +                LOCK (&priv->lock); +                { +                        ret = changelog_inject_single_event (this, priv, &cld); +                        if (!ret) +                                SLICE_VERSION_UPDATE (slice); +                } +                UNLOCK (&priv->lock); +        } + +        return NULL; +} + +void * +changelog_fsync_thread (void *data) +{ +        int                   ret  = 0; +        xlator_t             *this = NULL; +        struct timeval        tv   = {0,}; +        changelog_log_data_t  cld  = {0,}; +        changelog_priv_t     *priv = data; + +        this = priv->cf.this; +        cld.cld_type = CHANGELOG_TYPE_FSYNC; + +        while (1) { +                tv.tv_sec  = priv->fsync_interval; +                tv.tv_usec = 0; + +                ret = select (0, NULL, NULL, NULL, &tv); +                if (ret) +                        continue; + +                ret = changelog_inject_single_event (this, priv, &cld); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, +                                "failed to inject fsync event"); +        } + +        return NULL; +} + +/* macros for inode/changelog version checks */ + +#define INODE_VERSION_UPDATE(priv, inode, iver, slice, type) do {       \ +                LOCK (&inode->lock);                                    \ +                {                                                       \ +                        LOCK (&priv->lock);                             \ +                        {                                               \ +                                *iver = slice->changelog_version[type]; \ +                        }                                               \ +                        UNLOCK (&priv->lock);                           \ +                }                                                       \ +                UNLOCK (&inode->lock);                                  \ +        } while (0) + +#define INODE_VERSION_EQUALS_SLICE(priv, ver, slice, type, upd) do {    \ +                LOCK (&priv->lock);                                     \ +                {                                                       \ +                        upd = (ver == slice->changelog_version[type])   \ +                                ? _gf_false : _gf_true;                 \ +                }                                                       \ +                UNLOCK (&priv->lock);                                   \ +        } while (0) + +static int +__changelog_inode_ctx_set (xlator_t *this, +                           inode_t *inode, changelog_inode_ctx_t *ctx) +{ +        uint64_t ctx_addr = (uint64_t) ctx; +        return __inode_ctx_set (inode, this, &ctx_addr); +} + +/** + * one shot routine to get the address and the value of a inode version + * for a particular type. + */ +static changelog_inode_ctx_t * +__changelog_inode_ctx_get (xlator_t *this, +                           inode_t *inode, unsigned long **iver, +                           unsigned long *version, changelog_log_type type) +{ +        int                    ret      = 0; +        uint64_t               ctx_addr = 0; +        changelog_inode_ctx_t *ctx      = NULL; + +        ret = __inode_ctx_get (inode, this, &ctx_addr); +        if (ret < 0) +                ctx_addr = 0; +        if (ctx_addr != 0) { +                ctx = (changelog_inode_ctx_t *) (long)ctx_addr; +                goto out; +        } + +        ctx = GF_CALLOC (1, sizeof (*ctx), gf_changelog_mt_inode_ctx_t); +        if (!ctx) +                goto out; + +        ret = __changelog_inode_ctx_set (this, inode, ctx); +        if (ret) { +                GF_FREE (ctx); +                ctx = NULL; +        } + + out: +        if (ctx && iver && version) { +                *iver = CHANGELOG_INODE_VERSION_TYPE (ctx, type); +                *version = **iver; +        } + +        return ctx; +} + +static changelog_inode_ctx_t * +changelog_inode_ctx_get (xlator_t *this, +                         inode_t *inode, unsigned long **iver, +                         unsigned long *version, changelog_log_type type) +{ +        changelog_inode_ctx_t *ctx = NULL; + +        LOCK (&inode->lock); +        { +                ctx = __changelog_inode_ctx_get (this, +                                                 inode, iver, version, type); +        } +        UNLOCK (&inode->lock); + +        return ctx; +} + +/** + * This is the main update routine. Locking has been made granular so as to + * maximize parallelism of fops - I'll try to explain it below using execution + * timelines. + * + * Basically, the contention is between multiple execution threads of this + * routine and the roll-over thread. So, instead of having a big lock, we hold + * granular locks: inode->lock and priv->lock. Now I'll explain what happens + * when there is an update and a roll-over at just about the same time. + * NOTE: + *  - the dispatcher itself synchronizes updates via it's own lock + *  - the slice version in incremented by the roll-over thread + * + * Case 1: When the rollover thread wins before the inode version can be + * compared with the slice version. + * + *          [updater]                 |             [rollover] + *                                    | + *                                    |           <SLICE: 1, 1, 1> + * <changelog_update>                 | + *   <changelog_inode_ctx_get>        | + *      <CTX: 1, 1, 1>                | + *                                    |         <dispatch-rollover-event> + *                                    |         LOCK (&priv->lock) + *                                    |            <SLICE_VERSION_UPDATE> + *                                    |              <SLICE: 2, 2, 2> + *                                    |         UNLOCK (&priv->lock) + *                                    | + * LOCK (&priv->lock)                 | + *   <INODE_VERSION_EQUALS_SLICE>     | + *    I: 1 <-> S: 2                   | + *    update: true                    | + * UNLOCK (&priv->lock)               | + *                                    | + * <if update == true>                | + *  <dispath-update-event>            | + *  <INODE_VERSION_UPDATE>            | + *   LOCK (&inode->lock)              | + *    LOCK (&priv->lock)              | + *     <CTX: 2, 1, 1>                 | + *    UNLOCK (&priv->lock)            | + *   UNLOCK (&inode->lock)            | + * + * Therefore, the change gets recorded in the next change (no lost change). If + * the slice version was ahead of the inode version (say I:1, S: 2), then + * anyway the comparison would result in a update (I: 1, S: 3). + * + * If the rollover time is too less, then there is another contention when the + * updater tries to bring up inode version to the slice version (this is also + * the case when the roll-over thread wakes up during INODE_VERSION_UPDATE. + * + *   <CTX: 1, 1, 1>                   |       <SLICE: 2, 2, 2> + *                                    | + *                                    | + * <dispath-update-event>             | + * <INODE_VERSION_UPDATE>             | + *  LOCK (&inode->lock)               | + *   LOCK (&priv->lock)               | + *    <CTX: 2, 1, 1>                  | + *   UNLOCK (&priv->lock)             | + *  UNLOCK (&inode->lock)             | + *                                    |         <dispatch-rollover-event> + *                                    |         LOCK (&priv->lock) + *                                    |            <SLICE_VERSION_UPDATE> + *                                    |              <SLICE: 3, 3, 3> + *                                    |         UNLOCK (&priv->lock) + * + * + * Case 2: When the fop thread wins + * + *          [updater]                 |             [rollover] + *                                    | + *                                    |           <SLICE: 1, 1, 1> + * <changelog_update>                 | + *   <changelog_inode_ctx_get>        | + *      <CTX: 0, 0, 0>                | + *                                    | + * LOCK (&priv->lock)                 | + *   <INODE_VERSION_EQUALS_SLICE>     | + *    I: 0 <-> S: 1                   | + *    update: true                    | + * UNLOCK (&priv->lock)               | + *                                    |         <dispatch-rollover-event> + *                                    |         LOCK (&priv->lock) + *                                    |            <SLICE_VERSION_UPDATE> + *                                    |              <SLICE: 2, 2, 2> + *                                    |         UNLOCK (&priv->lock) + * <if update == true>                | + *  <dispath-update-event>            | + *  <INODE_VERSION_UPDATE>            | + *   LOCK (&inode->lock)              | + *    LOCK (&priv->lock)              | + *     <CTX: 2, 0, 0>                 | + *    UNLOCK (&priv->lock)            | + *   UNLOCK (&inode->lock)            | + * + * Here again, if the inode version was equal to the slice version (I: 1, S: 1) + * then there is no need to record an update (as the equality of the two version + * signifies an update was recorded in the current time slice). + */ +inline void +changelog_update (xlator_t *this, changelog_priv_t *priv, +                  changelog_local_t *local, changelog_log_type type) +{ +        int                     ret        = 0; +        unsigned long          *iver       = NULL; +        unsigned long           version    = 0; +        inode_t                *inode      = NULL; +        changelog_time_slice_t *slice      = NULL; +        changelog_inode_ctx_t  *ctx        = NULL; +        changelog_log_data_t   *cld_0      = NULL; +        changelog_log_data_t   *cld_1      = NULL; +        changelog_local_t      *next_local = NULL; +        gf_boolean_t            need_upd   = _gf_true; + +        slice = &priv->slice; + +        /** +         * for fops that do not require inode version checking +         */ +        if (local->update_no_check) +                goto update; + +        inode = local->inode; + +        ctx = changelog_inode_ctx_get (this, +                                       inode, &iver, &version, type); +        if (!ctx) +                goto update; + +        INODE_VERSION_EQUALS_SLICE (priv, version, slice, type, need_upd); + + update: +        if (need_upd) { +                cld_0 = &local->cld; +                cld_0->cld_type = type; + +                if ( (next_local = local->prev_entry) != NULL ) { +                        cld_1 = &next_local->cld; +                        cld_1->cld_type = type; +                } + +                ret = priv->cd.dispatchfn (this, priv, +                                           priv->cd.cd_data, cld_0, cld_1); + +                /** +                 * update after the dispatcher has successfully done +                 * it's job. +                 */ +                if (!local->update_no_check && iver && !ret) +                        INODE_VERSION_UPDATE (priv, inode, iver, slice, type); +        } + +        return; +} diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h new file mode 100644 index 00000000000..bbea245b95f --- /dev/null +++ b/xlators/features/changelog/src/changelog-helpers.h @@ -0,0 +1,386 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CHANGELOG_HELPERS_H +#define _CHANGELOG_HELPERS_H + +#include "locking.h" +#include "timer.h" +#include "pthread.h" +#include "iobuf.h" + +#include "changelog-misc.h" + +/** + * the changelog entry + */ +typedef struct changelog_log_data { +        /* rollover related */ +        unsigned long cld_roll_time; + +        /* reopen changelog? */ +        gf_boolean_t cld_finale; + +        changelog_log_type cld_type; + +        /** +         * sincd gfid is _always_ a necessity, it's not a part +         * of the iobuf. by doing this we do not add any overhead +         * for data and metadata related fops. +         */ +        uuid_t        cld_gfid; + +        /** +         * iobufs are used for optionals records: pargfid, path, +         * write offsets etc.. It's the fop implementers job +         * to allocate (iobuf_get() in the fop) and get unref'ed +         * in the callback (CHANGELOG_STACK_UNWIND). +         */ +        struct iobuf *cld_iobuf; + +#define cld_ptr cld_iobuf->ptr + +        /** +         * after allocation you can point this to the length of +         * usable data, but make sure it does not exceed the +         * the size of the requested iobuf. +         */ +        size_t        cld_iobuf_len; + +#define cld_ptr_len cld_iobuf_len + +        /** +         * number of optional records +         */ +        int cld_xtra_records; +} changelog_log_data_t; + +/** + * holder for dispatch function and private data + */ + +typedef struct changelog_priv changelog_priv_t; + +typedef struct changelog_dispatcher { +        void *cd_data; +        int (*dispatchfn) (xlator_t *, changelog_priv_t *, void *, +                           changelog_log_data_t *, changelog_log_data_t *); +} changelog_dispatcher_t; + +struct changelog_bootstrap { +        changelog_mode_t mode; +        int (*ctor) (xlator_t *, changelog_dispatcher_t *); +        int (*dtor) (xlator_t *, changelog_dispatcher_t *); +}; + +struct changelog_encoder { +        changelog_encoder_t encoder; +        int (*encode) (xlator_t *, changelog_log_data_t *); +}; + + +/* xlator private */ + +typedef struct changelog_time_slice { +        /** +         * just in case we need nanosecond granularity some day. +         * field is unused as of now (maybe we'd need it later). +         */ +        struct timeval tv_start; + +        /** +         * version of changelog file, incremented each time changes +         * rollover. +         */ +        unsigned long changelog_version[CHANGELOG_MAX_TYPE]; +} changelog_time_slice_t; + +typedef struct changelog_rollover { +        /* rollover thread */ +        pthread_t rollover_th; + +        xlator_t *this; +} changelog_rollover_t; + +typedef struct changelog_fsync { +        /* fsync() thread */ +        pthread_t fsync_th; + +        xlator_t *this; +} changelog_fsync_t; + +# define CHANGELOG_MAX_CLIENTS  5 +typedef struct changelog_notify { +        /* reader end of the pipe */ +        int rfd; + +        /* notifier thread */ +        pthread_t notify_th; + +        /* unique socket path */ +        char sockpath[PATH_MAX]; + +        int socket_fd; + +        /** +         * simple array of accept()'ed fds. Not scalable at all +         * for large number of clients, but it's okay as we have +         * a ahrd limit in this version (@CHANGELOG_MAX_CLIENTS). +         */ +        int client_fd[CHANGELOG_MAX_CLIENTS]; + +        xlator_t *this; +} changelog_notify_t; + +struct changelog_priv { +        gf_boolean_t active; + +        /* to generate unique socket file per brick */ +        char *changelog_brick; + +        /* logging directory */ +        char *changelog_dir; + +        /* one file for all changelog types */ +        int changelog_fd; + +        gf_lock_t lock; + +        /* writen end of the pipe */ +        int wfd; + +        /* rollover time */ +        int32_t rollover_time; + +        /* fsync() interval */ +        int32_t fsync_interval; + +        /* changelog type maps */ +        const char *maps[CHANGELOG_MAX_TYPE]; + +        /* time slicer */ +        changelog_time_slice_t slice; + +        /* context of the updater */ +        changelog_dispatcher_t cd; + +        /* context of the rollover thread */ +        changelog_rollover_t cr; + +        /* context of fsync thread */ +        changelog_fsync_t cf; + +        /* context of the notifier thread */ +        changelog_notify_t cn; + +        /* operation mode */ +        changelog_mode_t op_mode; + +        /* bootstrap routine for 'current' logger */ +        struct changelog_bootstrap *cb; + +        /* encoder mode */ +        changelog_encoder_t encode_mode; + +        /* encoder */ +        struct changelog_encoder *ce; +}; + +struct changelog_local { +        inode_t              *inode; +        gf_boolean_t          update_no_check; + +        changelog_log_data_t  cld; + +        /** +         * ->prev_entry is used in cases when there needs to be +         * additional changelog entry for the parent (eg. rename) +         * It's analogous to ->next in single linked list world, +         * but we call it as ->prev_entry... ha ha ha +         */ +        struct changelog_local *prev_entry; +}; + +typedef struct changelog_local changelog_local_t; + +/* inode version is stored in inode ctx */ +typedef struct changelog_inode_ctx { +        unsigned long iversion[CHANGELOG_MAX_TYPE]; +} changelog_inode_ctx_t; + +#define CHANGELOG_INODE_VERSION_TYPE(ctx, type)  &(ctx->iversion[type]) + +/** + * Optional Records: + *  fops that need to save additional information request a array of + *  @changelog_opt_t struct. The array is allocated via @iobufs. + */ +typedef enum { +        CHANGELOG_OPT_REC_FOP, +        CHANGELOG_OPT_REC_ENTRY, +} changelog_optional_rec_type_t; + +struct changelog_entry_fields { +        uuid_t  cef_uuid; +        char   *cef_bname; +}; + +typedef struct { +        /** +         * @co_covert can be used to do post-processing of the record before +         * it's persisted to the CHANGELOG. If this is NULL, then the record +         * is persisted as per it's in memory format. +         */ +        size_t (*co_convert) (void *data, char *buffer, gf_boolean_t encode); + +        /* release routines */ +        void (*co_free) (void *data); + +        /* type of the field */ +        changelog_optional_rec_type_t co_type; + +        /** +         * sizeof of the 'valid' field in the union. This field is not used if +         * @co_convert is specified. +         */ +        size_t co_len; + +        union { +                glusterfs_fop_t co_fop; +                struct changelog_entry_fields co_entry; +        }; +} changelog_opt_t; + +#define CHANGELOG_OPT_RECORD_LEN  sizeof (changelog_opt_t) + +/** + * helpers routines + */ + +void +changelog_thread_cleanup (xlator_t *this, pthread_t thr_id); +inline void * +changelog_get_usable_buffer (changelog_local_t *local); +inline void +changelog_set_usable_record_and_length (changelog_local_t *local, +                                        size_t len, int xr); +void +changelog_local_cleanup (xlator_t *xl, changelog_local_t *local); +changelog_local_t * +changelog_local_init (xlator_t *this, inode_t *inode, uuid_t gfid, +                      int xtra_records, gf_boolean_t update_flag); +int +changelog_start_next_change (xlator_t *this, +                             changelog_priv_t *priv, +                             unsigned long ts, gf_boolean_t finale); +int +changelog_open (xlator_t *this, changelog_priv_t *priv); +int +changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last); +int +changelog_inject_single_event (xlator_t *this, +                               changelog_priv_t *priv, +                               changelog_log_data_t *cld); +inline size_t +changelog_entry_length (); +inline int +changelog_write (int fd, char *buffer, size_t len); +int +changelog_write_change (changelog_priv_t *priv, char *buffer, size_t len); +inline int +changelog_handle_change (xlator_t *this, +                         changelog_priv_t *priv, changelog_log_data_t *cld); +inline void +changelog_update (xlator_t *this, changelog_priv_t *priv, +                  changelog_local_t *local, changelog_log_type type); +void * +changelog_rollover (void *data); +void * +changelog_fsync_thread (void *data); +int +changelog_forget (xlator_t *this, inode_t *inode); + +/* macros */ + +#define CHANGELOG_STACK_UNWIND(fop, frame, params ...) do {             \ +                changelog_local_t *__local = NULL;                      \ +                xlator_t          *__xl    = NULL;                      \ +                if (frame) {                                            \ +                        __local      = frame->local;                    \ +                        __xl         = frame->this;                     \ +                        frame->local = NULL;                            \ +                }                                                       \ +                STACK_UNWIND_STRICT (fop, frame, params);               \ +                changelog_local_cleanup (__xl, __local);                \ +                if (__local && __local->prev_entry)                     \ +                        changelog_local_cleanup (__xl,                  \ +                                                 __local->prev_entry);  \ +        } while (0) + +#define CHANGELOG_IOBUF_REF(iobuf) do {         \ +                if (iobuf)                      \ +                        iobuf_ref (iobuf);      \ +        } while (0) + +#define CHANGELOG_IOBUF_UNREF(iobuf) do {       \ +                if (iobuf)                      \ +                        iobuf_unref (iobuf);    \ +        } while (0) + +#define CHANGELOG_FILL_BUFFER(buffer, off, val, len) do {       \ +                memcpy (buffer + off, val, len);                \ +                off += len;                                     \ +        } while (0) + +#define SLICE_VERSION_UPDATE(slice) do {                \ +                int i = 0;                              \ +                for (; i < CHANGELOG_MAX_TYPE; i++) {   \ +                        slice->changelog_version[i]++;  \ +                }                                       \ +        } while (0) + +#define CHANGLOG_FILL_FOP_NUMBER(co, fop, converter, xlen) do { \ +                co->co_convert = converter;                     \ +                co->co_free = NULL;                             \ +                co->co_type = CHANGELOG_OPT_REC_FOP;            \ +                co->co_fop = fop;                               \ +                xlen += sizeof (fop);                           \ +        } while (0) + +#define CHANGELOG_FILL_ENTRY(co, pargfid, bname,                        \ +                             converter, freefn, xlen, label)            \ +        do {                                                            \ +                co->co_convert = converter;                             \ +                co->co_free = freefn;                                   \ +                co->co_type = CHANGELOG_OPT_REC_ENTRY;                  \ +                uuid_copy (co->co_entry.cef_uuid, pargfid);             \ +                co->co_entry.cef_bname = gf_strdup(bname);              \ +                if (!co->co_entry.cef_bname)                            \ +                        goto label;                                     \ +                xlen += (UUID_CANONICAL_FORM_LEN + strlen (bname));     \ +        } while (0) + +#define CHANGELOG_INIT(this, local, inode, gfid, xrec)                  \ +        local = changelog_local_init (this, inode, gfid, xrec, _gf_false) + +#define CHANGELOG_INIT_NOCHECK(this, local, inode, gfid, xrec)          \ +        local = changelog_local_init (this, inode, gfid, xrec, _gf_true) + +#define CHANGELOG_NOT_ACTIVE_THEN_GOTO(priv, label) do {        \ +                if (!priv->active)                              \ +                        goto label;                             \ +        } while (0) + +#define CHANGELOG_COND_GOTO(priv, cond, label) do {                    \ +                if (!priv->active || cond)                             \ +                        goto label;                                    \ +        } while (0) + +#endif /* _CHANGELOG_HELPERS_H */ diff --git a/xlators/features/changelog/src/changelog-mem-types.h b/xlators/features/changelog/src/changelog-mem-types.h new file mode 100644 index 00000000000..d72464eab70 --- /dev/null +++ b/xlators/features/changelog/src/changelog-mem-types.h @@ -0,0 +1,29 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CHANGELOG_MEM_TYPES_H +#define _CHANGELOG_MEM_TYPES_H + +#include "mem-types.h" + +enum gf_changelog_mem_types { +        gf_changelog_mt_priv_t                  = gf_common_mt_end + 1, +        gf_changelog_mt_str_t                   = gf_common_mt_end + 2, +        gf_changelog_mt_batch_t                 = gf_common_mt_end + 3, +        gf_changelog_mt_rt_t                    = gf_common_mt_end + 4, +        gf_changelog_mt_inode_ctx_t             = gf_common_mt_end + 5, +        gf_changelog_mt_libgfchangelog_t        = gf_common_mt_end + 6, +        gf_changelog_mt_libgfchangelog_rl_t     = gf_common_mt_end + 7, +        gf_changelog_mt_libgfchangelog_dirent_t = gf_common_mt_end + 8, +        gf_changelog_mt_changelog_buffer_t      = gf_common_mt_end + 9, +        gf_changelog_mt_end +}; + +#endif diff --git a/xlators/features/changelog/src/changelog-misc.h b/xlators/features/changelog/src/changelog-misc.h new file mode 100644 index 00000000000..0712a377183 --- /dev/null +++ b/xlators/features/changelog/src/changelog-misc.h @@ -0,0 +1,101 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CHANGELOG_MISC_H +#define _CHANGELOG_MISC_H + +#include "glusterfs.h" +#include "common-utils.h" + +#define CHANGELOG_MAX_TYPE  3 +#define CHANGELOG_FILE_NAME "CHANGELOG" + +#define CHANGELOG_VERSION_MAJOR  1 +#define CHANGELOG_VERSION_MINOR  0 + +#define CHANGELOG_UNIX_SOCK  DEFAULT_VAR_RUN_DIRECTORY"/changelog-%s.sock" + +/** + * header starts with the version and the format of the changelog. + * 'version' not much of a use now. + */ +#define CHANGELOG_HEADER                                                \ +        "GlusterFS Changelog | version: v%d.%d | encoding : %d\n" + +#define CHANGELOG_MAKE_SOCKET_PATH(brick_path, sockpath, len) do {      \ +                char md5_sum[MD5_DIGEST_LENGTH*2+1] = {0,};             \ +                md5_wrapper((unsigned char *) brick_path,               \ +                            strlen(brick_path),                         \ +                            md5_sum);                                   \ +                (void) snprintf (sockpath, len,                         \ +                                 CHANGELOG_UNIX_SOCK, md5_sum);         \ +        } while (0) + +/** + * ... used by libgfchangelog. + */ +#define CHANGELOG_GET_ENCODING(fd, buffer, len, enc, enc_len) do {      \ +                FILE *fp;                                               \ +                int fd_dup, maj, min;                                   \ +                                                                        \ +                enc = -1;                                               \ +                fd_dup = dup (fd);                                      \ +                                                                        \ +                if (fd_dup != -1) {                                     \ +                        fp = fdopen (fd_dup, "r");                      \ +                        if (fp) {                                       \ +                                if (fgets (buffer, len, fp)) {          \ +                                        elen = strlen (buffer);         \ +                                        sscanf (buffer,                 \ +                                                CHANGELOG_HEADER,       \ +                                                &maj, &min, &enc);      \ +                                }                                       \ +                                fclose (fp);                            \ +                        } else {                                        \ +                                close (fd_dup);                         \ +                        }                                               \ +                }                                                       \ +        } while (0) + +/** + * everything after 'CHANGELOG_TYPE_ENTRY' are internal types + * (ie. none of the fops trigger this type of event), hence + * CHANGELOG_MAX_TYPE = 3 + */ +typedef enum { +        CHANGELOG_TYPE_DATA = 0, +        CHANGELOG_TYPE_METADATA, +        CHANGELOG_TYPE_ENTRY, +        CHANGELOG_TYPE_ROLLOVER, +        CHANGELOG_TYPE_FSYNC, +} changelog_log_type; + +/* operation modes - RT for now */ +typedef enum { +        CHANGELOG_MODE_RT = 0, +} changelog_mode_t; + +/* encoder types */ + +typedef enum { +        CHANGELOG_ENCODE_MIN = 0, +        CHANGELOG_ENCODE_BINARY, +        CHANGELOG_ENCODE_ASCII, +        CHANGELOG_ENCODE_MAX, +} changelog_encoder_t; + +#define CHANGELOG_VALID_ENCODING(enc)                                   \ +        (enc > CHANGELOG_ENCODE_MIN && enc < CHANGELOG_ENCODE_MAX) + +#define CHANGELOG_TYPE_IS_ENTRY(type)  (type == CHANGELOG_TYPE_ENTRY) +#define CHANGELOG_TYPE_IS_ROLLOVER(type)  (type == CHANGELOG_TYPE_ROLLOVER) +#define CHANGELOG_TYPE_IS_FSYNC(type)  (type == CHANGELOG_TYPE_FSYNC) + +#endif /* _CHANGELOG_MISC_H */ diff --git a/xlators/features/changelog/src/changelog-notifier.c b/xlators/features/changelog/src/changelog-notifier.c new file mode 100644 index 00000000000..1f8b312538e --- /dev/null +++ b/xlators/features/changelog/src/changelog-notifier.c @@ -0,0 +1,314 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "changelog-notifier.h" + +#include <pthread.h> + +inline static void +changelog_notify_clear_fd (changelog_notify_t *cn, int i) +{ +        cn->client_fd[i] = -1; +} + +inline static void +changelog_notify_save_fd (changelog_notify_t *cn, int i, int fd) +{ +        cn->client_fd[i] = fd; +} + +static int +changelog_notify_insert_fd (xlator_t *this, changelog_notify_t *cn, int fd) +{ +        int i   = 0; +        int ret = 0; + +        for (; i < CHANGELOG_MAX_CLIENTS; i++) { +                if (cn->client_fd[i] == -1) +                        break; +        } + +        if (i == CHANGELOG_MAX_CLIENTS) { +                /** +                 * this case should not be hit as listen() would limit +                 * the number of completely established connections. +                 */ +                gf_log (this->name, GF_LOG_WARNING, +                        "hit max client limit (%d)", CHANGELOG_MAX_CLIENTS); +                ret = -1; +        } +        else +                changelog_notify_save_fd (cn, i, fd); + +        return ret; +} + +static void +changelog_notify_fill_rset (changelog_notify_t *cn, fd_set *rset, int *maxfd) +{ +        int i = 0; + +        FD_ZERO (rset); + +        FD_SET (cn->socket_fd, rset); +        *maxfd = cn->socket_fd; + +        FD_SET (cn->rfd, rset); +        *maxfd = max (*maxfd, cn->rfd); + +        for (; i < CHANGELOG_MAX_CLIENTS; i++) { +                if (cn->client_fd[i] != -1) { +                        FD_SET (cn->client_fd[i], rset); +                        *maxfd = max (*maxfd, cn->client_fd[i]); +                } +        } + +        *maxfd = *maxfd + 1; +} + +static int +changelog_notify_client (changelog_notify_t *cn, char *path, ssize_t len) +{ +        int i = 0; +        int ret = 0; + +        for (; i < CHANGELOG_MAX_CLIENTS; i++) { +                if (cn->client_fd[i] == -1) +                        continue; + +                if (changelog_write (cn->client_fd[i], +                                     path, len)) { +                        ret = -1; + +                        close (cn->client_fd[i]); +                        changelog_notify_clear_fd (cn, i); +                } +        } + +        return ret; +} + +static void +changelog_notifier_init (changelog_notify_t *cn) +{ +        int i = 0; + +        cn->socket_fd = -1; + +        for (; i < CHANGELOG_MAX_CLIENTS; i++) { +                changelog_notify_clear_fd (cn, i); +        } +} + +static void +changelog_close_client_conn (changelog_notify_t *cn) +{ +        int i = 0; + +        for (; i < CHANGELOG_MAX_CLIENTS; i++) { +                if (cn->client_fd[i] == -1) +                        continue; + +                close (cn->client_fd[i]); +                changelog_notify_clear_fd (cn, i); +        } +} + +static void +changelog_notifier_cleanup (void *arg) +{ +        changelog_notify_t *cn = NULL; + +        cn = (changelog_notify_t *) arg; + +        changelog_close_client_conn (cn); + +        if (cn->socket_fd != -1) +                close (cn->socket_fd); + +        if (cn->rfd) +                close (cn->rfd); + +        if (unlink (cn->sockpath)) +                gf_log ("", GF_LOG_WARNING, +                        "could not unlink changelog socket file" +                        " %s (reason: %s", cn->sockpath, strerror (errno)); +} + +void * +changelog_notifier (void *data) +{ +        int                 i         = 0; +        int                 fd        = 0; +        int                 max_fd    = 0; +        int                 len       = 0; +        ssize_t             readlen   = 0; +        xlator_t           *this      = NULL; +        changelog_priv_t   *priv      = NULL; +        changelog_notify_t *cn        = NULL; +        struct sockaddr_un  local     = {0,}; +        char path[PATH_MAX]           = {0,}; +        char abspath[PATH_MAX]        = {0,}; + +        char buffer; +        fd_set rset; + +        priv = (changelog_priv_t *) data; + +        cn = &priv->cn; +        this = cn->this; + +        pthread_cleanup_push (changelog_notifier_cleanup, cn); + +        changelog_notifier_init (cn); + +        cn->socket_fd = socket (AF_UNIX, SOCK_STREAM, 0); +        if (cn->socket_fd < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "changelog socket error (reason: %s)", +                        strerror (errno)); +                goto out; +        } + +        CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick, +                                    cn->sockpath, PATH_MAX); +        if (unlink (cn->sockpath) < 0) { +                if (errno != ENOENT) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "Could not unlink changelog socket file (%s)" +                                " (reason: %s)", +                                CHANGELOG_UNIX_SOCK, strerror (errno)); +                        goto cleanup; +                } +        } + +        local.sun_family = AF_UNIX; +        strcpy (local.sun_path, cn->sockpath); + +        len = strlen (local.sun_path) + sizeof (local.sun_family); + +        /* bind to the unix domain socket */ +        if (bind (cn->socket_fd, (struct sockaddr *) &local, len) < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Could not bind to changelog socket (reason: %s)", +                        strerror (errno)); +                goto cleanup; +        } + +        /* listen for incoming connections */ +        if (listen (cn->socket_fd, CHANGELOG_MAX_CLIENTS) < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "listen() error on changelog socket (reason: %s)", +                        strerror (errno)); +                goto cleanup; +        } + +        /** +         * simple select() on all to-be-read file descriptors. This method +         * though old school works pretty well when you have a handfull of +         * fd's to be watched (clients). +         * +         * Future TODO: move this to epoll based notification facility if +         *              number of clients increase. +         */ +        for (;;) { +                changelog_notify_fill_rset (cn, &rset, &max_fd); + +                if (select (max_fd, &rset, NULL, NULL, NULL) < 0) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "select() returned -1 (reason: %s)", +                                strerror (errno)); +                        sleep (2); +                        continue; +                } + +                if (FD_ISSET (cn->socket_fd, &rset)) { +                        fd = accept (cn->socket_fd, NULL, NULL); +                        if (fd < 0) { +                                gf_log (this->name, GF_LOG_ERROR, +                                        "accept error on changelog socket" +                                        " (reason: %s)", strerror (errno)); +                        } else if (changelog_notify_insert_fd (this, cn, fd)) { +                                gf_log (this->name, GF_LOG_ERROR, +                                        "hit max client limit"); +                        } +                } + +                if (FD_ISSET (cn->rfd, &rset)) { +                        /** +                         * read changelog filename and notify all connected +                         * clients. +                         */ +                        readlen = 0; +                        while (readlen < PATH_MAX) { +                                len = read (cn->rfd, &path[readlen++], 1); +                                if (len == -1) { +                                        break; +                                } + +                                if (len == 0) { +                                        gf_log (this->name, GF_LOG_ERROR, +                                                "rollover thread sent EOF" +                                                " on pipe - possibly a crash."); +                                        /* be blunt and close all connections */ +                                        pthread_exit(NULL); +                                } + +                                if (path[readlen - 1] == '\0') +                                        break; +                        } + +                        /* should we close all client connections here too? */ +                        if (len < 0 || readlen == PATH_MAX) { +                                gf_log (this->name, GF_LOG_ERROR, +                                        "Could not get pathname from rollover" +                                        " thread or pathname too long"); +                                goto process_rest; +                        } + +                        (void) snprintf (abspath, PATH_MAX, +                                         "%s/%s", priv->changelog_dir, path); +                        if (changelog_notify_client (cn, abspath, +                                                     strlen (abspath) + 1)) +                                gf_log (this->name, GF_LOG_ERROR, +                                        "could not notify some clients with new" +                                        " changelogs"); +                } + +        process_rest: +                for (i = 0; i < CHANGELOG_MAX_CLIENTS; i++) { +                        if ( (fd = cn->client_fd[i]) == -1 ) +                                continue; + +                        if (FD_ISSET (fd, &rset)) { +                                /** +                                 * the only data we accept from the client is a +                                 * disconnect. Anything else is treated as bogus +                                 * and is silently discarded (also warned!!!). +                                 */ +                                if ( (readlen = read (fd, &buffer, 1)) <= 0 ) { +                                        close (fd); +                                        changelog_notify_clear_fd (cn, i); +                                } else { +                                        /* silently discard data and log */ +                                        gf_log (this->name, GF_LOG_WARNING, +                                                "misbehaving changelog client"); +                                } +                        } +                } + +        } + + cleanup:; +        pthread_cleanup_pop (1); + + out: +        return NULL; +} diff --git a/xlators/features/changelog/src/changelog-notifier.h b/xlators/features/changelog/src/changelog-notifier.h new file mode 100644 index 00000000000..55e728356e6 --- /dev/null +++ b/xlators/features/changelog/src/changelog-notifier.h @@ -0,0 +1,19 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CHANGELOG_NOTIFIER_H +#define _CHANGELOG_NOTIFIER_H + +#include "changelog-helpers.h" + +void * +changelog_notifier (void *data); + +#endif diff --git a/xlators/features/changelog/src/changelog-rt.c b/xlators/features/changelog/src/changelog-rt.c new file mode 100644 index 00000000000..c147f68ca85 --- /dev/null +++ b/xlators/features/changelog/src/changelog-rt.c @@ -0,0 +1,72 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "xlator.h" +#include "defaults.h" +#include "logging.h" + +#include "changelog-rt.h" +#include "changelog-mem-types.h" + +int +changelog_rt_init (xlator_t *this, changelog_dispatcher_t *cd) +{ +        changelog_rt_t *crt = NULL; + +        crt = GF_CALLOC (1, sizeof (*crt), +                         gf_changelog_mt_rt_t); +        if (!crt) +                return -1; + +        LOCK_INIT (&crt->lock); + +        cd->cd_data = crt; +        cd->dispatchfn = &changelog_rt_enqueue; + +        return 0; +} + +int +changelog_rt_fini (xlator_t *this, changelog_dispatcher_t *cd) +{ +        changelog_rt_t *crt = NULL; + +        crt = cd->cd_data; + +        LOCK_DESTROY (&crt->lock); +        GF_FREE (crt); + +        return 0; +} + +int +changelog_rt_enqueue (xlator_t *this, changelog_priv_t *priv, void *cbatch, +                      changelog_log_data_t *cld_0, changelog_log_data_t *cld_1) +{ +        int             ret = 0; +        changelog_rt_t *crt = NULL; + +        crt = (changelog_rt_t *) cbatch; + +        LOCK (&crt->lock); +        { +                ret = changelog_handle_change (this, priv, cld_0); +                if (!ret && cld_1) +                        ret = changelog_handle_change (this, priv, cld_1); +        } +        UNLOCK (&crt->lock); + +        return ret; +} diff --git a/xlators/features/changelog/src/changelog-rt.h b/xlators/features/changelog/src/changelog-rt.h new file mode 100644 index 00000000000..1fc2bbc5bb9 --- /dev/null +++ b/xlators/features/changelog/src/changelog-rt.h @@ -0,0 +1,33 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CHANGELOG_RT_H +#define _CHANGELOG_RT_H + +#include "locking.h" +#include "timer.h" +#include "pthread.h" + +#include "changelog-helpers.h" + +/* unused as of now - may be you would need it later */ +typedef struct changelog_rt { +        gf_lock_t lock; +} changelog_rt_t; + +int +changelog_rt_init (xlator_t *this, changelog_dispatcher_t *cd); +int +changelog_rt_fini (xlator_t *this, changelog_dispatcher_t *cd); +int +changelog_rt_enqueue (xlator_t *this, changelog_priv_t *priv, void *cbatch, +                      changelog_log_data_t *cld_0, changelog_log_data_t *cld_1); + +#endif /* _CHANGELOG_RT_H */ diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c new file mode 100644 index 00000000000..35e3e784986 --- /dev/null +++ b/xlators/features/changelog/src/changelog.c @@ -0,0 +1,1487 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "xlator.h" +#include "defaults.h" +#include "logging.h" +#include "iobuf.h" + +#include "changelog-rt.h" + +#include "changelog-encoders.h" +#include "changelog-mem-types.h" + +#include <pthread.h> + +#include "changelog-notifier.h" + +static struct changelog_bootstrap +cb_bootstrap[] = { +        { +                .mode = CHANGELOG_MODE_RT, +                .ctor = changelog_rt_init, +                .dtor = changelog_rt_fini, +        }, +}; + +static struct changelog_encoder +cb_encoder[] = { +        [CHANGELOG_ENCODE_BINARY] = +        { +                .encoder = CHANGELOG_ENCODE_BINARY, +                .encode = changelog_encode_binary, +        }, +        [CHANGELOG_ENCODE_ASCII] = +        { +                .encoder = CHANGELOG_ENCODE_ASCII, +                .encode = changelog_encode_ascii, +        }, +}; + +/* Entry operations - TYPE III */ + +/** + * entry operations do not undergo inode version checking. + */ + +/* {{{ */ + +/* rmdir */ + +int32_t +changelog_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                     int32_t op_ret, int32_t op_errno, struct iatt *preparent, +                     struct iatt *postparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (rmdir, frame, op_ret, op_errno, +                                preparent, postparent, xdata); +        return 0; +} + +int32_t +changelog_rmdir (call_frame_t *frame, xlator_t *this, +                 loc_t *loc, int xflags, dict_t *xdata) +{ +        size_t            xtra_len = 0; +        changelog_priv_t *priv     = NULL; +        changelog_opt_t  *co       = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT_NOCHECK (this, frame->local, +                                NULL, loc->inode->gfid, 2); + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + + wind: +        STACK_WIND (frame, changelog_rmdir_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir, +                    loc, xflags, xdata); +        return 0; +} + +/* unlink */ + +int32_t +changelog_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                      int32_t op_ret, int32_t op_errno, struct iatt *preparent, +                      struct iatt *postparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (unlink, frame, op_ret, op_errno, +                                preparent, postparent, xdata); +        return 0; +} + +int32_t +changelog_unlink (call_frame_t *frame, xlator_t *this, +                  loc_t *loc, int xflags, dict_t *xdata) +{ +        size_t            xtra_len = 0; +        changelog_priv_t *priv     = NULL; +        changelog_opt_t  *co       = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, loc->inode->gfid, 2); + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + + wind: +        STACK_WIND (frame, changelog_unlink_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink, +                    loc, xflags, xdata); +        return 0; +} + +/* rename */ + +int32_t +changelog_rename_cbk (call_frame_t *frame, +                      void *cookie, xlator_t *this, +                      int32_t op_ret, int32_t op_errno, +                      struct iatt *buf, struct iatt *preoldparent, +                      struct iatt *postoldparent, struct iatt *prenewparent, +                      struct iatt *postnewparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (rename, frame, op_ret, op_errno, +                                buf, preoldparent, postoldparent, +                                prenewparent, postnewparent, xdata); +        return 0; +} + + +int32_t +changelog_rename (call_frame_t *frame, xlator_t *this, +                  loc_t *oldloc, loc_t *newloc, dict_t *xdata) +{ +        size_t            xtra_len  = 0; +        uuid_t            null_uuid = {0,}; +        changelog_priv_t *priv      = NULL; +        changelog_opt_t  *co        = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        /* 3 == fop + oldloc + newloc */ +        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, null_uuid, 3); + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, oldloc->pargfid, oldloc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        co++; +        CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 3); + + wind: +        STACK_WIND (frame, changelog_rename_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename, +                    oldloc, newloc, xdata); +        return 0; +} + +/* link */ + +int32_t +changelog_link_cbk (call_frame_t *frame, +                    void *cookie, xlator_t *this, int32_t op_ret, +                    int32_t op_errno, inode_t *inode, +                    struct iatt *buf, struct iatt *preparent, +                    struct iatt *postparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (link, frame, op_ret, op_errno, +                                inode, buf, preparent, postparent, xdata); +        return 0; +} + +int32_t +changelog_link (call_frame_t *frame, +                xlator_t *this, loc_t *oldloc, +                loc_t *newloc, dict_t *xdata) +{ +        size_t            xtra_len = 0; +        changelog_priv_t *priv     = NULL; +        changelog_opt_t  *co       = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, oldloc->gfid, 2); + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + + wind: +        STACK_WIND (frame, changelog_link_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->link, +                    oldloc, newloc, xdata); +        return 0; +} + +/* mkdir */ + +int32_t +changelog_mkdir_cbk (call_frame_t *frame, +                     void *cookie, xlator_t *this, int32_t op_ret, +                     int32_t op_errno, inode_t *inode, +                     struct iatt *buf, struct iatt *preparent, +                     struct iatt *postparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (mkdir, frame, op_ret, op_errno, +                                inode, buf, preparent, postparent, xdata); +        return 0; +} + +int32_t +changelog_mkdir (call_frame_t *frame, xlator_t *this, +                 loc_t *loc, mode_t mode, mode_t umask, dict_t *xdata) +{ +        int               ret      = -1; +        uuid_t            gfid     = {0,}; +        void             *uuid_req = NULL; +        size_t            xtra_len = 0; +        changelog_priv_t *priv     = NULL; +        changelog_opt_t  *co       = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); +        if (ret) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "failed to get gfid from dict"); +                goto wind; +        } +        uuid_copy (gfid, uuid_req); + +        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2); + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + + wind: +        STACK_WIND (frame, changelog_mkdir_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->mkdir, +                    loc, mode, umask, xdata); +        return 0; +} + +/* symlink */ + +int32_t +changelog_symlink_cbk (call_frame_t *frame, +                       void *cookie, xlator_t *this, +                       int32_t op_ret, int32_t op_errno, +                       inode_t *inode, struct iatt *buf, struct iatt *preparent, +                       struct iatt *postparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (symlink, frame, op_ret, op_errno, +                                inode, buf, preparent, postparent, xdata); +        return 0; +} + +int32_t +changelog_symlink (call_frame_t *frame, xlator_t *this, +                   const char *linkname, loc_t *loc, +                   mode_t umask, dict_t *xdata) +{ +        int               ret      = -1; +        size_t            xtra_len = 0; +        uuid_t            gfid     = {0,}; +        void             *uuid_req = NULL; +        changelog_priv_t *priv     = NULL; +        changelog_opt_t  *co       = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); +        if (ret) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "failed to get gfid from dict"); +                goto wind; +        } +        uuid_copy (gfid, uuid_req); + +        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2); + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + + wind: +        STACK_WIND (frame, changelog_symlink_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->symlink, +                    linkname, loc, umask, xdata); +        return 0; +} + +/* mknod */ + +int32_t +changelog_mknod_cbk (call_frame_t *frame, +                     void *cookie, xlator_t *this, +                     int32_t op_ret, int32_t op_errno, inode_t *inode, +                     struct iatt *buf, struct iatt *preparent, +                     struct iatt *postparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (mknod, frame, op_ret, op_errno, +                                inode, buf, preparent, postparent, xdata); +        return 0; +} + +int32_t +changelog_mknod (call_frame_t *frame, +                 xlator_t *this, loc_t *loc, +                 mode_t mode, dev_t dev, mode_t umask, dict_t *xdata) +{ +        int               ret      = -1; +        uuid_t            gfid     = {0,}; +        void             *uuid_req = NULL; +        size_t            xtra_len = 0; +        changelog_priv_t *priv     = NULL; +        changelog_opt_t  *co       = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); +        if (ret) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "failed to get gfid from dict"); +                goto wind; +        } +        uuid_copy (gfid, uuid_req); + +        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2); + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + + wind: +        STACK_WIND (frame, changelog_mknod_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->mknod, +                    loc, mode, dev, umask, xdata); +        return 0; +} + +/* creat */ + +int32_t +changelog_create_cbk (call_frame_t *frame, +                      void *cookie, xlator_t *this, +                      int32_t op_ret, int32_t op_errno, +                      fd_t *fd, inode_t *inode, struct iatt *buf, +                      struct iatt *preparent, +                      struct iatt *postparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (create, frame, +                                op_ret, op_errno, fd, inode, +                                buf, preparent, postparent, xdata); +        return 0; +} + +int32_t +changelog_create (call_frame_t *frame, xlator_t *this, +                  loc_t *loc, int32_t flags, mode_t mode, +                  mode_t umask, fd_t *fd, dict_t *xdata) +{ +        int               ret      = -1; +        uuid_t            gfid     = {0,}; +        void             *uuid_req = NULL; +        changelog_opt_t  *co       = NULL; +        changelog_priv_t *priv     = NULL; +        size_t            xtra_len = 0; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); +        if (ret) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "failed to get gfid from dict"); +                goto wind; +        } +        uuid_copy (gfid, uuid_req); + +        /* init with two extra records */ +        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2); +        if (!frame->local) +                goto wind; + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + + wind: +        STACK_WIND (frame, changelog_create_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->create, +                    loc, flags, mode, umask, fd, xdata); +        return 0; +} + +/* }}} */ + + +/* Metadata modification fops - TYPE II */ + +/* {{{ */ + +/* {f}setattr */ + +int32_t +changelog_fsetattr_cbk (call_frame_t *frame, +                        void *cookie, xlator_t *this, int32_t op_ret, +                        int32_t op_errno, struct iatt *preop_stbuf, +                        struct iatt *postop_stbuf, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + unwind: +        CHANGELOG_STACK_UNWIND (fsetattr, frame, op_ret, op_errno, +                                preop_stbuf, postop_stbuf, xdata); + +        return 0; + + +} + +int32_t +changelog_fsetattr (call_frame_t *frame, +                    xlator_t *this, fd_t *fd, +                    struct iatt *stbuf, int32_t valid, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        fd->inode, fd->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_fsetattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetattr, +                    fd, stbuf, valid, xdata); +        return 0; + + +} + +int32_t +changelog_setattr_cbk (call_frame_t *frame, +                       void *cookie, xlator_t *this, int32_t op_ret, +                       int32_t op_errno, struct iatt *preop_stbuf, +                       struct iatt *postop_stbuf, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + unwind: +        CHANGELOG_STACK_UNWIND (setattr, frame, op_ret, op_errno, +                                preop_stbuf, postop_stbuf, xdata); + +        return 0; +} + +int32_t +changelog_setattr (call_frame_t *frame, +                   xlator_t *this, loc_t *loc, +                   struct iatt *stbuf, int32_t valid, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        loc->inode, loc->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_setattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->setattr, +                    loc, stbuf, valid, xdata); +        return 0; +} + +/* {f}removexattr */ + +int32_t +changelog_fremovexattr_cbk (call_frame_t *frame, +                            void *cookie, xlator_t *this, +                            int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + unwind: +        CHANGELOG_STACK_UNWIND (fremovexattr, frame, op_ret, op_errno, xdata); + +        return 0; +} + +int32_t +changelog_fremovexattr (call_frame_t *frame, xlator_t *this, +                        fd_t *fd, const char *name, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        fd->inode, fd->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_fremovexattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->fremovexattr, +                    fd, name, xdata); +        return 0; +} + +int32_t +changelog_removexattr_cbk (call_frame_t *frame, +                           void *cookie, xlator_t *this, +                           int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + unwind: +        CHANGELOG_STACK_UNWIND (removexattr, frame, op_ret, op_errno, xdata); + +        return 0; +} + +int32_t +changelog_removexattr (call_frame_t *frame, xlator_t *this, +                       loc_t *loc, const char *name, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        loc->inode, loc->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_removexattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->removexattr, +                    loc, name, xdata); +        return 0; +} + +/* {f}setxattr */ + +int32_t +changelog_setxattr_cbk (call_frame_t *frame, +                        void *cookie, xlator_t *this, +                        int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + unwind: +        CHANGELOG_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata); + +        return 0; +} + +int32_t +changelog_setxattr (call_frame_t *frame, +                    xlator_t *this, loc_t *loc, +                    dict_t *dict, int32_t flags, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        loc->inode, loc->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_setxattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->setxattr, +                    loc, dict, flags, xdata); +        return 0; +} + +int32_t +changelog_fsetxattr_cbk (call_frame_t *frame, +                         void *cookie, xlator_t *this, int32_t op_ret, +                         int32_t op_errno, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + unwind: +        CHANGELOG_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, xdata); + +        return 0; +} + +int32_t +changelog_fsetxattr (call_frame_t *frame, +                     xlator_t *this, fd_t *fd, dict_t *dict, +                     int32_t flags, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        fd->inode, fd->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_fsetxattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetxattr, +                    fd, dict, flags, xdata); +        return 0; +} + +/* }}} */ + + +/* Data modification fops - TYPE I */ + +/* {{{ */ + +/* {f}truncate() */ + +int32_t +changelog_truncate_cbk (call_frame_t *frame, +                        void *cookie, xlator_t *this, int32_t op_ret, +                        int32_t op_errno, struct iatt *prebuf, +                        struct iatt *postbuf, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); + + unwind: +        CHANGELOG_STACK_UNWIND (truncate, frame, +                                op_ret, op_errno, prebuf, postbuf, xdata); +        return 0; +} + +int32_t +changelog_truncate (call_frame_t *frame, +                    xlator_t *this, loc_t *loc, off_t offset, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        loc->inode, loc->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_truncate_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->truncate, +                    loc, offset, xdata); +        return 0; +} + +int32_t +changelog_ftruncate_cbk (call_frame_t *frame, +                         void *cookie, xlator_t *this, int32_t op_ret, +                         int32_t op_errno, struct iatt *prebuf, +                         struct iatt *postbuf, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); + + unwind: +        CHANGELOG_STACK_UNWIND (ftruncate, frame, +                                op_ret, op_errno, prebuf, postbuf, xdata); +        return 0; +} + +int32_t +changelog_ftruncate (call_frame_t *frame, +                     xlator_t *this, fd_t *fd, off_t offset, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        fd->inode, fd->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_ftruncate_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->ftruncate, +                    fd, offset, xdata); +        return 0; +} + +/* writev() */ + +int32_t +changelog_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                    int32_t op_ret, int32_t op_errno, struct iatt *prebuf, +                    struct iatt *postbuf, +                    dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret <= 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); + + unwind: +        CHANGELOG_STACK_UNWIND (writev, frame, +                                op_ret, op_errno, prebuf, postbuf, xdata); +        return 0; +} + +int32_t +changelog_writev (call_frame_t *frame, +                  xlator_t *this, fd_t *fd, struct iovec *vector, +                  int32_t count, off_t offset, uint32_t flags, +                  struct iobref *iobref, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        fd->inode, fd->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_writev_cbk, FIRST_CHILD (this), +                    FIRST_CHILD (this)->fops->writev, fd, vector, +                    count, offset, flags, iobref, xdata); +        return 0; +} + +/* }}} */ + +/** + * The + *   - @init () + *   - @fini () + *   - @reconfigure () + *   ... and helper routines + */ + +/** + * needed if there are more operation modes in the future. + */ +static void +changelog_assign_opmode (changelog_priv_t *priv, char *mode) +{ +        if ( strncmp (mode, "realtime", 8) == 0 ) { +                priv->op_mode = CHANGELOG_MODE_RT; +        } +} + +static void +changelog_assign_encoding (changelog_priv_t *priv, char *enc) +{ +        if ( strncmp (enc, "binary", 6) == 0 ) { +                priv->encode_mode = CHANGELOG_ENCODE_BINARY; +        } else if ( strncmp (enc, "ascii", 5) == 0 ) { +                priv->encode_mode = CHANGELOG_ENCODE_ASCII; +        } +} + +/* cleanup any helper threads that are running */ +static void +changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv) +{ +        if (priv->cr.rollover_th) { +                changelog_thread_cleanup (this, priv->cr.rollover_th); +                priv->cr.rollover_th = 0; +        } + +        if (priv->cf.fsync_th) { +                changelog_thread_cleanup (this, priv->cf.fsync_th); +                priv->cf.fsync_th = 0; +        } +} + +/* spawn helper thread; cleaning up in case of errors */ +static int +changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv) +{ +        int ret = 0; + +        priv->cr.this = this; +        ret = pthread_create (&priv->cr.rollover_th, +                              NULL, changelog_rollover, priv); +        if (ret) +                goto out; + +        if (priv->fsync_interval) { +                priv->cf.this = this; +                ret = pthread_create (&priv->cf.fsync_th, +                                      NULL, changelog_fsync_thread, priv); +        } + +        if (ret) +                changelog_cleanup_helper_threads (this, priv); + + out: +        return ret; +} + +/* cleanup the notifier thread */ +static int +changelog_cleanup_notifier (xlator_t *this, changelog_priv_t *priv) +{ +        int ret = 0; + +        if (priv->cn.notify_th) { +                changelog_thread_cleanup (this, priv->cn.notify_th); +                priv->cn.notify_th = 0; + +                ret = close (priv->wfd); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, +                                "error closing writer end of notifier pipe" +                                " (reason: %s)", strerror (errno)); +        } + +        return ret; +} + +/* spawn the notifier thread - nop if already running */ +static int +changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv) +{ +        int ret        = 0; +        int flags      = 0; +        int pipe_fd[2] = {0, 0}; + +        if (priv->cn.notify_th) +                goto out; /* notifier thread already running */ + +        ret = pipe (pipe_fd); +        if (ret == -1) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Cannot create pipe (reason: %s)", strerror (errno)); +                goto out; +        } + +        /* writer is non-blocking */ +        flags = fcntl (pipe_fd[1], F_GETFL); +        flags |= O_NONBLOCK; + +        ret = fcntl (pipe_fd[1], F_SETFL, flags); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "failed to set O_NONBLOCK flag"); +                goto out; +        } + +        priv->wfd = pipe_fd[1]; + +        priv->cn.this = this; +        priv->cn.rfd  = pipe_fd[0]; + +        ret = pthread_create (&priv->cn.notify_th, +                              NULL, changelog_notifier, priv); + + out: +        return ret; +} + +int32_t +mem_acct_init (xlator_t *this) +{ +        int     ret = -1; + +        if (!this) +                return ret; + +        ret = xlator_mem_acct_init (this, gf_changelog_mt_end + 1); + +        if (ret != 0) { +                gf_log (this->name, GF_LOG_WARNING, "Memory accounting" +                        " init failed"); +                return ret; +        } + +        return ret; +} + +static int +changelog_init (xlator_t *this, changelog_priv_t *priv) +{ +        int                  i   = 0; +        int                  ret = -1; +        struct timeval       tv  = {0,}; +        changelog_log_data_t cld = {0,}; + +        ret = gettimeofday (&tv, NULL); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "gettimeofday() failure"); +                goto out; +        } + +        priv->slice.tv_start = tv; + +        priv->maps[CHANGELOG_TYPE_DATA]     = "D "; +        priv->maps[CHANGELOG_TYPE_METADATA] = "M "; +        priv->maps[CHANGELOG_TYPE_ENTRY]    = "E "; + +        /* spawn the notifier thread */ +        ret = changelog_spawn_notifier (this, priv); +        if (ret) +                goto out; + +        /** +         * start with a fresh changelog file every time. this is done +         * in case there was an encoding change. so... things are kept +         * simple here. +         */ +        ret = changelog_fill_rollover_data (&cld, _gf_false); +        if (ret) +                goto out; + +        LOCK (&priv->lock); +        { +                for (; i < CHANGELOG_MAX_TYPE; i++) { +                        /* start with version 1 */ +                        priv->slice.changelog_version[i] = 1; +                } + +                ret = changelog_inject_single_event (this, priv, &cld); +        } +        UNLOCK (&priv->lock); + +        /* ... and finally spawn the helpers threads */ +        ret = changelog_spawn_helper_threads (this, priv); + + out: +        return ret; +} + +int +reconfigure (xlator_t *this, dict_t *options) +{ +        int                     ret            = 0; +        char                   *tmp            = NULL; +        changelog_priv_t       *priv           = NULL; +        gf_boolean_t            active_earlier = _gf_true; +        gf_boolean_t            active_now     = _gf_true; +        changelog_time_slice_t *slice          = NULL; +        changelog_log_data_t    cld            = {0,}; + +        priv = this->private; +        if (!priv) +                goto out; + +        ret = -1; +        active_earlier = priv->active; + +        /* first stop the rollover and the fsync thread */ +        changelog_cleanup_helper_threads (this, priv); + +        GF_OPTION_RECONF ("changelog-dir", tmp, options, str, out); +        if (!tmp) { +                gf_log (this->name, GF_LOG_ERROR, +                        "\"changelog-dir\" option is not set"); +                goto out; +        } + +        GF_FREE (priv->changelog_dir); +        priv->changelog_dir = gf_strdup (tmp); +        if (!priv->changelog_dir) +                goto out; + +        ret = mkdir_p (priv->changelog_dir, 0600, _gf_true); +        if (ret) +                goto out; + +        GF_OPTION_RECONF ("changelog", active_now, options, bool, out); + +        /** +         * changelog_handle_change() handles changes that could possibly +         * have been submit changes before changelog deactivation. +         */ +        if (!active_now) +                priv->active = _gf_false; + +        GF_OPTION_RECONF ("op-mode", tmp, options, str, out); +        changelog_assign_opmode (priv, tmp); + +        GF_OPTION_RECONF ("encoding", tmp, options, str, out); +        changelog_assign_encoding (priv, tmp); + +        GF_OPTION_RECONF ("rollover-time", +                          priv->rollover_time, options, int32, out); +        GF_OPTION_RECONF ("fsync-interval", +                          priv->fsync_interval, options, int32, out); + +        if (active_now || active_earlier) { +                ret = changelog_fill_rollover_data (&cld, !active_now); +                if (ret) +                        goto out; + +                slice = &priv->slice; + +                LOCK (&priv->lock); +                { +                        ret = changelog_inject_single_event (this, priv, &cld); +                        if (!ret && active_now) +                                SLICE_VERSION_UPDATE (slice); +                } +                UNLOCK (&priv->lock); + +                if (ret) +                        goto out; + +                if (active_now) { +                        ret = changelog_spawn_notifier (this, priv); +                        if (!ret) +                                ret = changelog_spawn_helper_threads (this, +                                                                      priv); +                } else +                        ret = changelog_cleanup_notifier (this, priv); +        } + + out: +        if (ret) { +                ret = changelog_cleanup_notifier (this, priv); +        } else { +                gf_log (this->name, GF_LOG_DEBUG, +                        "changelog reconfigured"); +                if (active_now) +                        priv->active = _gf_true; +        } + +        return ret; +} + +int32_t +init (xlator_t *this) +{ +        int               ret  = -1; +        char             *tmp  = NULL; +        changelog_priv_t *priv = NULL; + +        GF_VALIDATE_OR_GOTO ("changelog", this, out); + +        if (!this->children || this->children->next) { +                gf_log (this->name, GF_LOG_ERROR, +                        "translator needs a single subvolume"); +                goto out; +        } + +        if (!this->parents) { +                gf_log (this->name, GF_LOG_ERROR, +                        "dangling volume. please check volfile"); +                goto out; +        } + +        priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t); +        if (!priv) +                goto out; + +        this->local_pool = mem_pool_new (changelog_local_t, 64); +        if (!this->local_pool) { +                gf_log (this->name, GF_LOG_ERROR, +                        "failed to create local memory pool"); +                goto out; +        } + +        LOCK_INIT (&priv->lock); + +        GF_OPTION_INIT ("changelog-brick", tmp, str, out); +        if (!tmp) { +                gf_log (this->name, GF_LOG_ERROR, +                        "\"changelog-brick\" option is not set"); +                goto out; +        } + +        priv->changelog_brick = gf_strdup (tmp); +        if (!priv->changelog_brick) +                goto out; +        tmp = NULL; + +        GF_OPTION_INIT ("changelog-dir", tmp, str, out); +        if (!tmp) { +                gf_log (this->name, GF_LOG_ERROR, +                        "\"changelog-dir\" option is not set"); +                goto out; +        } + +        priv->changelog_dir = gf_strdup (tmp); +        if (!priv->changelog_dir) +                goto out; +        tmp = NULL; + +        /** +         * create the directory even if change-logging would be inactive +         * so that consumers can _look_ into it (finding nothing...) +         */ +        ret = mkdir_p (priv->changelog_dir, 0600, _gf_true); +        if (ret) +                goto out; + +        GF_OPTION_INIT ("changelog", priv->active, bool, out); +        if (!priv->active) { +                ret = 0; +                goto out; +        } + +        GF_OPTION_INIT ("op-mode", tmp, str, out); +        changelog_assign_opmode (priv, tmp); + +        tmp = NULL; + +        GF_OPTION_INIT ("encoding", tmp, str, out); +        changelog_assign_encoding (priv, tmp); + +        GF_OPTION_INIT ("rollover-time", priv->rollover_time, int32, out); + +        GF_OPTION_INIT ("fsync-interval", priv->fsync_interval, int32, out); + +        GF_ASSERT (cb_encoder[priv->encode_mode].encoder == priv->encode_mode); +        priv->ce = &cb_encoder[priv->encode_mode]; + +        GF_ASSERT (cb_bootstrap[priv->op_mode].mode == priv->op_mode); +        priv->cb = &cb_bootstrap[priv->op_mode]; + +        /* ... now bootstrap the logger */ +        ret = priv->cb->ctor (this, &priv->cd); +        if (ret) +                goto out; + +        priv->changelog_fd = -1; +        if (priv->active) +                ret = changelog_init (this, priv); +        if (ret) +                goto out; + +        gf_log (this->name, GF_LOG_DEBUG, "changelog translator loaded"); + + out: +        if (ret) { +                if (this->local_pool) +                        mem_pool_destroy (this->local_pool); +                ret = priv->cb->dtor (this, &priv->cd); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, +                                "error in cleanup during init()"); +                GF_FREE (priv->changelog_brick); +                GF_FREE (priv->changelog_dir); +                GF_FREE (priv); +                this->private = NULL; +        } else +                this->private = priv; + +        return ret; +} + +void +fini (xlator_t *this) +{ +        int               ret  = -1; +        changelog_priv_t *priv = NULL; + +        priv = this->private; + +        if (priv) { +                ret = priv->cb->dtor (this, &priv->cd); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, +                                "error in fini"); +                mem_pool_destroy (this->local_pool); +                GF_FREE (priv->changelog_brick); +                GF_FREE (priv->changelog_dir); +                GF_FREE (priv); +        } + +        this->private = NULL; + +        return; +} + +struct xlator_fops fops = { +        .mknod        = changelog_mknod, +        .mkdir        = changelog_mkdir, +        .create       = changelog_create, +        .symlink      = changelog_symlink, +        .writev       = changelog_writev, +        .truncate     = changelog_truncate, +        .ftruncate    = changelog_ftruncate, +        .link         = changelog_link, +        .rename       = changelog_rename, +        .unlink       = changelog_unlink, +        .rmdir        = changelog_rmdir, +        .setattr      = changelog_setattr, +        .fsetattr     = changelog_fsetattr, +        .setxattr     = changelog_setxattr, +        .fsetxattr    = changelog_fsetxattr, +        .removexattr  = changelog_removexattr, +        .fremovexattr = changelog_fremovexattr, +}; + +struct xlator_cbks cbks = { +        .forget = changelog_forget, +}; + +struct volume_options options[] = { +        {.key = {"changelog"}, +         .type = GF_OPTION_TYPE_BOOL, +         .default_value = "off", +         .description = "enable/disable change-logging" +        }, +        {.key = {"changelog-brick"}, +         .type = GF_OPTION_TYPE_PATH, +         .description = "brick path to generate unique socket file name." +                       " should be the export directory of the volume strictly." +        }, +        {.key = {"changelog-dir"}, +         .type = GF_OPTION_TYPE_PATH, +         .description = "directory for the changelog files" +        }, +        {.key = {"op-mode"}, +         .type = GF_OPTION_TYPE_STR, +         .default_value = "realtime", +         .value = {"realtime"}, +         .description = "operation mode - futuristic operation modes" +        }, +        {.key = {"encoding"}, +         .type = GF_OPTION_TYPE_STR, +         .default_value = "ascii", +         .value = {"binary", "ascii"}, +         .description = "encoding type for changelogs" +        }, +        {.key = {"rollover-time"}, +         .default_value = "60", +         .type = GF_OPTION_TYPE_TIME, +         .description = "time to switch to a new changelog file (in seconds)" +        }, +        {.key = {"fsync-interval"}, +         .type = GF_OPTION_TYPE_TIME, +         .default_value = "0", +         .description = "do not open CHANGELOG file with O_SYNC mode." +                        " instead perform fsync() at specified intervals" +        }, +        {.key = {NULL} +        }, +};  | 
