summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/policy/changelog-policy-replication.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/src/policy/changelog-policy-replication.c')
-rw-r--r--xlators/features/changelog/src/policy/changelog-policy-replication.c1374
1 files changed, 1374 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/policy/changelog-policy-replication.c b/xlators/features/changelog/src/policy/changelog-policy-replication.c
new file mode 100644
index 000000000..29c049716
--- /dev/null
+++ b/xlators/features/changelog/src/policy/changelog-policy-replication.c
@@ -0,0 +1,1374 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-policy.h"
+#include "changelog-encoders.h"
+#include "changelog-fops.h"
+
+#define JOURNAL_NAME "TERM"
+
+#define JOURNAL_SECTOR_SIZE 128
+
+#define PRE_OP_MARK 0x5F4552505FULL /* _PRE_ */
+#define POST_OP_MARK 0x5F54534F505FULL /* _POST_ */
+
+/* similar to fop_fn, but... */
+size_t
+int32_fn (void *data, char *buffer, gf_boolean_t encode)
+{
+ size_t bufsz = 0;
+ int nr = 0;
+ char buf[20] = {0,};
+
+ nr = *(int *) data;
+
+ if (encode) {
+ (void) snprintf (buf, sizeof (buf), "%d", nr);
+ CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf));
+ } else
+ CHANGELOG_FILL_BUFFER (buffer, bufsz,
+ &nr, sizeof (int));
+
+ return bufsz;
+}
+
+
+size_t
+uint32_fn (void *data, char *buffer, gf_boolean_t encode)
+{
+ size_t bufsz = 0;
+ unsigned int nr = 0;
+ char buf[20] = {0,};
+
+ nr = *(unsigned int *) data;
+
+ if (encode) {
+ (void) snprintf (buf, sizeof (buf), "%u", nr);
+ CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf));
+ } else
+ CHANGELOG_FILL_BUFFER (buffer, bufsz,
+ &nr, sizeof (unsigned int));
+
+ return bufsz;
+}
+
+size_t
+number_fn (void *data, char *buffer, gf_boolean_t encode)
+{
+ char buf[1024] = {0,};
+ size_t bufsz = 0;
+ unsigned long long nr = 0;
+
+ nr = *(unsigned long long *) data;
+
+ if (encode) {
+ (void) snprintf (buf, sizeof (buf), "%llu", nr);
+ CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf));
+ } else
+ CHANGELOG_FILL_BUFFER (buffer, bufsz,
+ &nr, sizeof (unsigned long long));
+
+ return bufsz;
+}
+
+size_t
+uuid_fn (void *data, char *buffer, gf_boolean_t encode)
+{
+ char buf[1024] = {0,};
+ uuid_t uuid = {0,};
+ size_t bufsz = 0;
+
+ memcpy (uuid, (uuid_t *) data, sizeof (uuid_t));
+
+ if (encode) {
+ char *tmpbuf = uuid_utoa (uuid);
+ (void) snprintf (buf, sizeof (buf), "%s", tmpbuf);
+ CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf));
+ } else
+ CHANGELOG_FILL_BUFFER (buffer, bufsz, uuid, sizeof (uuid_t));
+
+ return bufsz;
+}
+
+#define CHANGELOG_FILL_USIGNLL(co, number, converter, xlen) do { \
+ co->co_convert = converter; \
+ co->co_free = NULL; \
+ co->co_type = CHANGELOG_OPT_REC_ULL; \
+ co->co_number = (unsigned long long) number; \
+ xlen += sizeof (unsigned long long); \
+ if (!co->co_convert) \
+ co->co_len = sizeof (unsigned long long); \
+ } while (0)
+
+#define CHANGELOG_FILL_UUID(co, uuid, converter, xlen) do { \
+ co->co_convert = converter; \
+ co->co_free = NULL; \
+ co->co_type = CHANGELOG_OPT_REC_UUID; \
+ uuid_copy (co->co_uuid, uuid); \
+ xlen += sizeof (uuid_t); \
+ } while (0)
+
+
+/* TBD: move declarations here and nsr.c into a common place */
+#define NSR_TERM_XATTR "trusted.nsr.term"
+#define NSR_INDEX_XATTR "trusted.nsr.index"
+#define RECON_TERM_XATTR "trusted.nsr.recon-term"
+#define RECON_INDEX_XATTR "trusted.nsr.recon-index"
+
+static gf_boolean_t
+changelog_fix_term(xlator_t *this,
+ changelog_local_t *local,
+ dict_t *xdata)
+{
+ int32_t old_term, new_term;
+ uint32_t index;
+ changelog_priv_t *priv = this->private;
+ int ret = 0;
+ char nfile[PATH_MAX] = {0,};
+ int32_t recon_term, recon_index;
+ changelog_rollover_data_t crd;
+
+ // If coming via the regular IO path, we should get the dict "nsr-term"
+ // If coming via reconciliation, we should get the dicts "nsr-recon-term"
+ // that indicates the term and "nsr-recon-index" for the index
+ if ((dict_get_int32(xdata,NSR_TERM_XATTR,&new_term) == 0) &&
+ (dict_get_uint32(xdata, NSR_INDEX_XATTR, &index) == 0)) {
+ old_term = priv->term;
+
+ if (old_term != new_term) {
+ GF_ASSERT(new_term > old_term);
+ LOCK (&priv->lock);
+ priv->term = new_term;
+ (void) snprintf (nfile, PATH_MAX, "%s.%d",
+ JOURNAL_NAME, priv->term);
+ ret = CHANGELOG_INVOKE_CFOP(this, priv, rollover,
+ nfile, _gf_false);
+ UNLOCK (&priv->lock);
+ if (ret != 0)
+ return _gf_false;
+ }
+ local->nr_bytes = 0;
+ local->lu.val = index;
+ } else if ((dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+
+ old_term = priv->term;
+
+ if (old_term != recon_term) {
+ LOCK (&priv->lock);
+ {
+ priv->term = recon_term;
+ (void) snprintf (crd.crd_changelog_name,
+ PATH_MAX, "%s.%d",
+ JOURNAL_NAME, priv->term);
+ crd.crd_prealloc_size = 1<<29;
+ ret = changelog_open(this, priv, local, &crd);
+ }
+ UNLOCK (&priv->lock);
+ if (ret != 0)
+ return _gf_false;
+ }
+
+ local->nr_bytes = 0;
+ local->lu.val = recon_index;
+ } else {
+ return _gf_false;
+ }
+
+ return _gf_true;
+}
+
+/**
+ * Replication policy records journal entries in the FOP path. This is
+ * quite different that the default policy (used by geo-replication),
+ * which journals records in the callback path on a successfull posix
+ * operation. Additionally, each record starts with a PRE-OP marker and
+ * the index number generated by the leader.
+ * (c.f. nsr_$NAME$() ~/xlator/cluster/nsr/nsr-server/src/all-templates.c)
+ *
+ * POST-OPs are marked asynchronously and not during in the callback path
+ * Marking it in the callback path is incorrect as the actual FOP may not
+ * have been synchronized to the disk. Therefore, POST op marking is done
+ * after a successful file system sync, which is trigerred periodically
+ * by NSR server component. To keep journal updates strictly sequential,
+ * POST-OPs are separate record in the journal.
+ */
+
+/**
+ * Override File Operations
+ *
+ * NOTE: Since journal updates are done in the FOP path, there is no
+ * actual use of @local in cbk. Therefore, @local could have been
+ * declared statically for each FOP (which would remove the overhead
+ * of allocating (here) and deallocating (in cbk)). Let's not do that
+ * and keep it this way for now. Will worry about it later (for code
+ * reasonability and performance).
+ */
+
+int32_t
+changelog_replication_rmdir (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int xflags, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ /* <PRE> + IDX + FOP + GFID + UID + GID + Entry */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 7);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 7);
+
+ frame->local = local;
+ ret = 0;
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_unlink (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int xflags, dict_t *xdata)
+{
+ return changelog_replication_rmdir (frame, this, loc, xflags, xdata);
+}
+
+int32_t
+changelog_replication_rename (call_frame_t *frame, xlator_t *this,
+ loc_t *oldloc, loc_t *newloc, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ /* <PRE> + IDX + FOP + GFID + UID + GID + OLDLOC + NEWLOC */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->inode->gfid, 8);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, oldloc->inode->gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, oldloc->pargfid, oldloc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 8);
+
+ frame->local = local;
+ ret = 0;
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_link (call_frame_t *frame,
+ xlator_t *this, loc_t *oldloc,
+ loc_t *newloc, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ /* <PRE> + IDX + FOP + GFID + UID + GID + Entry */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->gfid, 7);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, oldloc->gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 7);
+
+ frame->local = local;
+ ret = 0;
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_mkdir (call_frame_t *frame,
+ xlator_t *this, loc_t *loc,
+ mode_t mode, mode_t umask, dict_t *xdata)
+{
+ int ret = -1;
+ uuid_t gfid = {0,};
+ void *uuid_req = NULL;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "failed to get gfid from dict");
+ goto out;
+ }
+ uuid_copy (gfid, uuid_req);
+
+ ret = -1;
+
+ /* <PRE> + IDX + FOP + GFID + MODE + UID + GID + Entry */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 8);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, mode | S_IFDIR, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 8);
+
+ frame->local = local;
+ ret = 0;
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_symlink (call_frame_t *frame, xlator_t *this,
+ const char *linkname, loc_t *loc,
+ mode_t umask, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ uuid_t gfid = {0,};
+ void *uuid_req = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "failed to get gfid from dict");
+ goto out;
+ }
+ uuid_copy (gfid, uuid_req);
+
+ ret = -1;
+
+ /* <PRE> + IDX + FOP + GFID + LINKNAME + UID + GID + Entry */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 8);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_NAME (co, linkname, entry_free_fn, xtra_len, out);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 8);
+
+ frame->local = local;
+ ret = 0;
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_mknod (call_frame_t *frame,
+ xlator_t *this, loc_t *loc,
+ mode_t mode, dev_t dev,
+ mode_t umask, dict_t *xdata)
+{
+ int ret = -1;
+ uuid_t gfid = {0,};
+ void *uuid_req = NULL;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "failed to get gfid from dict");
+ goto out;
+ }
+ uuid_copy (gfid, uuid_req);
+
+ ret = -1;
+
+ /* <PRE> + IDX + FOP + GFID + MODE + UID + GID + DEV + Entry */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 9);
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term (this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, mode, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_USIGNLL (co, dev, number_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 9);
+
+ frame->local = local;
+ ret = 0;
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_create (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int32_t flags, mode_t mode,
+ mode_t umask, fd_t *fd, dict_t *xdata)
+{
+ int ret = -1;
+ uuid_t gfid = {0,};
+ void *uuid_req = NULL;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "failed to get gfid from dict");
+ goto out;
+ }
+ uuid_copy (gfid, uuid_req);
+
+ ret = -1;
+
+ /* <PRE> + IDX + FOP + GFID + MODE + UID + GID + ENTRY */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 8);
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term (this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, mode, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 8);
+
+ frame->local = local;
+ ret = 0;
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+static int
+_changelog_setattr_fill_common (call_frame_t *frame, xlator_t *this,
+ int32_t attr, struct iatt *stbuf,
+ uuid_t gfid, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ int used_count = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ used_count = 7;
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, used_count);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ /**
+ * - <PRE>
+ * - IDX
+ * - FOP
+ * - GFID
+ * - Valid flag
+ * GF_SET_ATTR_MODE [chmod]
+ * ->ia_prot
+ * ->ia_type
+ * GF_SET_ATTR_UID | GF_SET_ATTR_GID [chown]
+ * ->ia_uid
+ * ->ia_gid
+ * GF_SET_ATTR_ATIME | GF_SET_ATTR_MTIME [utimes]
+ * ->ia_atime
+ * ->ia_mtime
+ */
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, attr, uint32_fn, xtra_len);
+ co++;
+
+ if (attr & GF_SET_ATTR_MODE) {
+ mode_t mode = 0;
+
+ /* ->ia_prot & ->ia_type stored as a consolidated value */
+ used_count--;
+ mode = st_mode_from_ia (stbuf->ia_prot, stbuf->ia_type);
+
+ CHANGELOG_FILL_UINT32 (co, mode, uint32_fn, xtra_len);
+
+ } else if (attr & (GF_SET_ATTR_UID | GF_SET_ATTR_GID)) {
+ uid_t uid = -1;
+ gid_t gid = -1;
+
+ if (attr & GF_SET_ATTR_UID)
+ uid = stbuf->ia_uid;
+ if (attr & GF_SET_ATTR_GID)
+ gid = stbuf->ia_gid;
+
+ /* ->ia_uid & ->ia_gid */
+ CHANGELOG_FILL_INT32 (co, uid, int32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_INT32 (co, gid, int32_fn, xtra_len);
+
+ } else if (attr & (GF_SET_ATTR_ATIME | GF_SET_ATTR_MTIME)) {
+
+ /* ->ia_atime & ->ia_mtime, need usecs? */
+ CHANGELOG_FILL_UINT32 (co,
+ stbuf->ia_atime, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co,
+ stbuf->ia_mtime, uint32_fn, xtra_len);
+ }
+
+ changelog_set_usable_record_and_length (local, xtra_len, used_count);
+
+ ret = 0;
+ frame->local = local;
+
+ changelog_update (this, priv, frame->local, CHANGELOG_TYPE_METADATA);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+
+ return ret;
+}
+
+int32_t
+changelog_replication_fsetattr (call_frame_t *frame,
+ xlator_t *this, fd_t *fd,
+ struct iatt *stbuf, int32_t valid,
+ dict_t *xdata)
+{
+ return _changelog_setattr_fill_common (frame, this, valid,
+ stbuf, fd->inode->gfid, xdata);
+}
+
+int32_t
+changelog_replication_setattr (call_frame_t *frame,
+ xlator_t *this, loc_t *loc,
+ struct iatt *stbuf, int32_t valid, dict_t *xdata)
+{
+ return _changelog_setattr_fill_common (frame, this, valid,
+ stbuf, loc->inode->gfid, xdata);
+}
+
+int32_t
+changelog_replication_fremovexattr (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, const char *name, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+ int32_t xattr_op;
+
+ priv = this->private;
+
+ /* <PRE> + IDX + FOP + GFID */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 4);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0)
+ CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
+ fop_fn, xtra_len);
+ else
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
+ xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+ frame->local = local;
+ ret = 0;
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_removexattr (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, const char *name, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_local_t *local = NULL;
+ int32_t xattr_op;
+
+ priv = this->private;
+
+ /* <PRE> + IDX + FOP + GFID */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 4);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0)
+ CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
+ fop_fn, xtra_len);
+ else
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
+ xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+ frame->local = local;
+ ret = 0;
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_setxattr (call_frame_t *frame,
+ xlator_t *this, loc_t *loc,
+ dict_t *dict, int32_t flags, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_local_t *local = NULL;
+ int32_t xattr_op;
+
+ priv = this->private;
+
+ /* <PRE> + IDX + FOP + GFID */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 4);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0)
+ CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
+ fop_fn, xtra_len);
+ else
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
+ xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+ frame->local = local;
+ ret = 0;
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_fsetxattr (call_frame_t *frame,
+ xlator_t *this, fd_t *fd, dict_t *dict,
+ int32_t flags, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_local_t *local = NULL;
+ int32_t xattr_op;
+
+ priv = this->private;
+
+ /* <PRE> + IDX + FOP + GFID */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 4);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0)
+ CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
+ fop_fn, xtra_len);
+ else
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
+ xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+ frame->local = local;
+ ret = 0;
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_truncate (call_frame_t *frame,
+ xlator_t *this, loc_t *loc,
+ off_t offset, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ /* <PRE> + IDX + FOP + GFID + Offset */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 5);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 5);
+
+ frame->local = local;
+ ret = 0;
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_ftruncate (call_frame_t *frame,
+ xlator_t *this, fd_t *fd,
+ off_t offset, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ /* <PRE> + IDX + FOP + GFID + Offset */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 5);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 5);
+
+ frame->local = local;
+ ret = 0;
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_writev (call_frame_t *frame,
+ xlator_t *this, fd_t *fd, struct iovec *vector,
+ int32_t count, off_t offset, uint32_t flags,
+ struct iobref *iobref, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ /* <PRE> + IDX + FOP + GFID + Offset + Length */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 6);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_USIGNLL (co, iov_length (vector, count),
+ number_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 6);
+
+ frame->local = local;
+ ret = 0;
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+/* overriden COPS */
+int
+changelog_replication_cops_open (xlator_t *this,
+ changelog_priv_t *priv, void *cpriv,
+ char *name, gf_boolean_t last)
+{
+ changelog_local_t local = {0,};
+ changelog_log_data_t cld = {0,};
+ changelog_rollover_data_t *crd = NULL;
+
+ crd = &cld.cld_roll;
+
+ cld.cld_type = CHANGELOG_TYPE_ROLLOVER;
+
+ crd->crd_finale = last;
+ crd->crd_use_suffix = _gf_false;
+ crd->crd_prealloc_size = 1<<29; /* preallocate 512 MB */
+
+
+ (void) strcpy (crd->crd_changelog_name, name);
+
+ local.lu.val = 0;
+ local.nr_bytes = 0;
+
+ return changelog_inject_single_event (this, priv, &local, &cld);
+}
+
+/**
+ * NO-OP changelog write (from changelog.c). Records are journaled
+ * in the FOP path.
+ */
+int
+changelog_replication_cops_write (xlator_t *this,
+ changelog_priv_t *priv, void *cpriv,
+ changelog_local_t *local,
+ changelog_log_type type)
+{
+ return 0;
+}
+
+/**
+ * no implicit rollover
+ */
+int
+changelog_replication_cops_rollover (xlator_t *this,
+ changelog_priv_t *priv, void *cpriv,
+ char *name, gf_boolean_t last)
+{
+ return changelog_replication_cops_open (this, priv, cpriv, name, last);
+}
+
+off_t
+changelog_replication_cops_get_offset (xlator_t *this,
+ changelog_priv_t *priv, void *cpriv,
+ changelog_local_t *local)
+{
+ if (!local)
+ return 0;
+
+ return (local->lu.val * JOURNAL_SECTOR_SIZE) + local->nr_bytes;
+}
+
+void
+changelog_replication_cops_set_offset (xlator_t *this,
+ changelog_priv_t *priv, void *cpriv,
+ changelog_local_t *local, off_t bytes)
+{
+ local->nr_bytes += bytes;
+}
+
+void
+changelog_replication_cops_reset_offset (xlator_t *this, changelog_priv_t *priv,
+ void *cpriv, changelog_local_t *local)
+{
+ return;
+}
+
+int
+changelog_replication_policy_init (xlator_t *this,
+ changelog_priv_t *priv,
+ struct changelog_logpolicy *cp)
+{
+ struct xlator_fops *r_fops = NULL;
+ struct changelog_ops *r_cops = NULL;
+
+ r_fops = GF_CALLOC (1, sizeof (struct xlator_fops),
+ gf_changelog_mt_fop_policy_t);
+ if (!r_fops)
+ return -1;
+
+ r_cops = GF_CALLOC (1, sizeof (struct changelog_ops),
+ gf_changelog_mt_fop_policy_t);
+ if (!r_cops) {
+ GF_FREE (r_fops);
+ return -1;
+ }
+
+ cp->cpriv = GF_CALLOC (1, sizeof (off_t),
+ gf_changelog_mt_fop_policy_t);
+ if (!cp->cpriv) {
+ GF_FREE (r_fops);
+ GF_FREE (r_cops);
+ return -1;
+ }
+
+ /* no roll-over, one big fat journal per term */
+ priv->rollover_time = 0;
+
+ /* fsync() is internally trigerred by NSR */
+ priv->fsync_interval = 0;
+
+ /* no record header: extra data (via iobufs) are always persisted */
+ priv->no_gfid_hdr = _gf_true;
+
+ priv->lockless_update = _gf_false;
+
+ memcpy (r_fops, &changelog_default_fops, sizeof (struct xlator_fops));
+ memcpy (r_cops, &changelog_default_cops, sizeof (struct changelog_ops));
+
+ priv->term = 0;
+ (void) memset (cp->changelog_name, '\0', PATH_MAX);
+ memcpy(cp->changelog_name, JOURNAL_NAME, strlen(JOURNAL_NAME));
+#if 0
+ (void) snprintf (cp->changelog_name, PATH_MAX,
+ JOURNAL_NAME, priv->term);
+#endif
+
+ /* overload all fops */
+ r_fops->writev = changelog_replication_writev;
+ r_fops->ftruncate = changelog_replication_ftruncate;
+ r_fops->truncate = changelog_replication_truncate;
+ r_fops->fsetxattr = changelog_replication_fsetxattr;
+ r_fops->setxattr = changelog_replication_setxattr;
+ r_fops->removexattr = changelog_replication_removexattr;
+ r_fops->fremovexattr = changelog_replication_fremovexattr;
+ r_fops->setattr = changelog_replication_setattr;
+ r_fops->fsetattr = changelog_replication_fsetattr;
+ r_fops->create = changelog_replication_create;
+ r_fops->mknod = changelog_replication_mknod;
+ r_fops->symlink = changelog_replication_symlink;
+ r_fops->mkdir = changelog_replication_mkdir;
+ r_fops->link = changelog_replication_link;
+ r_fops->rename = changelog_replication_rename;
+ r_fops->unlink = changelog_replication_unlink;
+ r_fops->rmdir = changelog_replication_rmdir;
+
+ /* overload cops */
+ r_cops->open = changelog_replication_cops_open;
+ r_cops->write = changelog_replication_cops_write;
+ r_cops->rollover = changelog_replication_cops_rollover;
+ r_cops->get_offset = changelog_replication_cops_get_offset;
+ r_cops->set_offset = changelog_replication_cops_set_offset;
+ r_cops->reset_offset = changelog_replication_cops_reset_offset;
+
+
+ cp->fops = r_fops;
+ cp->cops = r_cops;
+
+ return 0;
+}
+
+int
+changelog_replication_policy_fini (xlator_t *this,
+ struct changelog_logpolicy *cp)
+{
+ GF_FREE (cp->fops);
+ GF_FREE (cp->cops);
+ GF_FREE (cp->cpriv);
+ return 0;
+}