/* Copyright (c) 2013 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #include "changelog-policy.h" #include "changelog-encoders.h" #include "changelog-fops.h" #define JOURNAL_NAME "TERM" #define JOURNAL_SECTOR_SIZE 128 #define PRE_OP_MARK 0x5F4552505FULL /* _PRE_ */ #define POST_OP_MARK 0x5F54534F505FULL /* _POST_ */ /** * assume an ever increasing index for now.. */ static unsigned long nsr_index = 1; static unsigned long get_index(changelog_priv_t *priv) { unsigned long idx = 0; LOCK (&priv->lock); { idx = nsr_index++; } UNLOCK (&priv->lock); return idx; } static void reset_index(changelog_priv_t *priv) { nsr_index = 1; } #if 0 static inline void //changelog_replication_assign_term (changelog_priv_t *priv, changelog_local_t *local) { local->nr_bytes = 0; local->lu.val = get_index (priv); } #endif size_t number_fn (void *data, char *buffer, gf_boolean_t encode) { char buf[1024] = {0,}; size_t bufsz = 0; unsigned long long nr = 0; nr = *(unsigned long long *) data; if (encode) { (void) snprintf (buf, sizeof (buf), "%llu", nr); CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); } else CHANGELOG_FILL_BUFFER (buffer, bufsz, &nr, sizeof (unsigned long long)); return bufsz; } size_t uuid_fn (void *data, char *buffer, gf_boolean_t encode) { char buf[1024] = {0,}; uuid_t uuid = {0,}; size_t bufsz = 0; memcpy (uuid, (uuid_t *) data, sizeof (uuid_t)); if (encode) { char *tmpbuf = uuid_utoa (uuid); (void) snprintf (buf, sizeof (buf), "%s", tmpbuf); CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); } else CHANGELOG_FILL_BUFFER (buffer, bufsz, uuid, sizeof (uuid_t)); return bufsz; } #define CHANGELOG_FILL_USIGNLL(co, number, converter, xlen) do { \ co->co_convert = converter; \ co->co_free = NULL; \ co->co_type = CHANGELOG_OPT_REC_ULL; \ co->co_number = (unsigned long long) number; \ xlen += sizeof (unsigned long long); \ if (!co->co_convert) \ co->co_len = sizeof (unsigned long long); \ } while (0) #define CHANGELOG_FILL_UUID(co, uuid, converter, xlen) do { \ co->co_convert = converter; \ co->co_free = NULL; \ co->co_type = CHANGELOG_OPT_REC_UUID; \ uuid_copy (co->co_uuid, uuid); \ xlen += sizeof (uuid_t); \ } while (0) /* TBD: move declarations here and nsr.c into a common place */ #define NSR_TERM_XATTR "trusted.nsr.term" #define RECON_TERM_XATTR "trusted.nsr.recon-term" #define RECON_INDEX_XATTR "trusted.nsr.recon-index" static gf_boolean_t changelog_fix_term(xlator_t *this, changelog_local_t *local, dict_t *xdata) { int32_t old_term, new_term; changelog_priv_t *priv = this->private; int ret = 0; char nfile[PATH_MAX] = {0,}; int32_t recon_term, recon_index; changelog_rollover_data_t crd; // If coming via the regular IO path, we should get the dict "nsr-term" // If coming via reconciliation, we should get the dicts "nsr-recon-term" // that indicates the term and "nsr-recon-index" for the index if (dict_get_int32(xdata,NSR_TERM_XATTR,&new_term) == 0) { old_term = priv->term; if (old_term != new_term) { GF_ASSERT(new_term > old_term); LOCK (&priv->lock); reset_index(priv); priv->term = new_term; (void) snprintf (nfile, PATH_MAX, "%s.%d", JOURNAL_NAME, priv->term); ret = CHANGELOG_INVOKE_CFOP(this, priv, rollover, nfile, _gf_false); UNLOCK (&priv->lock); if (ret != 0) return _gf_false; } local->nr_bytes = 0; local->lu.val = get_index (priv); } else if ((dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { old_term = priv->term; if (old_term != recon_term) { LOCK (&priv->lock); priv->term = recon_term; (void) snprintf (crd.crd_changelog_name, PATH_MAX, "%s.%d", JOURNAL_NAME, priv->term); crd.crd_prealloc_size = 0; if (changelog_open(this, priv, local, &crd) != 0) return _gf_false; UNLOCK (&priv->lock); } local->nr_bytes = 0; local->lu.val = recon_index; } else { return _gf_false; } return _gf_true; } /** override FOPS */ int32_t changelog_replication_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags, dict_t *xdata) { int ret = -1; size_t xtra_len = 0; changelog_opt_t *co = NULL; changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; /*
 + FOP + GFID + Entry */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 4);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
                              entry_fn, entry_free_fn, xtra_len, out);

        changelog_set_usable_record_and_length (local, xtra_len, 4);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_unlink (call_frame_t *frame, xlator_t *this,
                              loc_t *loc, int xflags, dict_t *xdata)
{
        return changelog_replication_rmdir (frame, this, loc, xflags, xdata);
}

