summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/nsr-recon
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/nsr-recon')
-rw-r--r--xlators/cluster/nsr-recon/Makefile.am3
-rw-r--r--xlators/cluster/nsr-recon/src/Makefile.am23
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.c3130
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.h325
-rw-r--r--xlators/cluster/nsr-recon/src/recon_xlator.c1010
-rw-r--r--xlators/cluster/nsr-recon/src/recon_xlator.h92
6 files changed, 4583 insertions, 0 deletions
diff --git a/xlators/cluster/nsr-recon/Makefile.am b/xlators/cluster/nsr-recon/Makefile.am
new file mode 100644
index 000000000..d471a3f92
--- /dev/null
+++ b/xlators/cluster/nsr-recon/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/cluster/nsr-recon/src/Makefile.am b/xlators/cluster/nsr-recon/src/Makefile.am
new file mode 100644
index 000000000..e639e4437
--- /dev/null
+++ b/xlators/cluster/nsr-recon/src/Makefile.am
@@ -0,0 +1,23 @@
+xlator_LTLIBRARIES = nsr_recon.la
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/cluster
+
+nsr_recon_la_LDFLAGS = -module -avoid-version
+nsr_recon_la_SOURCES = recon_driver.c recon_xlator.c
+
+nsr_recon_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ $(top_builddir)/api/src/libgfapi.la
+
+noinst_HEADERS = recon_driver.h recon_xlator.h
+
+AM_CPPFLAGS = $(GF_CPPFLAGS) \
+ -I$(top_srcdir)/libglusterfs/src -I$(top_srcdir)/xlators/lib/src \
+ -I$(top_srcdir)/rpc/rpc-lib/src
+
+AM_CFLAGS = -Wall $(GF_CFLAGS)
+
+XLATOR_HEADER = $(top_srcdir)/libglusterfs/src/xlator.h
+
+CLEANFILES =
+
+uninstall-local:
+ rm -f $(DESTDIR)$(xlatordir)/nsr.so
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c
new file mode 100644
index 000000000..8c7622a02
--- /dev/null
+++ b/xlators/cluster/nsr-recon/src/recon_driver.c
@@ -0,0 +1,3130 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include <sys/types.h>
+#include <fcntl.h>
+#include <string.h>
+#include <unistd.h>
+#include <fnmatch.h>
+
+
+#include "call-stub.h"
+#include "defaults.h"
+#include "xlator.h"
+
+
+#include "recon_driver.h"
+#include "recon_xlator.h"
+#include "api/src/glfs-internal.h"
+#include "api/src/glfs-handles.h"
+
+/* TBD: move declarations here and nsr.c into a common place */
+#define NSR_TERM_XATTR "trusted.nsr.term"
+#define RECON_TERM_XATTR "trusted.nsr.recon-term"
+#define RECON_INDEX_XATTR "trusted.nsr.recon-index"
+
+/*
+ * Execution architecture for the NSR reconciliation driver. The driver runs
+ * as a seperate process in each node where the brick is. The main function of
+ * the driver is nsr_reconciliation_driver() (last function below) The driver
+ * just sits in a tight loop waiting for state changes. When a brick becomes a
+ * replica leader, it fences IO, contacts this process and waits for
+ * reconciliation to finish.
+ *
+ * The replica leader talks to other bricks in replica group which are alive
+ * and gets the last term info using which it decides which has the latest
+ * data. That brick is referred to as the "reconciliator"; leader sends a
+ * message to reconciliator to freeze its data (by reading any incomplete data
+ * from other nodes from that term if required)
+ *
+ * Once that is done leader sends a message to all nodes except the
+ * reconciliator to sync themselves with the reconciliator. This process is
+ * referred to as "resolution".
+ *
+ * Hence the reconciliation processes need to talk to each other to get a given
+ * term info. This is implemented using the recon translator IOs which
+ * implements a bare bone RPC by exposing a file interface to which
+ * reads/writes are done to pass control messages. This is referred to as the
+ * "control plane". This implementation allows the control plane to be
+ * implemented as a bunch of threads for each of the nodes.
+ *
+ * The reconciliation process also needs to talk to the brick process on that
+ * node to actually write the data as part of reconciliation/resolution. This
+ * is referred to as the "data plane". Again there are a bunch of threads that
+ * do this work.
+ *
+ * The way the worker threads are organised is that main driver context has a
+ * pointer to contexts for each of these thread contexts. The thread context at
+ * index 0 always refers to talking with local recon process/brick. So the
+ * control worker at index 0 will get the local changelog info and data worker
+ * at index 0 will talk to local brick.
+ *
+ * All the ops from the control/data planes are implemented using the glfs
+ * APIs.
+ */
+
+#if defined(NSR_DEBUG)
+
+/* This lets us change on the fly even if NSR_DEBUG is defined. */
+int nsr_debug_level = GF_LOG_TRACE;
+
+FILE *
+recon_create_log (char *member, char *module)
+{
+ char *dpath = NULL;
+ char *p;
+ char *fpath = NULL;
+ FILE *fp = NULL;
+ int fd = -1;
+
+ (void)mkdir(NSR_LOG_DIR,0777);
+ (void)asprintf(&dpath,NSR_LOG_DIR"/%s",member);
+ if (dpath) {
+ for (p = dpath + strlen(NSR_LOG_DIR) + 1; *p; ++p) {
+ if (*p == '/') {
+ *p = '-';
+ }
+ }
+ (void)mkdir(dpath,0777);
+ (void)asprintf(&fpath,"%s/%s",dpath,module);
+ if (fpath) {
+ fd = open(fpath,O_WRONLY|O_CREAT|O_APPEND|O_SYNC,0666);
+ if (fd >= 0) {
+ fp = fdopen(fd,"a");
+ if (!fp) {
+ close(fd);
+ }
+ }
+ if (fp) {
+ if (setvbuf (fp, NULL, _IONBF, 0)) {
+ /*
+ * Might as well take advantage of it
+ * to log the error.
+ */
+ fprintf (fp,
+ "setvbuf failed for log\n");
+ fprintf (fp,
+ "log output may be async\n");
+ fflush(fp);
+ }
+ }
+ free(fpath);
+ }
+ free(dpath);
+ }
+
+ return fp;
+}
+
+void
+_nsr_driver_log (const char *func, int line, char *member, FILE *fp,
+ char *fmt, ...)
+{
+ va_list ap;
+ char *buf = NULL;
+ int retval;
+
+ if (!fp) {
+ fp = recon_create_log(member,"nsr-driver-log");
+ if (!fp) {
+ return;
+ }
+ }
+
+ va_start(ap,fmt);
+ retval = vasprintf(&buf,fmt,ap);
+ if (buf) {
+ fprintf(fp,"[%s:%d] %.*s\n",func,line,retval,buf);
+ free(buf);
+ }
+ va_end(ap);
+}
+
+void
+_nsr_worker_log (const char *func, int line, char *member,
+ char *type, uint32_t index, FILE *fp,
+ char *fmt, ...)
+{
+ va_list ap;
+ char *buf = NULL;
+ int retval;
+
+ if (!fp) {
+ char *name;
+ if (asprintf(&name,"%s-%u",type,index) < 1) {
+ return;
+ }
+ fp = recon_create_log (member, name);
+ if (!fp) {
+ return;
+ }
+ }
+
+ va_start(ap,fmt);
+ retval = vasprintf(&buf,fmt,ap);
+ if (buf) {
+ fprintf(fp,"[%s:%d] %.*s\n",func,line,retval,buf);
+ free(buf);
+ }
+ va_end(ap);
+}
+
+#endif
+
+/*
+ * Recon Driver Calloc
+ *
+ * We need this because all of this messing about with gfapi from within a
+ * translator keeps scrambling THIS (only one reason it's a terrible idea) and
+ * we need THIS to have a value that represents our initialization with our
+ * memory types.
+ *
+ * Note that the macro requires "this" to be defined in the current scope.
+ */
+
+#define RD_CALLOC(x,y,z) ({THIS = this; GF_CALLOC(x,y,z); })
+
+/*
+ * This function gets the size of all the extended attributes for a file.
+ * This is used so that caller knows how much to allocate for key-value storage.
+ *
+ * Input Arguments:
+ * fd - the file opened using glfs API.
+ * dict - passed so that NSR translator can get this from the required brick
+ *
+ * Output Arguments:
+ * b - pointer to the buffer where the attributes are filled up.
+ * key_size - the size of all keys
+ * val_size - the size of all values
+ * num - number of key/values
+ */
+static int32_t
+get_xattr_total_size( struct glfs_fd *fd,
+ char **b,
+ uint32_t *key_size,
+ uint32_t *val_size,
+ uint32_t* num,
+ dict_t *dict)
+{
+ int32_t s = -1, ret = -1;
+ char *c = NULL;
+
+ *key_size = 0;
+ *val_size = 0;
+ *num = 0;
+
+ // First get the size of the keys
+ s = glfs_flistxattr_with_xdata(fd, NULL,0, dict);
+ if (s == -1) {
+ goto out;
+ }
+ *key_size = s;
+
+ // TBD - use the regular calloc
+ (*b) = c = calloc(s+1,1);
+
+ // get the keys themselves
+ if (glfs_flistxattr_with_xdata(fd, c, s+1, dict) == -1) {
+ goto out;
+ }
+ do {
+ int32_t r;
+ uint32_t len = 0;
+ // for each key get the size of the value
+ r = glfs_fgetxattr_with_xdata(fd, c, NULL, 0, dict);
+ if (r == -1)
+ goto out;
+ (*val_size) += r;
+ len = strlen(c) + 1;
+ c += len;
+ s -= len;
+ (*num)++;
+ } while(s);
+ ret = 0;
+out:
+ return ret;
+}
+
+/*
+ * This function gets bunch of xattr values given set of keys.
+ *
+ * Input Arguments:
+ * fd - the file opened using glfs API.
+ * keys - the bunch of keys
+ * size - size of values
+ * num - number of keys
+ * dict - passed so that NSR translator can get this from the required brick
+ *
+ * Output Arguments:
+ * buf - where the values are written one after the other (NULL seperated)
+ */
+static int32_t
+get_xattr(struct glfs_fd *fd,
+ char *keys,
+ char *buf,
+ uint32_t size,
+ uint32_t num,
+ dict_t *dict)
+{
+ while(num--) {
+ int32_t r;
+ uint32_t len = 0;
+
+ // copy the key
+ strcpy(buf, keys);
+ len = strlen(keys);
+ len++;
+ buf += len;
+
+ // get the value and copy the value after incrementing buf after the key
+ r = glfs_fgetxattr_with_xdata(fd, keys, buf, size, dict);
+
+ // TBD - handle error
+ if (r == -1)
+ return -1;
+
+ // increment the key to next value
+ keys += len;
+
+ // increment buf to hold the next key
+ buf += strlen(buf) + 1;
+ }
+ return 0;
+}
+
+/*
+ * Function deletes a bunch of key values in extended attributes of a file.
+ * Input Arguments:
+ * fd - the file opened using glfs API.
+ * dict - passed so that NSR translator can do this from the required brick
+ * keys - bunch of NULL seperated key names
+ * num - number of keys
+ */
+static int32_t delete_xattr(struct glfs_fd *fd,
+ dict_t *dict_t,
+ char *keys,
+ uint32_t num)
+{
+ while(num--) {
+ // get the value and copy the value
+ // TBD - handle failure cases when calling glfs_fremovexattr_with_xdata()
+ if (glfs_fremovexattr_with_xdata(fd, keys, dict_t) == -1)
+ return -1;
+ keys += strlen(keys) +1;
+ }
+ return 0;
+}
+
+/*
+ * Given a bunch of key value pairs, fill them as xattrs for a file
+ *
+ * Input Arguments:
+ * fd - the file opened using glfs API.
+ * dict - passed so that NSR translator can do this from the required brick
+ * buf - buffer containing the keys-values pairs. The key value are NULL seperated.
+ * Each of the key-value is seperated by NULL in turn.
+ * num - Number of such key value pairs.
+ */
+static int32_t
+fill_xattr(struct glfs_fd *fd,
+ dict_t *dict,
+ char *buf,
+ uint32_t num)
+{
+ char *k = buf, *val = NULL;
+
+ while(num--) {
+ int32_t r;
+
+ val = k + strlen(k) + 1;
+
+ // TBD - handle failure cases when calling glfs_fsetxattr_with_xdata()
+ r = glfs_fsetxattr_with_xdata(fd, k, val, strlen(val), 0, dict);
+ if (r == -1)
+ return -1;
+ k = val + strlen(val) + 1;
+ }
+ return 0;
+}
+
+/*
+ * This function gets a file that can be used for doing glfs_init later.
+ * The control file is used by control thread(function) to talk to peer reconciliation process.
+ * The data file is used by the data thread(function) to talk to the bricks.
+ * The control file is of name such as con:gfs1:-mnt-a1 where "gfs1" is name of host
+ * and the brick path is "/mnt/a1".
+ * The data file is of name such as data:gfs1:-mnt-a1.
+ *
+ * Input Arguments:
+ * vol - name of the volume. This is used to build the full path of the control and data file
+ * such as /var/lib/glusterd/vols/test/bricks/gfs2:-mnt-test1-nsr-recon.vol.
+ * In above example the volume name is test and brick on gfs2 is on path /mnt/test1
+ *
+ * worker - The worker for a given node. This worker has 2 threads - one on the data plane
+ * and one on the control plane. The worker->name is already filled with hostname:brickname
+ * in the function nsr_reconciliation_driver(). Use that to build the volume file.
+ * So if worker->name has gfs1:/mnt/a1, control file is con:gfs1:-mnt-a1
+ * and data file is data:gfs1:-mnt-a1.
+ * All these files are under the bricks directory. TBD - move this to a NSR recon directory later.
+ */
+static void
+nsr_recon_get_file(char *vol, nsr_replica_worker_t *worker)
+{
+ char *ptr;
+ char tr[256];
+
+ // Replace the "/" to -
+ strcpy(tr, worker->name);
+ ptr = strchr (tr, '/');
+ while (ptr) {
+ *ptr = '-';
+ ptr = strchr (tr, '/');
+ }
+
+ // Build the base directory such as "/var/lib/glusterd/vols/test/bricks/"
+ sprintf(worker->control_worker->vol_file,
+ "/%s/%s/%s/%s/",
+ GLUSTERD_DEFAULT_WORKDIR,
+ GLUSTERD_VOLUME_DIR_PREFIX,
+ vol,
+ GLUSTERD_BRICK_INFO_DIR);
+
+ strcat(worker->control_worker->vol_file, "con:");
+ strcat(worker->control_worker->vol_file, tr);
+
+ sprintf(worker->data_worker->vol_file,
+ "/%s/%s/%s/%s/",
+ GLUSTERD_DEFAULT_WORKDIR,
+ GLUSTERD_VOLUME_DIR_PREFIX,
+ vol,
+ GLUSTERD_BRICK_INFO_DIR);
+ strcat(worker->data_worker->vol_file, "data:");
+ strcat(worker->data_worker->vol_file, tr);
+}
+
+/*
+ * This function does all the glfs initialisation
+ * so that reconciliation process can talk to other recon processes/bricks
+ * for the control/data messages.
+ * This will be done everytime a worker needs to be kicked off to talk
+ * across any plane.
+ *
+ * Input arguments:
+ * ctx - The per worker based context
+ * control - set to true if this worker is for the control plane
+ */
+static int32_t
+nsr_recon_start_work(nsr_per_node_worker_t *ctx,
+ gf_boolean_t control)
+{
+ glfs_t *fs = NULL;
+ xlator_t *this = ctx->driver_ctx->this;
+ int32_t ret = 0;
+ glfs_fd_t *aux_fd = NULL; // fd of auxilary log
+ char lf[256];
+ nsr_recon_private_t *priv = NULL;
+ char *my_name = NULL;
+ char *morph_name = NULL, *ptr = NULL;
+
+ priv = this->private;
+ my_name = RD_CALLOC (1,
+ strlen (priv->replica_group_members[0]) + 1,
+ gf_mt_recon_member_name_t);
+ strcpy (my_name, priv->replica_group_members[0]);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "starting work with volfile %s\n",
+ ctx->vol_file);
+
+ fs = glfs_new(ctx->id);
+ if (!fs) {
+ glusterfs_this_set(this);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "cannot create gfls context for thread %s\n",ctx->id);
+ return -1;
+ }
+
+ // For some vague reason, glfs init APIs seem to be clobbering "this". hence resetting it.
+ glusterfs_this_set(this);
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "init done. setting volfile %s\n",
+ ctx->vol_file);
+
+ ret = glfs_set_volfile(fs, ctx->vol_file);
+ if (ret != 0) {
+ glusterfs_this_set(this);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "cannot set volfile %s for thread %s\n",ctx->vol_file, ctx->id);
+ return -1;
+ }
+
+ morph_name = RD_CALLOC (1, strlen (my_name) + 1,
+ gf_mt_recon_member_name_t);
+ strcpy (morph_name, my_name);
+
+ ptr = strchr (morph_name, '/');
+ while (ptr)
+ {
+ *ptr = '-';
+ ptr = strchr (morph_name, '/');
+ }
+ // TBD - convert this to right /usr/local/var/log based log files.
+
+ sprintf(lf, NSR_LOG_DIR"/%s/%s-%"PRIu32, morph_name,
+ (control == _gf_true)?"glfs-con":"glfs-data", ctx->index);
+ ret = glfs_set_logging (fs, lf, 7);
+ if (ret) {
+ glusterfs_this_set(this);
+ gf_log (this->name, GF_LOG_ERROR, "glfs logging set failed (%s)",
+ strerror (errno));
+ return -1;
+ }
+
+ ret = glfs_init (fs);
+ if (ret != 0) {
+ glusterfs_this_set(this);
+ nsr_worker_log(this->name, GF_LOG_ERROR, "cannot do init for thread %s with volfile %s\n",ctx->id, ctx->vol_file);
+ return -1;
+ }
+ glusterfs_this_set(this);
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "setting volfile %s done\n",
+ ctx->vol_file);
+
+ // If it is control thread, open the "/" as the aux_fd.
+ // All IOs happening via the fd will do the RPCs across the reconciliation
+ // processes. For some vague reason, the root seems to be open'able like a file.
+ // TBD - try to clean this up. (implement a virtual file???)
+ if (control == _gf_true) {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "doing open for / \n");
+ aux_fd = glfs_open (fs, "/", O_RDWR);
+ // TBD - proper error handling. Stall reconciliation if such a thing happens?
+ if (aux_fd == NULL) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "cannot open aux log file for thread %s\n",ctx->id);
+ return -1;
+ } else {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "---opened aux log file for thread %s\n",ctx->id);
+ }
+ ctx->aux_fd = aux_fd;
+ }
+ glusterfs_this_set(this);
+ ctx->fs = fs;
+ return 0;
+}
+
+/*
+ *
+ * This function does the cleanup after reconciliation is done
+ * or before we start a new reconciliation.
+ *
+ * Input arguments:
+ * ctx - The per worker based context
+ * control - set to true if this worker is for the control plane
+ */
+static int32_t
+nsr_recon_end_work(nsr_per_node_worker_t *ctx,
+ gf_boolean_t control)
+{
+ int32_t ret = 0;
+ xlator_t *this = ctx->driver_ctx->this;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "doing fini for recon worker\n");
+
+ ret = glfs_fini(ctx->fs);
+ if (ret != 0) {
+ glusterfs_this_set(this);
+ nsr_worker_log(this->name, GF_LOG_ERROR, "cannot do fini for thread %s with volfile %s\n",ctx->id, ctx->vol_file);
+ return -1;
+ }
+ glusterfs_this_set(this);
+ ctx->fs = NULL;
+ if (control == _gf_true) {
+ glfs_close (ctx->aux_fd);
+ ctx->aux_fd = NULL;
+ }
+ return 0;
+}
+
+//called in case all worker functions run as sepeerate threads
+static void
+init_worker(nsr_per_node_worker_t *ctx, gf_boolean_t control)
+{
+ pthread_mutex_init(&(ctx->mutex), NULL);
+ pthread_cond_init(&(ctx->cv), NULL);
+ INIT_LIST_HEAD(&(ctx->head.list));
+}
+
+
+/*
+ * Control worker funct for getting changelog info on this node.
+ * calls directly functions to parse the changelog.
+ *
+ * Input arguments:
+ * ctx - The per worker based context
+ * control - set to true if this worker is for the control plane
+ */
+static void
+control_worker_func_0(nsr_per_node_worker_t *ctx,
+ nsr_recon_work_t *work)
+{
+ unsigned int index = ctx->index;
+ nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]);
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+ xlator_t *this = dr->this;
+ nsr_recon_private_t *priv = this->private;
+
+ ctx->is_control = _gf_true;
+
+ switch (work->req_id) {
+ case NSR_WORK_ID_INI:
+ {
+ break;
+ }
+ case NSR_WORK_ID_FINI:
+ {
+ break;
+ }
+ case NSR_WORK_ID_GET_LAST_TERM_INFO:
+ {
+ nsr_recon_last_term_info_t lt;
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ // term is stuffed inside work->index. overloading.
+ int32_t term = work->index;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get last term info for node %d with current term %d\n",index, term);
+
+ // TBD - handle errors
+ // This is called by the leader after it gets the current term.
+ // Makes searching easier.
+ nsr_recon_libchangelog_get_last_term_info(this, priv->changelog_base_path, term, &lt);
+ recon_info->last_term = lt.last_term;
+ recon_info->commited_ops = lt.commited_ops;
+ recon_info->last_index = lt.last_index;
+ recon_info->first_index = lt.first_index;
+
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "out of get last term info with current term %d. got ops %d with first %d and last %d \n",
+ recon_info->last_term, recon_info->commited_ops,
+ recon_info->first_index, recon_info->last_index);
+ break;
+ }
+ case NSR_WORK_ID_GET_GIVEN_TERM_INFO:
+ {
+ nsr_recon_last_term_info_t lt;
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ // term is stuffed inside work->index. overloading.
+ int32_t term = work->index;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get term info for node %d for term %d\n",index, term);
+
+ // TBD - handle errors
+ nsr_recon_libchangelog_get_this_term_info(this,priv->changelog_base_path, term, &lt);
+
+ recon_info->last_term = lt.last_term;
+ recon_info->commited_ops = lt.commited_ops;
+ recon_info->last_index = lt.last_index;
+ recon_info->first_index = lt.first_index;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "out of get term info for term %d. got ops %d with first %d and last %d \n",
+ recon_info->last_term, recon_info->commited_ops,
+ recon_info->first_index, recon_info->last_index);
+
+ break;
+ }
+ case NSR_WORK_ID_RECONCILIATOR_DO_WORK:
+ {
+ // For local resolution, the main driver thread does it.
+ // SO there is no way we can have this message for this node.
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "this message should not be sent \n");
+ break;
+ }
+ case NSR_WORK_ID_RESOLUTION_DO_WORK:
+ {
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "this message should not be sent \n");
+ ctx->result = -1;
+ break;
+ }
+ case NSR_WORK_ID_GET_RECONCILATION_WINDOW:
+ {
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ // first_index and last_index at 0 indicates empty log.
+ // For non empty log, the first_index always starts at 1.
+ uint32_t num = (dr->workers[index].recon_info->last_index -
+ dr->workers[index].recon_info->first_index + 1);
+ nsr_recon_record_details_t *rd;
+ uint32_t i=0;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get reconciliation window records for node %d for term %d with first %d last %d\n",
+ index, recon_info->last_term, recon_info->first_index, recon_info->last_index);
+
+
+ // TBD - handle buffer allocation errors
+ rd = RD_CALLOC(num,
+ sizeof(nsr_recon_record_details_t),
+ gf_mt_recon_record_details_t);
+ if (rd == NULL) {
+ ctx->result = -1;
+ return;
+ }
+
+ recon_info->records = RD_CALLOC(num,
+ sizeof(nsr_reconciliation_record_t),
+ gf_mt_recon_record_t);
+ if (recon_info->records == NULL) {
+ ctx->result = -1;
+ return;
+ }
+
+ // TBD - handle errors
+ if (nsr_recon_libchangelog_get_records(this, priv->changelog_base_path,
+ recon_info->last_term,
+ recon_info->first_index,
+ recon_info->last_index,
+ rd) == _gf_false) {
+ ctx->result = -1;
+ return;
+ }
+
+ // The above function writes into rd from 0 to (num -1)
+ // We need to take care of this whenever we deal with records
+ for (i=0; i < num; i++) {
+ ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl
+ memcpy(&(recon_info->records[i].rec),
+ &(rd[i]),
+ sizeof(nsr_recon_record_details_t));
+ }
+
+ GF_FREE(rd);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got reconciliation window records for node %d for term %d \n",
+ index, recon_info->last_term);
+ break;
+ }
+
+ default:
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "bad req id %u", work->req_id);
+ }
+
+ return;
+}
+
+// Control worker thread
+static void*
+control_worker_main_0(nsr_per_node_worker_t *ctx)
+{
+
+ ctx->is_control = _gf_true;
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "starting control worker func 0\n");
+
+ init_worker(ctx, 1);
+
+ while(1)
+ {
+ nsr_recon_work_t *work = NULL;
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "waiting for work\n");
+
+ pthread_mutex_lock(&ctx->mutex);
+ while (list_empty(&(ctx->head.list))) {
+ pthread_cond_wait(&ctx->cv, &ctx->mutex);
+ }
+ pthread_mutex_unlock(&ctx->mutex);
+
+
+ list_for_each_entry(work, &(ctx->head.list), list) {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got work with id %d\n", work->req_id);
+ work->in_use = _gf_false;
+
+ // Call the main function.
+ control_worker_func_0(ctx, work);
+
+ atomic_dec(&(dr->outstanding));
+ break;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n");
+ list_del_init (&work->list);
+ GF_FREE(work);
+ nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n");
+ }
+
+ return NULL;
+}
+
+static void
+control_worker_do_reconciliation (nsr_per_node_worker_t *ctx,
+ nsr_recon_work_t *work)
+{
+ unsigned int index = ctx->index;
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+ nsr_recon_role_t rr;
+ uint32_t i=0;
+ uint32_t num=0;
+ uint32_t idx = dr->reconciliator_index;
+ uint32_t term = dr->workers[idx].recon_info->last_term;
+
+ GF_ASSERT(idx == index);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to make this index %d as reconciliator for term %d\n", index, term);
+
+ // TBD - error handling for all the glfs APIs
+ if (glfs_lseek(ctx->aux_fd,
+ nsr_recon_xlator_sector_1,
+ SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+
+ // We have all the info for all other nodes.
+ // Fill all that info when sending data to that process.
+ for (i=0; i < dr->replica_group_size; i++) {
+ if ( dr->workers[i].in_use &&
+ (dr->workers[i].recon_info->last_term == term)) {
+ rr.info[num].last_term =
+ dr->workers[i].recon_info->last_term;
+ rr.info[num].commited_ops =
+ dr->workers[i].recon_info->commited_ops;
+ rr.info[num].last_index =
+ dr->workers[i].recon_info->last_index;
+ rr.info[num].first_index =
+ dr->workers[i].recon_info->first_index;
+ strcpy(rr.info[num].name,
+ dr->workers[i].name);
+ }
+ num++;
+ }
+ rr.num = num;
+ rr.role = reconciliator;
+ ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl
+ if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) {
+ ctx->result = -1;
+ // Put the errno only for this case since we are bothered about
+ // retrying only for this case. For rest of the cases we will
+ // just return EIO in errno.
+ ctx->op_errno = errno;
+ return;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "sent reconciliator info for term %d with node count as %d\n", term, num);
+}
+
+static void
+control_worker_do_resolution (nsr_per_node_worker_t *ctx,
+ nsr_recon_work_t *work)
+{
+ unsigned int index = ctx->index;
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+ nsr_recon_role_t rr;
+ unsigned int i=0, j=0;
+ unsigned int rec = dr->reconciliator_index;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to make this index %d as resolutor with reconciliator as %d\n",index, rec);
+
+ // TBD - error handling for all the glfs APIs
+ if (glfs_lseek(ctx->aux_fd,
+ nsr_recon_xlator_sector_1,
+ SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+
+ rr.num = 2;
+
+ // Fill in info[0] as info for the node for which we are seeking
+ // resolution. Fill in info[1] as info of the reconciliator node. The
+ // function nsr_recon_driver_get_role() that will be called when this
+ // message reaches the node will look at index 1 for term information
+ // related to the reconciliator.
+ for (i=0; i < 2; i++) {
+ (i == 0) ? (j = index) : (j = rec);
+ rr.info[i].last_term =
+ dr->workers[j].recon_info->last_term;
+ rr.info[i].commited_ops =
+ dr->workers[j].recon_info->commited_ops;
+ rr.info[i].last_index =
+ dr->workers[j].recon_info->last_index;
+ rr.info[i].first_index =
+ dr->workers[j].recon_info->first_index;
+ // The name is used as the key to convert indices since the
+ // reconciliator index could be different across the nodes.
+ strcpy(rr.info[i].name,
+ dr->workers[j].name);
+ if (i == 0) {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "this node info term=%d, ops=%d, first=%d, last=%d\n",
+ rr.info[i].last_term, rr.info[i].commited_ops,
+ rr.info[i].first_index,rr.info[i].last_index);
+ } else {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "reconciliator node info term=%d, ops=%d, first=%d, last=%d\n",
+ rr.info[i].last_term, rr.info[i].commited_ops,
+ rr.info[i].first_index,rr.info[i].last_index);
+ }
+ }
+ rr.role = resolutor;
+ ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl
+ if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) {
+ ctx->result = -1;
+ // Put the errno only for this case since we are bothered about
+ // retrying only for this case. For rest of the cases we will
+ // just return EIO in errno.
+ ctx->op_errno = errno;
+ return;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "sent message to this node %d resolutor with reconciliator as %d\n", index, rec);
+}
+
+static void
+control_worker_get_window (nsr_per_node_worker_t *ctx, nsr_recon_work_t *work)
+{
+ unsigned int index = ctx->index;
+ nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]);
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+ xlator_t *this = dr->this;
+ nsr_recon_log_info_t li;
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ uint32_t i = 0;
+ uint32_t num = (dr->workers[index].recon_info->last_index -
+ dr->workers[index].recon_info->first_index +1);
+ nsr_recon_record_details_t *rd;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get reconciliation window records for node %d for term %d with first %d last %d\n",
+ index, recon_info->last_term, recon_info->first_index, recon_info->last_index);
+
+ // TBD - error handling for all the glfs APIs
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+
+ // write to node what term & indices we are interested
+ li.term = recon_info->last_term;
+ li.first_index = recon_info->first_index;
+ li.last_index = recon_info->last_index;
+ ENDIAN_CONVERSION_LI(li, _gf_false); //htonl
+ if (glfs_write(ctx->aux_fd, &li, sizeof(li), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+
+ // then read
+ rd = RD_CALLOC(num,
+ sizeof(nsr_recon_record_details_t),
+ gf_mt_recon_private_t);
+ if (rd == NULL) {
+ ctx->result = -1;
+ return;
+ }
+ recon_info->records = RD_CALLOC(num,
+ sizeof(nsr_reconciliation_record_t),
+ gf_mt_recon_private_t);
+ if (recon_info->records == NULL) {
+ ctx->result = -1;
+ goto err;
+ }
+
+ if (glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0) == -1) {
+ ctx->result = -1;
+ goto err;
+ }
+
+ for (i=0; i < num; i++) {
+ ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl
+ memcpy (&(recon_info->records[i].rec), &(rd[i]),
+ sizeof(nsr_recon_record_details_t));
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "get_reconcilaition_window:Got %d at index %d\n",
+ recon_info->records[i].rec.type,
+ i + recon_info->first_index);
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got reconciliation window records for node %d for term %d \n",
+ index, recon_info->last_term);
+
+err:
+ GF_FREE(rd);
+}
+
+/*
+ * Control worker funct for getting changelog info on some other node.
+ * calls glfs functions to seek/read/write on aux_fd.
+ *
+ * Input arguments:
+ * ctx - The per worker based context
+ * control - set to true if this worker is for the control plane
+ */
+static void
+control_worker_func(nsr_per_node_worker_t *ctx,
+ nsr_recon_work_t *work)
+{
+ unsigned int index = ctx->index;
+ nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]);
+ nsr_recon_last_term_info_t lt;
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ int32_t term = htonl(work->index); // overloading it
+
+ ctx->is_control = _gf_true;
+
+ switch (work->req_id){
+
+ case NSR_WORK_ID_INI:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "calling nsr_recon_start_work\n");
+
+ // TBD - handle error in case nsr_recon_start_work gives error
+ if (nsr_recon_start_work(ctx, _gf_true) != 0) {
+ ctx->result = -1;
+ return;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished nsr_recon_start_work\n");
+ break;
+
+ case NSR_WORK_ID_FINI:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "calling nsr_recon_end_work\n");
+
+ // TBD - handle error in case nsr_recon_end_work gives error
+ if (nsr_recon_end_work(ctx, _gf_true) != 0) {
+ ctx->result = -1;
+ return;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished nsr_recon_end_work\n");
+ break;
+
+ case NSR_WORK_ID_GET_LAST_TERM_INFO:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get last term info for node %d with current term %d\n",index, work->index);
+
+ // first write the current term term number
+ // TBD - error handling for all the glfs APIs
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl
+ recon_info->last_term = lt.last_term;
+ recon_info->commited_ops = lt.commited_ops;
+ recon_info->last_index = lt.last_index;
+ recon_info->first_index = lt.first_index;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "out of get last term info with current term %d. got ops %d with first %d and last %d \n",
+ recon_info->last_term, recon_info->commited_ops,
+ recon_info->first_index, recon_info->last_index);
+
+ break;
+
+ case NSR_WORK_ID_GET_GIVEN_TERM_INFO:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get term info for node %d for term %d\n",index, work->index);
+
+ // first write the term number
+ // TBD - error handling for all the glfs APIs
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl
+ recon_info->last_term = lt.last_term;
+ recon_info->commited_ops = lt.commited_ops;
+ recon_info->last_index = lt.last_index;
+ recon_info->first_index = lt.first_index;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "out of get term info for term %d. got ops %d with first %d and last %d \n",
+ recon_info->last_term, recon_info->commited_ops,
+ recon_info->first_index, recon_info->last_index);
+
+ break;
+
+ case NSR_WORK_ID_RECONCILIATOR_DO_WORK:
+ control_worker_do_reconciliation(ctx,work);
+ break;
+
+ case NSR_WORK_ID_RESOLUTION_DO_WORK:
+ control_worker_do_resolution(ctx,work);
+ break;
+
+ case NSR_WORK_ID_GET_RECONCILATION_WINDOW:
+ control_worker_get_window(ctx,work);
+ break;
+
+ default:
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "bad work type %d", work->req_id);
+ }
+
+ return;
+}
+
+// Control worker thread
+static void*
+control_worker_main(nsr_per_node_worker_t *ctx)
+{
+ unsigned int index = ctx->index;
+
+ ctx->is_control = _gf_true;
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "starting control worker func\n");
+
+ // if this is for local processing, call the changelog parsing calls directly
+ if (index == 0) {
+ control_worker_main_0(ctx);
+ return NULL;
+ }
+
+ init_worker(ctx, 1);
+
+
+ while(1)
+ {
+ nsr_recon_work_t *work = NULL;
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "waiting for work\n");
+
+ pthread_mutex_lock(&ctx->mutex);
+ while (list_empty(&(ctx->head.list))) {
+ pthread_cond_wait(&ctx->cv, &ctx->mutex);
+ }
+ pthread_mutex_unlock(&ctx->mutex);
+
+
+ list_for_each_entry(work, &(ctx->head.list), list) {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got work with id %d\n", work->req_id);
+ work->in_use = _gf_false;
+ control_worker_func(ctx,work);
+ atomic_dec(&(dr->outstanding));
+ break;
+ }
+ nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n");
+ list_del_init (&work->list);
+ GF_FREE(work);
+ nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n");
+ }
+
+ return NULL;
+}
+
+/*
+ * This function gets called if this process is chosen as the reconciliator
+ * for this replica group. It would have already got the records for the last term
+ * for the indices that are required (from the first HOLE to last index) from
+ * all other nodes that also witnessed that term. COmpare all the records and
+ * compute the work required.
+ *
+ * Input arguments
+ * ctx - driver context. All recon work is stored in workers[0].recon_info
+ */
+static void
+compute_reconciliation_work(nsr_recon_driver_ctx_t *ctx)
+{
+ uint32_t i=0, j=0;
+ nsr_reconciliator_info_t *my_recon = ctx->workers[0].recon_info;
+ uint32_t num = (my_recon->last_index - my_recon->first_index + 1);
+
+ for (i=0; i < num; i++) {
+ nsr_log_type_t orig, new;
+ unsigned int src = 0;
+ orig = new = my_recon->records[i].rec.type;
+ nsr_recon_work_type_t tw = NSR_RECON_WORK_NONE;
+ // index 0 means this node. Look at all other nodes.
+ for (j=1; j < ctx->replica_group_size; j++) {
+ if (ctx->workers[j].in_use) {
+ nsr_log_type_t pr = ctx->workers[j].recon_info->records[i].work.type;
+ if ((new != pr) && (pr > new)) {
+ src = j;
+ new = (new | pr);
+ }
+ }
+ }
+ // TBD - compare data if new and orig are all FILLs. (can detect changelog corruption)
+ // Right now we compare if both orig and new are psuedo holes since
+ // only that is of interest to us.
+ if (orig != new) {
+ if ((orig == NSR_LOG_HOLE) && (new == NSR_LOG_PSEUDO_HOLE))
+ tw = NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE;
+ else if ((orig == NSR_LOG_HOLE) && (new == NSR_LOG_FILL))
+ tw = NSR_RECON_WORK_HOLE_TO_FILL;
+ else if ((orig == NSR_LOG_PSEUDO_HOLE) && (new == NSR_LOG_PSEUDO_HOLE))
+ tw = NSR_RECON_WORK_COMPARE_PSEUDO_HOLE;
+ else if ((orig == NSR_LOG_PSEUDO_HOLE) && (new == NSR_LOG_FILL))
+ tw = NSR_RECON_WORK_HOLE_TO_FILL;
+ }
+ if (tw != NSR_RECON_WORK_NONE) {
+ my_recon->records[i].work.type = tw;
+ my_recon->records[i].work.source = src;
+ // Overwrite the record
+ memcpy(&(my_recon->records[i].rec),
+ &(ctx->workers[src].recon_info->records[i].rec),
+ sizeof(nsr_recon_record_details_t));
+ }
+ }
+ return;
+}
+
+static int32_t
+nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
+ uint32_t i,
+ gf_boolean_t in_use);
+
+/*
+ * Write the role and associated information to the node.
+ * This gets called from recon xlator indicating node is either
+ * leader, reconciliator or should do resolution.
+ */
+gf_boolean_t
+nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
+ nsr_recon_role_t *rr,
+ uint32_t term)
+{
+ nsr_role_work_t *rw;
+ xlator_t *this = ctx->this;
+
+ nsr_driver_log(this->name, GF_LOG_INFO, "set role called \n");
+ rw = RD_CALLOC(1, sizeof (nsr_role_work_t), gf_mt_recon_role_work_t);
+ memcpy(&rw->role, rr, sizeof(nsr_recon_role_t));
+ rw->term = term;
+ INIT_LIST_HEAD(&(rw->list));
+ pthread_mutex_lock(&(ctx->mutex));
+ list_add_tail(&rw->list, &ctx->role_head.list);
+ pthread_cond_signal(&(ctx->cv));
+ pthread_mutex_unlock(&(ctx->mutex));
+ nsr_driver_log(this->name, GF_LOG_INFO, "set role returns \n");
+ return _gf_true;
+}
+
+/*
+ * First we undo the last role to make sure we clean up.
+ *
+ * Input arguments
+ * ctx - driver context.
+ * rr - Role information.
+ * If leader, the thread now sends the list of all nodes that are part of
+ * the current replica group. Use that to find out the activate the
+ * required worker threads.
+ * If reconciliator, the leader node would have sent information about
+ * all nodes which saw last term as the reconciliator.
+ * If resolution to be done, then rr.info[0] will have this node's info
+ * which the leader would have got earlier. rr[1].info will have the
+ * info regarding the reconciliator.
+ * term - leader's term that is causing this role
+ */
+nsr_recon_driver_state_t
+nsr_recon_driver_get_role(int32_t *status,
+ nsr_recon_driver_ctx_t *ctx,
+ nsr_role_work_t *rw)
+{
+ uint8_t i=0, j=0;
+ nsr_recon_role_t *rr = &(rw->role);
+ nsr_reconciliator_info_t *tmp;
+ xlator_t *this = ctx->this;
+
+ // First make all the threads uninitialise
+ for (i = 0; i < ctx->replica_group_size; i++) {
+ if (nsr_recon_in_use(ctx, i, _gf_false) == -1) {
+ *status = -1;
+ return 0;
+ }
+ }
+
+ switch (rr->role) {
+ case leader:
+ case joiner:
+
+ // First set info this node
+ tmp = RD_CALLOC (1, sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_reconciliator_info_t);
+ if (!tmp) {
+ *status = -1;
+ return 0;
+ }
+ ctx->workers[0].recon_info = tmp;
+ if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
+ ctx->current_term = rr->current_term;
+
+ // Find rest of the nodes
+ for (i=1; i < ctx->replica_group_size; i++) {
+ for (j=0 ; /* nothing */; j++) {
+ if (j >= rr->num) {
+ nsr_driver_log (this->name, GF_LOG_ERROR,
+ "failed to find %s",
+ ctx->workers[i].name);
+ break;
+ }
+ if (strcmp(ctx->workers[i].name,
+ rr->info[j].name)) {
+ continue;
+ }
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "nsr_recon_driver_get_role: this as %s. found other server %s\n",
+ (rr->role == leader) ? "leader"
+ : "joiner",
+ ctx->workers[i].name);
+
+ // Allocate this here. This will get later
+ // filled when the leader tries to get last term
+ // information from all the nodes
+ tmp = RD_CALLOC (1,
+ sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_reconciliator_info_t);
+ if (!tmp) {
+ *status = -1;
+ return 0;
+ }
+ ctx->workers[i].recon_info = tmp;
+ if (nsr_recon_in_use(ctx, i, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
+ break;
+ }
+ }
+ // If leader, reconciliator has to be chosen.
+ // If joiner, we are the reconciliator.
+ if (rr->role == leader)
+ ctx->reconciliator_index = -1;
+ else
+ ctx->reconciliator_index = 0;
+ break;
+
+ case reconciliator:
+ ctx->reconciliator_index = 0;
+ // Copy information about all the other members which had the
+ // same term
+ for (i=0; i < rr->num; i++) {
+ for (j=0; /* nothing */; j++) {
+ if (j >= ctx->replica_group_size) {
+ nsr_driver_log (this->name, GF_LOG_ERROR,
+ "failed to find %s",
+ rr->info[i].name);
+ break;
+ }
+ if (strcmp(rr->info[i].name,
+ ctx->workers[j].name)) {
+ continue;
+ }
+ nsr_driver_log(this->name, GF_LOG_INFO,
+ "nsr_recon_driver_get_role: this as reconciliator. found other server %s\n",
+ ctx->workers[j].name);
+ tmp = RD_CALLOC (1,
+ sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_reconciliator_info_t);
+ if (!tmp) {
+ *status = -1;
+ return 0;
+ }
+ tmp->last_term = rr->info[i].last_term;
+ tmp->commited_ops = rr->info[i].commited_ops;
+ tmp->last_index = rr->info[i].last_index;
+ tmp->first_index = rr->info[i].first_index;
+ ctx->workers[j].recon_info = tmp;
+ if (nsr_recon_in_use(ctx, j, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
+ break;
+ }
+ }
+ break;
+
+ case resolutor:
+ for (j=0; /* nothing */; j++) {
+ // info[1] has the information regarding the
+ // reconciliator
+ if (j >= ctx->replica_group_size) {
+ nsr_driver_log (this->name, GF_LOG_ERROR,
+ "failed to find %s",
+ rr->info[1].name);
+ break;
+ }
+ if (strcmp(rr->info[1].name,
+ ctx->workers[j].name)) {
+ continue;
+ }
+ nsr_driver_log(this->name, GF_LOG_INFO,
+ "nsr_recon_driver_get_role: this as resolutor. found other server %s as reconciliator\n",
+ ctx->workers[j].name);
+ tmp = RD_CALLOC (1,
+ sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_reconciliator_info_t);
+ if (!tmp) {
+ *status = -1;
+ return 0;
+ }
+ tmp->last_term = rr->info[1].last_term;
+ tmp->commited_ops = rr->info[1].commited_ops;
+ tmp->last_index = rr->info[1].last_index;
+ tmp->first_index = rr->info[1].first_index;
+ ctx->reconciliator_index = j;
+ ctx->workers[j].recon_info = tmp;
+ if (nsr_recon_in_use(ctx, j, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
+ GF_ASSERT(ctx->reconciliator_index != 0);
+ break;
+ }
+ tmp = RD_CALLOC (1,
+ sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_reconciliator_info_t);
+ if (!tmp) {
+ *status = -1;
+ return 0;
+ }
+ // info[0] has all info for this node
+ tmp->last_term = rr->info[0].last_term;
+ tmp->commited_ops = rr->info[0].commited_ops;
+ tmp->last_index = rr->info[0].last_index;
+ tmp->first_index = rr->info[0].first_index;
+ ctx->workers[0].recon_info = tmp;
+ if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
+ }
+
+ ctx->term = rw->term;
+
+ *status = 0;
+ return rr->role;
+}
+
+
+/*
+ * This function gets called if this process is chosen to sync itself with
+ * the reconciliator.
+ *
+ * Input arguments
+ * ctx - driver context.
+ * my_info - local changelog info that has all the local records for indices that require work
+ * his_info - reconciliator's info that has all the golden copies
+ * invalidate - if set to true, then do not consult local records
+ */
+
+static void
+compute_resolution_work(nsr_recon_driver_ctx_t *ctx,
+ nsr_reconciliator_info_t *my_info,
+ nsr_reconciliator_info_t *his_info,
+ gf_boolean_t invalidate)
+{
+ uint32_t i=0;
+ uint32_t num = (my_info->last_index - my_info->first_index + 1);
+ xlator_t *this = ctx->this;
+
+ if (invalidate) {
+ if (my_info->records) {
+ GF_FREE(my_info->records);
+ }
+ my_info->records = RD_CALLOC(num,
+ sizeof(nsr_reconciliation_record_t),
+ gf_mt_recon_record_t);
+ }
+
+ for (i=0; i < num; i++) {
+ nsr_log_type_t orig, new;
+ nsr_recon_work_type_t tw = NSR_RECON_WORK_NONE;
+ orig = my_info->records[i].rec.type;
+ if (invalidate)
+ orig = NSR_LOG_HOLE;
+ new = his_info->records[i].rec.type;
+ // TBD - we can never have PSUEDO_HOLE in reconciliator's info
+ // We should have taken care of that during reconciliation.
+ // Put an assert to validate that.
+ if (new != orig) {
+ if ((orig != NSR_LOG_FILL) && (new == NSR_LOG_FILL))
+ tw = NSR_RECON_WORK_HOLE_TO_FILL;
+ else if ((orig != NSR_LOG_HOLE) && (new == NSR_LOG_HOLE))
+ tw = NSR_RECON_WORK_UNDO_FILL;
+ }
+ // copy the records anyway
+ my_info->records[i].work.type = tw;
+ my_info->records[i].work.source = ctx->reconciliator_index;
+ memcpy(&(my_info->records[i].rec),
+ &(his_info->records[i].rec),
+ sizeof(nsr_recon_record_details_t));
+ }
+ return;
+}
+
+
+// Create an glfs object
+static struct glfs_object *
+create_obj(nsr_per_node_worker_t *ctx, char *gfid_str)
+{
+ struct glfs_object *obj = NULL;
+ uuid_t gfid;
+
+ uuid_parse(gfid_str, gfid);
+
+ obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL);
+ if (obj == NULL) {
+ GF_ASSERT(obj != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "creating of handle failed\n");
+ return NULL;
+ }
+ return obj;
+}
+
+/*
+ * Function to apply the actual record onto the local brick.
+ * prior to this we should have read all the data from the
+ * brick that has the data.
+ *
+ * Input parameters:
+ * ctx - per node worker context that has the fs for communicating to brick
+ * ri - Reconciliation record that needs fixup
+ * dict - So that NSR server translator on brick applis fixup only on this brick
+ * and the changelog translator consumes term and index.
+ */
+
+static gf_boolean_t
+apply_record(nsr_per_node_worker_t *ctx,
+ nsr_reconciliation_record_t *ri,
+ dict_t * dict)
+{
+ struct glfs_fd *fd = NULL;
+ struct glfs_object *obj = NULL;
+ struct glfs_object *to_obj = NULL;
+ gf_boolean_t retval = _gf_false;
+
+ if (ri->rec.op == GF_FOP_WRITE) {
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "DOing write for file %s @offset %d for len %d\n",
+ ri->rec.gfid, ri->rec.offset, ri->rec.len);
+
+ // The file has got deleted on the source. Hence just ignore
+ // this.
+ // TBD - get a way to just stuff the log entry without writing
+ // the data so that changelogs remain identical.
+ if (ri->work.data == NULL) {
+ return _gf_true;
+ }
+
+ if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL)
+ goto err;
+
+ fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
+ if (fd == NULL) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "open for file %s failed\n",
+ ri->rec.gfid);
+ goto err;
+ }
+ if (glfs_lseek_with_xdata(fd, ri->rec.offset, SEEK_SET, dict) != ri->rec.offset) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "lseek for file %s failed at offset %d\n",
+ ri->rec.gfid, ri->rec.offset);
+ goto err;
+ }
+ if (glfs_write_with_xdata(fd, ri->work.data, ri->rec.len, 0, dict) != ri->rec.len) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "write for file %s failed for bytes %d\n",
+ ri->rec.gfid, ri->rec.len);
+ goto err;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finished DOing write for gfid %s @offset %d for len %d\n",
+ ri->rec.gfid, ri->rec.offset, ri->rec.len);
+
+ } else if (ri->rec.op == GF_FOP_FTRUNCATE) {
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "DOing truncate for file %s @offset %d \n",
+ ri->rec.gfid, ri->rec.offset);
+
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) {
+ goto err;
+ }
+
+ fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
+ if (fd == NULL) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "open for file %s failed\n",
+ ri->rec.gfid);
+ goto err;
+ }
+ if (glfs_ftruncate_with_xdata(fd, ri->rec.offset, dict) == -1) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "trunctae for file %s failed @offset %d\n",
+ ri->rec.gfid,ri->rec.offset );
+ goto err;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finished DOing truncate for gfid %s @offset %d \n",
+ ri->rec.gfid, ri->rec.offset);
+
+ } else if ((ri->rec.op == GF_FOP_FREMOVEXATTR) ||
+ (ri->rec.op == GF_FOP_REMOVEXATTR) ||
+ (ri->rec.op == GF_FOP_SETXATTR) ||
+ (ri->rec.op == GF_FOP_FSETXATTR)) {
+
+ uint32_t k_s = 0, v_s = 0;
+ char *t_b= NULL;
+ uint32_t num = 0;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing set extended attr for file %s \n",
+ ri->rec.gfid);
+
+ // The file has got deleted on the source. Hence just ignore
+ // this. TBD - get a way to just stuff the log entry without
+ // writing the data so that changelogs remain identical.
+ if (ri->work.data == NULL) {
+ return _gf_true;
+ }
+
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) {
+ goto err;
+ }
+
+ if (obj->inode->ia_type == IA_IFDIR)
+ fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
+ else
+ fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
+ if (fd == NULL) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "open for file %s failed\n",
+ ri->rec.gfid);
+ goto err;
+ }
+
+ if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) {
+ if (t_b) free(t_b);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "list of xattr of %s failed\n", ri->rec.gfid);
+ goto err;
+ }
+
+ if (delete_xattr(fd, dict, t_b, num) == -1) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "deleting xattrs failed\n");
+ goto err;
+ }
+
+ // Set one special dict flag to indicate the opcode so that
+ // the opcode gets set to this
+ if (dict_set_int32(dict,"recon-xattr-opcode",ri->rec.op)) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "setting opcode to %d failed\n",ri->rec.op);
+ goto err;
+ }
+
+ if (fill_xattr(fd, dict, ri->work.data, ri->work.num) == -1) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "filling xattrs failed\n");
+ goto err;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finsihed Doing set extended attr for %s \n",
+ ri->rec.gfid);
+
+ } else if (ri->rec.op == GF_FOP_CREATE) {
+
+ uuid_t gfid;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing create for file %s \n",
+ ri->rec.gfid);
+
+ // TBD - add mode and flags later
+ uuid_parse(ri->rec.gfid, gfid);
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) {
+ goto err;
+ }
+
+ nsr_worker_log (this->name, GF_LOG_INFO,
+ "creating with mode 0%o", ri->rec.mode);
+ if (glfs_h_creat_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, ri->rec.mode, NULL, gfid, dict) == NULL) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "Failure for Doing create for file %s\n",
+ ri->rec.entry);
+ goto err;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finished Doing create for file %s \n",
+ ri->rec.entry);
+
+ } else if (ri->rec.op == GF_FOP_MKNOD) {
+
+ uuid_t gfid;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing mknod for file %s \n",
+ ri->rec.entry);
+
+ // TBD - add mode and flags later
+ uuid_parse(ri->rec.gfid, gfid);
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) {
+ goto err;
+ }
+
+ if (glfs_h_mknod_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, 0777, NULL, gfid, dict) == NULL) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "Failure for Doing mknod for file %s\n",
+ ri->rec.entry);
+ goto err;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finished Doing mknod for file %s \n",
+ ri->rec.entry);
+
+ } else if (ri->rec.op == GF_FOP_MKDIR) {
+
+ uuid_t gfid;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing mkdir for dir %s \n",
+ ri->rec.gfid);
+
+ // TBD - add mode and flags later
+ uuid_parse(ri->rec.gfid, gfid);
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) {
+ goto err;
+ }
+
+ if (glfs_h_mkdir_with_xdata(ctx->fs, obj, ri->rec.entry, 0777, NULL, gfid, dict) != 0) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "Failure for Doing mkdir for file %s\n",
+ ri->rec.entry);
+ goto err;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finished Doing mkdir for file %s \n",
+ ri->rec.entry);
+
+ } else if ((ri->rec.op == GF_FOP_RMDIR) || (ri->rec.op == GF_FOP_UNLINK)) {
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing rmdir/ublink for dir %s \n",
+ ri->rec.entry);
+
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) {
+ goto err;
+ }
+ if (glfs_h_unlink_with_xdata(ctx->fs, obj, ri->rec.entry, dict) != 0) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "Failure for Doing rmdir/unlink for file %s\n",
+ ri->rec.entry);
+ goto err;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finished Doing rmdir/unlink for file %s \n",
+ ri->rec.entry);
+
+ } else if (ri->rec.op == GF_FOP_SYMLINK) {
+
+ uuid_t gfid;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing symlink for file %s to file %s \n",
+ ri->rec.entry, ri->rec.link_path);
+
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) {
+ goto err;
+ }
+ uuid_parse(ri->rec.gfid, gfid);
+
+ if (glfs_h_symlink_with_xdata(ctx->fs, obj, ri->rec.entry, ri->rec.link_path, NULL, gfid, dict) == NULL) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "Failed to Doing symlink for file %s to file %s \n",
+ ri->rec.entry, ri->rec.link_path);
+ goto err;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finished Doing symlink for file %s to file %s \n",
+ ri->rec.entry, ri->rec.link_path);
+
+ } else if (ri->rec.op == GF_FOP_LINK) {
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing hard link for file %s to file %s \n",
+ ri->rec.entry, ri->rec.gfid);
+
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) {
+ goto err;
+ }
+ if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) {
+ goto err;
+ }
+
+ if (glfs_h_link_with_xdata(ctx->fs, to_obj, obj, ri->rec.entry, dict) == -1) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "Failed to Doing hard link for file %s to file %s \n",
+ ri->rec.entry, ri->rec.gfid);
+ goto err;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finsihed doing hard link for file %s to file %s \n",
+ ri->rec.entry, ri->rec.gfid);
+
+ } else if (ri->rec.op == GF_FOP_RENAME) {
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing rename for file %s to file %s \n",
+ ri->rec.entry, ri->rec.newloc);
+
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) {
+ goto err;
+ }
+ if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) {
+ goto err;
+ }
+
+ if (glfs_h_rename_with_xdata(ctx->fs, obj, ri->rec.entry, to_obj, ri->rec.newloc, dict) == -1) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "Failed to Doing rename for file %s to file %s \n",
+ ri->rec.entry, ri->rec.newloc);
+ goto err;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finsihed doing renam for file %s to file %s \n",
+ ri->rec.entry, ri->rec.newloc);
+
+
+ } else if ((ri->rec.op == GF_FOP_SETATTR) || (ri->rec.op == GF_FOP_FSETATTR)) {
+
+ struct iatt iatt = {0, };
+ int valid = 0;
+ int ret = -1;
+
+ // TBD - do the actual settings once we do that
+ // right now we just set the mode so that changelog gets filled
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing attr for file %s \n",
+ ri->rec.gfid);
+
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) {
+ goto err;
+ }
+
+ if (obj->inode->ia_type == IA_IFDIR)
+ fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
+ else
+ fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
+ if (fd == NULL) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "open for file %s failed\n",
+ ri->rec.gfid);
+ goto err;
+ }
+
+ iatt.ia_prot = ia_prot_from_st_mode(777);
+ valid = GF_SET_ATTR_MODE;
+
+
+ // Set one special dict flag to indicate the opcode so that
+ // the opcode gets set to this
+ if (dict_set_int32(dict,"recon-attr-opcode",ri->rec.op)) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "setting opcode to %d failed\n",ri->rec.op);
+ goto err;
+ }
+
+ ret = glfs_fsetattr_with_xdata(fd, &iatt, valid, dict);
+ if (ret == -1) {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "failed Doing attr for file %s \n",
+ ri->rec.gfid);
+ goto err;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing attr for file %s \n",
+ ri->rec.gfid);
+
+ }
+
+ retval = _gf_true;
+
+err:
+ if (fd) {
+ /*
+ * It's not clear that we should be passing the same dict to
+ * glfs_close that was passed to us for glfs_open, but that's
+ * the prior behavior so let's preserve it for now.
+ */
+ if (glfs_close_with_xdata(fd, dict) == -1) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "close failed\n");
+ }
+ }
+ if (obj) {
+ /*
+ * AFAICT fd operations do not borrow this reference, so we
+ * still need to drop it ourselves.
+ */
+ glfs_h_close(obj);
+ }
+ if (to_obj) {
+ /*
+ * AFAICT fd operations do not borrow this reference, so we
+ * still need to drop it ourselves.
+ */
+ glfs_h_close(to_obj);
+ }
+ return retval;
+}
+
+//return back opcodes that requires reading from source
+static gf_boolean_t
+recon_check_changelog(nsr_recon_record_details_t *rd)
+{
+ return((rd->op == GF_FOP_WRITE) ||
+ (rd->op == GF_FOP_FSETATTR) ||
+ (rd-> op == GF_FOP_SETATTR) ||
+ (rd->op == GF_FOP_FREMOVEXATTR) ||
+ (rd->op == GF_FOP_SETXATTR) ||
+ (rd->op == GF_FOP_FSETXATTR) ||
+ (rd->op == GF_FOP_SYMLINK));
+
+}
+
+// TBD
+static gf_boolean_t
+recon_compute_undo(nsr_recon_record_details_t *rd)
+{
+ return(_gf_false);
+}
+
+
+/*
+ * Function that talks to the brick for data tranfer.
+ *
+ * Input arguments:
+ * ctx - worker context
+ * work - pointer to work object
+ */
+static void
+data_worker_func(nsr_per_node_worker_t *ctx,
+ nsr_recon_work_t *work)
+{
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+ xlator_t *this = dr->this;
+ nsr_reconciliation_record_t *ri = NULL;
+ nsr_recon_record_details_t *rd = NULL;
+ int wip = 0;
+ dict_t * dict = NULL;
+ struct glfs_fd *fd = NULL;
+ struct glfs_object *obj = NULL;
+ uuid_t gfid;
+ uint32_t k_s = 0, v_s = 0;
+ char *t_b= NULL;
+ uint32_t num=0;
+
+ switch (work->req_id){
+ case NSR_WORK_ID_INI:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "started data ini \n");
+
+ if (nsr_recon_start_work(ctx, _gf_false) != 0) {
+ ctx->result = -1;
+ return;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished data ini \n");
+ break;
+ case NSR_WORK_ID_FINI:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "started data fini \n");
+
+ if (nsr_recon_end_work(ctx, _gf_false) != 0) {
+ ctx->result = -1;
+ return;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished data fini \n");
+ break;
+ case NSR_WORK_ID_SINGLE_RECONCILIATION_READ:
+ // first_index always starts with 1 but records starts at 0.
+ wip = work->index - (dr->workers[0].recon_info->first_index);
+ ri = &(dr->workers[0].recon_info->records[wip]);
+ rd = &(ri->rec);
+
+ dict = dict_new ();
+ if (!dict) {
+ ctx->result = -1;
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "failed allocating for dictionary\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ ctx->result = -1;
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ ctx->result = -1;
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+
+ switch (rd->op) {
+ case GF_FOP_WRITE:
+
+ // record already copied.
+ // copy data to this node's info.
+
+ uuid_parse(ri->rec.gfid, gfid);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "started recon read for file %s at offset %d at len %d\n",
+ ri->rec.gfid, rd->offset, rd->len);
+
+ obj = glfs_h_create_from_handle (ctx->fs, gfid,
+ GFAPI_HANDLE_LENGTH,
+ NULL);
+ if (obj == NULL) {
+ GF_ASSERT(obj != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "creating of handle failed\n");
+ break;
+ }
+
+ // The file has probably got deleted.
+ fd = glfs_h_open_with_xdata (ctx->fs, obj, O_RDONLY,
+ dict);
+ if (fd == NULL) {
+ GF_ASSERT(fd != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "opening of file failed\n");
+ break;
+ }
+
+ if (glfs_lseek_with_xdata (fd, rd->offset, SEEK_SET,
+ dict) != rd->offset) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "lseek of file failed to offset %d\n",
+ rd->offset);
+ break;
+ }
+
+ ri->work.data = RD_CALLOC (rd->len , sizeof(char),
+ gf_mt_recon_work_data_t);
+ if (glfs_read_with_xdata (fd, ri->work.data, rd->len,
+ 0, dict) != rd->len) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "read of file failed to offset %d for bytes %d\n",
+ rd->offset, rd->len);
+ break;
+ }
+ break;
+
+ case GF_FOP_FTRUNCATE:
+ case GF_FOP_SYMLINK:
+ case GF_FOP_RMDIR:
+ case GF_FOP_UNLINK:
+ case GF_FOP_MKNOD:
+ case GF_FOP_CREATE:
+ case GF_FOP_LINK:
+ case GF_FOP_MKDIR:
+ case GF_FOP_RENAME:
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "unimplemented fop %u\n", rd->op);
+ break;
+
+ case GF_FOP_FREMOVEXATTR:
+ case GF_FOP_REMOVEXATTR:
+ case GF_FOP_SETXATTR:
+ case GF_FOP_FSETXATTR:
+
+ uuid_parse(ri->rec.gfid, gfid);
+
+
+ // This is for all the set attribute/extended
+ // attributes commands. Get all the attributes from
+ // the source and fill it in the buffer as a NULL
+ // seperated key and value which are in turn seperated
+ // by NULL.
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "doing getattr for gfid %s \n",
+ ri->rec.gfid);
+
+ obj = glfs_h_create_from_handle (ctx->fs, gfid,
+ GFAPI_HANDLE_LENGTH,
+ NULL);
+ if (obj == NULL) {
+ GF_ASSERT(fd != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "creating of handle failed\n");
+ break;
+ }
+
+ if (obj->inode->ia_type == IA_IFDIR)
+ fd = glfs_h_opendir_with_xdata (ctx->fs, obj,
+ dict);
+ else
+ fd = glfs_h_open_with_xdata (ctx->fs, obj,
+ O_RDONLY, dict);
+
+ if (fd == NULL) {
+ GF_ASSERT(fd != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "opening of file failed\n");
+ break;
+ }
+
+ if (get_xattr_total_size (fd, &t_b, &k_s, &v_s, &num,
+ dict) == -1) {
+ if (t_b) free(t_b);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "list of xattr of gfid %s failed\n",
+ rd->gfid);
+ break;
+ }
+ ri->work.data = RD_CALLOC ((k_s + v_s) , sizeof(char),
+ gf_mt_recon_work_data_t);
+ if (get_xattr(fd, t_b, ri->work.data, v_s, num, dict) == -1) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "get xattr of gfid %s failed\n", rd->gfid);
+ break;
+ }
+ ri->work.num = num;
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished getattr for gfid %s \n",
+ ri->rec.gfid);
+ free(t_b);
+ break;
+
+ case GF_FOP_FSETATTR:
+ case GF_FOP_SETATTR:
+ //TBD - to get the actual attrbutes and fill
+ // mode, uid, gid, size, atime, mtime
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "unimplemented fop %u\n", rd->op);
+ break;
+ default:
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "unrecognized fop %u\n", rd->op);
+
+ }
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished recon read for gfid %s at offset %d for %d bytes \n",
+ rd->gfid, rd->offset, rd->len);
+ break;
+
+ case NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT:
+ // first_index always starts with 1 but records starts at 0.
+ wip = work->index - (dr->workers[0].recon_info->first_index);
+ ri = &(dr->workers[0].recon_info->records[wip]);
+ rd = &(ri->rec);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got recon commit for index %d that has gfid %s \n",
+ wip, rd->gfid);
+ dict = dict_new ();
+ if (!dict) {
+ ctx->result = -1;
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "failed allocating for dictionary\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ ctx->result = -1;
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ ctx->result = -1;
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+ if (apply_record(ctx, ri, dict) == _gf_false) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "apply_record fails\n");
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished recon commit for gfid %s \n",
+ rd->gfid);
+ break;
+
+ case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH:
+ dict = dict_new ();
+ if (!dict) {
+ ctx->result = -1;
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "failed allocating for dictionary\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ ctx->result = -1;
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ ctx->result = -1;
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+
+ // Increment work index with the start index
+ wip = work->index - (dr->workers[0].recon_info->first_index);
+ ri = &(dr->workers[0].recon_info->records[wip]);
+ rd = &(ri->rec);
+
+ glfs_fsync_with_xdata(fd, dict);
+ break;
+
+ default:
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "unrecognized request id %u\n", work->req_id);
+ }
+
+ if (fd) {
+ glfs_close_with_xdata(fd, dict);
+ }
+ if (obj) {
+ glfs_h_close(obj);
+ }
+ if (dict) {
+ dict_unref(dict);
+ }
+}
+
+// thread for doing data work
+static void *
+data_worker_main(nsr_per_node_worker_t *ctx)
+{
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "starting data worker func\n");
+ init_worker(ctx, 0);
+
+ while(1) {
+ nsr_recon_work_t *work = NULL;
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "waiting for work\n");
+
+ pthread_mutex_lock(&(ctx->mutex));
+ while (list_empty(&(ctx->head.list))) {
+ pthread_cond_wait(&(ctx->cv), &(ctx->mutex));
+ }
+ pthread_mutex_unlock(&(ctx->mutex));
+ list_for_each_entry(work, &(ctx->head.list), list) {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got work with id %d\n",work->req_id);
+ work->in_use = _gf_false;
+ data_worker_func(ctx, work);
+ atomic_dec(&(dr->outstanding));
+ break;
+ }
+ nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n");
+ list_del_init (&work->list);
+ GF_FREE(work);
+ nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n");
+ }
+
+ return NULL;
+}
+
+
+//make recon work
+static void
+recon_make_work(nsr_recon_driver_ctx_t *ctx,
+ nsr_recon_work_t **work,
+ nsr_recon_work_req_id_t req_id,
+ int32_t i)
+{
+ xlator_t *this = ctx->this;
+
+ // TBD - change this to get from a static pool
+ // This cannot fail
+ (*work) = RD_CALLOC (1, sizeof (nsr_recon_work_t), gf_mt_recon_work_t);
+ (*work)->req_id = req_id;
+ (*work)->index = i;
+ (*work)->in_use = _gf_true;
+ INIT_LIST_HEAD(&((*work)->list));
+ return;
+}
+
+// Schedule a work object to a worker thread.
+static void
+recon_queue_to_worker(nsr_recon_driver_ctx_t *ctx,
+ nsr_recon_work_t *work,
+ unsigned int id,
+ nsr_recon_queue_type_t type)
+{
+ nsr_per_node_worker_t *worker;
+ if (type == NSR_RECON_QUEUE_TO_CONTROL) {
+ worker = ctx->workers[id].control_worker;
+ nsr_driver_log(this->name, GF_LOG_INFO,
+ "queueing work to control index %d\n",id);
+ } else {
+ worker= ctx->workers[id].data_worker;
+ nsr_driver_log(this->name, GF_LOG_INFO,
+ "queueing work to data index %d\n",id);
+ }
+ pthread_mutex_lock(&worker->mutex);
+ list_add_tail(&work->list, &worker->head.list);
+ pthread_cond_signal(&worker->cv);
+ pthread_mutex_unlock(&worker->mutex);
+ return;
+}
+
+typedef void * (*F_t) (void *);
+
+// In case mode is set to NSR_USE_THREADS, create worker threads.
+static gf_boolean_t
+create_worker_threads(nsr_recon_private_t *priv,
+ nsr_recon_driver_ctx_t *ctx,
+ nsr_per_node_worker_t *w,
+ gf_boolean_t control_or_data,
+ F_t f,
+ uint32_t num)
+{
+ uint32_t i;
+ nsr_per_node_worker_t *worker = w;
+ xlator_t *this = ctx->this;
+
+ for (i=0; i < num; i++) {
+ worker->id = RD_CALLOC(1, 10, gf_mt_recon_id_t);
+ if (!worker->id) {
+ nsr_driver_log (priv->this->name, GF_LOG_ERROR, "memory allocation error \n");
+ return _gf_false;
+ }
+ sprintf(worker->id,"recon_%d", i);
+ worker->driver_ctx = ctx ;
+
+ if (ctx->mode == NSR_USE_THREADS) {
+ if (pthread_create(&worker->thread_id, NULL, f, worker)) {
+ nsr_driver_log (ctx->this->name, GF_LOG_ERROR, "control work thread creation error \n");
+ return _gf_false;
+ }
+ }
+ worker->index = i;
+ worker++;
+ }
+ return _gf_true;
+}
+
+/*
+ * In case of thread, send the work item; else call the function directly.
+ *
+ * Input arguments:
+ * bm - bitmap containing indices of nodes we want to send work
+ * num - number of such indices
+ * ctx - driver context from where we derive per worker context
+ * id - request ID
+ * q - control or data
+ * misc - used to overload such as index.
+ */
+static void
+send_and_wait(int32_t *result,
+ int32_t *op_errno,
+ int32_t bm,
+ uint32_t num,
+ nsr_recon_driver_ctx_t *ctx,
+ nsr_recon_work_req_id_t id,
+ nsr_recon_queue_type_t q,
+ int32_t misc)
+{
+ uint32_t i = 0;
+ nsr_recon_work_t *work;
+
+#define CONTROL_WORKER(i) ctx->workers[i].control_worker
+#define DATA_WORKER(i) ctx->workers[i].data_worker
+#define WORKER(i) ((q == NSR_RECON_QUEUE_TO_CONTROL) ? (CONTROL_WORKER(i)) : (DATA_WORKER(i)))
+
+ *result = *op_errno = 0;
+
+ for (i=0; i < num; i++) {
+ if ((bm & (1 << i)) && ctx->workers[i].in_use) {
+ WORKER(i)->result = 0;
+ WORKER(i)->op_errno = 0;
+ }
+ }
+ if (ctx->mode == NSR_SEQ) {
+ for (i=0; i < num; i++) {
+ if ((bm & (1 << i)) && ctx->workers[i].in_use) {
+ recon_make_work(ctx,&work, id, misc);
+ if (q == NSR_RECON_QUEUE_TO_CONTROL) {
+ if (i == 0)
+ control_worker_func_0(ctx->workers[0].control_worker, work);
+ else
+ control_worker_func(ctx->workers[i].control_worker, work);
+ } else {
+ data_worker_func(ctx->workers[i].data_worker, work);
+ }
+ GF_FREE(work);
+ }
+ }
+ goto out;
+ }
+
+ for (i=0; i < num; i++) {
+ if ((bm & (1 << i)) && ctx->workers[i].in_use) {
+ recon_make_work(ctx,&work, id, misc);
+ atomic_inc(&(ctx->outstanding));
+ recon_queue_to_worker(ctx, work, i, q);
+ }
+ }
+
+ nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: waiting\n");
+ while (ctx->outstanding) {
+ pthread_yield();
+ }
+out:
+ for (i=0; i < num; i++) {
+ if ((bm & (1 << i)) && ctx->workers[i].in_use) {
+ if (WORKER(i)->result == -1) {
+ *result = -1;
+ }
+ }
+ }
+ if (*result == -1) {
+ for (i=0; i < num; i++) {
+ if ((bm & (1 << i)) && ctx->workers[i].in_use) {
+ if (WORKER(i)->op_errno == EAGAIN) {
+ *op_errno = EAGAIN;
+ break;
+ } else {
+ *op_errno = EIO;
+ }
+ }
+ }
+ }
+
+ nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned with result: %d errno:%d\n", *result, *op_errno);
+ return;
+}
+
+// send INI or FINI
+static int32_t
+nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
+ uint32_t i,
+ gf_boolean_t in_use)
+{
+ uint32_t bm = 1 << i;
+ gf_boolean_t send = _gf_false;
+ int32_t status =0, op_errno = 0;
+
+ send = (ctx->workers[i].in_use != in_use);
+ ctx->workers[i].in_use = in_use;
+
+ if (!send) {
+ return 0;
+ }
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "sending %s to index %d\n",in_use?"INI":"FINI",i);
+
+ send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx,
+ (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto err;
+
+ send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx,
+ (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
+ NSR_RECON_QUEUE_TO_DATA, -1);
+ if (status == -1)
+ goto err;
+
+ /*
+ * We really need better error recovery. To activate a worker, we
+ * allocate memory and send two messages. If any of those actions
+ * fail, we should undo the others. It would probably be good to
+ * collapse the two messages into one, because it's pretty trivial to
+ * allocate a temporary structure and either link it in or free it
+ * depending on the result here.
+ */
+
+ if (in_use == _gf_false) {
+ GF_FREE(ctx->workers[i].recon_info);
+ }
+
+ return 0;
+
+err:
+ GF_FREE(ctx->workers[i].recon_info);
+ ctx->workers[i].recon_info = NULL;
+ return -1;
+}
+
+gf_boolean_t
+nsr_recon_driver_reconciliator (nsr_recon_private_t *priv)
+{
+ uint32_t replica_group_size = priv->replica_group_size;
+ uint32_t i;
+ nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context;
+ int32_t bm;
+ int32_t status = 0;
+ int32_t op_errno = 0;
+ gf_boolean_t do_recon = _gf_false;
+ uint32_t start_index = ctx->workers[0].recon_info->first_index;
+ uint32_t end_index = ctx->workers[0].recon_info->last_index;
+ uint32_t num = ((start_index == 0) && (end_index == 0)) ? 0 : (end_index - start_index + 1);
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "starting reconciliation work as reconciliator \n");
+
+ // nothing to be done? signal back to the recon translator that this
+ // phase done.
+ bm = 1;
+ for (i=1; i < replica_group_size; i++) {
+ if (ctx->workers[i].in_use &&
+ (ctx->workers[0].recon_info->last_term == ctx->workers[i].recon_info->last_term)) {
+ ctx->workers[i].recon_info->last_index = end_index;
+ ctx->workers[i].recon_info->first_index = start_index;
+ bm |= (1 << i);
+ do_recon = _gf_true;
+ }
+ }
+
+ if (!do_recon || !num) {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "nothing needs to be done as resolutor \n");
+ return _gf_true;
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "getting reconciliation window for term %d from %dto %d \n",
+ ctx->workers[0].recon_info->last_term,
+ start_index, end_index);
+ // We have set the bm in the above for loop where we go thru all nodes
+ // including this node that have seen the last term.
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_GET_RECONCILATION_WINDOW,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ return _gf_false;
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished getting reconciliation window for term %d from %dto %d \n",
+ ctx->workers[0].recon_info->last_term,
+ start_index, end_index);
+
+
+ // from the changelogs, calculate the entries that need action and the
+ // source for each of these entries
+ compute_reconciliation_work(ctx);
+
+ // for each of the entries that need fixup, issue IO
+ for (i=start_index; i < (start_index + num); i++) {
+ nsr_reconciliator_info_t *my_recon_info =
+ ctx->workers[0].recon_info;
+ nsr_reconciliation_record_t *record =
+ &(my_recon_info->records[i - start_index]);
+
+ record->work.term = ctx->workers[0].recon_info->last_term;
+ record->work.index = i;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "fixing index %d\n",i);
+ if ((record->work.type == NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE) ||
+ (record->work.type == NSR_RECON_WORK_HOLE_TO_FILL)) {
+ // 1st case (RECON_WORK_HOLE_TO_PSEUDO_HOLE): If there
+ // are only pseudo_holes in others, it is best effort.
+ // Just pick from the first node that has it and
+ // proceed.
+ // 2nd case (RECON_WORK_HOLE_TO_FILL): this node has
+ // either a HOLE or PSUEDO_HOLE and some one else has a
+ // FILL(source). analyse the changelog to check if data
+ // needs to be read or if the log has all the data
+ // required
+
+ if (recon_check_changelog(&record->rec)) {
+ bm = (1 << record->work.source);
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "reading data from source %d\n",record->work.source);
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_READ,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+ if (status == -1)
+ return _gf_false;
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "got data from source %d\n",record->work.source);
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "fixing local data as part of reconciliation\n");
+
+ bm = 1;
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+ if (status == -1)
+ return _gf_false;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished fixing local data as part of reconciliation\n");
+
+ } else if (record->work.type == NSR_RECON_WORK_COMPARE_PSEUDO_HOLE) {
+ // this node has a pseudo_hole and some others have just
+ // that too. Just convert this to FILL. let others
+ // blindly pick it from here.
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "fixing this record as a fill\n");
+ bm = 1;
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+ if (status == -1)
+ return _gf_false;
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished fixing this record as a fill\n");
+ }
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished reconciliation work as reconciliator \n");
+
+ // tbd - mark this term golden in the reconciliator
+ return _gf_true;
+}
+
+gf_boolean_t
+nsr_recon_driver_resolutor (nsr_recon_private_t *priv)
+{
+ uint32_t replica_group_size = priv->replica_group_size;
+ uint32_t i;
+ nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context;
+ int32_t bm;
+ int32_t status = 0;
+ int32_t op_errno = 0;
+ // This node's last term is filled when it gets a message from the
+ // leader to act as a reconciliator.
+ uint32_t recon_index = ctx->reconciliator_index;
+ nsr_reconciliator_info_t *my_info =
+ ctx->workers[0].recon_info;
+ nsr_reconciliator_info_t *his_info =
+ ctx->workers[recon_index].recon_info;
+ uint32_t my_last_term = my_info->last_term;
+ uint32_t to_do_term = his_info->last_term;
+ uint32_t my_start_index = 1, my_end_index = 1;
+ uint32_t his_start_index = 1, his_end_index = 1;
+ uint32_t num = 0;
+ gf_boolean_t fl = _gf_true;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "starting resolutor work with reconciliator as %d from term %d to term %d \n",
+ recon_index, my_last_term, to_do_term);
+
+ do {
+
+ if (!fl) {
+ (his_info->last_term)++;
+ (my_info->last_term)++;
+ } else {
+ his_info->last_term = my_last_term;
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "resolving term %d \n", my_info->last_term);
+
+ // Get reconciliator's term information for that term
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "getting info from reconciliator for term %d \n", my_info->last_term);
+ bm = (1 << recon_index);
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_GET_GIVEN_TERM_INFO,
+ NSR_RECON_QUEUE_TO_CONTROL, his_info->last_term);
+ if (status == -1)
+ return _gf_false;
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished getting info from reconciliator for term %d \n", my_info->last_term);
+
+
+ // empty term
+ if (!his_info->commited_ops) {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "reconciliator for term %d is empty. moving to next term. \n", my_info->last_term);
+ // TBD - mark the term golden
+ fl = _gf_false;
+ continue;
+ }
+
+ // calculate the resolution window boundary. for the last term
+ // this node saw, we compare the resolution window of this and
+ // reconciliator. for the rest of the nodes, we just accept the
+ // reconciliator info.
+ if (fl) {
+ my_start_index = my_info->first_index;
+ my_end_index = my_info->last_index;
+ his_start_index = his_info->first_index;
+ his_end_index = his_info->last_index;
+ my_info->first_index = (my_start_index < his_start_index) ? my_start_index : his_start_index;
+ my_info->last_index = (my_end_index > his_end_index) ? my_end_index : his_end_index;
+ } else {
+ my_info->first_index = his_info->first_index;
+ my_info->last_index = his_info->last_index;
+ my_info->commited_ops = his_info->commited_ops;
+ }
+ if (my_info->first_index == 0)
+ my_info->first_index = 1;
+ num = (my_info->last_index - my_info->first_index) + 1;
+
+
+ // Get the logs from the reconciliator (and this node for this
+ // term)
+ if (fl)
+ bm = ((1 << recon_index) | 1);
+ else
+ bm = (1 << recon_index);
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "getting reconciliation window for term %d from %d to %d \n",
+ my_info->last_term,
+ my_info->first_index, my_info->last_index);
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_GET_RECONCILATION_WINDOW,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ return _gf_false;
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished getting reconciliation window for term %d from %d to %d \n",
+ my_info->last_term,
+ my_info->first_index, my_info->last_index);
+
+ // from the changelogs, calculate the entries that need action
+ compute_resolution_work(ctx, my_info, his_info, !fl);
+
+
+ // for each of the entries that need fixup, issue IO
+ for (i=my_info->first_index; i < (my_info->first_index + num); i++) {
+ nsr_reconciliation_record_t *record =
+ &(my_info->records[i - my_info->first_index]);
+
+ record->work.term = my_info->last_term;
+ record->work.index = i;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "fixing index %d\n",i);
+ if ((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) ||
+ (record->work.type == NSR_RECON_WORK_UNDO_FILL)) {
+ if (((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) &&
+ recon_check_changelog(&record->rec)) ||
+ ((record->work.type == NSR_RECON_WORK_UNDO_FILL) &&
+ recon_compute_undo(&record->rec))) {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "reading data from source %d\n",recon_index);
+ bm = (1 << recon_index);
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_READ,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+ if (status == -1)
+ return _gf_false;
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished reading data from source %d\n",recon_index);
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "fixing local data as part of resolutor\n");
+
+ bm = 1;
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+ if (status == -1)
+ return _gf_false;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished fixing local data as part of resolutor\n");
+ }
+ }
+ fl = _gf_false;
+
+ // tbd - mark this term golden in the reconciliator
+ } while (my_last_term++ != to_do_term);
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished resolutor work \n");
+ return _gf_true;
+}
+
+gf_boolean_t
+nsr_recon_driver_leader (nsr_recon_private_t *priv)
+{
+ uint32_t replica_group_size = priv->replica_group_size;
+ uint32_t i;
+ nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context;
+ int32_t bm;
+ int32_t status = 0;
+ int32_t op_errno = 0;
+ int32_t chosen = -1;
+ int32_t last_term = -1, last_ops = -1;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "getting last term info from all members of this group\n");
+ // Get last term info from all members for this group
+ send_and_wait(&status, &op_errno, -1,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_GET_LAST_TERM_INFO,
+ NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term);
+ if (status == -1)
+ return _gf_false;
+
+
+ // compare all the info received and choose the reconciliator First
+ // choose all with latest term
+ for (i=0; i < replica_group_size; i++) {
+ if (ctx->workers[i].in_use) {
+ if (ctx->workers[i].recon_info->last_term > last_term) {
+ last_term = ctx->workers[i].recon_info->last_term;
+ }
+ }
+ }
+ // First choose all with latest term and highest ops
+ for (i=0; i < replica_group_size; i++) {
+ if ((ctx->workers[i].in_use) && (last_term == ctx->workers[i].recon_info->last_term)) {
+ if (ctx->workers[i].recon_info->commited_ops > last_ops) {
+ last_ops = ctx->workers[i].recon_info->commited_ops;
+ }
+ }
+ }
+ // choose the first among the lot
+ for (i=0; i < replica_group_size; i++) {
+ if ((ctx->workers[i].in_use) &&
+ (last_term == ctx->workers[i].recon_info->last_term) &&
+ (last_ops == ctx->workers[i].recon_info->commited_ops)) {
+ chosen = i;
+ break;
+ }
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "reconciliator chosen is %d\n", chosen);
+ ctx->reconciliator_index = chosen;
+ GF_ASSERT(chosen != -1);
+ if (chosen == -1) {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "no reconciliatior chosen\n");
+ return _gf_false;
+ }
+
+ // send the message to reconciliator to do reconciliation with list of
+ // nodes that are part of this quorum
+ if (chosen != 0) {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "sending reconciliation work to %d\n", chosen);
+ bm = 1 << ctx->reconciliator_index;
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_RECONCILIATOR_DO_WORK,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ return _gf_false;
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished reconciliation work to %d\n", chosen);
+ } else {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "local node is reconciliator. before set jmp\n");
+ nsr_recon_driver_reconciliator(priv);
+ }
+
+ // send message to all other nodes to sync up with the reconciliator
+ // including itself if required
+ // requires optimisation - TBD
+ if (chosen != 0) {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "local node resolution needs to be done. before set jmp\n");
+ nsr_recon_driver_resolutor(priv);
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "sending resolution work to all nodes except this node and reconciliator\n");
+ bm = ~((1 << ctx->reconciliator_index) || 1);
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_RESOLUTION_DO_WORK,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ return _gf_false;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished reconciliation work as leader \n");
+ return _gf_true;
+}
+
+// main recon driver thread
+void *
+nsr_reconciliation_driver(void *arg)
+{
+ nsr_recon_private_t *priv = (nsr_recon_private_t *) arg;
+ uint32_t replica_group_size = priv->replica_group_size;
+ uint32_t i;
+ nsr_per_node_worker_t *control_s, *data_s;
+ nsr_recon_driver_ctx_t **driver_ctx, *ctx;
+ int32_t bm;
+ xlator_t *this = priv->this;
+ char *con_name, *data_name;
+ int32_t status = 0;
+ int32_t op_errno = 0;
+
+ driver_ctx = &priv->driver_thread_context;
+ (*driver_ctx) = GF_CALLOC (1,
+ sizeof (nsr_recon_driver_ctx_t),
+ gf_mt_recon_driver_ctx_t);
+ if (!driver_ctx) {
+ gf_log (this->name, GF_LOG_ERROR, "memory allocation error \n");
+ return NULL;
+ }
+ ctx = *driver_ctx;
+ ctx->this = priv->this;
+ ctx->replica_group_size = replica_group_size;
+
+ ctx->fp = recon_create_log (priv->replica_group_members[0], "nsr-driver-log");
+ if (!ctx->fp)
+ return NULL;
+
+ if ((pthread_mutex_init(&(ctx->mutex), NULL)) ||
+ (pthread_cond_init(&(ctx->cv), NULL))){
+ nsr_driver_log (this->name, GF_LOG_ERROR, "mutex init error \n");
+ return NULL;
+ }
+ INIT_LIST_HEAD(&(ctx->role_head.list));
+
+ ctx->workers = RD_CALLOC (replica_group_size,
+ sizeof(nsr_replica_worker_t),
+ gf_mt_recon_worker_t);
+ if (!ctx->workers) {
+ nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n");
+ return NULL;
+ }
+ for (i=0; i < replica_group_size; i++) {
+ strcpy(ctx->workers[i].name, priv->replica_group_members[i]);
+ }
+
+ control_s = RD_CALLOC (replica_group_size,
+ sizeof(nsr_per_node_worker_t),
+ gf_mt_recon_per_node_worker_t);
+ if (!control_s) {
+ nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n");
+ return NULL;
+ }
+
+ data_s = RD_CALLOC (replica_group_size,
+ sizeof(nsr_per_node_worker_t),
+ gf_mt_recon_per_node_worker_t);
+ if (!data_s) {
+ nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n");
+ return NULL;
+ }
+ for (i=0; i < replica_group_size; i++) {
+ ctx->workers[i].control_worker = &control_s[i];
+ if (asprintf(&con_name,"recon-con-%u",i) < 1) {
+ return NULL;
+ }
+ ctx->workers[i].control_worker->fp = recon_create_log
+ (priv->replica_group_members[0], con_name);
+ if (!ctx->workers[i].control_worker->fp)
+ return NULL;
+ ctx->workers[i].data_worker = &data_s[i];
+ if (asprintf (&data_name,"recon-data-%u",i) <1) {
+ return NULL;
+ }
+ ctx->workers[i].data_worker->fp = recon_create_log
+ (priv->replica_group_members[0], data_name);
+ if (!ctx->workers[i].data_worker->fp)
+ return NULL;
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "creating threads \n");
+ // Create the worker threads
+ // For every brick including itself there will be 2 worker threads:
+ // one for data and one for control
+ if (!create_worker_threads(priv, ctx, control_s, _gf_true,
+ (F_t) control_worker_main, replica_group_size) ||
+ !create_worker_threads(priv, ctx, data_s, _gf_false,
+ (F_t) data_worker_main, replica_group_size)) {
+ return NULL;
+ }
+
+ for (i=0; i < replica_group_size; i++) {
+ nsr_recon_get_file(priv->volname, &(ctx->workers[i]));
+ }
+
+ while (1) {
+
+ nsr_role_work_t *rr;
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "waiting for role to be queued \n");
+ pthread_mutex_lock(&(ctx->mutex));
+ while (list_empty(&(ctx->role_head.list))) {
+ pthread_cond_wait(&(ctx->cv), &(ctx->mutex));
+ }
+ pthread_mutex_unlock(&(ctx->mutex));
+
+ list_for_each_entry(rr, &(ctx->role_head.list), list) {
+ nsr_recon_driver_state_t state;
+ state = nsr_recon_driver_get_role(&status, ctx, rr);
+
+ if (status == -1) {
+ op_errno = EIO;
+ goto out;
+ }
+
+ switch (state) {
+
+ case leader:
+ if (!nsr_recon_driver_leader(priv)) {
+ goto out;
+ }
+ break;
+ case reconciliator:
+ if (!nsr_recon_driver_reconciliator(priv)) {
+ goto out;
+ }
+ break;
+ case resolutor:
+ if (!nsr_recon_driver_resolutor(priv)) {
+ goto out;
+ }
+ break;
+
+ case joiner:
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n");
+ // Get last term info from all members for this group
+ // which will be the leader(this node) and the node that wants to join.
+ send_and_wait(&status, &op_errno, -1,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_GET_LAST_TERM_INFO,
+ NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term);
+ if (status == -1)
+ goto out;
+
+
+ // send message to other node that just joined to sync up with this node which is also the leader
+ nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this\n");
+ bm = ~(1);
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_RESOLUTION_DO_WORK,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished recon work as joiner \n");
+ break;
+
+ default:
+ nsr_driver_log (this->name, GF_LOG_ERROR,
+ "bad state %d", state);
+ }
+
+
+ // free the asasociated recon_info contexts created as part of this role
+
+out:
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "sending end of reconciliation message \n");
+ nsr_recon_return_back(priv, ctx->term, status, op_errno);
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished sending end of reconciliation message \n");
+ }
+ list_del_init (&rr->list);
+ }
+
+ return NULL;
+}
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.h b/xlators/cluster/nsr-recon/src/recon_driver.h
new file mode 100644
index 000000000..3efb26269
--- /dev/null
+++ b/xlators/cluster/nsr-recon/src/recon_driver.h
@@ -0,0 +1,325 @@
+/*
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __RECON_DRIVER_H__
+#define __RECON_DRIVER_H__
+
+
+#include "api/src/glfs.h"
+
+#define MAX_HOSTNAME_LEN 32
+#define MAXIMUM_REPLICA_STRENGTH 8
+#define MAX_RECONCILIATION_WINDOW_SIZE 10000
+
+#define GLUSTERD_DEFAULT_WORKDIR "/var/lib/glusterd"
+#define GLUSTERD_VOLUME_DIR_PREFIX "vols"
+#define GLUSTERD_BRICK_INFO_DIR "bricks"
+
+/*
+ * Even with the names fixed, the non-NSR_DEBUG definitions of nsr_*_log don't
+ * work because many callers don't have "this" defined.
+ *
+ * TBD: use gf_log, fix "this" problem, eliminate extra fields and newlines.
+ */
+#define NSR_DEBUG
+
+typedef enum nsr_recon_work_req_id_t {
+ NSR_WORK_ID_GET_NONE = 0,
+ NSR_WORK_ID_GET_LAST_TERM_INFO = NSR_WORK_ID_GET_NONE + 1,
+ NSR_WORK_ID_GET_GIVEN_TERM_INFO = NSR_WORK_ID_GET_LAST_TERM_INFO + 1,
+ NSR_WORK_ID_RECONCILIATOR_DO_WORK = NSR_WORK_ID_GET_GIVEN_TERM_INFO + 1,
+ NSR_WORK_ID_RESOLUTION_DO_WORK = NSR_WORK_ID_RECONCILIATOR_DO_WORK + 1,
+ NSR_WORK_ID_GET_RECONCILATION_WINDOW = NSR_WORK_ID_RESOLUTION_DO_WORK + 1,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_READ = NSR_WORK_ID_GET_RECONCILATION_WINDOW + 1,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT = NSR_WORK_ID_SINGLE_RECONCILIATION_READ + 1,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH = NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT + 1,
+ NSR_WORK_ID_GET_RESOLUTION_WINDOW = NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH + 1,
+ NSR_WORK_ID_END_RECONCILIATION = NSR_WORK_ID_GET_RESOLUTION_WINDOW + 1,
+ NSR_WORK_ID_INI = NSR_WORK_ID_END_RECONCILIATION + 1,
+ NSR_WORK_ID_FINI = NSR_WORK_ID_INI + 1
+} nsr_recon_work_req_id_t;
+
+typedef enum nsr_recon_queue_type_t {
+ NSR_RECON_QUEUE_TO_CONTROL = 0,
+ NSR_RECON_QUEUE_TO_DATA =NSR_RECON_QUEUE_TO_CONTROL + 1,
+} nsr_recon_queue_type_t;
+
+typedef enum nsr_log_type_t {
+ NSR_LOG_HOLE = 0b0,
+ NSR_LOG_PSEUDO_HOLE = 0b1,
+ NSR_LOG_FILL = 0b11
+} nsr_log_type_t;
+
+typedef enum nsr_mode_t {
+ NSR_SEQ = 0,
+ NSR_USE_THREADS = 1,
+ NSR_ASYNC = 2
+} nsr_mode_t;
+
+typedef enum nsr_recon_work_type_t {
+ NSR_RECON_WORK_NONE = 0,
+ NSR_RECON_WORK_HOLE_TO_NOOP = NSR_RECON_WORK_NONE + 1,
+ NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE = NSR_RECON_WORK_HOLE_TO_NOOP + 1,
+ NSR_RECON_WORK_COMPARE_PSEUDO_HOLE = NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE + 1,
+ NSR_RECON_WORK_HOLE_TO_FILL = NSR_RECON_WORK_COMPARE_PSEUDO_HOLE + 1,
+ NSR_RECON_WORK_UNDO_FILL = NSR_RECON_WORK_HOLE_TO_FILL + 1,
+} nsr_recon_work_type_t;
+
+typedef enum nsr_recon_driver_state_t {
+ none = 0,
+ leader = 1,
+ reconciliator = 2,
+ resolutor = 3,
+ joiner = 4,
+} nsr_recon_driver_state_t;
+
+// role structure
+#pragma pack(push, 1)
+typedef struct _nsr_recon_role_s {
+ uint32_t role; // leader, reconciliator, resolutor
+ uint32_t num; // required in case state is reconciliator
+ uint32_t current_term; // current term used in case of leader
+ // In case this is reconciliator, num is set to nodes that were part
+ // of previous term.
+ // In case this is resolutor, num is set to 2.
+ // info[0] - information for this node.
+ // info[1] - information of the reconciliator.
+ // In case this is leader, num is set to this term's membership list
+ // set info.name to all members including the leader
+ struct {
+ int32_t last_term;
+ int32_t commited_ops;
+ uint32_t last_index;
+ uint32_t first_index;
+ char name[MAX_HOSTNAME_LEN];
+ } info[MAXIMUM_REPLICA_STRENGTH];
+} nsr_recon_role_t;
+#pragma pack(pop)
+
+#define ENDIAN_CONVERSION_RR(rr, is_true) \
+{ \
+ uint32_t i=0; \
+ uint32_t (*f)(uint32_t) = ((is_true == _gf_true) ? ntohl : htonl); \
+ if (is_true == _gf_true) rr.num = f(rr.num); \
+ rr.current_term = f(rr.current_term); \
+ for (i=0; i < rr.num; i++) { \
+ rr.info[i].last_term = f(rr.info[i].last_term); \
+ rr.info[i].commited_ops = f(rr.info[i].commited_ops); \
+ rr.info[i].last_index = f(rr.info[i].last_index); \
+ rr.info[i].first_index = f(rr.info[i].first_index); \
+ } \
+ if (is_true == _gf_false) rr.num = f(rr.num); \
+}
+
+// last term info structure
+#pragma pack(push, 1)
+typedef struct _nsr_recon_last_term_info_s {
+ int32_t last_term;
+ int32_t commited_ops;
+ uint32_t last_index;
+ uint32_t first_index;
+} nsr_recon_last_term_info_t;
+#pragma pack(pop)
+
+#define ENDIAN_CONVERSION_LT(lt, is_true) \
+{ \
+ uint32_t (*f)(uint32_t) = ((is_true == _gf_true) ? ntohl : htonl); \
+ lt.last_term = f(lt.last_term); \
+ lt.commited_ops = f(lt.commited_ops); \
+ lt.last_index = f(lt.last_index); \
+ lt.first_index = f(lt.first_index); \
+}
+
+// log information
+#pragma pack(push, 1)
+typedef struct _nsr_recon_log_info_s {
+ uint32_t term;
+ uint32_t first_index;
+ uint32_t last_index;
+} nsr_recon_log_info_t;
+#pragma pack(pop)
+
+#define ENDIAN_CONVERSION_LI(li, is_true) \
+{ \
+ uint32_t (*f)(uint32_t) = ((is_true == _gf_true) ? ntohl : htonl); \
+ li.term = f(li.term); \
+ li.first_index = f(li.first_index); \
+ li.last_index = f(li.last_index); \
+}
+
+#pragma pack(push, 1)
+typedef struct nsr_recon_record_details_s {
+ uint32_t type;
+ uint32_t op;
+ char gfid[36+1];
+ char pargfid[36+1];
+ char link_path[256]; // should it be PATH_MAX?
+ uint32_t offset;
+ uint32_t len;
+ char entry[128];
+ char newloc[128]; // for rename. can you overload link_path for this? TBD
+ mode_t mode;
+} nsr_recon_record_details_t;
+#pragma pack(pop)
+
+#define ENDIAN_CONVERSION_RD(rd, is_true) \
+{ \
+ uint32_t (*f)(uint32_t) = ((is_true == _gf_true) ? ntohl : htonl); \
+ rd.type = f(rd.type); \
+ rd.op = f(rd.op); \
+ rd.offset = f(rd.offset); \
+ rd.len = f(rd.len); \
+}
+
+typedef struct _nsr_role_work_s {
+ nsr_recon_role_t role;
+ uint32_t term;
+ struct list_head list;
+} nsr_role_work_t;
+
+typedef struct _nsr_recon_work_s {
+ gf_boolean_t in_use;
+ uint32_t index;
+ uint32_t req_id;
+ struct list_head list;
+} nsr_recon_work_t;
+
+typedef struct _nsr_reconciliation_work_s {
+ uint32_t term;
+ uint32_t index;
+ uint32_t type;
+ uint32_t source;
+ void *data;
+
+ uint32_t num; // used for xattr
+
+} nsr_reconciliation_work_t;
+
+typedef struct _nsr_reconciliation_record_s {
+ nsr_reconciliation_work_t work; // will store the computed work
+ nsr_recon_record_details_t rec;
+} nsr_reconciliation_record_t;
+
+typedef struct _nsr_reconciliator_info {
+ uint32_t reconcilator_index;
+ int32_t last_term;
+ int32_t commited_ops;
+ uint32_t last_index;
+ uint32_t first_index;
+ //nsr_reconciliation_record_t records[MAX_RECONCILIATION_WINDOW_SIZE];
+ nsr_reconciliation_record_t *records;
+} nsr_reconciliator_info_t;
+
+typedef struct _nsr_per_node_worker_s {
+ char *id; // identifier
+ char vol_file[256]; //volfile that will be used by this thread
+ glfs_t *fs;
+ glfs_fd_t *aux_fd;
+ uint32_t index; // index into array of workers
+ pthread_t thread_id; // thread id
+ void * context; // thread context
+ struct _nsr_recon_driver_ctxt *driver_ctx;
+ char local; // local data worker
+ //struct list_head list; //list of work items
+ nsr_recon_work_t head;
+ pthread_mutex_t mutex; //mutex to guard the state
+ pthread_cond_t cv; //condition variable for signaling the worker thread
+ gf_boolean_t is_control;
+#if defined(NSR_DEBUG)
+ FILE *fp;
+#endif
+ int32_t result; // result of latest work
+ int32_t op_errno; // errno
+} nsr_per_node_worker_t;
+
+typedef struct _nsr_replica_worker_s {
+ char name[256];
+ nsr_per_node_worker_t *control_worker;
+ nsr_per_node_worker_t *data_worker;
+ gf_boolean_t in_use;
+ nsr_reconciliator_info_t *recon_info; // Bunch of infos kept for this reconciliation
+} nsr_replica_worker_t;
+
+typedef struct _nsr_recon_driver_ctxt {
+ xlator_t *this;
+ uint32_t replica_group_size; // number of static members of replica group
+ nsr_replica_worker_t *workers; // worker info
+ int32_t reconciliator;
+ pthread_mutex_t mutex;
+ pthread_cond_t cv;
+ nsr_role_work_t role_head;
+ volatile int32_t outstanding;
+ uint32_t reconciliator_index;
+ uint32_t term;
+ uint32_t current_term;
+ nsr_mode_t mode; // default set to seq
+#if defined(NSR_DEBUG)
+ FILE *fp;
+#endif
+} nsr_recon_driver_ctx_t;
+
+void *
+nsr_reconciliation_driver(void *);
+
+gf_boolean_t
+nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, uint32_t term);
+
+#define atomic_inc(ptr) ((void) __sync_fetch_and_add(ptr, 1))
+#define atomic_dec(ptr) ((void) __sync_fetch_and_add(ptr, -1))
+#define atomic_fetch_and __sync_fetch_and_and
+#define atomic_fetch_or __sync_fetch_and_or
+
+#if defined(NSR_DEBUG)
+
+#define NSR_LOG_DIR "/var/log/nsr-logs"
+
+extern int nsr_debug_level;
+extern FILE *recon_create_log (char *member, char *module);
+
+extern void
+_nsr_driver_log (const char *func, int line, char *member, FILE *fp,
+ char *fmt, ...);
+
+#define nsr_driver_log(dom, levl, fmt...) do { \
+ FMT_WARN (fmt); \
+ if (levl <= nsr_debug_level) { \
+ nsr_recon_private_t *priv = ctx->this->private; \
+ _nsr_driver_log (__FUNCTION__, __LINE__, \
+ priv->replica_group_members[0], \
+ ctx->fp, \
+ ##fmt); \
+ } \
+} while (0)
+
+extern void
+_nsr_worker_log (const char *func, int line, char *member,
+ char *type, uint32_t index, FILE *fp,
+ char *fmt, ...);
+
+#define nsr_worker_log(dom, levl, fmt...) do { \
+ FMT_WARN (fmt); \
+ if (levl <= nsr_debug_level) { \
+ nsr_recon_private_t *priv; \
+ priv = ctx->driver_ctx->this->private; \
+ _nsr_worker_log (__FUNCTION__, __LINE__, \
+ priv->replica_group_members[0], \
+ ctx->is_control ? "recon-con" : \
+ "recon-data", \
+ ctx->index, ctx->fp, \
+ ##fmt); \
+ } \
+} while (0)
+
+#else
+#define nsr_driver_log(dom, levl, fmt...) gf_log(dom, levl, fmt)
+#define nsr_worker_log(dom, levl, fmt...) gf_log(dom, levl, fmt)
+#endif
+
+#endif /* #ifndef __RECON_DRIVER_H__ */
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.c b/xlators/cluster/nsr-recon/src/recon_xlator.c
new file mode 100644
index 000000000..272c35dc2
--- /dev/null
+++ b/xlators/cluster/nsr-recon/src/recon_xlator.c
@@ -0,0 +1,1010 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include <sys/types.h>
+#include <fcntl.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "call-stub.h"
+#include "defaults.h"
+#include "xlator.h"
+
+#include "recon_driver.h"
+#include "recon_xlator.h"
+
+typedef struct _nsr_recon_fd_s {
+ int32_t term;
+ nsr_recon_driver_state_t state;
+ uint32_t first_index;
+ uint32_t last_index;
+ call_frame_t *frame;
+} nsr_recon_fd_t;
+
+#if defined(NSR_DEBUG)
+
+void
+_recon_main_log (const char *func, int line, char *member, FILE *fp,
+ char *fmt, ...)
+{
+ va_list ap;
+ char *buf = NULL;
+ int retval;
+
+ if (!fp) {
+ fp = recon_create_log(member,"recon-main-log");
+ if (!fp) {
+ return;
+ }
+ }
+
+ va_start(ap,fmt);
+ retval = vasprintf(&buf,fmt,ap);
+ if (buf) {
+ fprintf(fp,"[%s:%d] %.*s\n",func,line,retval,buf);
+ free(buf);
+ }
+ va_end(ap);
+}
+
+#endif
+
+// Given fd, get back the NSR based fd context.
+static int32_t this_fd_ctx_get(fd_t *fd, xlator_t *this, nsr_recon_fd_t **rfd)
+{
+ uint64_t tmp = 0;
+ int32_t ret = -1;
+
+ if ((ret = fd_ctx_get(fd, this, &tmp)) != 0) {
+ return ret;
+ } else {
+ *rfd = (nsr_recon_fd_t *)tmp;
+ return 0;
+ }
+}
+
+// Add the frame in q after associating with term
+// term usage tbd
+static void put_frame(nsr_recon_private_t *priv,
+ call_frame_t *frame,
+ uint32_t term)
+{
+ xlator_t *this = priv->this;
+ recon_main_log (this->name, GF_LOG_INFO, "adding frame for term %d \n", term);
+ priv->frame = frame;
+ return;
+}
+
+// get the frame from the queue given the term
+// term usage tbd
+static void get_frame(nsr_recon_private_t *priv,
+ call_frame_t **frame,
+ uint32_t term)
+{
+ if (frame != NULL)
+ *frame = priv->frame;
+ priv->frame = NULL;
+ return;
+}
+
+// check if there are outstanding frames
+static gf_boolean_t is_frame(nsr_recon_private_t *priv)
+{
+ return((priv->frame != NULL) ? _gf_true : _gf_false);
+}
+
+#define ENTRY_SIZE 128
+
+long
+get_entry_count (char *path)
+{
+ int fd;
+ struct stat buf;
+ unsigned long entries = -1;
+ long min; /* last entry not known to be empty */
+ long max; /* first entry known to be empty */
+ long curr;
+ char entry[ENTRY_SIZE];
+ void *err_label = &&done;
+
+ fd = open(path,O_RDONLY);
+ if (fd < 0) {
+ goto *err_label;
+ }
+ err_label = &&close_fd;
+
+ if (fstat(fd,&buf) < 0) {
+ goto *err_label;
+ }
+
+ min = 0;
+ max = buf.st_size / ENTRY_SIZE;
+ printf("max = %ld\n",max);
+
+ while ((min+1) < max) {
+ curr = (min + max) / 2;
+ printf("trying entry %ld\n",curr);
+ if (lseek(fd,curr*ENTRY_SIZE,SEEK_SET) < 0) {
+ goto *err_label;
+ }
+ if (read(fd,entry,sizeof(entry)) != sizeof(entry)) {
+ goto *err_label;
+ }
+ if ((entry[0] == '_') && (entry[1] == 'P')) {
+ min = curr;
+ }
+ else {
+ max = curr;
+ }
+ }
+
+ entries = max;
+
+close_fd:
+ close(fd);
+done:
+ return entries;
+}
+
+// Get the term info for the term number specified
+void nsr_recon_libchangelog_get_this_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt)
+{
+ char path[PATH_MAX];
+ long entries;
+
+ bzero(lt, sizeof(nsr_recon_last_term_info_t));
+ lt->last_term = term;
+ sprintf(path,"%s/%s%d",bp,"TERM.",term);
+ entries = get_entry_count(path);
+ if (entries > 1) {
+ /* The first entry is actually a header. */
+ lt->first_index = 1;
+ /*
+ * This seems wrong, because it means that last_index*128 will
+ * be exactly at EOF and commited_ops will be one greater than
+ * it should be. Maybe some other code makes the exact
+ * opposite mistake to compensate.
+ */
+ lt->last_index = lt->commited_ops = (int)entries;
+ }
+ recon_main_log (this->name, GF_LOG_INFO, "for term=%d got first_index=%d last_index=%d commited_ops=%d\n",
+ term, lt->first_index, lt->last_index, lt->commited_ops);
+ return;
+}
+
+// Given the term number, find the last term in the changelogs
+void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt)
+{
+ uint32_t t = term;
+ struct stat buf;
+ char path[PATH_MAX];
+ bzero(lt, sizeof(nsr_recon_last_term_info_t));
+ while(t) {
+ // journal file is of type TERM-1.jnl
+ sprintf(path,"%s/%s%d",bp,"TERM.",t);
+ if (!stat(path, &buf)) {
+ nsr_recon_libchangelog_get_this_term_info(this, bp, t, lt);
+ recon_main_log (this->name, GF_LOG_INFO, "got last term given current term %d as %d\n", term, t);
+ return;
+ }
+ t--;
+ }
+ recon_main_log (this->name, GF_LOG_INFO, "got no last term given current term %d \n", term);
+
+ return;
+}
+
+// Return back the frame stored against the term
+void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term, int32_t status, int32_t op_errno)
+{
+ call_frame_t *old_frame = NULL;
+ xlator_t *this = priv->this;
+
+ get_frame(priv, &old_frame, term);
+ if (old_frame) {
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev returns old frame \n");
+ // first return the original write for which this ack was sent
+ STACK_UNWIND_STRICT (writev, old_frame, status, op_errno, NULL, NULL, NULL);
+ } else {
+ recon_main_log (this->name, GF_LOG_ERROR, "EIII---nsr_recon_writev cnnot return old frame \n");
+ }
+}
+
+typedef enum records_type_t {
+ fop_gfid_pgfid_oldloc_newloc = 1,
+ fop_gfid_pgfid_entry = fop_gfid_pgfid_oldloc_newloc + 1,
+ fop_gfid = fop_gfid_pgfid_entry + 1 ,
+ fop_gfid_offset = fop_gfid + 1,
+ fop_gfid_offset_len = fop_gfid_offset + 1,
+} records_type_t;
+
+// Get the backend ./glusterfs/xx/xx/<...> path
+static void
+get_gfid_path(nsr_recon_private_t *priv, char *gfid, char *path)
+{
+ strcpy(path, priv->base_dir);
+ strcat(path, "/.glusterfs/");
+ strncat(path,gfid,2);
+ strcat(path,"/");
+ strncat(path,gfid+2,2);
+ strcat(path,"/");
+ strcat(path,gfid);
+}
+
+
+// Get the link to which backend points to
+static gf_boolean_t
+get_link_using_gfid(nsr_recon_private_t *priv, char *gfid, char *path)
+{
+ char lp[PATH_MAX];
+ xlator_t *this = priv->this;
+ get_gfid_path(priv,gfid, lp);
+ if (readlink(lp, path, 255) == -1) {
+ GF_ASSERT(0);
+ recon_main_log(priv->this, GF_LOG_ERROR,
+ "cannot get readlink for %s\n",lp);
+ return _gf_false;
+ }
+ return _gf_true;
+}
+
+// Get the list of changelog records given a term , first and last index.
+//
+// TBD: rewrite this hideous ball of mud in at least the following ways:
+//
+// (1) Break out the code for handling a single record into a separate
+// function, to make error handling easier and reduce "indentation
+// creep" so the code's readable.
+//
+// (2) Change all of the fop_xxx_yyy nonsense to OR together values
+// like FOP_HAS_FIELD_XXX and FOP_HAS_FIELD_YYY, to reduce code
+// duplication and facilitate the addition of new fields.
+//
+// (3) Stop making so many assumptions about the underlying formats.
+// The code as it is won't even work for the existing binary format,
+// let alone as changelog evolves over time.
+//
+// Really, 90% of this code should just GO AWAY in favor of using
+// libgfchangelog, enhanced as necessary to support our needs.
+
+/*
+ * Use this macro to skip over a field we're not using yet.
+ * NB: the body is a null statement on purpose
+ * TBD: all instances of this should be removed eventually!
+ */
+#define SKIP_FIELD do /* nothing */ ; while (*(start++) != '\0')
+
+#define SKIP_OVER
+gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf)
+{
+ // do a mmap; seek into the first and read all records till last.
+ // TBD - right now all records are pseudo holes but mark them as fills.
+ // TBD - pseudo hole to be implemented when actual fsync gets done on data.
+ char *rb = NULL, *orig = NULL;
+ char path[PATH_MAX];
+ int fd;
+ uint32_t index = 0;
+
+ recon_main_log (this->name, GF_LOG_INFO,
+ "libchangelog_get_records called for term %d index from %d to %d \n",
+ term, first, last );
+
+ orig = rb = GF_CALLOC(128, ((last - first) + 1),
+ gf_mt_recon_changelog_buf_t);
+
+ sprintf(path,"%s/%s%d",bp,"TERM.",term);
+ fd = open(path, O_RDONLY);
+ if (fd == -1) {
+ return _gf_false;
+ } else {
+ char *start = NULL;
+ nsr_recon_record_details_t * rec = (nsr_recon_record_details_t *)buf;
+
+ if (first == 0)
+ lseek(fd, 128, SEEK_SET);
+ else
+ lseek(fd, first * 128, SEEK_SET);
+ if (read(fd, rb, (last - first + 1) * 128) == -1) {
+ return _gf_false;
+ }
+ start = rb;
+ index = first;
+ do {
+ recon_main_log (this->name, GF_LOG_INFO,
+ "libchangelog_get_records start inspecting records at index %d \n",
+ index );
+ if (!strncmp(start, "_PRE_", 5)) {
+ uint32_t i;
+ uint32_t opcode = 0;
+ records_type_t type;
+
+ start += 5;
+ // increment by the NULLs after the PRE
+ start += 4;
+ SKIP_FIELD; // real index
+ // now we have the opcode
+ while (*start != '\0') {
+ opcode *= 10;
+ opcode += (*(start++) - '0');
+ }
+ ++start;
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "libchangelog_get_records: got opcode %d @index %d\n", opcode, index);
+ if ((opcode == GF_FOP_RENAME)) {
+ type = fop_gfid_pgfid_oldloc_newloc;
+ } else if ((opcode == GF_FOP_UNLINK) ||
+ (opcode == GF_FOP_RMDIR) ||
+ (opcode == GF_FOP_LINK) ||
+ (opcode == GF_FOP_MKDIR) ||
+ (opcode == GF_FOP_SYMLINK) ||
+ (opcode == GF_FOP_MKNOD) ||
+ (opcode == GF_FOP_CREATE)) {
+ type = fop_gfid_pgfid_entry;
+ } else if ((opcode == GF_FOP_FSETATTR) ||
+ (opcode == GF_FOP_SETATTR) ||
+ (opcode == GF_FOP_FREMOVEXATTR) ||
+ (opcode == GF_FOP_REMOVEXATTR) ||
+ (opcode == GF_FOP_SETXATTR) ||
+ (opcode == GF_FOP_FSETXATTR)) {
+ type = fop_gfid;
+ } else if ((opcode == GF_FOP_TRUNCATE) ||
+ (opcode == GF_FOP_FTRUNCATE)) {
+ type = fop_gfid_offset;
+ } else if (opcode == GF_FOP_WRITE) {
+ type = fop_gfid_offset_len;
+ } else {
+ recon_main_log (this->name,
+ GF_LOG_ERROR,
+ "libchangelog_get_records:got no proper opcode %d @index %d\n",
+ opcode, index);
+ //GF_ASSERT(0);
+ // make this as a hole.
+ // TBD - check this logic later. maybe we should raise alarm here because
+ // this means that changelog is corrupted. We are not handling changelog
+ // corruptions as of now.
+ rec->type = NSR_LOG_HOLE;
+ goto finish;
+ }
+ // TBD - handle psuedo holes once that logic is in.
+ rec->type = NSR_LOG_FILL;
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "libchangelog_get_records:got type %d at index %d \n",
+ rec->type, index);
+ rec->op = opcode;
+
+ // Now get the gfid and parse it
+ // before that increment the pointer
+ for (i=0; i < 36; i++) {
+ rec->gfid[i] = (*start);
+ start++;
+ }
+ rec->gfid[i] = '\0';
+
+ GF_ASSERT(*start == 0);
+ start ++;
+
+ if (opcode == GF_FOP_SYMLINK) {
+ i = 0;
+ do {
+ if (i >= 256) {
+ goto finish;
+ }
+ rec->link_path[i++] = *start;
+ } while (*(start++) != '\0');
+ }
+
+ i = 0;
+ // If type is fop_gfid_offset+_len, get offset
+ if ((type == fop_gfid_offset) || (type == fop_gfid_offset_len)) {
+ char offset_str[128];
+ while(*start != 0) {
+ offset_str[i++] = *start;
+ start ++;
+ }
+ offset_str[i] = '\0';
+ // get over the 0
+ start++;
+ rec->offset = strtoul(offset_str, NULL, 10);
+ recon_main_log (this->name,
+ GF_LOG_ERROR,
+ "libchangelog_get_records:got offset %d @index %d \n", rec->offset, index);
+
+ }
+ i = 0;
+ if (type == fop_gfid_offset_len) {
+ char len_str[128];
+ while(*start != 0) {
+ len_str[i++] = *start;
+ start ++;
+ }
+ len_str[i] = '\0';
+ // get over the 0
+ start++;
+ rec->len = strtoul(len_str, NULL, 10);
+ recon_main_log (this->name,
+ GF_LOG_ERROR,
+ "libchangelog_get_records:got length %d @index %d \n", rec->len, index);
+ }
+ i = 0;
+ if (type == fop_gfid_pgfid_entry) {
+ switch (opcode) {
+ case GF_FOP_CREATE:
+ case GF_FOP_MKDIR:
+ case GF_FOP_MKNOD:
+ SKIP_FIELD; // mode
+ break;
+ /* TBD: handle GF_FOP_SYMLINK target */
+ default:
+ ;
+ }
+ SKIP_FIELD; // uid
+ SKIP_FIELD; // gid
+ if (opcode == GF_FOP_MKNOD) {
+ SKIP_FIELD; // dev
+ }
+ // first get the gfid and then the path
+ for (i=0; i < 36; i++) {
+ rec->pargfid[i] = (*start);
+ start++;
+ }
+ rec->pargfid[i] = '\0';
+ GF_ASSERT(*start == '/');
+ start ++;
+
+ i = 0;
+ while(*start != 0) {
+ rec->entry[i++] = *start;
+ start ++;
+ }
+ rec->entry[i] = '\0';
+ // get over the 0
+ start++;
+ /*
+ * Having to add this as a special case
+ * is awful. See the function header
+ * comment for the real solution.
+ */
+ if (opcode == GF_FOP_CREATE) {
+ rec->mode = 0;
+ while (*start != '\0') {
+ rec->mode *= 10;
+ rec->mode += *start
+ - '0';
+ ++start;
+ }
+ ++start;
+ }
+ recon_main_log (this->name,
+ GF_LOG_ERROR,
+ "libchangelog_get_records:got entry %s @index %d \n", rec->entry, index);
+
+ }
+ i = 0;
+ if (type == fop_gfid_pgfid_oldloc_newloc) {
+
+ // first get the source and then the destination
+ // source stuff gets stored in pargfid/entry
+ for (i=0; i < 36; i++) {
+ rec->pargfid[i] = (*start);
+ start++;
+ }
+ rec->pargfid[i] = '\0';
+ GF_ASSERT(*start == '/');
+ start ++;
+
+ i=0;
+ while(*start != 0) {
+ rec->entry[i++] = *start;
+ start ++;
+ }
+ rec->entry[i] = '\0';
+ // get over the 0
+ start++;
+
+ // dst stuff gets stored in gfid/newloc
+ for (i=0; i < 36; i++) {
+ rec->gfid[i] = (*start);
+ start++;
+ }
+ rec->gfid[i] = '\0';
+ GF_ASSERT(*start == '/');
+ start ++;
+ i = 0;
+ while(*start != 0) {
+ rec->newloc[i++] = *start;
+ start ++;
+ }
+ rec->newloc[i] = '\0';
+ // get over the 0
+ start++;
+
+ }
+ ENDIAN_CONVERSION_RD((*rec), _gf_false); //htonl
+ }
+finish:
+ if (index == last)
+ break;
+ index++;
+ rb += 128;
+ start = rb;
+ rec++;
+ } while(1);
+ }
+ GF_FREE(orig);
+ close(fd);
+
+ recon_main_log (this->name, GF_LOG_INFO,
+ "libchangelog_get_records finsihed inspecting records for term %d \n",
+ term);
+ return _gf_true;
+}
+
+int32_t
+nsr_recon_open (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int32_t flags, fd_t *fd, dict_t *xdata)
+{
+ int32_t op_ret = 0;
+ int32_t op_errno = 0;
+ nsr_recon_fd_t *rfd = NULL;
+
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_open called for path %s \n",loc->path );
+ rfd = GF_CALLOC (1, sizeof (*rfd), gf_mt_recon_fd_t);
+ if (!rfd) {
+ op_ret = -1;
+ op_errno = ENOMEM;
+ }
+
+ op_ret = fd_ctx_set (fd, this, (uint64_t)(long)rfd);
+ if (op_ret) {
+ op_ret = -1;
+ op_errno = EINVAL;
+ }
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_open returns with %d for path %s \n",op_ret,loc->path );
+ STACK_UNWIND_STRICT (open, frame, op_ret, op_errno, fd, NULL);
+ return 0;
+}
+
+int32_t
+nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
+ struct iovec *vector, int32_t count, off_t offset,
+ uint32_t flags, struct iobref *iobref, dict_t *xdata)
+{
+ nsr_recon_fd_t *rfd = NULL;
+ nsr_recon_private_t *priv = NULL;
+ int32_t op_ret = 0;
+ int32_t op_errno = 0;
+ int32_t ret = 0;
+
+ ret = this_fd_ctx_get (fd, this, &rfd);
+ if (ret < 0) {
+ return -1;
+ }
+ priv = (nsr_recon_private_t *)this->private;
+
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called for offset %d \n",(unsigned int)offset );
+ GF_ASSERT(count == 1);
+ switch (offset) {
+ // client(brick, leader) writes the role of the node
+ case nsr_recon_xlator_sector_1 :
+ {
+ nsr_recon_role_t rr;
+ memcpy((void *)&rr, (void *)vector[0].iov_base, sizeof(rr));
+ ENDIAN_CONVERSION_RR(rr, _gf_true); //ntohl
+
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called to set role %d\n", rr.role);
+ if ((rr.role != leader) &&
+ (rr.role != reconciliator) &&
+ (rr.role != resolutor) &&
+ (rr.role != joiner)) {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "EIII---nsr_recon_writev cannot set state \n");
+ STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ }
+
+ GF_ASSERT(rr.num <= MAXIMUM_REPLICA_STRENGTH);
+
+ // Check if already a role play is going on. If yes return with EAGAIN.
+ // Ideally we should check if we have got a higher term number while
+ // servicing a lower term number; if so abort the older one.
+ // However the abort infrastructure needs to be sketched properly; TBD.
+ if (is_frame(priv) == _gf_true) {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "nsr_recon_writev set_role - already role play \n");
+ STACK_UNWIND_STRICT (writev, frame, -1, EAGAIN,
+ NULL, NULL, NULL);
+ } else {
+
+ // Store the stack frame so that when the actual job gets finished
+ // we send the response back to the brick.
+ put_frame(priv, frame, rr.current_term);
+ if (nsr_recon_driver_set_role(priv->driver_thread_context,
+ &rr,
+ rr.current_term) == _gf_false) {
+ get_frame(priv, NULL, rr.current_term);
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "nsr_recon_writev set_role - cannot seem to set role \n");
+ STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ } else {
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_writev set_role - set role succesfully \n");
+ }
+ }
+ break;
+ }
+ // client(reconciliator) writes how much it needs for the read
+ case nsr_recon_xlator_sector_2 :
+ {
+ nsr_recon_log_info_t li;
+ memcpy((void *)&li, (void *)vector[0].iov_base, sizeof(li));
+ ENDIAN_CONVERSION_LI(li, _gf_true); //ntohl
+
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_writev - setting term info for reconcilation info. term=%d, first_index=%d,start_index=%d \n",
+ li.term, li.first_index, li.last_index);
+ rfd->term = li.term;
+ rfd->last_index = li.last_index;
+ rfd->first_index = li.first_index;
+ STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno,
+ NULL, NULL, NULL);
+ break;
+ }
+ // client(reconciliator) writes term for which it needs info
+ case nsr_recon_xlator_sector_3 :
+ {
+ int32_t term;
+
+ memcpy((void *)&term, (void *)vector[0].iov_base, sizeof(term));
+ term = ntohl(term); //ntohl
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_writev - setting term info for term info. term=%d\n",
+ term);
+ rfd->term = term;
+ STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno,
+ NULL, NULL, NULL);
+ break;
+ }
+ // client(reconciliator) writes current term so that it gets last term info later
+ case nsr_recon_xlator_sector_4 :
+ {
+ int32_t term;
+
+ memcpy((void *)&term, (void *)vector[0].iov_base, sizeof(term));
+ term = ntohl(term); //ntohl
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_writev - setting term info for last term info given current term=%d\n",
+ term);
+ rfd->term = term;
+ STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno,
+ NULL, NULL, NULL);
+ break;
+ }
+ default:
+ {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "nsr_recon_writev called with wrong offset\n");
+ STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ break;
+ }
+ }
+
+ return 0;
+}
+
+int
+nsr_recon_readv (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, size_t size, off_t offset, uint32_t flags, dict_t *xdata)
+{
+ nsr_recon_fd_t *rfd = NULL;
+ int32_t op_errno = 0;
+ // copied stuff from quick-read.c and posix.c
+ struct iobuf *iobuf = NULL;
+ struct iobref *iobref = NULL;
+ struct iovec iov = {0, };
+ int32_t ret = -1;
+ nsr_recon_private_t *priv = NULL;
+
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool, size);
+ if (!iobuf) {
+ op_errno = ENOMEM;
+ goto out;
+ }
+
+ iobref = iobref_new ();
+ if (!iobref) {
+ op_errno = ENOMEM;
+ goto out;
+ }
+
+ iobref_add (iobref, iobuf);
+
+ ret = this_fd_ctx_get (fd, this, &rfd);
+ if (ret < 0) {
+ op_errno = -ret;
+ goto out;
+ }
+ priv = (nsr_recon_private_t *)this->private;
+
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_readv called for offset %d \n",(unsigned int)offset );
+ switch (offset) {
+ // client(leader) reads from here to get info for this term on this node
+ // invole libchagelog to get the information
+ case nsr_recon_xlator_sector_3 :
+ {
+ nsr_recon_last_term_info_t lt;
+ GF_ASSERT(size == sizeof(lt));
+ nsr_recon_libchangelog_get_this_term_info(this,priv->changelog_base_path, rfd->term, &lt);
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_readv - getting term info for term=%d, ops=%d, first=%d, last=%d\n",
+ rfd->term, lt.commited_ops, lt.first_index, lt.last_index);
+ ENDIAN_CONVERSION_LT(lt, _gf_false); //htonl
+ memcpy(iobuf->ptr, &lt, size);
+ goto out;
+ }
+ // client(reconciliator) reads individual record information
+ case nsr_recon_xlator_sector_2 :
+ {
+ uint32_t num = (rfd->last_index - rfd->first_index + 1);
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_readv - expected size %lu got size %lu\n",
+ (num * sizeof(nsr_recon_record_details_t)), size);
+
+ GF_ASSERT(size == (num * sizeof(nsr_recon_record_details_t)));
+ bzero(iobuf->ptr, size);
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_readv - getting records for term=%d from %d to %d\n",
+ rfd->term, rfd->first_index, rfd->last_index);
+ nsr_recon_libchangelog_get_records(this, priv->changelog_base_path,
+ rfd->term, rfd->first_index, rfd->last_index, iobuf->ptr);
+ goto out;
+ }
+ // read last term info
+ case nsr_recon_xlator_sector_4 :
+ {
+ nsr_recon_last_term_info_t lt;
+ GF_ASSERT(size == sizeof(lt));
+ nsr_recon_libchangelog_get_last_term_info(this, priv->changelog_base_path, rfd->term, &lt);
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_readv - getting last term info given current term=%d. last term = %d ops=%d, first=%d, last=%d\n",
+ rfd->term, lt.last_term, lt.commited_ops, lt.first_index, lt.last_index);
+ ENDIAN_CONVERSION_LT(lt, _gf_false); //htonl
+ memcpy(iobuf->ptr, &lt, size);
+ goto out;
+ }
+ default:
+ {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "nsr_recon_readv called with wrong offset\n");
+ op_errno = -1;
+ break;
+ }
+ }
+
+out:
+ if (op_errno == 0) {
+ iov.iov_base = iobuf->ptr;
+ ret = iov.iov_len = size;
+ }
+
+ STACK_UNWIND_STRICT (readv, frame, ret, op_errno, &iov, 1, NULL, iobref , NULL);
+
+ if (iobref)
+ iobref_unref (iobref);
+ if (iobuf)
+ iobuf_unref (iobuf);
+ return 0;
+}
+
+int
+nsr_recon_lookup (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, dict_t *xdata)
+{
+ struct iatt buf = {0, };
+ // dirty hack to set root as regular but seems to work.
+ buf.ia_type = IA_IFREG;
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_lookup called \n");
+
+ STACK_UNWIND_STRICT (lookup, frame, 0, 0, this->itable->root, &buf, NULL, NULL);
+ return 0;
+}
+
+
+int32_t
+nsr_recon_flush (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, dict_t *xdata)
+{
+ STACK_UNWIND_STRICT (flush, frame, 0, 0, NULL);
+ return 0;
+}
+
+
+int32_t
+mem_acct_init (xlator_t *this)
+{
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("recon", this, out);
+
+ ret = xlator_mem_acct_init (this, gf_mt_recon_end + 1);
+
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Memory accounting init" "failed");
+ return ret;
+ }
+out:
+ return ret;
+}
+
+
+int32_t
+init (xlator_t *this)
+{
+ nsr_recon_private_t *priv = NULL;
+ char *local, *members;
+ unsigned int i=0;
+
+ priv = GF_CALLOC (1, sizeof (*priv), gf_mt_recon_private_t);
+ if (!priv) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "priv allocation error\n");
+ return -1;
+ }
+ GF_OPTION_INIT ("replica-group-size", priv->replica_group_size, uint32, err);
+ GF_OPTION_INIT ("vol-name", priv->volname, str, err);
+ if (!priv->volname) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "missing volname option (required)");
+ return -1;
+ }
+ GF_OPTION_INIT ("changelog-dir", priv->changelog_base_path, str, err);
+ if (!priv->changelog_base_path) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "missing changelog directory option (required)");
+ return -1;
+ }
+ GF_OPTION_INIT ("base-dir", priv->base_dir, str, err);
+ if (!priv->base_dir) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "missing brick base directory option (required)");
+ return -1;
+ }
+ GF_OPTION_INIT ("replica-group-members", members, str, err);
+ if (!members) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "missing membership option (required)");
+ return -1;
+ }
+ GF_OPTION_INIT ("local-member", local, str, err);
+ if (!local) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "missing local member option (required)");
+ return -1;
+ }
+
+ priv->replica_group_members = GF_CALLOC (priv->replica_group_size,
+ sizeof(char *),
+ gf_mt_recon_members_list_t);
+ priv->replica_group_members[0] = GF_CALLOC (1,
+ strlen(local),
+ gf_mt_recon_member_name_t);
+ if (!priv->replica_group_members || !(priv->replica_group_members[0])) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "str allocation error\n");
+ return -1;
+ }
+ strcpy(priv->replica_group_members[0], local);
+ for (i=1; i < priv->replica_group_size; i++) {
+ char *member;
+ if (i == 1)
+ member = strtok(members, ",");
+ else
+ member = strtok(NULL, ",");
+ priv->replica_group_members[i] = GF_CALLOC (1,
+ strlen(member) + 1, gf_mt_recon_member_name_t);
+ if (!priv->replica_group_members[i]) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "str allocation error\n");
+ return -1;
+ }
+ strcpy(priv->replica_group_members[i], member);
+ }
+
+
+ priv->this = this;
+ this->private = (void *)priv;
+
+ priv->fp = recon_create_log (priv->replica_group_members[0], "recon-main-log");
+ if (!priv->fp)
+ return -1;
+
+ recon_main_log (this->name, GF_LOG_INFO, "creating reconciliation driver \n");
+
+ if (pthread_create(&priv->thread_id, NULL, nsr_reconciliation_driver, priv)) {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "pthread creation error \n");
+ return -1;
+ }
+
+ INIT_LIST_HEAD(&(priv->list));
+
+
+ return 0;
+
+err:
+ return -1;
+}
+
+
+void
+fini (xlator_t *this)
+{
+ nsr_recon_private_t *priv = NULL;
+ void *ret = NULL;
+
+ priv = (nsr_recon_private_t *)this->private;
+
+ pthread_cancel(priv->thread_id);
+ pthread_join(priv->thread_id, &ret);
+}
+
+
+struct xlator_fops fops = {
+ .open = nsr_recon_open,
+ .readv = nsr_recon_readv,
+ .writev = nsr_recon_writev,
+ .lookup = nsr_recon_lookup,
+ .flush = nsr_recon_flush
+};
+
+struct xlator_cbks cbks = {
+};
+
+struct volume_options options[] = {
+ { .key = {"replica-group-size"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 2,
+ .max = INT_MAX,
+ .default_value = "2",
+ .description = "Number of bricks in replica group. can be derived but putting it here for testing."
+ },
+ {
+ .key = {"vol-name"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "volume name"
+ },
+ {
+ .key = {"local-member"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "member(brick) for which this translator is responsible."
+ },
+ {
+ .key = {"replica-group-members"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "Comma seperated member names other than local."
+ },
+ {
+ .key = {"changelog-dir"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "Base directory where per term changelogs are maintained."
+ },
+ {
+ .key = {"base-dir"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "Base directory for this brick. This should go away once we fix gfid based lookups"
+ },
+ { .key = {NULL} },
+};
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.h b/xlators/cluster/nsr-recon/src/recon_xlator.h
new file mode 100644
index 000000000..d9692a632
--- /dev/null
+++ b/xlators/cluster/nsr-recon/src/recon_xlator.h
@@ -0,0 +1,92 @@
+/*
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __RECON_XLATOR_H__
+#define __RECON_XLATOR_H__
+
+#include <semaphore.h>
+#include <pthread.h>
+
+enum gf_dht_mem_types_ {
+ gf_mt_recon_changelog_buf_t = gf_common_mt_end + 1,
+ gf_mt_recon_driver_ctx_t,
+ gf_mt_recon_fd_t,
+ gf_mt_recon_id_t,
+ gf_mt_recon_member_name_t,
+ gf_mt_recon_members_list_t,
+ gf_mt_recon_per_node_worker_t,
+ gf_mt_recon_private_t,
+ gf_mt_recon_reconciliator_info_t,
+ gf_mt_recon_record_t,
+ gf_mt_recon_record_details_t,
+ gf_mt_recon_role_work_t,
+ gf_mt_recon_work_t,
+ gf_mt_recon_work_data_t,
+ gf_mt_recon_worker_t,
+ gf_mt_recon_end,
+};
+
+enum nsr_recon_xlator_sector_t {
+ nsr_recon_xlator_sector_0 = 0, // to report back the status of given transaction ids
+ nsr_recon_xlator_sector_1 = 512, // to write here information about leadership changes from the brick
+ nsr_recon_xlator_sector_2 = (512 * 2), // to write here individual roles and wait for that role to be done
+ nsr_recon_xlator_sector_3 = (512 *3), // read from here to get term info for given term
+ nsr_recon_xlator_sector_4 = (512 * 4), // read from here to get last term info
+};
+
+
+typedef struct _nsr_recon_private_s {
+ xlator_t *this; //back pointer
+ unsigned int replica_group_size; // number of static members of replica group
+ char **replica_group_members; // replica group members (including itself in first slot)
+ pthread_t thread_id; // driver thread id
+ nsr_recon_driver_ctx_t *driver_thread_context; //driver thread context
+ unsigned int outstanding; // for communicating with driver thread
+ call_frame_t *frame; // old frame that is pending (just one as of now)
+ struct list_head list;
+ char *volname;
+ uint32_t txn_id;
+ char *changelog_base_path;
+ char *base_dir;
+#if defined(NSR_DEBUG)
+ FILE *fp;
+#endif
+} nsr_recon_private_t;
+
+#define atomic_cmpxchg __sync_val_compare_and_swap
+
+#if defined(NSR_DEBUG)
+
+extern void
+_recon_main_log (const char *func, int line, char *member, FILE *fp,
+ char *fmt, ...);
+
+#define recon_main_log(dom, levl, fmt...) do { \
+ FMT_WARN (fmt); \
+ if (levl <= nsr_debug_level) { \
+ nsr_recon_private_t *priv = this->private; \
+ _recon_main_log (__FUNCTION__, __LINE__, \
+ priv->replica_group_members[0], \
+ priv->fp, \
+ ##fmt); \
+ } \
+} while (0)
+
+#else
+#define recon_main_log(dom, levl, fmt...) gf_log(dom, levl, fmt)
+#endif
+
+void nsr_recon_libchangelog_get_this_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt);
+void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt);
+void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term, int32_t status, int32_t op_errno);
+gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf);
+
+
+#endif /* #ifndef __RECON_XLATOR_H__ */