diff options
Diffstat (limited to 'xlators/features/changelog/src/policy/changelog-policy-replication.c')
-rw-r--r-- | xlators/features/changelog/src/policy/changelog-policy-replication.c | 1374 |
1 files changed, 1374 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/policy/changelog-policy-replication.c b/xlators/features/changelog/src/policy/changelog-policy-replication.c new file mode 100644 index 000000000..29c049716 --- /dev/null +++ b/xlators/features/changelog/src/policy/changelog-policy-replication.c @@ -0,0 +1,1374 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "changelog-policy.h" +#include "changelog-encoders.h" +#include "changelog-fops.h" + +#define JOURNAL_NAME "TERM" + +#define JOURNAL_SECTOR_SIZE 128 + +#define PRE_OP_MARK 0x5F4552505FULL /* _PRE_ */ +#define POST_OP_MARK 0x5F54534F505FULL /* _POST_ */ + +/* similar to fop_fn, but... */ +size_t +int32_fn (void *data, char *buffer, gf_boolean_t encode) +{ + size_t bufsz = 0; + int nr = 0; + char buf[20] = {0,}; + + nr = *(int *) data; + + if (encode) { + (void) snprintf (buf, sizeof (buf), "%d", nr); + CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); + } else + CHANGELOG_FILL_BUFFER (buffer, bufsz, + &nr, sizeof (int)); + + return bufsz; +} + + +size_t +uint32_fn (void *data, char *buffer, gf_boolean_t encode) +{ + size_t bufsz = 0; + unsigned int nr = 0; + char buf[20] = {0,}; + + nr = *(unsigned int *) data; + + if (encode) { + (void) snprintf (buf, sizeof (buf), "%u", nr); + CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); + } else + CHANGELOG_FILL_BUFFER (buffer, bufsz, + &nr, sizeof (unsigned int)); + + return bufsz; +} + +size_t +number_fn (void *data, char *buffer, gf_boolean_t encode) +{ + char buf[1024] = {0,}; + size_t bufsz = 0; + unsigned long long nr = 0; + + nr = *(unsigned long long *) data; + + if (encode) { + (void) snprintf (buf, sizeof (buf), "%llu", nr); + CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); + } else + CHANGELOG_FILL_BUFFER (buffer, bufsz, + &nr, sizeof (unsigned long long)); + + return bufsz; +} + +size_t +uuid_fn (void *data, char *buffer, gf_boolean_t encode) +{ + char buf[1024] = {0,}; + uuid_t uuid = {0,}; + size_t bufsz = 0; + + memcpy (uuid, (uuid_t *) data, sizeof (uuid_t)); + + if (encode) { + char *tmpbuf = uuid_utoa (uuid); + (void) snprintf (buf, sizeof (buf), "%s", tmpbuf); + CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); + } else + CHANGELOG_FILL_BUFFER (buffer, bufsz, uuid, sizeof (uuid_t)); + + return bufsz; +} + +#define CHANGELOG_FILL_USIGNLL(co, number, converter, xlen) do { \ + co->co_convert = converter; \ + co->co_free = NULL; \ + co->co_type = CHANGELOG_OPT_REC_ULL; \ + co->co_number = (unsigned long long) number; \ + xlen += sizeof (unsigned long long); \ + if (!co->co_convert) \ + co->co_len = sizeof (unsigned long long); \ + } while (0) + +#define CHANGELOG_FILL_UUID(co, uuid, converter, xlen) do { \ + co->co_convert = converter; \ + co->co_free = NULL; \ + co->co_type = CHANGELOG_OPT_REC_UUID; \ + uuid_copy (co->co_uuid, uuid); \ + xlen += sizeof (uuid_t); \ + } while (0) + + +/* TBD: move declarations here and nsr.c into a common place */ +#define NSR_TERM_XATTR "trusted.nsr.term" +#define NSR_INDEX_XATTR "trusted.nsr.index" +#define RECON_TERM_XATTR "trusted.nsr.recon-term" +#define RECON_INDEX_XATTR "trusted.nsr.recon-index" + +static gf_boolean_t +changelog_fix_term(xlator_t *this, + changelog_local_t *local, + dict_t *xdata) +{ + int32_t old_term, new_term; + uint32_t index; + changelog_priv_t *priv = this->private; + int ret = 0; + char nfile[PATH_MAX] = {0,}; + int32_t recon_term, recon_index; + changelog_rollover_data_t crd; + + // If coming via the regular IO path, we should get the dict "nsr-term" + // If coming via reconciliation, we should get the dicts "nsr-recon-term" + // that indicates the term and "nsr-recon-index" for the index + if ((dict_get_int32(xdata,NSR_TERM_XATTR,&new_term) == 0) && + (dict_get_uint32(xdata, NSR_INDEX_XATTR, &index) == 0)) { + old_term = priv->term; + + if (old_term != new_term) { + GF_ASSERT(new_term > old_term); + LOCK (&priv->lock); + priv->term = new_term; + (void) snprintf (nfile, PATH_MAX, "%s.%d", + JOURNAL_NAME, priv->term); + ret = CHANGELOG_INVOKE_CFOP(this, priv, rollover, + nfile, _gf_false); + UNLOCK (&priv->lock); + if (ret != 0) + return _gf_false; + } + local->nr_bytes = 0; + local->lu.val = index; + } else if ((dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + + old_term = priv->term; + + if (old_term != recon_term) { + LOCK (&priv->lock); + { + priv->term = recon_term; + (void) snprintf (crd.crd_changelog_name, + PATH_MAX, "%s.%d", + JOURNAL_NAME, priv->term); + crd.crd_prealloc_size = 1<<29; + ret = changelog_open(this, priv, local, &crd); + } + UNLOCK (&priv->lock); + if (ret != 0) + return _gf_false; + } + + local->nr_bytes = 0; + local->lu.val = recon_index; + } else { + return _gf_false; + } + + return _gf_true; +} + +/** + * Replication policy records journal entries in the FOP path. This is + * quite different that the default policy (used by geo-replication), + * which journals records in the callback path on a successfull posix + * operation. Additionally, each record starts with a PRE-OP marker and + * the index number generated by the leader. + * (c.f. nsr_$NAME$() ~/xlator/cluster/nsr/nsr-server/src/all-templates.c) + * + * POST-OPs are marked asynchronously and not during in the callback path + * Marking it in the callback path is incorrect as the actual FOP may not + * have been synchronized to the disk. Therefore, POST op marking is done + * after a successful file system sync, which is trigerred periodically + * by NSR server component. To keep journal updates strictly sequential, + * POST-OPs are separate record in the journal. + */ + +/** + * Override File Operations + * + * NOTE: Since journal updates are done in the FOP path, there is no + * actual use of @local in cbk. Therefore, @local could have been + * declared statically for each FOP (which would remove the overhead + * of allocating (here) and deallocating (in cbk)). Let's not do that + * and keep it this way for now. Will worry about it later (for code + * reasonability and performance). + */ + +int32_t +changelog_replication_rmdir (call_frame_t *frame, xlator_t *this, + loc_t *loc, int xflags, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /* <PRE> + IDX + FOP + GFID + UID + GID + Entry */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 7); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 7); + + frame->local = local; + ret = 0; + + changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_unlink (call_frame_t *frame, xlator_t *this, + loc_t *loc, int xflags, dict_t *xdata) +{ + return changelog_replication_rmdir (frame, this, loc, xflags, xdata); +} + +int32_t +changelog_replication_rename (call_frame_t *frame, xlator_t *this, + loc_t *oldloc, loc_t *newloc, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /* <PRE> + IDX + FOP + GFID + UID + GID + OLDLOC + NEWLOC */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->inode->gfid, 8); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, oldloc->inode->gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_ENTRY (co, oldloc->pargfid, oldloc->name, + entry_fn, entry_free_fn, xtra_len, out); + co++; + + CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 8); + + frame->local = local; + ret = 0; + + changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_link (call_frame_t *frame, + xlator_t *this, loc_t *oldloc, + loc_t *newloc, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /* <PRE> + IDX + FOP + GFID + UID + GID + Entry */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->gfid, 7); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, oldloc->gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 7); + + frame->local = local; + ret = 0; + + changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_mkdir (call_frame_t *frame, + xlator_t *this, loc_t *loc, + mode_t mode, mode_t umask, dict_t *xdata) +{ + int ret = -1; + uuid_t gfid = {0,}; + void *uuid_req = NULL; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "failed to get gfid from dict"); + goto out; + } + uuid_copy (gfid, uuid_req); + + ret = -1; + + /* <PRE> + IDX + FOP + GFID + MODE + UID + GID + Entry */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 8); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, mode | S_IFDIR, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 8); + + frame->local = local; + ret = 0; + + changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_symlink (call_frame_t *frame, xlator_t *this, + const char *linkname, loc_t *loc, + mode_t umask, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + uuid_t gfid = {0,}; + void *uuid_req = NULL; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "failed to get gfid from dict"); + goto out; + } + uuid_copy (gfid, uuid_req); + + ret = -1; + + /* <PRE> + IDX + FOP + GFID + LINKNAME + UID + GID + Entry */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 8); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_NAME (co, linkname, entry_free_fn, xtra_len, out); + co++; + + CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 8); + + frame->local = local; + ret = 0; + + changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_mknod (call_frame_t *frame, + xlator_t *this, loc_t *loc, + mode_t mode, dev_t dev, + mode_t umask, dict_t *xdata) +{ + int ret = -1; + uuid_t gfid = {0,}; + void *uuid_req = NULL; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "failed to get gfid from dict"); + goto out; + } + uuid_copy (gfid, uuid_req); + + ret = -1; + + /* <PRE> + IDX + FOP + GFID + MODE + UID + GID + DEV + Entry */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 9); + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term (this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, mode, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_USIGNLL (co, dev, number_fn, xtra_len); + co++; + + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 9); + + frame->local = local; + ret = 0; + + changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_create (call_frame_t *frame, xlator_t *this, + loc_t *loc, int32_t flags, mode_t mode, + mode_t umask, fd_t *fd, dict_t *xdata) +{ + int ret = -1; + uuid_t gfid = {0,}; + void *uuid_req = NULL; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "failed to get gfid from dict"); + goto out; + } + uuid_copy (gfid, uuid_req); + + ret = -1; + + /* <PRE> + IDX + FOP + GFID + MODE + UID + GID + ENTRY */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 8); + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term (this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, mode, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 8); + + frame->local = local; + ret = 0; + + changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +static int +_changelog_setattr_fill_common (call_frame_t *frame, xlator_t *this, + int32_t attr, struct iatt *stbuf, + uuid_t gfid, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + int used_count = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + used_count = 7; + CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, used_count); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + /** + * - <PRE> + * - IDX + * - FOP + * - GFID + * - Valid flag + * GF_SET_ATTR_MODE [chmod] + * ->ia_prot + * ->ia_type + * GF_SET_ATTR_UID | GF_SET_ATTR_GID [chown] + * ->ia_uid + * ->ia_gid + * GF_SET_ATTR_ATIME | GF_SET_ATTR_MTIME [utimes] + * ->ia_atime + * ->ia_mtime + */ + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, attr, uint32_fn, xtra_len); + co++; + + if (attr & GF_SET_ATTR_MODE) { + mode_t mode = 0; + + /* ->ia_prot & ->ia_type stored as a consolidated value */ + used_count--; + mode = st_mode_from_ia (stbuf->ia_prot, stbuf->ia_type); + + CHANGELOG_FILL_UINT32 (co, mode, uint32_fn, xtra_len); + + } else if (attr & (GF_SET_ATTR_UID | GF_SET_ATTR_GID)) { + uid_t uid = -1; + gid_t gid = -1; + + if (attr & GF_SET_ATTR_UID) + uid = stbuf->ia_uid; + if (attr & GF_SET_ATTR_GID) + gid = stbuf->ia_gid; + + /* ->ia_uid & ->ia_gid */ + CHANGELOG_FILL_INT32 (co, uid, int32_fn, xtra_len); + co++; + + CHANGELOG_FILL_INT32 (co, gid, int32_fn, xtra_len); + + } else if (attr & (GF_SET_ATTR_ATIME | GF_SET_ATTR_MTIME)) { + + /* ->ia_atime & ->ia_mtime, need usecs? */ + CHANGELOG_FILL_UINT32 (co, + stbuf->ia_atime, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, + stbuf->ia_mtime, uint32_fn, xtra_len); + } + + changelog_set_usable_record_and_length (local, xtra_len, used_count); + + ret = 0; + frame->local = local; + + changelog_update (this, priv, frame->local, CHANGELOG_TYPE_METADATA); + + out: + if (ret) + changelog_local_cleanup (this, local); + + return ret; +} + +int32_t +changelog_replication_fsetattr (call_frame_t *frame, + xlator_t *this, fd_t *fd, + struct iatt *stbuf, int32_t valid, + dict_t *xdata) +{ + return _changelog_setattr_fill_common (frame, this, valid, + stbuf, fd->inode->gfid, xdata); +} + +int32_t +changelog_replication_setattr (call_frame_t *frame, + xlator_t *this, loc_t *loc, + struct iatt *stbuf, int32_t valid, dict_t *xdata) +{ + return _changelog_setattr_fill_common (frame, this, valid, + stbuf, loc->inode->gfid, xdata); +} + +int32_t +changelog_replication_fremovexattr (call_frame_t *frame, xlator_t *this, + fd_t *fd, const char *name, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + int32_t xattr_op; + + priv = this->private; + + /* <PRE> + IDX + FOP + GFID */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 4); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) + CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op, + fop_fn, xtra_len); + else + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, + xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 4); + + frame->local = local; + ret = 0; + + changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_removexattr (call_frame_t *frame, xlator_t *this, + loc_t *loc, const char *name, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + int32_t xattr_op; + + priv = this->private; + + /* <PRE> + IDX + FOP + GFID */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 4); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) + CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op, + fop_fn, xtra_len); + else + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, + xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 4); + + frame->local = local; + ret = 0; + + changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_setxattr (call_frame_t *frame, + xlator_t *this, loc_t *loc, + dict_t *dict, int32_t flags, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + int32_t xattr_op; + + priv = this->private; + + /* <PRE> + IDX + FOP + GFID */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 4); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) + CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op, + fop_fn, xtra_len); + else + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, + xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 4); + + frame->local = local; + ret = 0; + + changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_fsetxattr (call_frame_t *frame, + xlator_t *this, fd_t *fd, dict_t *dict, + int32_t flags, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + int32_t xattr_op; + + priv = this->private; + + /* <PRE> + IDX + FOP + GFID */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 4); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) + CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op, + fop_fn, xtra_len); + else + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, + xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 4); + + frame->local = local; + ret = 0; + + changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_truncate (call_frame_t *frame, + xlator_t *this, loc_t *loc, + off_t offset, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /* <PRE> + IDX + FOP + GFID + Offset */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 5); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 5); + + frame->local = local; + ret = 0; + + changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_ftruncate (call_frame_t *frame, + xlator_t *this, fd_t *fd, + off_t offset, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /* <PRE> + IDX + FOP + GFID + Offset */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 5); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 5); + + frame->local = local; + ret = 0; + + changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_writev (call_frame_t *frame, + xlator_t *this, fd_t *fd, struct iovec *vector, + int32_t count, off_t offset, uint32_t flags, + struct iobref *iobref, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /* <PRE> + IDX + FOP + GFID + Offset + Length */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 6); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len); + co++; + + CHANGELOG_FILL_USIGNLL (co, iov_length (vector, count), + number_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 6); + + frame->local = local; + ret = 0; + + changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +/* overriden COPS */ +int +changelog_replication_cops_open (xlator_t *this, + changelog_priv_t *priv, void *cpriv, + char *name, gf_boolean_t last) +{ + changelog_local_t local = {0,}; + changelog_log_data_t cld = {0,}; + changelog_rollover_data_t *crd = NULL; + + crd = &cld.cld_roll; + + cld.cld_type = CHANGELOG_TYPE_ROLLOVER; + + crd->crd_finale = last; + crd->crd_use_suffix = _gf_false; + crd->crd_prealloc_size = 1<<29; /* preallocate 512 MB */ + + + (void) strcpy (crd->crd_changelog_name, name); + + local.lu.val = 0; + local.nr_bytes = 0; + + return changelog_inject_single_event (this, priv, &local, &cld); +} + +/** + * NO-OP changelog write (from changelog.c). Records are journaled + * in the FOP path. + */ +int +changelog_replication_cops_write (xlator_t *this, + changelog_priv_t *priv, void *cpriv, + changelog_local_t *local, + changelog_log_type type) +{ + return 0; +} + +/** + * no implicit rollover + */ +int +changelog_replication_cops_rollover (xlator_t *this, + changelog_priv_t *priv, void *cpriv, + char *name, gf_boolean_t last) +{ + return changelog_replication_cops_open (this, priv, cpriv, name, last); +} + +off_t +changelog_replication_cops_get_offset (xlator_t *this, + changelog_priv_t *priv, void *cpriv, + changelog_local_t *local) +{ + if (!local) + return 0; + + return (local->lu.val * JOURNAL_SECTOR_SIZE) + local->nr_bytes; +} + +void +changelog_replication_cops_set_offset (xlator_t *this, + changelog_priv_t *priv, void *cpriv, + changelog_local_t *local, off_t bytes) +{ + local->nr_bytes += bytes; +} + +void +changelog_replication_cops_reset_offset (xlator_t *this, changelog_priv_t *priv, + void *cpriv, changelog_local_t *local) +{ + return; +} + +int +changelog_replication_policy_init (xlator_t *this, + changelog_priv_t *priv, + struct changelog_logpolicy *cp) +{ + struct xlator_fops *r_fops = NULL; + struct changelog_ops *r_cops = NULL; + + r_fops = GF_CALLOC (1, sizeof (struct xlator_fops), + gf_changelog_mt_fop_policy_t); + if (!r_fops) + return -1; + + r_cops = GF_CALLOC (1, sizeof (struct changelog_ops), + gf_changelog_mt_fop_policy_t); + if (!r_cops) { + GF_FREE (r_fops); + return -1; + } + + cp->cpriv = GF_CALLOC (1, sizeof (off_t), + gf_changelog_mt_fop_policy_t); + if (!cp->cpriv) { + GF_FREE (r_fops); + GF_FREE (r_cops); + return -1; + } + + /* no roll-over, one big fat journal per term */ + priv->rollover_time = 0; + + /* fsync() is internally trigerred by NSR */ + priv->fsync_interval = 0; + + /* no record header: extra data (via iobufs) are always persisted */ + priv->no_gfid_hdr = _gf_true; + + priv->lockless_update = _gf_false; + + memcpy (r_fops, &changelog_default_fops, sizeof (struct xlator_fops)); + memcpy (r_cops, &changelog_default_cops, sizeof (struct changelog_ops)); + + priv->term = 0; + (void) memset (cp->changelog_name, '\0', PATH_MAX); + memcpy(cp->changelog_name, JOURNAL_NAME, strlen(JOURNAL_NAME)); +#if 0 + (void) snprintf (cp->changelog_name, PATH_MAX, + JOURNAL_NAME, priv->term); +#endif + + /* overload all fops */ + r_fops->writev = changelog_replication_writev; + r_fops->ftruncate = changelog_replication_ftruncate; + r_fops->truncate = changelog_replication_truncate; + r_fops->fsetxattr = changelog_replication_fsetxattr; + r_fops->setxattr = changelog_replication_setxattr; + r_fops->removexattr = changelog_replication_removexattr; + r_fops->fremovexattr = changelog_replication_fremovexattr; + r_fops->setattr = changelog_replication_setattr; + r_fops->fsetattr = changelog_replication_fsetattr; + r_fops->create = changelog_replication_create; + r_fops->mknod = changelog_replication_mknod; + r_fops->symlink = changelog_replication_symlink; + r_fops->mkdir = changelog_replication_mkdir; + r_fops->link = changelog_replication_link; + r_fops->rename = changelog_replication_rename; + r_fops->unlink = changelog_replication_unlink; + r_fops->rmdir = changelog_replication_rmdir; + + /* overload cops */ + r_cops->open = changelog_replication_cops_open; + r_cops->write = changelog_replication_cops_write; + r_cops->rollover = changelog_replication_cops_rollover; + r_cops->get_offset = changelog_replication_cops_get_offset; + r_cops->set_offset = changelog_replication_cops_set_offset; + r_cops->reset_offset = changelog_replication_cops_reset_offset; + + + cp->fops = r_fops; + cp->cops = r_cops; + + return 0; +} + +int +changelog_replication_policy_fini (xlator_t *this, + struct changelog_logpolicy *cp) +{ + GF_FREE (cp->fops); + GF_FREE (cp->cops); + GF_FREE (cp->cpriv); + return 0; +} |