int32_t
changelog_replication_rename (call_frame_t *frame, xlator_t *this,
                              loc_t *oldloc, loc_t *newloc, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        /* 
 + FOP + GFID + OLDLOC + NEWLOC */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->inode->gfid, 5);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, oldloc->inode->gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_ENTRY (co, oldloc->pargfid, oldloc->name,
                              entry_fn, entry_free_fn, xtra_len, out);
        co++;

        CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name,
                              entry_fn, entry_free_fn, xtra_len, out);

        //changelog_replication_assign_term (priv, local);

        changelog_set_usable_record_and_length (local, xtra_len, 5);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_link (call_frame_t *frame,
                            xlator_t *this, loc_t *oldloc,
                            loc_t *newloc, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        /* 
 + FOP + GFID + Entry */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->gfid, 4);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, oldloc->gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name,
                              entry_fn, entry_free_fn, xtra_len, out);

        changelog_set_usable_record_and_length (local, xtra_len, 4);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_mkdir (call_frame_t *frame,
                             xlator_t *this, loc_t *loc,
                             mode_t mode, mode_t umask, dict_t *xdata)
{
        int                ret      = -1;
        uuid_t             gfid     = {0,};
        void              *uuid_req = NULL;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
        if (ret) {
                gf_log (this->name, GF_LOG_DEBUG,
                        "failed to get gfid from dict");
                goto out;
        }
        uuid_copy (gfid, uuid_req);

        ret = -1;

        /* 
 + FOP + GFID + Entry */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
                              entry_fn, entry_free_fn, xtra_len, out);

        changelog_set_usable_record_and_length (local, xtra_len, 4);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_symlink (call_frame_t *frame, xlator_t *this,
                               const char *linkname, loc_t *loc,
                               mode_t umask, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        uuid_t             gfid     = {0,};
        void              *uuid_req = NULL;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
        if (ret) {
                gf_log (this->name, GF_LOG_DEBUG,
                        "failed to get gfid from dict");
                goto out;
        }
        uuid_copy (gfid, uuid_req);

        ret = -1;

        /* 
 + FOP + GFID + Entry */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
                              entry_fn, entry_free_fn, xtra_len, out);

        changelog_set_usable_record_and_length (local, xtra_len, 4);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_mknod (call_frame_t *frame,
                             xlator_t *this, loc_t *loc,
                             mode_t mode, dev_t dev,
                             mode_t umask, dict_t *xdata)
{
        int                ret      = -1;
        uuid_t             gfid     = {0,};
        void              *uuid_req = NULL;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
        if (ret) {
                gf_log (this->name, GF_LOG_DEBUG,
                        "failed to get gfid from dict");
                goto out;
        }
        uuid_copy (gfid, uuid_req);

        ret = -1;

        /* 
 + FOP + GFID + Entry */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4);

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
                              entry_fn, entry_free_fn, xtra_len, out);

        changelog_set_usable_record_and_length (local, xtra_len, 4);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_create (call_frame_t *frame, xlator_t *this,
                              loc_t *loc, int32_t flags, mode_t mode,
                              mode_t umask, fd_t *fd, dict_t *xdata)
{
        int                ret      = -1;
        uuid_t             gfid     = {0,};
        void              *uuid_req = NULL;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
        if (ret) {
                gf_log (this->name, GF_LOG_DEBUG,
                        "failed to get gfid from dict");
                goto out;
        }
        uuid_copy (gfid, uuid_req);

        ret = -1;

        /* 
 + FOP + GFID + Entry */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4);

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
                              entry_fn, entry_free_fn, xtra_len, out);

        changelog_set_usable_record_and_length (local, xtra_len, 4);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_fsetattr (call_frame_t *frame,
                                xlator_t *this, fd_t *fd,
                                struct iatt *stbuf, int32_t valid,
                                dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        /* 
 + FOP + GFID */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 3);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 3);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_setattr (call_frame_t *frame,
                               xlator_t *this, loc_t *loc,
                               struct iatt *stbuf, int32_t valid, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        /* 
 + FOP + GFID */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 3);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 3);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_fremovexattr (call_frame_t *frame, xlator_t *this,
                                    fd_t *fd, const char *name, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;
        int32_t            xattr_op;

        priv = this->private;

        /* 
 + FOP + GFID */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 3);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) {
                CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
                                                fop_fn, xtra_len);
        }
        else {
                CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
                                           xtra_len);
        }
        co++;

        CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 3);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_removexattr (call_frame_t *frame, xlator_t *this,
                                   loc_t *loc, const char *name, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;
        int32_t            xattr_op;

        priv = this->private;

        CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 3);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) {
                CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
                                           fop_fn, xtra_len);
        }
        else {
                CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
                                           xtra_len);
        }
        co++;

        CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 3);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_setxattr (call_frame_t *frame,
                                xlator_t *this, loc_t *loc,
                                dict_t *dict, int32_t flags, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;
        int32_t            xattr_op;

        priv = this->private;

        /* 
 + FOP + GFID */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 3);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) {
                CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
                                           fop_fn, xtra_len);
        }
        else {
                CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
                                           xtra_len);
        }
        co++;

        CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 3);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_fsetxattr (call_frame_t *frame,
                                 xlator_t *this, fd_t *fd, dict_t *dict,
                                 int32_t flags, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;
        int32_t            xattr_op;

        priv = this->private;

        /* 
 + FOP + GFID */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 3);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;


        if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) {
                CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
                                           fop_fn, xtra_len);
        }
        else {
                CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
                                           xtra_len);
        }
        co++;

        CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 3);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_truncate (call_frame_t *frame,
                                xlator_t *this, loc_t *loc,
                                off_t offset, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        /* 
 + FOP + GFID + Offset */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 4);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 4);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_ftruncate (call_frame_t *frame,
                                 xlator_t *this, fd_t *fd,
                                 off_t offset, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        /* 
 + FOP + GFID + Offset */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 4);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 4);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_writev (call_frame_t *frame,
                              xlator_t *this, fd_t *fd, struct iovec *vector,
                              int32_t count, off_t offset, uint32_t flags,
                              struct iobref *iobref, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        /* 
 + FOP + GFID + Offset + Length */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 5);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);
        co++;

        CHANGELOG_FILL_USIGNLL (co, iov_length (vector, count),
                               number_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 5);

        //changelog_replication_assign_term (priv, local);

        frame->local = local;
        ret = 0;

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

/* overriden COPS */
int
changelog_replication_cops_open (xlator_t *this,
                                 changelog_priv_t *priv, void *cpriv,
                                 char *name, gf_boolean_t last)
{
        changelog_local_t local = {0,};
        changelog_log_data_t       cld = {0,};
        changelog_rollover_data_t *crd = NULL;

        crd = &cld.cld_roll;

        cld.cld_type = CHANGELOG_TYPE_ROLLOVER;

        crd->crd_finale = last;
        crd->crd_use_suffix = _gf_false;
        crd->crd_prealloc_size = 1<<10; /* preallocate 1 MB */


        (void) strcpy (crd->crd_changelog_name, name);

        local.lu.val = 0;
        local.nr_bytes = 0;

        return changelog_inject_single_event (this, priv, &local, &cld);
}

/**
 * no implicit rollover
 */
int
changelog_replication_cops_rollover (xlator_t *this,
                                     changelog_priv_t *priv, void *cpriv,
                                     char *name, gf_boolean_t last)
{
        return changelog_replication_cops_open(this, priv, cpriv, name, last);
}

off_t
changelog_replication_cops_get_offset (xlator_t *this,
                                       changelog_priv_t *priv, void *cpriv,
                                       changelog_local_t *local)
{
        if (!local)
                return 0;

        return (local->lu.val * JOURNAL_SECTOR_SIZE) + local->nr_bytes;
}

void
changelog_replication_cops_set_offset (xlator_t *this,
                                       changelog_priv_t *priv, void *cpriv,
                                       changelog_local_t *local, off_t bytes)
{
        local->nr_bytes += bytes;
}

void
changelog_replication_cops_reset_offset (xlator_t *this, changelog_priv_t *priv,
                                         void *cpriv, changelog_local_t *local)
{
        return;
}

int
changelog_replication_policy_init (xlator_t *this,
                                   changelog_priv_t *priv,
                                   struct changelog_logpolicy *cp)
{
        struct xlator_fops   *r_fops = NULL;
        struct changelog_ops *r_cops = NULL;

        r_fops = GF_CALLOC (1, sizeof (struct xlator_fops),
                            gf_changelog_mt_fop_policy_t);
        if (!r_fops)
                return -1;

        r_cops = GF_CALLOC (1, sizeof (struct changelog_ops),
                            gf_changelog_mt_fop_policy_t);
        if (!r_cops) {
                GF_FREE (r_fops);
                return -1;
        }

        /* no roll-over, one big fat journal per term */
        priv->rollover_time = 0;

        /* fsync() is internally trigerred by NSR */
        priv->fsync_interval = 0;

        /* no record header: extra data (via iobufs) are always persisted */
        priv->no_gfid_hdr = _gf_true;

        memcpy (r_fops, &changelog_default_fops, sizeof (struct xlator_fops));
        memcpy (r_cops, &changelog_default_cops, sizeof (struct changelog_ops));

        priv->term = 0;
        (void) memset (cp->changelog_name, '\0', PATH_MAX);
        memcpy(cp->changelog_name, JOURNAL_NAME, strlen(JOURNAL_NAME));
#if 0
        (void) snprintf (cp->changelog_name, PATH_MAX,
                         JOURNAL_NAME, priv->term);
#endif

        /* overload all fops */
        r_fops->writev       = changelog_replication_writev;
        r_fops->ftruncate    = changelog_replication_ftruncate;
        r_fops->truncate     = changelog_replication_truncate;
        r_fops->fsetxattr    = changelog_replication_fsetxattr;
        r_fops->setxattr     = changelog_replication_setxattr;
        r_fops->removexattr  = changelog_replication_removexattr;
        r_fops->fremovexattr = changelog_replication_fremovexattr;
        r_fops->setattr      = changelog_replication_setattr;
        r_fops->fsetattr     = changelog_replication_fsetattr;
        r_fops->create       = changelog_replication_create;
        r_fops->mknod        = changelog_replication_mknod;
        r_fops->symlink      = changelog_replication_symlink;
        r_fops->mkdir        = changelog_replication_mkdir;
        r_fops->link         = changelog_replication_link;
        r_fops->rename       = changelog_replication_rename;
        r_fops->unlink       = changelog_replication_unlink;
        r_fops->rmdir        = changelog_replication_rmdir;

        /* overload cops */
        r_cops->open         = changelog_replication_cops_open;
        r_cops->rollover     = changelog_replication_cops_rollover;
        r_cops->get_offset   = changelog_replication_cops_get_offset;
        r_cops->set_offset   = changelog_replication_cops_set_offset;
        r_cops->reset_offset = changelog_replication_cops_reset_offset;

        cp->fops = r_fops;
        cp->cops = r_cops;

        return 0;
}

int
changelog_replication_policy_fini (xlator_t *this,
                                   struct changelog_logpolicy *cp)
{
        GF_FREE (cp->fops);
        GF_FREE (cp->cops);
        return 0;
}