diff options
Diffstat (limited to 'xlators/cluster/nsr-recon')
-rw-r--r-- | xlators/cluster/nsr-recon/Makefile.am | 3 | ||||
-rw-r--r-- | xlators/cluster/nsr-recon/src/Makefile.am | 23 | ||||
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_driver.c | 3130 | ||||
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_driver.h | 325 | ||||
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_xlator.c | 1010 | ||||
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_xlator.h | 92 |
6 files changed, 4583 insertions, 0 deletions
diff --git a/xlators/cluster/nsr-recon/Makefile.am b/xlators/cluster/nsr-recon/Makefile.am new file mode 100644 index 000000000..d471a3f92 --- /dev/null +++ b/xlators/cluster/nsr-recon/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src + +CLEANFILES = diff --git a/xlators/cluster/nsr-recon/src/Makefile.am b/xlators/cluster/nsr-recon/src/Makefile.am new file mode 100644 index 000000000..e639e4437 --- /dev/null +++ b/xlators/cluster/nsr-recon/src/Makefile.am @@ -0,0 +1,23 @@ +xlator_LTLIBRARIES = nsr_recon.la +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/cluster + +nsr_recon_la_LDFLAGS = -module -avoid-version +nsr_recon_la_SOURCES = recon_driver.c recon_xlator.c + +nsr_recon_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \ + $(top_builddir)/api/src/libgfapi.la + +noinst_HEADERS = recon_driver.h recon_xlator.h + +AM_CPPFLAGS = $(GF_CPPFLAGS) \ + -I$(top_srcdir)/libglusterfs/src -I$(top_srcdir)/xlators/lib/src \ + -I$(top_srcdir)/rpc/rpc-lib/src + +AM_CFLAGS = -Wall $(GF_CFLAGS) + +XLATOR_HEADER = $(top_srcdir)/libglusterfs/src/xlator.h + +CLEANFILES = + +uninstall-local: + rm -f $(DESTDIR)$(xlatordir)/nsr.so diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c new file mode 100644 index 000000000..8c7622a02 --- /dev/null +++ b/xlators/cluster/nsr-recon/src/recon_driver.c @@ -0,0 +1,3130 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <sys/types.h> +#include <fcntl.h> +#include <string.h> +#include <unistd.h> +#include <fnmatch.h> + + +#include "call-stub.h" +#include "defaults.h" +#include "xlator.h" + + +#include "recon_driver.h" +#include "recon_xlator.h" +#include "api/src/glfs-internal.h" +#include "api/src/glfs-handles.h" + +/* TBD: move declarations here and nsr.c into a common place */ +#define NSR_TERM_XATTR "trusted.nsr.term" +#define RECON_TERM_XATTR "trusted.nsr.recon-term" +#define RECON_INDEX_XATTR "trusted.nsr.recon-index" + +/* + * Execution architecture for the NSR reconciliation driver. The driver runs + * as a seperate process in each node where the brick is. The main function of + * the driver is nsr_reconciliation_driver() (last function below) The driver + * just sits in a tight loop waiting for state changes. When a brick becomes a + * replica leader, it fences IO, contacts this process and waits for + * reconciliation to finish. + * + * The replica leader talks to other bricks in replica group which are alive + * and gets the last term info using which it decides which has the latest + * data. That brick is referred to as the "reconciliator"; leader sends a + * message to reconciliator to freeze its data (by reading any incomplete data + * from other nodes from that term if required) + * + * Once that is done leader sends a message to all nodes except the + * reconciliator to sync themselves with the reconciliator. This process is + * referred to as "resolution". + * + * Hence the reconciliation processes need to talk to each other to get a given + * term info. This is implemented using the recon translator IOs which + * implements a bare bone RPC by exposing a file interface to which + * reads/writes are done to pass control messages. This is referred to as the + * "control plane". This implementation allows the control plane to be + * implemented as a bunch of threads for each of the nodes. + * + * The reconciliation process also needs to talk to the brick process on that + * node to actually write the data as part of reconciliation/resolution. This + * is referred to as the "data plane". Again there are a bunch of threads that + * do this work. + * + * The way the worker threads are organised is that main driver context has a + * pointer to contexts for each of these thread contexts. The thread context at + * index 0 always refers to talking with local recon process/brick. So the + * control worker at index 0 will get the local changelog info and data worker + * at index 0 will talk to local brick. + * + * All the ops from the control/data planes are implemented using the glfs + * APIs. + */ + +#if defined(NSR_DEBUG) + +/* This lets us change on the fly even if NSR_DEBUG is defined. */ +int nsr_debug_level = GF_LOG_TRACE; + +FILE * +recon_create_log (char *member, char *module) +{ + char *dpath = NULL; + char *p; + char *fpath = NULL; + FILE *fp = NULL; + int fd = -1; + + (void)mkdir(NSR_LOG_DIR,0777); + (void)asprintf(&dpath,NSR_LOG_DIR"/%s",member); + if (dpath) { + for (p = dpath + strlen(NSR_LOG_DIR) + 1; *p; ++p) { + if (*p == '/') { + *p = '-'; + } + } + (void)mkdir(dpath,0777); + (void)asprintf(&fpath,"%s/%s",dpath,module); + if (fpath) { + fd = open(fpath,O_WRONLY|O_CREAT|O_APPEND|O_SYNC,0666); + if (fd >= 0) { + fp = fdopen(fd,"a"); + if (!fp) { + close(fd); + } + } + if (fp) { + if (setvbuf (fp, NULL, _IONBF, 0)) { + /* + * Might as well take advantage of it + * to log the error. + */ + fprintf (fp, + "setvbuf failed for log\n"); + fprintf (fp, + "log output may be async\n"); + fflush(fp); + } + } + free(fpath); + } + free(dpath); + } + + return fp; +} + +void +_nsr_driver_log (const char *func, int line, char *member, FILE *fp, + char *fmt, ...) +{ + va_list ap; + char *buf = NULL; + int retval; + + if (!fp) { + fp = recon_create_log(member,"nsr-driver-log"); + if (!fp) { + return; + } + } + + va_start(ap,fmt); + retval = vasprintf(&buf,fmt,ap); + if (buf) { + fprintf(fp,"[%s:%d] %.*s\n",func,line,retval,buf); + free(buf); + } + va_end(ap); +} + +void +_nsr_worker_log (const char *func, int line, char *member, + char *type, uint32_t index, FILE *fp, + char *fmt, ...) +{ + va_list ap; + char *buf = NULL; + int retval; + + if (!fp) { + char *name; + if (asprintf(&name,"%s-%u",type,index) < 1) { + return; + } + fp = recon_create_log (member, name); + if (!fp) { + return; + } + } + + va_start(ap,fmt); + retval = vasprintf(&buf,fmt,ap); + if (buf) { + fprintf(fp,"[%s:%d] %.*s\n",func,line,retval,buf); + free(buf); + } + va_end(ap); +} + +#endif + +/* + * Recon Driver Calloc + * + * We need this because all of this messing about with gfapi from within a + * translator keeps scrambling THIS (only one reason it's a terrible idea) and + * we need THIS to have a value that represents our initialization with our + * memory types. + * + * Note that the macro requires "this" to be defined in the current scope. + */ + +#define RD_CALLOC(x,y,z) ({THIS = this; GF_CALLOC(x,y,z); }) + +/* + * This function gets the size of all the extended attributes for a file. + * This is used so that caller knows how much to allocate for key-value storage. + * + * Input Arguments: + * fd - the file opened using glfs API. + * dict - passed so that NSR translator can get this from the required brick + * + * Output Arguments: + * b - pointer to the buffer where the attributes are filled up. + * key_size - the size of all keys + * val_size - the size of all values + * num - number of key/values + */ +static int32_t +get_xattr_total_size( struct glfs_fd *fd, + char **b, + uint32_t *key_size, + uint32_t *val_size, + uint32_t* num, + dict_t *dict) +{ + int32_t s = -1, ret = -1; + char *c = NULL; + + *key_size = 0; + *val_size = 0; + *num = 0; + + // First get the size of the keys + s = glfs_flistxattr_with_xdata(fd, NULL,0, dict); + if (s == -1) { + goto out; + } + *key_size = s; + + // TBD - use the regular calloc + (*b) = c = calloc(s+1,1); + + // get the keys themselves + if (glfs_flistxattr_with_xdata(fd, c, s+1, dict) == -1) { + goto out; + } + do { + int32_t r; + uint32_t len = 0; + // for each key get the size of the value + r = glfs_fgetxattr_with_xdata(fd, c, NULL, 0, dict); + if (r == -1) + goto out; + (*val_size) += r; + len = strlen(c) + 1; + c += len; + s -= len; + (*num)++; + } while(s); + ret = 0; +out: + return ret; +} + +/* + * This function gets bunch of xattr values given set of keys. + * + * Input Arguments: + * fd - the file opened using glfs API. + * keys - the bunch of keys + * size - size of values + * num - number of keys + * dict - passed so that NSR translator can get this from the required brick + * + * Output Arguments: + * buf - where the values are written one after the other (NULL seperated) + */ +static int32_t +get_xattr(struct glfs_fd *fd, + char *keys, + char *buf, + uint32_t size, + uint32_t num, + dict_t *dict) +{ + while(num--) { + int32_t r; + uint32_t len = 0; + + // copy the key + strcpy(buf, keys); + len = strlen(keys); + len++; + buf += len; + + // get the value and copy the value after incrementing buf after the key + r = glfs_fgetxattr_with_xdata(fd, keys, buf, size, dict); + + // TBD - handle error + if (r == -1) + return -1; + + // increment the key to next value + keys += len; + + // increment buf to hold the next key + buf += strlen(buf) + 1; + } + return 0; +} + +/* + * Function deletes a bunch of key values in extended attributes of a file. + * Input Arguments: + * fd - the file opened using glfs API. + * dict - passed so that NSR translator can do this from the required brick + * keys - bunch of NULL seperated key names + * num - number of keys + */ +static int32_t delete_xattr(struct glfs_fd *fd, + dict_t *dict_t, + char *keys, + uint32_t num) +{ + while(num--) { + // get the value and copy the value + // TBD - handle failure cases when calling glfs_fremovexattr_with_xdata() + if (glfs_fremovexattr_with_xdata(fd, keys, dict_t) == -1) + return -1; + keys += strlen(keys) +1; + } + return 0; +} + +/* + * Given a bunch of key value pairs, fill them as xattrs for a file + * + * Input Arguments: + * fd - the file opened using glfs API. + * dict - passed so that NSR translator can do this from the required brick + * buf - buffer containing the keys-values pairs. The key value are NULL seperated. + * Each of the key-value is seperated by NULL in turn. + * num - Number of such key value pairs. + */ +static int32_t +fill_xattr(struct glfs_fd *fd, + dict_t *dict, + char *buf, + uint32_t num) +{ + char *k = buf, *val = NULL; + + while(num--) { + int32_t r; + + val = k + strlen(k) + 1; + + // TBD - handle failure cases when calling glfs_fsetxattr_with_xdata() + r = glfs_fsetxattr_with_xdata(fd, k, val, strlen(val), 0, dict); + if (r == -1) + return -1; + k = val + strlen(val) + 1; + } + return 0; +} + +/* + * This function gets a file that can be used for doing glfs_init later. + * The control file is used by control thread(function) to talk to peer reconciliation process. + * The data file is used by the data thread(function) to talk to the bricks. + * The control file is of name such as con:gfs1:-mnt-a1 where "gfs1" is name of host + * and the brick path is "/mnt/a1". + * The data file is of name such as data:gfs1:-mnt-a1. + * + * Input Arguments: + * vol - name of the volume. This is used to build the full path of the control and data file + * such as /var/lib/glusterd/vols/test/bricks/gfs2:-mnt-test1-nsr-recon.vol. + * In above example the volume name is test and brick on gfs2 is on path /mnt/test1 + * + * worker - The worker for a given node. This worker has 2 threads - one on the data plane + * and one on the control plane. The worker->name is already filled with hostname:brickname + * in the function nsr_reconciliation_driver(). Use that to build the volume file. + * So if worker->name has gfs1:/mnt/a1, control file is con:gfs1:-mnt-a1 + * and data file is data:gfs1:-mnt-a1. + * All these files are under the bricks directory. TBD - move this to a NSR recon directory later. + */ +static void +nsr_recon_get_file(char *vol, nsr_replica_worker_t *worker) +{ + char *ptr; + char tr[256]; + + // Replace the "/" to - + strcpy(tr, worker->name); + ptr = strchr (tr, '/'); + while (ptr) { + *ptr = '-'; + ptr = strchr (tr, '/'); + } + + // Build the base directory such as "/var/lib/glusterd/vols/test/bricks/" + sprintf(worker->control_worker->vol_file, + "/%s/%s/%s/%s/", + GLUSTERD_DEFAULT_WORKDIR, + GLUSTERD_VOLUME_DIR_PREFIX, + vol, + GLUSTERD_BRICK_INFO_DIR); + + strcat(worker->control_worker->vol_file, "con:"); + strcat(worker->control_worker->vol_file, tr); + + sprintf(worker->data_worker->vol_file, + "/%s/%s/%s/%s/", + GLUSTERD_DEFAULT_WORKDIR, + GLUSTERD_VOLUME_DIR_PREFIX, + vol, + GLUSTERD_BRICK_INFO_DIR); + strcat(worker->data_worker->vol_file, "data:"); + strcat(worker->data_worker->vol_file, tr); +} + +/* + * This function does all the glfs initialisation + * so that reconciliation process can talk to other recon processes/bricks + * for the control/data messages. + * This will be done everytime a worker needs to be kicked off to talk + * across any plane. + * + * Input arguments: + * ctx - The per worker based context + * control - set to true if this worker is for the control plane + */ +static int32_t +nsr_recon_start_work(nsr_per_node_worker_t *ctx, + gf_boolean_t control) +{ + glfs_t *fs = NULL; + xlator_t *this = ctx->driver_ctx->this; + int32_t ret = 0; + glfs_fd_t *aux_fd = NULL; // fd of auxilary log + char lf[256]; + nsr_recon_private_t *priv = NULL; + char *my_name = NULL; + char *morph_name = NULL, *ptr = NULL; + + priv = this->private; + my_name = RD_CALLOC (1, + strlen (priv->replica_group_members[0]) + 1, + gf_mt_recon_member_name_t); + strcpy (my_name, priv->replica_group_members[0]); + + nsr_worker_log(this->name, GF_LOG_INFO, + "starting work with volfile %s\n", + ctx->vol_file); + + fs = glfs_new(ctx->id); + if (!fs) { + glusterfs_this_set(this); + nsr_worker_log(this->name, GF_LOG_ERROR, + "cannot create gfls context for thread %s\n",ctx->id); + return -1; + } + + // For some vague reason, glfs init APIs seem to be clobbering "this". hence resetting it. + glusterfs_this_set(this); + nsr_worker_log(this->name, GF_LOG_INFO, + "init done. setting volfile %s\n", + ctx->vol_file); + + ret = glfs_set_volfile(fs, ctx->vol_file); + if (ret != 0) { + glusterfs_this_set(this); + nsr_worker_log(this->name, GF_LOG_ERROR, + "cannot set volfile %s for thread %s\n",ctx->vol_file, ctx->id); + return -1; + } + + morph_name = RD_CALLOC (1, strlen (my_name) + 1, + gf_mt_recon_member_name_t); + strcpy (morph_name, my_name); + + ptr = strchr (morph_name, '/'); + while (ptr) + { + *ptr = '-'; + ptr = strchr (morph_name, '/'); + } + // TBD - convert this to right /usr/local/var/log based log files. + + sprintf(lf, NSR_LOG_DIR"/%s/%s-%"PRIu32, morph_name, + (control == _gf_true)?"glfs-con":"glfs-data", ctx->index); + ret = glfs_set_logging (fs, lf, 7); + if (ret) { + glusterfs_this_set(this); + gf_log (this->name, GF_LOG_ERROR, "glfs logging set failed (%s)", + strerror (errno)); + return -1; + } + + ret = glfs_init (fs); + if (ret != 0) { + glusterfs_this_set(this); + nsr_worker_log(this->name, GF_LOG_ERROR, "cannot do init for thread %s with volfile %s\n",ctx->id, ctx->vol_file); + return -1; + } + glusterfs_this_set(this); + nsr_worker_log(this->name, GF_LOG_INFO, + "setting volfile %s done\n", + ctx->vol_file); + + // If it is control thread, open the "/" as the aux_fd. + // All IOs happening via the fd will do the RPCs across the reconciliation + // processes. For some vague reason, the root seems to be open'able like a file. + // TBD - try to clean this up. (implement a virtual file???) + if (control == _gf_true) { + nsr_worker_log(this->name, GF_LOG_INFO, + "doing open for / \n"); + aux_fd = glfs_open (fs, "/", O_RDWR); + // TBD - proper error handling. Stall reconciliation if such a thing happens? + if (aux_fd == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "cannot open aux log file for thread %s\n",ctx->id); + return -1; + } else { + nsr_worker_log(this->name, GF_LOG_ERROR, + "---opened aux log file for thread %s\n",ctx->id); + } + ctx->aux_fd = aux_fd; + } + glusterfs_this_set(this); + ctx->fs = fs; + return 0; +} + +/* + * + * This function does the cleanup after reconciliation is done + * or before we start a new reconciliation. + * + * Input arguments: + * ctx - The per worker based context + * control - set to true if this worker is for the control plane + */ +static int32_t +nsr_recon_end_work(nsr_per_node_worker_t *ctx, + gf_boolean_t control) +{ + int32_t ret = 0; + xlator_t *this = ctx->driver_ctx->this; + + nsr_worker_log(this->name, GF_LOG_INFO, + "doing fini for recon worker\n"); + + ret = glfs_fini(ctx->fs); + if (ret != 0) { + glusterfs_this_set(this); + nsr_worker_log(this->name, GF_LOG_ERROR, "cannot do fini for thread %s with volfile %s\n",ctx->id, ctx->vol_file); + return -1; + } + glusterfs_this_set(this); + ctx->fs = NULL; + if (control == _gf_true) { + glfs_close (ctx->aux_fd); + ctx->aux_fd = NULL; + } + return 0; +} + +//called in case all worker functions run as sepeerate threads +static void +init_worker(nsr_per_node_worker_t *ctx, gf_boolean_t control) +{ + pthread_mutex_init(&(ctx->mutex), NULL); + pthread_cond_init(&(ctx->cv), NULL); + INIT_LIST_HEAD(&(ctx->head.list)); +} + + +/* + * Control worker funct for getting changelog info on this node. + * calls directly functions to parse the changelog. + * + * Input arguments: + * ctx - The per worker based context + * control - set to true if this worker is for the control plane + */ +static void +control_worker_func_0(nsr_per_node_worker_t *ctx, + nsr_recon_work_t *work) +{ + unsigned int index = ctx->index; + nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]); + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + xlator_t *this = dr->this; + nsr_recon_private_t *priv = this->private; + + ctx->is_control = _gf_true; + + switch (work->req_id) { + case NSR_WORK_ID_INI: + { + break; + } + case NSR_WORK_ID_FINI: + { + break; + } + case NSR_WORK_ID_GET_LAST_TERM_INFO: + { + nsr_recon_last_term_info_t lt; + nsr_reconciliator_info_t *recon_info = rw->recon_info; + // term is stuffed inside work->index. overloading. + int32_t term = work->index; + + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get last term info for node %d with current term %d\n",index, term); + + // TBD - handle errors + // This is called by the leader after it gets the current term. + // Makes searching easier. + nsr_recon_libchangelog_get_last_term_info(this, priv->changelog_base_path, term, <); + recon_info->last_term = lt.last_term; + recon_info->commited_ops = lt.commited_ops; + recon_info->last_index = lt.last_index; + recon_info->first_index = lt.first_index; + + + nsr_worker_log(this->name, GF_LOG_INFO, + "out of get last term info with current term %d. got ops %d with first %d and last %d \n", + recon_info->last_term, recon_info->commited_ops, + recon_info->first_index, recon_info->last_index); + break; + } + case NSR_WORK_ID_GET_GIVEN_TERM_INFO: + { + nsr_recon_last_term_info_t lt; + nsr_reconciliator_info_t *recon_info = rw->recon_info; + // term is stuffed inside work->index. overloading. + int32_t term = work->index; + + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get term info for node %d for term %d\n",index, term); + + // TBD - handle errors + nsr_recon_libchangelog_get_this_term_info(this,priv->changelog_base_path, term, <); + + recon_info->last_term = lt.last_term; + recon_info->commited_ops = lt.commited_ops; + recon_info->last_index = lt.last_index; + recon_info->first_index = lt.first_index; + + nsr_worker_log(this->name, GF_LOG_INFO, + "out of get term info for term %d. got ops %d with first %d and last %d \n", + recon_info->last_term, recon_info->commited_ops, + recon_info->first_index, recon_info->last_index); + + break; + } + case NSR_WORK_ID_RECONCILIATOR_DO_WORK: + { + // For local resolution, the main driver thread does it. + // SO there is no way we can have this message for this node. + + nsr_worker_log(this->name, GF_LOG_INFO, + "this message should not be sent \n"); + break; + } + case NSR_WORK_ID_RESOLUTION_DO_WORK: + { + + nsr_worker_log(this->name, GF_LOG_INFO, + "this message should not be sent \n"); + ctx->result = -1; + break; + } + case NSR_WORK_ID_GET_RECONCILATION_WINDOW: + { + nsr_reconciliator_info_t *recon_info = rw->recon_info; + // first_index and last_index at 0 indicates empty log. + // For non empty log, the first_index always starts at 1. + uint32_t num = (dr->workers[index].recon_info->last_index - + dr->workers[index].recon_info->first_index + 1); + nsr_recon_record_details_t *rd; + uint32_t i=0; + + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get reconciliation window records for node %d for term %d with first %d last %d\n", + index, recon_info->last_term, recon_info->first_index, recon_info->last_index); + + + // TBD - handle buffer allocation errors + rd = RD_CALLOC(num, + sizeof(nsr_recon_record_details_t), + gf_mt_recon_record_details_t); + if (rd == NULL) { + ctx->result = -1; + return; + } + + recon_info->records = RD_CALLOC(num, + sizeof(nsr_reconciliation_record_t), + gf_mt_recon_record_t); + if (recon_info->records == NULL) { + ctx->result = -1; + return; + } + + // TBD - handle errors + if (nsr_recon_libchangelog_get_records(this, priv->changelog_base_path, + recon_info->last_term, + recon_info->first_index, + recon_info->last_index, + rd) == _gf_false) { + ctx->result = -1; + return; + } + + // The above function writes into rd from 0 to (num -1) + // We need to take care of this whenever we deal with records + for (i=0; i < num; i++) { + ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl + memcpy(&(recon_info->records[i].rec), + &(rd[i]), + sizeof(nsr_recon_record_details_t)); + } + + GF_FREE(rd); + + nsr_worker_log(this->name, GF_LOG_INFO, + "got reconciliation window records for node %d for term %d \n", + index, recon_info->last_term); + break; + } + + default: + nsr_worker_log (this->name, GF_LOG_ERROR, + "bad req id %u", work->req_id); + } + + return; +} + +// Control worker thread +static void* +control_worker_main_0(nsr_per_node_worker_t *ctx) +{ + + ctx->is_control = _gf_true; + nsr_worker_log(this->name, GF_LOG_INFO, + "starting control worker func 0\n"); + + init_worker(ctx, 1); + + while(1) + { + nsr_recon_work_t *work = NULL; + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + + nsr_worker_log(this->name, GF_LOG_INFO, + "waiting for work\n"); + + pthread_mutex_lock(&ctx->mutex); + while (list_empty(&(ctx->head.list))) { + pthread_cond_wait(&ctx->cv, &ctx->mutex); + } + pthread_mutex_unlock(&ctx->mutex); + + + list_for_each_entry(work, &(ctx->head.list), list) { + nsr_worker_log(this->name, GF_LOG_INFO, + "got work with id %d\n", work->req_id); + work->in_use = _gf_false; + + // Call the main function. + control_worker_func_0(ctx, work); + + atomic_dec(&(dr->outstanding)); + break; + } + + nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n"); + list_del_init (&work->list); + GF_FREE(work); + nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n"); + } + + return NULL; +} + +static void +control_worker_do_reconciliation (nsr_per_node_worker_t *ctx, + nsr_recon_work_t *work) +{ + unsigned int index = ctx->index; + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + nsr_recon_role_t rr; + uint32_t i=0; + uint32_t num=0; + uint32_t idx = dr->reconciliator_index; + uint32_t term = dr->workers[idx].recon_info->last_term; + + GF_ASSERT(idx == index); + + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to make this index %d as reconciliator for term %d\n", index, term); + + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, + nsr_recon_xlator_sector_1, + SEEK_SET) == -1) { + ctx->result = -1; + return; + } + + // We have all the info for all other nodes. + // Fill all that info when sending data to that process. + for (i=0; i < dr->replica_group_size; i++) { + if ( dr->workers[i].in_use && + (dr->workers[i].recon_info->last_term == term)) { + rr.info[num].last_term = + dr->workers[i].recon_info->last_term; + rr.info[num].commited_ops = + dr->workers[i].recon_info->commited_ops; + rr.info[num].last_index = + dr->workers[i].recon_info->last_index; + rr.info[num].first_index = + dr->workers[i].recon_info->first_index; + strcpy(rr.info[num].name, + dr->workers[i].name); + } + num++; + } + rr.num = num; + rr.role = reconciliator; + ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl + if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) { + ctx->result = -1; + // Put the errno only for this case since we are bothered about + // retrying only for this case. For rest of the cases we will + // just return EIO in errno. + ctx->op_errno = errno; + return; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "sent reconciliator info for term %d with node count as %d\n", term, num); +} + +static void +control_worker_do_resolution (nsr_per_node_worker_t *ctx, + nsr_recon_work_t *work) +{ + unsigned int index = ctx->index; + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + nsr_recon_role_t rr; + unsigned int i=0, j=0; + unsigned int rec = dr->reconciliator_index; + + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to make this index %d as resolutor with reconciliator as %d\n",index, rec); + + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, + nsr_recon_xlator_sector_1, + SEEK_SET) == -1) { + ctx->result = -1; + return; + } + + rr.num = 2; + + // Fill in info[0] as info for the node for which we are seeking + // resolution. Fill in info[1] as info of the reconciliator node. The + // function nsr_recon_driver_get_role() that will be called when this + // message reaches the node will look at index 1 for term information + // related to the reconciliator. + for (i=0; i < 2; i++) { + (i == 0) ? (j = index) : (j = rec); + rr.info[i].last_term = + dr->workers[j].recon_info->last_term; + rr.info[i].commited_ops = + dr->workers[j].recon_info->commited_ops; + rr.info[i].last_index = + dr->workers[j].recon_info->last_index; + rr.info[i].first_index = + dr->workers[j].recon_info->first_index; + // The name is used as the key to convert indices since the + // reconciliator index could be different across the nodes. + strcpy(rr.info[i].name, + dr->workers[j].name); + if (i == 0) { + nsr_worker_log(this->name, GF_LOG_INFO, + "this node info term=%d, ops=%d, first=%d, last=%d\n", + rr.info[i].last_term, rr.info[i].commited_ops, + rr.info[i].first_index,rr.info[i].last_index); + } else { + nsr_worker_log(this->name, GF_LOG_INFO, + "reconciliator node info term=%d, ops=%d, first=%d, last=%d\n", + rr.info[i].last_term, rr.info[i].commited_ops, + rr.info[i].first_index,rr.info[i].last_index); + } + } + rr.role = resolutor; + ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl + if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) { + ctx->result = -1; + // Put the errno only for this case since we are bothered about + // retrying only for this case. For rest of the cases we will + // just return EIO in errno. + ctx->op_errno = errno; + return; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "sent message to this node %d resolutor with reconciliator as %d\n", index, rec); +} + +static void +control_worker_get_window (nsr_per_node_worker_t *ctx, nsr_recon_work_t *work) +{ + unsigned int index = ctx->index; + nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]); + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + xlator_t *this = dr->this; + nsr_recon_log_info_t li; + nsr_reconciliator_info_t *recon_info = rw->recon_info; + uint32_t i = 0; + uint32_t num = (dr->workers[index].recon_info->last_index - + dr->workers[index].recon_info->first_index +1); + nsr_recon_record_details_t *rd; + + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get reconciliation window records for node %d for term %d with first %d last %d\n", + index, recon_info->last_term, recon_info->first_index, recon_info->last_index); + + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET) == -1) { + ctx->result = -1; + return; + } + + // write to node what term & indices we are interested + li.term = recon_info->last_term; + li.first_index = recon_info->first_index; + li.last_index = recon_info->last_index; + ENDIAN_CONVERSION_LI(li, _gf_false); //htonl + if (glfs_write(ctx->aux_fd, &li, sizeof(li), 0) == -1) { + ctx->result = -1; + return; + } + + // then read + rd = RD_CALLOC(num, + sizeof(nsr_recon_record_details_t), + gf_mt_recon_private_t); + if (rd == NULL) { + ctx->result = -1; + return; + } + recon_info->records = RD_CALLOC(num, + sizeof(nsr_reconciliation_record_t), + gf_mt_recon_private_t); + if (recon_info->records == NULL) { + ctx->result = -1; + goto err; + } + + if (glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0) == -1) { + ctx->result = -1; + goto err; + } + + for (i=0; i < num; i++) { + ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl + memcpy (&(recon_info->records[i].rec), &(rd[i]), + sizeof(nsr_recon_record_details_t)); + nsr_worker_log(this->name, GF_LOG_INFO, + "get_reconcilaition_window:Got %d at index %d\n", + recon_info->records[i].rec.type, + i + recon_info->first_index); + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "got reconciliation window records for node %d for term %d \n", + index, recon_info->last_term); + +err: + GF_FREE(rd); +} + +/* + * Control worker funct for getting changelog info on some other node. + * calls glfs functions to seek/read/write on aux_fd. + * + * Input arguments: + * ctx - The per worker based context + * control - set to true if this worker is for the control plane + */ +static void +control_worker_func(nsr_per_node_worker_t *ctx, + nsr_recon_work_t *work) +{ + unsigned int index = ctx->index; + nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]); + nsr_recon_last_term_info_t lt; + nsr_reconciliator_info_t *recon_info = rw->recon_info; + int32_t term = htonl(work->index); // overloading it + + ctx->is_control = _gf_true; + + switch (work->req_id){ + + case NSR_WORK_ID_INI: + nsr_worker_log(this->name, GF_LOG_INFO, + "calling nsr_recon_start_work\n"); + + // TBD - handle error in case nsr_recon_start_work gives error + if (nsr_recon_start_work(ctx, _gf_true) != 0) { + ctx->result = -1; + return; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "finished nsr_recon_start_work\n"); + break; + + case NSR_WORK_ID_FINI: + nsr_worker_log(this->name, GF_LOG_INFO, + "calling nsr_recon_end_work\n"); + + // TBD - handle error in case nsr_recon_end_work gives error + if (nsr_recon_end_work(ctx, _gf_true) != 0) { + ctx->result = -1; + return; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "finished nsr_recon_end_work\n"); + break; + + case NSR_WORK_ID_GET_LAST_TERM_INFO: + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get last term info for node %d with current term %d\n",index, work->index); + + // first write the current term term number + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET) == -1) { + ctx->result = -1; + return; + } + if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) { + ctx->result = -1; + return; + } + if (glfs_read(ctx->aux_fd, <, sizeof(lt), 0) == -1) { + ctx->result = -1; + return; + } + ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl + recon_info->last_term = lt.last_term; + recon_info->commited_ops = lt.commited_ops; + recon_info->last_index = lt.last_index; + recon_info->first_index = lt.first_index; + + nsr_worker_log(this->name, GF_LOG_INFO, + "out of get last term info with current term %d. got ops %d with first %d and last %d \n", + recon_info->last_term, recon_info->commited_ops, + recon_info->first_index, recon_info->last_index); + + break; + + case NSR_WORK_ID_GET_GIVEN_TERM_INFO: + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get term info for node %d for term %d\n",index, work->index); + + // first write the term number + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET) == -1) { + ctx->result = -1; + return; + } + if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) { + ctx->result = -1; + return; + } + if (glfs_read(ctx->aux_fd, <, sizeof(lt), 0) == -1) { + ctx->result = -1; + return; + } + ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl + recon_info->last_term = lt.last_term; + recon_info->commited_ops = lt.commited_ops; + recon_info->last_index = lt.last_index; + recon_info->first_index = lt.first_index; + + nsr_worker_log(this->name, GF_LOG_INFO, + "out of get term info for term %d. got ops %d with first %d and last %d \n", + recon_info->last_term, recon_info->commited_ops, + recon_info->first_index, recon_info->last_index); + + break; + + case NSR_WORK_ID_RECONCILIATOR_DO_WORK: + control_worker_do_reconciliation(ctx,work); + break; + + case NSR_WORK_ID_RESOLUTION_DO_WORK: + control_worker_do_resolution(ctx,work); + break; + + case NSR_WORK_ID_GET_RECONCILATION_WINDOW: + control_worker_get_window(ctx,work); + break; + + default: + nsr_worker_log (this->name, GF_LOG_ERROR, + "bad work type %d", work->req_id); + } + + return; +} + +// Control worker thread +static void* +control_worker_main(nsr_per_node_worker_t *ctx) +{ + unsigned int index = ctx->index; + + ctx->is_control = _gf_true; + nsr_worker_log(this->name, GF_LOG_INFO, + "starting control worker func\n"); + + // if this is for local processing, call the changelog parsing calls directly + if (index == 0) { + control_worker_main_0(ctx); + return NULL; + } + + init_worker(ctx, 1); + + + while(1) + { + nsr_recon_work_t *work = NULL; + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + + nsr_worker_log(this->name, GF_LOG_INFO, + "waiting for work\n"); + + pthread_mutex_lock(&ctx->mutex); + while (list_empty(&(ctx->head.list))) { + pthread_cond_wait(&ctx->cv, &ctx->mutex); + } + pthread_mutex_unlock(&ctx->mutex); + + + list_for_each_entry(work, &(ctx->head.list), list) { + nsr_worker_log(this->name, GF_LOG_INFO, + "got work with id %d\n", work->req_id); + work->in_use = _gf_false; + control_worker_func(ctx,work); + atomic_dec(&(dr->outstanding)); + break; + } + nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n"); + list_del_init (&work->list); + GF_FREE(work); + nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n"); + } + + return NULL; +} + +/* + * This function gets called if this process is chosen as the reconciliator + * for this replica group. It would have already got the records for the last term + * for the indices that are required (from the first HOLE to last index) from + * all other nodes that also witnessed that term. COmpare all the records and + * compute the work required. + * + * Input arguments + * ctx - driver context. All recon work is stored in workers[0].recon_info + */ +static void +compute_reconciliation_work(nsr_recon_driver_ctx_t *ctx) +{ + uint32_t i=0, j=0; + nsr_reconciliator_info_t *my_recon = ctx->workers[0].recon_info; + uint32_t num = (my_recon->last_index - my_recon->first_index + 1); + + for (i=0; i < num; i++) { + nsr_log_type_t orig, new; + unsigned int src = 0; + orig = new = my_recon->records[i].rec.type; + nsr_recon_work_type_t tw = NSR_RECON_WORK_NONE; + // index 0 means this node. Look at all other nodes. + for (j=1; j < ctx->replica_group_size; j++) { + if (ctx->workers[j].in_use) { + nsr_log_type_t pr = ctx->workers[j].recon_info->records[i].work.type; + if ((new != pr) && (pr > new)) { + src = j; + new = (new | pr); + } + } + } + // TBD - compare data if new and orig are all FILLs. (can detect changelog corruption) + // Right now we compare if both orig and new are psuedo holes since + // only that is of interest to us. + if (orig != new) { + if ((orig == NSR_LOG_HOLE) && (new == NSR_LOG_PSEUDO_HOLE)) + tw = NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE; + else if ((orig == NSR_LOG_HOLE) && (new == NSR_LOG_FILL)) + tw = NSR_RECON_WORK_HOLE_TO_FILL; + else if ((orig == NSR_LOG_PSEUDO_HOLE) && (new == NSR_LOG_PSEUDO_HOLE)) + tw = NSR_RECON_WORK_COMPARE_PSEUDO_HOLE; + else if ((orig == NSR_LOG_PSEUDO_HOLE) && (new == NSR_LOG_FILL)) + tw = NSR_RECON_WORK_HOLE_TO_FILL; + } + if (tw != NSR_RECON_WORK_NONE) { + my_recon->records[i].work.type = tw; + my_recon->records[i].work.source = src; + // Overwrite the record + memcpy(&(my_recon->records[i].rec), + &(ctx->workers[src].recon_info->records[i].rec), + sizeof(nsr_recon_record_details_t)); + } + } + return; +} + +static int32_t +nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, + uint32_t i, + gf_boolean_t in_use); + +/* + * Write the role and associated information to the node. + * This gets called from recon xlator indicating node is either + * leader, reconciliator or should do resolution. + */ +gf_boolean_t +nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, + nsr_recon_role_t *rr, + uint32_t term) +{ + nsr_role_work_t *rw; + xlator_t *this = ctx->this; + + nsr_driver_log(this->name, GF_LOG_INFO, "set role called \n"); + rw = RD_CALLOC(1, sizeof (nsr_role_work_t), gf_mt_recon_role_work_t); + memcpy(&rw->role, rr, sizeof(nsr_recon_role_t)); + rw->term = term; + INIT_LIST_HEAD(&(rw->list)); + pthread_mutex_lock(&(ctx->mutex)); + list_add_tail(&rw->list, &ctx->role_head.list); + pthread_cond_signal(&(ctx->cv)); + pthread_mutex_unlock(&(ctx->mutex)); + nsr_driver_log(this->name, GF_LOG_INFO, "set role returns \n"); + return _gf_true; +} + +/* + * First we undo the last role to make sure we clean up. + * + * Input arguments + * ctx - driver context. + * rr - Role information. + * If leader, the thread now sends the list of all nodes that are part of + * the current replica group. Use that to find out the activate the + * required worker threads. + * If reconciliator, the leader node would have sent information about + * all nodes which saw last term as the reconciliator. + * If resolution to be done, then rr.info[0] will have this node's info + * which the leader would have got earlier. rr[1].info will have the + * info regarding the reconciliator. + * term - leader's term that is causing this role + */ +nsr_recon_driver_state_t +nsr_recon_driver_get_role(int32_t *status, + nsr_recon_driver_ctx_t *ctx, + nsr_role_work_t *rw) +{ + uint8_t i=0, j=0; + nsr_recon_role_t *rr = &(rw->role); + nsr_reconciliator_info_t *tmp; + xlator_t *this = ctx->this; + + // First make all the threads uninitialise + for (i = 0; i < ctx->replica_group_size; i++) { + if (nsr_recon_in_use(ctx, i, _gf_false) == -1) { + *status = -1; + return 0; + } + } + + switch (rr->role) { + case leader: + case joiner: + + // First set info this node + tmp = RD_CALLOC (1, sizeof (nsr_reconciliator_info_t), + gf_mt_recon_reconciliator_info_t); + if (!tmp) { + *status = -1; + return 0; + } + ctx->workers[0].recon_info = tmp; + if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) { + *status = -1; + return 0; + } + ctx->current_term = rr->current_term; + + // Find rest of the nodes + for (i=1; i < ctx->replica_group_size; i++) { + for (j=0 ; /* nothing */; j++) { + if (j >= rr->num) { + nsr_driver_log (this->name, GF_LOG_ERROR, + "failed to find %s", + ctx->workers[i].name); + break; + } + if (strcmp(ctx->workers[i].name, + rr->info[j].name)) { + continue; + } + nsr_driver_log (this->name, GF_LOG_INFO, + "nsr_recon_driver_get_role: this as %s. found other server %s\n", + (rr->role == leader) ? "leader" + : "joiner", + ctx->workers[i].name); + + // Allocate this here. This will get later + // filled when the leader tries to get last term + // information from all the nodes + tmp = RD_CALLOC (1, + sizeof (nsr_reconciliator_info_t), + gf_mt_recon_reconciliator_info_t); + if (!tmp) { + *status = -1; + return 0; + } + ctx->workers[i].recon_info = tmp; + if (nsr_recon_in_use(ctx, i, _gf_true) == -1) { + *status = -1; + return 0; + } + break; + } + } + // If leader, reconciliator has to be chosen. + // If joiner, we are the reconciliator. + if (rr->role == leader) + ctx->reconciliator_index = -1; + else + ctx->reconciliator_index = 0; + break; + + case reconciliator: + ctx->reconciliator_index = 0; + // Copy information about all the other members which had the + // same term + for (i=0; i < rr->num; i++) { + for (j=0; /* nothing */; j++) { + if (j >= ctx->replica_group_size) { + nsr_driver_log (this->name, GF_LOG_ERROR, + "failed to find %s", + rr->info[i].name); + break; + } + if (strcmp(rr->info[i].name, + ctx->workers[j].name)) { + continue; + } + nsr_driver_log(this->name, GF_LOG_INFO, + "nsr_recon_driver_get_role: this as reconciliator. found other server %s\n", + ctx->workers[j].name); + tmp = RD_CALLOC (1, + sizeof (nsr_reconciliator_info_t), + gf_mt_recon_reconciliator_info_t); + if (!tmp) { + *status = -1; + return 0; + } + tmp->last_term = rr->info[i].last_term; + tmp->commited_ops = rr->info[i].commited_ops; + tmp->last_index = rr->info[i].last_index; + tmp->first_index = rr->info[i].first_index; + ctx->workers[j].recon_info = tmp; + if (nsr_recon_in_use(ctx, j, _gf_true) == -1) { + *status = -1; + return 0; + } + break; + } + } + break; + + case resolutor: + for (j=0; /* nothing */; j++) { + // info[1] has the information regarding the + // reconciliator + if (j >= ctx->replica_group_size) { + nsr_driver_log (this->name, GF_LOG_ERROR, + "failed to find %s", + rr->info[1].name); + break; + } + if (strcmp(rr->info[1].name, + ctx->workers[j].name)) { + continue; + } + nsr_driver_log(this->name, GF_LOG_INFO, + "nsr_recon_driver_get_role: this as resolutor. found other server %s as reconciliator\n", + ctx->workers[j].name); + tmp = RD_CALLOC (1, + sizeof (nsr_reconciliator_info_t), + gf_mt_recon_reconciliator_info_t); + if (!tmp) { + *status = -1; + return 0; + } + tmp->last_term = rr->info[1].last_term; + tmp->commited_ops = rr->info[1].commited_ops; + tmp->last_index = rr->info[1].last_index; + tmp->first_index = rr->info[1].first_index; + ctx->reconciliator_index = j; + ctx->workers[j].recon_info = tmp; + if (nsr_recon_in_use(ctx, j, _gf_true) == -1) { + *status = -1; + return 0; + } + GF_ASSERT(ctx->reconciliator_index != 0); + break; + } + tmp = RD_CALLOC (1, + sizeof (nsr_reconciliator_info_t), + gf_mt_recon_reconciliator_info_t); + if (!tmp) { + *status = -1; + return 0; + } + // info[0] has all info for this node + tmp->last_term = rr->info[0].last_term; + tmp->commited_ops = rr->info[0].commited_ops; + tmp->last_index = rr->info[0].last_index; + tmp->first_index = rr->info[0].first_index; + ctx->workers[0].recon_info = tmp; + if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) { + *status = -1; + return 0; + } + } + + ctx->term = rw->term; + + *status = 0; + return rr->role; +} + + +/* + * This function gets called if this process is chosen to sync itself with + * the reconciliator. + * + * Input arguments + * ctx - driver context. + * my_info - local changelog info that has all the local records for indices that require work + * his_info - reconciliator's info that has all the golden copies + * invalidate - if set to true, then do not consult local records + */ + +static void +compute_resolution_work(nsr_recon_driver_ctx_t *ctx, + nsr_reconciliator_info_t *my_info, + nsr_reconciliator_info_t *his_info, + gf_boolean_t invalidate) +{ + uint32_t i=0; + uint32_t num = (my_info->last_index - my_info->first_index + 1); + xlator_t *this = ctx->this; + + if (invalidate) { + if (my_info->records) { + GF_FREE(my_info->records); + } + my_info->records = RD_CALLOC(num, + sizeof(nsr_reconciliation_record_t), + gf_mt_recon_record_t); + } + + for (i=0; i < num; i++) { + nsr_log_type_t orig, new; + nsr_recon_work_type_t tw = NSR_RECON_WORK_NONE; + orig = my_info->records[i].rec.type; + if (invalidate) + orig = NSR_LOG_HOLE; + new = his_info->records[i].rec.type; + // TBD - we can never have PSUEDO_HOLE in reconciliator's info + // We should have taken care of that during reconciliation. + // Put an assert to validate that. + if (new != orig) { + if ((orig != NSR_LOG_FILL) && (new == NSR_LOG_FILL)) + tw = NSR_RECON_WORK_HOLE_TO_FILL; + else if ((orig != NSR_LOG_HOLE) && (new == NSR_LOG_HOLE)) + tw = NSR_RECON_WORK_UNDO_FILL; + } + // copy the records anyway + my_info->records[i].work.type = tw; + my_info->records[i].work.source = ctx->reconciliator_index; + memcpy(&(my_info->records[i].rec), + &(his_info->records[i].rec), + sizeof(nsr_recon_record_details_t)); + } + return; +} + + +// Create an glfs object +static struct glfs_object * +create_obj(nsr_per_node_worker_t *ctx, char *gfid_str) +{ + struct glfs_object *obj = NULL; + uuid_t gfid; + + uuid_parse(gfid_str, gfid); + + obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL); + if (obj == NULL) { + GF_ASSERT(obj != NULL); + nsr_worker_log(this->name, GF_LOG_ERROR, + "creating of handle failed\n"); + return NULL; + } + return obj; +} + +/* + * Function to apply the actual record onto the local brick. + * prior to this we should have read all the data from the + * brick that has the data. + * + * Input parameters: + * ctx - per node worker context that has the fs for communicating to brick + * ri - Reconciliation record that needs fixup + * dict - So that NSR server translator on brick applis fixup only on this brick + * and the changelog translator consumes term and index. + */ + +static gf_boolean_t +apply_record(nsr_per_node_worker_t *ctx, + nsr_reconciliation_record_t *ri, + dict_t * dict) +{ + struct glfs_fd *fd = NULL; + struct glfs_object *obj = NULL; + struct glfs_object *to_obj = NULL; + gf_boolean_t retval = _gf_false; + + if (ri->rec.op == GF_FOP_WRITE) { + + nsr_worker_log(this->name, GF_LOG_INFO, + "DOing write for file %s @offset %d for len %d\n", + ri->rec.gfid, ri->rec.offset, ri->rec.len); + + // The file has got deleted on the source. Hence just ignore + // this. + // TBD - get a way to just stuff the log entry without writing + // the data so that changelogs remain identical. + if (ri->work.data == NULL) { + return _gf_true; + } + + if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL) + goto err; + + fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); + if (fd == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "open for file %s failed\n", + ri->rec.gfid); + goto err; + } + if (glfs_lseek_with_xdata(fd, ri->rec.offset, SEEK_SET, dict) != ri->rec.offset) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "lseek for file %s failed at offset %d\n", + ri->rec.gfid, ri->rec.offset); + goto err; + } + if (glfs_write_with_xdata(fd, ri->work.data, ri->rec.len, 0, dict) != ri->rec.len) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "write for file %s failed for bytes %d\n", + ri->rec.gfid, ri->rec.len); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finished DOing write for gfid %s @offset %d for len %d\n", + ri->rec.gfid, ri->rec.offset, ri->rec.len); + + } else if (ri->rec.op == GF_FOP_FTRUNCATE) { + + nsr_worker_log(this->name, GF_LOG_INFO, + "DOing truncate for file %s @offset %d \n", + ri->rec.gfid, ri->rec.offset); + + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } + + fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); + if (fd == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "open for file %s failed\n", + ri->rec.gfid); + goto err; + } + if (glfs_ftruncate_with_xdata(fd, ri->rec.offset, dict) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "trunctae for file %s failed @offset %d\n", + ri->rec.gfid,ri->rec.offset ); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finished DOing truncate for gfid %s @offset %d \n", + ri->rec.gfid, ri->rec.offset); + + } else if ((ri->rec.op == GF_FOP_FREMOVEXATTR) || + (ri->rec.op == GF_FOP_REMOVEXATTR) || + (ri->rec.op == GF_FOP_SETXATTR) || + (ri->rec.op == GF_FOP_FSETXATTR)) { + + uint32_t k_s = 0, v_s = 0; + char *t_b= NULL; + uint32_t num = 0; + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing set extended attr for file %s \n", + ri->rec.gfid); + + // The file has got deleted on the source. Hence just ignore + // this. TBD - get a way to just stuff the log entry without + // writing the data so that changelogs remain identical. + if (ri->work.data == NULL) { + return _gf_true; + } + + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } + + if (obj->inode->ia_type == IA_IFDIR) + fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict); + else + fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); + if (fd == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "open for file %s failed\n", + ri->rec.gfid); + goto err; + } + + if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) { + if (t_b) free(t_b); + nsr_worker_log(this->name, GF_LOG_ERROR, + "list of xattr of %s failed\n", ri->rec.gfid); + goto err; + } + + if (delete_xattr(fd, dict, t_b, num) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "deleting xattrs failed\n"); + goto err; + } + + // Set one special dict flag to indicate the opcode so that + // the opcode gets set to this + if (dict_set_int32(dict,"recon-xattr-opcode",ri->rec.op)) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "setting opcode to %d failed\n",ri->rec.op); + goto err; + } + + if (fill_xattr(fd, dict, ri->work.data, ri->work.num) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "filling xattrs failed\n"); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finsihed Doing set extended attr for %s \n", + ri->rec.gfid); + + } else if (ri->rec.op == GF_FOP_CREATE) { + + uuid_t gfid; + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing create for file %s \n", + ri->rec.gfid); + + // TBD - add mode and flags later + uuid_parse(ri->rec.gfid, gfid); + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + + nsr_worker_log (this->name, GF_LOG_INFO, + "creating with mode 0%o", ri->rec.mode); + if (glfs_h_creat_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, ri->rec.mode, NULL, gfid, dict) == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "Failure for Doing create for file %s\n", + ri->rec.entry); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finished Doing create for file %s \n", + ri->rec.entry); + + } else if (ri->rec.op == GF_FOP_MKNOD) { + + uuid_t gfid; + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing mknod for file %s \n", + ri->rec.entry); + + // TBD - add mode and flags later + uuid_parse(ri->rec.gfid, gfid); + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + + if (glfs_h_mknod_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, 0777, NULL, gfid, dict) == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "Failure for Doing mknod for file %s\n", + ri->rec.entry); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finished Doing mknod for file %s \n", + ri->rec.entry); + + } else if (ri->rec.op == GF_FOP_MKDIR) { + + uuid_t gfid; + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing mkdir for dir %s \n", + ri->rec.gfid); + + // TBD - add mode and flags later + uuid_parse(ri->rec.gfid, gfid); + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + + if (glfs_h_mkdir_with_xdata(ctx->fs, obj, ri->rec.entry, 0777, NULL, gfid, dict) != 0) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "Failure for Doing mkdir for file %s\n", + ri->rec.entry); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finished Doing mkdir for file %s \n", + ri->rec.entry); + + } else if ((ri->rec.op == GF_FOP_RMDIR) || (ri->rec.op == GF_FOP_UNLINK)) { + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing rmdir/ublink for dir %s \n", + ri->rec.entry); + + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + if (glfs_h_unlink_with_xdata(ctx->fs, obj, ri->rec.entry, dict) != 0) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "Failure for Doing rmdir/unlink for file %s\n", + ri->rec.entry); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finished Doing rmdir/unlink for file %s \n", + ri->rec.entry); + + } else if (ri->rec.op == GF_FOP_SYMLINK) { + + uuid_t gfid; + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing symlink for file %s to file %s \n", + ri->rec.entry, ri->rec.link_path); + + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + uuid_parse(ri->rec.gfid, gfid); + + if (glfs_h_symlink_with_xdata(ctx->fs, obj, ri->rec.entry, ri->rec.link_path, NULL, gfid, dict) == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "Failed to Doing symlink for file %s to file %s \n", + ri->rec.entry, ri->rec.link_path); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finished Doing symlink for file %s to file %s \n", + ri->rec.entry, ri->rec.link_path); + + } else if (ri->rec.op == GF_FOP_LINK) { + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing hard link for file %s to file %s \n", + ri->rec.entry, ri->rec.gfid); + + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } + + if (glfs_h_link_with_xdata(ctx->fs, to_obj, obj, ri->rec.entry, dict) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "Failed to Doing hard link for file %s to file %s \n", + ri->rec.entry, ri->rec.gfid); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finsihed doing hard link for file %s to file %s \n", + ri->rec.entry, ri->rec.gfid); + + } else if (ri->rec.op == GF_FOP_RENAME) { + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing rename for file %s to file %s \n", + ri->rec.entry, ri->rec.newloc); + + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } + + if (glfs_h_rename_with_xdata(ctx->fs, obj, ri->rec.entry, to_obj, ri->rec.newloc, dict) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "Failed to Doing rename for file %s to file %s \n", + ri->rec.entry, ri->rec.newloc); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finsihed doing renam for file %s to file %s \n", + ri->rec.entry, ri->rec.newloc); + + + } else if ((ri->rec.op == GF_FOP_SETATTR) || (ri->rec.op == GF_FOP_FSETATTR)) { + + struct iatt iatt = {0, }; + int valid = 0; + int ret = -1; + + // TBD - do the actual settings once we do that + // right now we just set the mode so that changelog gets filled + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing attr for file %s \n", + ri->rec.gfid); + + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } + + if (obj->inode->ia_type == IA_IFDIR) + fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict); + else + fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); + if (fd == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "open for file %s failed\n", + ri->rec.gfid); + goto err; + } + + iatt.ia_prot = ia_prot_from_st_mode(777); + valid = GF_SET_ATTR_MODE; + + + // Set one special dict flag to indicate the opcode so that + // the opcode gets set to this + if (dict_set_int32(dict,"recon-attr-opcode",ri->rec.op)) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "setting opcode to %d failed\n",ri->rec.op); + goto err; + } + + ret = glfs_fsetattr_with_xdata(fd, &iatt, valid, dict); + if (ret == -1) { + nsr_worker_log(this->name, GF_LOG_INFO, + "failed Doing attr for file %s \n", + ri->rec.gfid); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing attr for file %s \n", + ri->rec.gfid); + + } + + retval = _gf_true; + +err: + if (fd) { + /* + * It's not clear that we should be passing the same dict to + * glfs_close that was passed to us for glfs_open, but that's + * the prior behavior so let's preserve it for now. + */ + if (glfs_close_with_xdata(fd, dict) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "close failed\n"); + } + } + if (obj) { + /* + * AFAICT fd operations do not borrow this reference, so we + * still need to drop it ourselves. + */ + glfs_h_close(obj); + } + if (to_obj) { + /* + * AFAICT fd operations do not borrow this reference, so we + * still need to drop it ourselves. + */ + glfs_h_close(to_obj); + } + return retval; +} + +//return back opcodes that requires reading from source +static gf_boolean_t +recon_check_changelog(nsr_recon_record_details_t *rd) +{ + return((rd->op == GF_FOP_WRITE) || + (rd->op == GF_FOP_FSETATTR) || + (rd-> op == GF_FOP_SETATTR) || + (rd->op == GF_FOP_FREMOVEXATTR) || + (rd->op == GF_FOP_SETXATTR) || + (rd->op == GF_FOP_FSETXATTR) || + (rd->op == GF_FOP_SYMLINK)); + +} + +// TBD +static gf_boolean_t +recon_compute_undo(nsr_recon_record_details_t *rd) +{ + return(_gf_false); +} + + +/* + * Function that talks to the brick for data tranfer. + * + * Input arguments: + * ctx - worker context + * work - pointer to work object + */ +static void +data_worker_func(nsr_per_node_worker_t *ctx, + nsr_recon_work_t *work) +{ + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + xlator_t *this = dr->this; + nsr_reconciliation_record_t *ri = NULL; + nsr_recon_record_details_t *rd = NULL; + int wip = 0; + dict_t * dict = NULL; + struct glfs_fd *fd = NULL; + struct glfs_object *obj = NULL; + uuid_t gfid; + uint32_t k_s = 0, v_s = 0; + char *t_b= NULL; + uint32_t num=0; + + switch (work->req_id){ + case NSR_WORK_ID_INI: + nsr_worker_log(this->name, GF_LOG_INFO, + "started data ini \n"); + + if (nsr_recon_start_work(ctx, _gf_false) != 0) { + ctx->result = -1; + return; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "finished data ini \n"); + break; + case NSR_WORK_ID_FINI: + nsr_worker_log(this->name, GF_LOG_INFO, + "started data fini \n"); + + if (nsr_recon_end_work(ctx, _gf_false) != 0) { + ctx->result = -1; + return; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "finished data fini \n"); + break; + case NSR_WORK_ID_SINGLE_RECONCILIATION_READ: + // first_index always starts with 1 but records starts at 0. + wip = work->index - (dr->workers[0].recon_info->first_index); + ri = &(dr->workers[0].recon_info->records[wip]); + rd = &(ri->rec); + + dict = dict_new (); + if (!dict) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "failed allocating for dictionary\n"); + break; + } + if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + + switch (rd->op) { + case GF_FOP_WRITE: + + // record already copied. + // copy data to this node's info. + + uuid_parse(ri->rec.gfid, gfid); + + nsr_worker_log(this->name, GF_LOG_INFO, + "started recon read for file %s at offset %d at len %d\n", + ri->rec.gfid, rd->offset, rd->len); + + obj = glfs_h_create_from_handle (ctx->fs, gfid, + GFAPI_HANDLE_LENGTH, + NULL); + if (obj == NULL) { + GF_ASSERT(obj != NULL); + nsr_worker_log(this->name, GF_LOG_ERROR, + "creating of handle failed\n"); + break; + } + + // The file has probably got deleted. + fd = glfs_h_open_with_xdata (ctx->fs, obj, O_RDONLY, + dict); + if (fd == NULL) { + GF_ASSERT(fd != NULL); + nsr_worker_log(this->name, GF_LOG_ERROR, + "opening of file failed\n"); + break; + } + + if (glfs_lseek_with_xdata (fd, rd->offset, SEEK_SET, + dict) != rd->offset) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "lseek of file failed to offset %d\n", + rd->offset); + break; + } + + ri->work.data = RD_CALLOC (rd->len , sizeof(char), + gf_mt_recon_work_data_t); + if (glfs_read_with_xdata (fd, ri->work.data, rd->len, + 0, dict) != rd->len) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "read of file failed to offset %d for bytes %d\n", + rd->offset, rd->len); + break; + } + break; + + case GF_FOP_FTRUNCATE: + case GF_FOP_SYMLINK: + case GF_FOP_RMDIR: + case GF_FOP_UNLINK: + case GF_FOP_MKNOD: + case GF_FOP_CREATE: + case GF_FOP_LINK: + case GF_FOP_MKDIR: + case GF_FOP_RENAME: + nsr_worker_log (this->name, GF_LOG_ERROR, + "unimplemented fop %u\n", rd->op); + break; + + case GF_FOP_FREMOVEXATTR: + case GF_FOP_REMOVEXATTR: + case GF_FOP_SETXATTR: + case GF_FOP_FSETXATTR: + + uuid_parse(ri->rec.gfid, gfid); + + + // This is for all the set attribute/extended + // attributes commands. Get all the attributes from + // the source and fill it in the buffer as a NULL + // seperated key and value which are in turn seperated + // by NULL. + + nsr_worker_log(this->name, GF_LOG_INFO, + "doing getattr for gfid %s \n", + ri->rec.gfid); + + obj = glfs_h_create_from_handle (ctx->fs, gfid, + GFAPI_HANDLE_LENGTH, + NULL); + if (obj == NULL) { + GF_ASSERT(fd != NULL); + nsr_worker_log(this->name, GF_LOG_ERROR, + "creating of handle failed\n"); + break; + } + + if (obj->inode->ia_type == IA_IFDIR) + fd = glfs_h_opendir_with_xdata (ctx->fs, obj, + dict); + else + fd = glfs_h_open_with_xdata (ctx->fs, obj, + O_RDONLY, dict); + + if (fd == NULL) { + GF_ASSERT(fd != NULL); + nsr_worker_log(this->name, GF_LOG_ERROR, + "opening of file failed\n"); + break; + } + + if (get_xattr_total_size (fd, &t_b, &k_s, &v_s, &num, + dict) == -1) { + if (t_b) free(t_b); + nsr_worker_log(this->name, GF_LOG_ERROR, + "list of xattr of gfid %s failed\n", + rd->gfid); + break; + } + ri->work.data = RD_CALLOC ((k_s + v_s) , sizeof(char), + gf_mt_recon_work_data_t); + if (get_xattr(fd, t_b, ri->work.data, v_s, num, dict) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "get xattr of gfid %s failed\n", rd->gfid); + break; + } + ri->work.num = num; + nsr_worker_log(this->name, GF_LOG_INFO, + "finished getattr for gfid %s \n", + ri->rec.gfid); + free(t_b); + break; + + case GF_FOP_FSETATTR: + case GF_FOP_SETATTR: + //TBD - to get the actual attrbutes and fill + // mode, uid, gid, size, atime, mtime + nsr_worker_log (this->name, GF_LOG_ERROR, + "unimplemented fop %u\n", rd->op); + break; + default: + nsr_worker_log (this->name, GF_LOG_ERROR, + "unrecognized fop %u\n", rd->op); + + } + nsr_worker_log(this->name, GF_LOG_INFO, + "finished recon read for gfid %s at offset %d for %d bytes \n", + rd->gfid, rd->offset, rd->len); + break; + + case NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT: + // first_index always starts with 1 but records starts at 0. + wip = work->index - (dr->workers[0].recon_info->first_index); + ri = &(dr->workers[0].recon_info->records[wip]); + rd = &(ri->rec); + + nsr_worker_log(this->name, GF_LOG_INFO, + "got recon commit for index %d that has gfid %s \n", + wip, rd->gfid); + dict = dict_new (); + if (!dict) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "failed allocating for dictionary\n"); + break; + } + if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + if (apply_record(ctx, ri, dict) == _gf_false) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "apply_record fails\n"); + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "finished recon commit for gfid %s \n", + rd->gfid); + break; + + case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH: + dict = dict_new (); + if (!dict) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "failed allocating for dictionary\n"); + break; + } + if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + + // Increment work index with the start index + wip = work->index - (dr->workers[0].recon_info->first_index); + ri = &(dr->workers[0].recon_info->records[wip]); + rd = &(ri->rec); + + glfs_fsync_with_xdata(fd, dict); + break; + + default: + nsr_worker_log (this->name, GF_LOG_ERROR, + "unrecognized request id %u\n", work->req_id); + } + + if (fd) { + glfs_close_with_xdata(fd, dict); + } + if (obj) { + glfs_h_close(obj); + } + if (dict) { + dict_unref(dict); + } +} + +// thread for doing data work +static void * +data_worker_main(nsr_per_node_worker_t *ctx) +{ + nsr_worker_log(this->name, GF_LOG_INFO, + "starting data worker func\n"); + init_worker(ctx, 0); + + while(1) { + nsr_recon_work_t *work = NULL; + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + + nsr_worker_log(this->name, GF_LOG_INFO, + "waiting for work\n"); + + pthread_mutex_lock(&(ctx->mutex)); + while (list_empty(&(ctx->head.list))) { + pthread_cond_wait(&(ctx->cv), &(ctx->mutex)); + } + pthread_mutex_unlock(&(ctx->mutex)); + list_for_each_entry(work, &(ctx->head.list), list) { + nsr_worker_log(this->name, GF_LOG_INFO, + "got work with id %d\n",work->req_id); + work->in_use = _gf_false; + data_worker_func(ctx, work); + atomic_dec(&(dr->outstanding)); + break; + } + nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n"); + list_del_init (&work->list); + GF_FREE(work); + nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n"); + } + + return NULL; +} + + +//make recon work +static void +recon_make_work(nsr_recon_driver_ctx_t *ctx, + nsr_recon_work_t **work, + nsr_recon_work_req_id_t req_id, + int32_t i) +{ + xlator_t *this = ctx->this; + + // TBD - change this to get from a static pool + // This cannot fail + (*work) = RD_CALLOC (1, sizeof (nsr_recon_work_t), gf_mt_recon_work_t); + (*work)->req_id = req_id; + (*work)->index = i; + (*work)->in_use = _gf_true; + INIT_LIST_HEAD(&((*work)->list)); + return; +} + +// Schedule a work object to a worker thread. +static void +recon_queue_to_worker(nsr_recon_driver_ctx_t *ctx, + nsr_recon_work_t *work, + unsigned int id, + nsr_recon_queue_type_t type) +{ + nsr_per_node_worker_t *worker; + if (type == NSR_RECON_QUEUE_TO_CONTROL) { + worker = ctx->workers[id].control_worker; + nsr_driver_log(this->name, GF_LOG_INFO, + "queueing work to control index %d\n",id); + } else { + worker= ctx->workers[id].data_worker; + nsr_driver_log(this->name, GF_LOG_INFO, + "queueing work to data index %d\n",id); + } + pthread_mutex_lock(&worker->mutex); + list_add_tail(&work->list, &worker->head.list); + pthread_cond_signal(&worker->cv); + pthread_mutex_unlock(&worker->mutex); + return; +} + +typedef void * (*F_t) (void *); + +// In case mode is set to NSR_USE_THREADS, create worker threads. +static gf_boolean_t +create_worker_threads(nsr_recon_private_t *priv, + nsr_recon_driver_ctx_t *ctx, + nsr_per_node_worker_t *w, + gf_boolean_t control_or_data, + F_t f, + uint32_t num) +{ + uint32_t i; + nsr_per_node_worker_t *worker = w; + xlator_t *this = ctx->this; + + for (i=0; i < num; i++) { + worker->id = RD_CALLOC(1, 10, gf_mt_recon_id_t); + if (!worker->id) { + nsr_driver_log (priv->this->name, GF_LOG_ERROR, "memory allocation error \n"); + return _gf_false; + } + sprintf(worker->id,"recon_%d", i); + worker->driver_ctx = ctx ; + + if (ctx->mode == NSR_USE_THREADS) { + if (pthread_create(&worker->thread_id, NULL, f, worker)) { + nsr_driver_log (ctx->this->name, GF_LOG_ERROR, "control work thread creation error \n"); + return _gf_false; + } + } + worker->index = i; + worker++; + } + return _gf_true; +} + +/* + * In case of thread, send the work item; else call the function directly. + * + * Input arguments: + * bm - bitmap containing indices of nodes we want to send work + * num - number of such indices + * ctx - driver context from where we derive per worker context + * id - request ID + * q - control or data + * misc - used to overload such as index. + */ +static void +send_and_wait(int32_t *result, + int32_t *op_errno, + int32_t bm, + uint32_t num, + nsr_recon_driver_ctx_t *ctx, + nsr_recon_work_req_id_t id, + nsr_recon_queue_type_t q, + int32_t misc) +{ + uint32_t i = 0; + nsr_recon_work_t *work; + +#define CONTROL_WORKER(i) ctx->workers[i].control_worker +#define DATA_WORKER(i) ctx->workers[i].data_worker +#define WORKER(i) ((q == NSR_RECON_QUEUE_TO_CONTROL) ? (CONTROL_WORKER(i)) : (DATA_WORKER(i))) + + *result = *op_errno = 0; + + for (i=0; i < num; i++) { + if ((bm & (1 << i)) && ctx->workers[i].in_use) { + WORKER(i)->result = 0; + WORKER(i)->op_errno = 0; + } + } + if (ctx->mode == NSR_SEQ) { + for (i=0; i < num; i++) { + if ((bm & (1 << i)) && ctx->workers[i].in_use) { + recon_make_work(ctx,&work, id, misc); + if (q == NSR_RECON_QUEUE_TO_CONTROL) { + if (i == 0) + control_worker_func_0(ctx->workers[0].control_worker, work); + else + control_worker_func(ctx->workers[i].control_worker, work); + } else { + data_worker_func(ctx->workers[i].data_worker, work); + } + GF_FREE(work); + } + } + goto out; + } + + for (i=0; i < num; i++) { + if ((bm & (1 << i)) && ctx->workers[i].in_use) { + recon_make_work(ctx,&work, id, misc); + atomic_inc(&(ctx->outstanding)); + recon_queue_to_worker(ctx, work, i, q); + } + } + + nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: waiting\n"); + while (ctx->outstanding) { + pthread_yield(); + } +out: + for (i=0; i < num; i++) { + if ((bm & (1 << i)) && ctx->workers[i].in_use) { + if (WORKER(i)->result == -1) { + *result = -1; + } + } + } + if (*result == -1) { + for (i=0; i < num; i++) { + if ((bm & (1 << i)) && ctx->workers[i].in_use) { + if (WORKER(i)->op_errno == EAGAIN) { + *op_errno = EAGAIN; + break; + } else { + *op_errno = EIO; + } + } + } + } + + nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned with result: %d errno:%d\n", *result, *op_errno); + return; +} + +// send INI or FINI +static int32_t +nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, + uint32_t i, + gf_boolean_t in_use) +{ + uint32_t bm = 1 << i; + gf_boolean_t send = _gf_false; + int32_t status =0, op_errno = 0; + + send = (ctx->workers[i].in_use != in_use); + ctx->workers[i].in_use = in_use; + + if (!send) { + return 0; + } + nsr_driver_log (this->name, GF_LOG_INFO, + "sending %s to index %d\n",in_use?"INI":"FINI",i); + + send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx, + (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto err; + + send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx, + (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, + NSR_RECON_QUEUE_TO_DATA, -1); + if (status == -1) + goto err; + + /* + * We really need better error recovery. To activate a worker, we + * allocate memory and send two messages. If any of those actions + * fail, we should undo the others. It would probably be good to + * collapse the two messages into one, because it's pretty trivial to + * allocate a temporary structure and either link it in or free it + * depending on the result here. + */ + + if (in_use == _gf_false) { + GF_FREE(ctx->workers[i].recon_info); + } + + return 0; + +err: + GF_FREE(ctx->workers[i].recon_info); + ctx->workers[i].recon_info = NULL; + return -1; +} + +gf_boolean_t +nsr_recon_driver_reconciliator (nsr_recon_private_t *priv) +{ + uint32_t replica_group_size = priv->replica_group_size; + uint32_t i; + nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context; + int32_t bm; + int32_t status = 0; + int32_t op_errno = 0; + gf_boolean_t do_recon = _gf_false; + uint32_t start_index = ctx->workers[0].recon_info->first_index; + uint32_t end_index = ctx->workers[0].recon_info->last_index; + uint32_t num = ((start_index == 0) && (end_index == 0)) ? 0 : (end_index - start_index + 1); + + nsr_driver_log (this->name, GF_LOG_INFO, + "starting reconciliation work as reconciliator \n"); + + // nothing to be done? signal back to the recon translator that this + // phase done. + bm = 1; + for (i=1; i < replica_group_size; i++) { + if (ctx->workers[i].in_use && + (ctx->workers[0].recon_info->last_term == ctx->workers[i].recon_info->last_term)) { + ctx->workers[i].recon_info->last_index = end_index; + ctx->workers[i].recon_info->first_index = start_index; + bm |= (1 << i); + do_recon = _gf_true; + } + } + + if (!do_recon || !num) { + nsr_driver_log (this->name, GF_LOG_INFO, + "nothing needs to be done as resolutor \n"); + return _gf_true; + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "getting reconciliation window for term %d from %dto %d \n", + ctx->workers[0].recon_info->last_term, + start_index, end_index); + // We have set the bm in the above for loop where we go thru all nodes + // including this node that have seen the last term. + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_GET_RECONCILATION_WINDOW, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished getting reconciliation window for term %d from %dto %d \n", + ctx->workers[0].recon_info->last_term, + start_index, end_index); + + + // from the changelogs, calculate the entries that need action and the + // source for each of these entries + compute_reconciliation_work(ctx); + + // for each of the entries that need fixup, issue IO + for (i=start_index; i < (start_index + num); i++) { + nsr_reconciliator_info_t *my_recon_info = + ctx->workers[0].recon_info; + nsr_reconciliation_record_t *record = + &(my_recon_info->records[i - start_index]); + + record->work.term = ctx->workers[0].recon_info->last_term; + record->work.index = i; + + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing index %d\n",i); + if ((record->work.type == NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE) || + (record->work.type == NSR_RECON_WORK_HOLE_TO_FILL)) { + // 1st case (RECON_WORK_HOLE_TO_PSEUDO_HOLE): If there + // are only pseudo_holes in others, it is best effort. + // Just pick from the first node that has it and + // proceed. + // 2nd case (RECON_WORK_HOLE_TO_FILL): this node has + // either a HOLE or PSUEDO_HOLE and some one else has a + // FILL(source). analyse the changelog to check if data + // needs to be read or if the log has all the data + // required + + if (recon_check_changelog(&record->rec)) { + bm = (1 << record->work.source); + nsr_driver_log (this->name, GF_LOG_INFO, + "reading data from source %d\n",record->work.source); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_READ, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "got data from source %d\n",record->work.source); + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing local data as part of reconciliation\n"); + + bm = 1; + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished fixing local data as part of reconciliation\n"); + + } else if (record->work.type == NSR_RECON_WORK_COMPARE_PSEUDO_HOLE) { + // this node has a pseudo_hole and some others have just + // that too. Just convert this to FILL. let others + // blindly pick it from here. + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing this record as a fill\n"); + bm = 1; + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished fixing this record as a fill\n"); + } + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished reconciliation work as reconciliator \n"); + + // tbd - mark this term golden in the reconciliator + return _gf_true; +} + +gf_boolean_t +nsr_recon_driver_resolutor (nsr_recon_private_t *priv) +{ + uint32_t replica_group_size = priv->replica_group_size; + uint32_t i; + nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context; + int32_t bm; + int32_t status = 0; + int32_t op_errno = 0; + // This node's last term is filled when it gets a message from the + // leader to act as a reconciliator. + uint32_t recon_index = ctx->reconciliator_index; + nsr_reconciliator_info_t *my_info = + ctx->workers[0].recon_info; + nsr_reconciliator_info_t *his_info = + ctx->workers[recon_index].recon_info; + uint32_t my_last_term = my_info->last_term; + uint32_t to_do_term = his_info->last_term; + uint32_t my_start_index = 1, my_end_index = 1; + uint32_t his_start_index = 1, his_end_index = 1; + uint32_t num = 0; + gf_boolean_t fl = _gf_true; + + nsr_driver_log (this->name, GF_LOG_INFO, + "starting resolutor work with reconciliator as %d from term %d to term %d \n", + recon_index, my_last_term, to_do_term); + + do { + + if (!fl) { + (his_info->last_term)++; + (my_info->last_term)++; + } else { + his_info->last_term = my_last_term; + } + + nsr_driver_log (this->name, GF_LOG_INFO, "resolving term %d \n", my_info->last_term); + + // Get reconciliator's term information for that term + nsr_driver_log (this->name, GF_LOG_INFO, + "getting info from reconciliator for term %d \n", my_info->last_term); + bm = (1 << recon_index); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_GET_GIVEN_TERM_INFO, + NSR_RECON_QUEUE_TO_CONTROL, his_info->last_term); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished getting info from reconciliator for term %d \n", my_info->last_term); + + + // empty term + if (!his_info->commited_ops) { + nsr_driver_log (this->name, GF_LOG_INFO, + "reconciliator for term %d is empty. moving to next term. \n", my_info->last_term); + // TBD - mark the term golden + fl = _gf_false; + continue; + } + + // calculate the resolution window boundary. for the last term + // this node saw, we compare the resolution window of this and + // reconciliator. for the rest of the nodes, we just accept the + // reconciliator info. + if (fl) { + my_start_index = my_info->first_index; + my_end_index = my_info->last_index; + his_start_index = his_info->first_index; + his_end_index = his_info->last_index; + my_info->first_index = (my_start_index < his_start_index) ? my_start_index : his_start_index; + my_info->last_index = (my_end_index > his_end_index) ? my_end_index : his_end_index; + } else { + my_info->first_index = his_info->first_index; + my_info->last_index = his_info->last_index; + my_info->commited_ops = his_info->commited_ops; + } + if (my_info->first_index == 0) + my_info->first_index = 1; + num = (my_info->last_index - my_info->first_index) + 1; + + + // Get the logs from the reconciliator (and this node for this + // term) + if (fl) + bm = ((1 << recon_index) | 1); + else + bm = (1 << recon_index); + + nsr_driver_log (this->name, GF_LOG_INFO, + "getting reconciliation window for term %d from %d to %d \n", + my_info->last_term, + my_info->first_index, my_info->last_index); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_GET_RECONCILATION_WINDOW, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished getting reconciliation window for term %d from %d to %d \n", + my_info->last_term, + my_info->first_index, my_info->last_index); + + // from the changelogs, calculate the entries that need action + compute_resolution_work(ctx, my_info, his_info, !fl); + + + // for each of the entries that need fixup, issue IO + for (i=my_info->first_index; i < (my_info->first_index + num); i++) { + nsr_reconciliation_record_t *record = + &(my_info->records[i - my_info->first_index]); + + record->work.term = my_info->last_term; + record->work.index = i; + + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing index %d\n",i); + if ((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) || + (record->work.type == NSR_RECON_WORK_UNDO_FILL)) { + if (((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) && + recon_check_changelog(&record->rec)) || + ((record->work.type == NSR_RECON_WORK_UNDO_FILL) && + recon_compute_undo(&record->rec))) { + nsr_driver_log (this->name, GF_LOG_INFO, + "reading data from source %d\n",recon_index); + bm = (1 << recon_index); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_READ, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished reading data from source %d\n",recon_index); + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing local data as part of resolutor\n"); + + bm = 1; + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished fixing local data as part of resolutor\n"); + } + } + fl = _gf_false; + + // tbd - mark this term golden in the reconciliator + } while (my_last_term++ != to_do_term); + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished resolutor work \n"); + return _gf_true; +} + +gf_boolean_t +nsr_recon_driver_leader (nsr_recon_private_t *priv) +{ + uint32_t replica_group_size = priv->replica_group_size; + uint32_t i; + nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context; + int32_t bm; + int32_t status = 0; + int32_t op_errno = 0; + int32_t chosen = -1; + int32_t last_term = -1, last_ops = -1; + + nsr_driver_log (this->name, GF_LOG_INFO, + "getting last term info from all members of this group\n"); + // Get last term info from all members for this group + send_and_wait(&status, &op_errno, -1, + replica_group_size, + ctx, + NSR_WORK_ID_GET_LAST_TERM_INFO, + NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term); + if (status == -1) + return _gf_false; + + + // compare all the info received and choose the reconciliator First + // choose all with latest term + for (i=0; i < replica_group_size; i++) { + if (ctx->workers[i].in_use) { + if (ctx->workers[i].recon_info->last_term > last_term) { + last_term = ctx->workers[i].recon_info->last_term; + } + } + } + // First choose all with latest term and highest ops + for (i=0; i < replica_group_size; i++) { + if ((ctx->workers[i].in_use) && (last_term == ctx->workers[i].recon_info->last_term)) { + if (ctx->workers[i].recon_info->commited_ops > last_ops) { + last_ops = ctx->workers[i].recon_info->commited_ops; + } + } + } + // choose the first among the lot + for (i=0; i < replica_group_size; i++) { + if ((ctx->workers[i].in_use) && + (last_term == ctx->workers[i].recon_info->last_term) && + (last_ops == ctx->workers[i].recon_info->commited_ops)) { + chosen = i; + break; + } + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "reconciliator chosen is %d\n", chosen); + ctx->reconciliator_index = chosen; + GF_ASSERT(chosen != -1); + if (chosen == -1) { + nsr_driver_log (this->name, GF_LOG_INFO, + "no reconciliatior chosen\n"); + return _gf_false; + } + + // send the message to reconciliator to do reconciliation with list of + // nodes that are part of this quorum + if (chosen != 0) { + nsr_driver_log (this->name, GF_LOG_INFO, + "sending reconciliation work to %d\n", chosen); + bm = 1 << ctx->reconciliator_index; + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_RECONCILIATOR_DO_WORK, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished reconciliation work to %d\n", chosen); + } else { + nsr_driver_log (this->name, GF_LOG_INFO, + "local node is reconciliator. before set jmp\n"); + nsr_recon_driver_reconciliator(priv); + } + + // send message to all other nodes to sync up with the reconciliator + // including itself if required + // requires optimisation - TBD + if (chosen != 0) { + nsr_driver_log (this->name, GF_LOG_INFO, + "local node resolution needs to be done. before set jmp\n"); + nsr_recon_driver_resolutor(priv); + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "sending resolution work to all nodes except this node and reconciliator\n"); + bm = ~((1 << ctx->reconciliator_index) || 1); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_RESOLUTION_DO_WORK, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + return _gf_false; + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished reconciliation work as leader \n"); + return _gf_true; +} + +// main recon driver thread +void * +nsr_reconciliation_driver(void *arg) +{ + nsr_recon_private_t *priv = (nsr_recon_private_t *) arg; + uint32_t replica_group_size = priv->replica_group_size; + uint32_t i; + nsr_per_node_worker_t *control_s, *data_s; + nsr_recon_driver_ctx_t **driver_ctx, *ctx; + int32_t bm; + xlator_t *this = priv->this; + char *con_name, *data_name; + int32_t status = 0; + int32_t op_errno = 0; + + driver_ctx = &priv->driver_thread_context; + (*driver_ctx) = GF_CALLOC (1, + sizeof (nsr_recon_driver_ctx_t), + gf_mt_recon_driver_ctx_t); + if (!driver_ctx) { + gf_log (this->name, GF_LOG_ERROR, "memory allocation error \n"); + return NULL; + } + ctx = *driver_ctx; + ctx->this = priv->this; + ctx->replica_group_size = replica_group_size; + + ctx->fp = recon_create_log (priv->replica_group_members[0], "nsr-driver-log"); + if (!ctx->fp) + return NULL; + + if ((pthread_mutex_init(&(ctx->mutex), NULL)) || + (pthread_cond_init(&(ctx->cv), NULL))){ + nsr_driver_log (this->name, GF_LOG_ERROR, "mutex init error \n"); + return NULL; + } + INIT_LIST_HEAD(&(ctx->role_head.list)); + + ctx->workers = RD_CALLOC (replica_group_size, + sizeof(nsr_replica_worker_t), + gf_mt_recon_worker_t); + if (!ctx->workers) { + nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n"); + return NULL; + } + for (i=0; i < replica_group_size; i++) { + strcpy(ctx->workers[i].name, priv->replica_group_members[i]); + } + + control_s = RD_CALLOC (replica_group_size, + sizeof(nsr_per_node_worker_t), + gf_mt_recon_per_node_worker_t); + if (!control_s) { + nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n"); + return NULL; + } + + data_s = RD_CALLOC (replica_group_size, + sizeof(nsr_per_node_worker_t), + gf_mt_recon_per_node_worker_t); + if (!data_s) { + nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n"); + return NULL; + } + for (i=0; i < replica_group_size; i++) { + ctx->workers[i].control_worker = &control_s[i]; + if (asprintf(&con_name,"recon-con-%u",i) < 1) { + return NULL; + } + ctx->workers[i].control_worker->fp = recon_create_log + (priv->replica_group_members[0], con_name); + if (!ctx->workers[i].control_worker->fp) + return NULL; + ctx->workers[i].data_worker = &data_s[i]; + if (asprintf (&data_name,"recon-data-%u",i) <1) { + return NULL; + } + ctx->workers[i].data_worker->fp = recon_create_log + (priv->replica_group_members[0], data_name); + if (!ctx->workers[i].data_worker->fp) + return NULL; + } + + nsr_driver_log (this->name, GF_LOG_INFO, "creating threads \n"); + // Create the worker threads + // For every brick including itself there will be 2 worker threads: + // one for data and one for control + if (!create_worker_threads(priv, ctx, control_s, _gf_true, + (F_t) control_worker_main, replica_group_size) || + !create_worker_threads(priv, ctx, data_s, _gf_false, + (F_t) data_worker_main, replica_group_size)) { + return NULL; + } + + for (i=0; i < replica_group_size; i++) { + nsr_recon_get_file(priv->volname, &(ctx->workers[i])); + } + + while (1) { + + nsr_role_work_t *rr; + + nsr_driver_log (this->name, GF_LOG_INFO, "waiting for role to be queued \n"); + pthread_mutex_lock(&(ctx->mutex)); + while (list_empty(&(ctx->role_head.list))) { + pthread_cond_wait(&(ctx->cv), &(ctx->mutex)); + } + pthread_mutex_unlock(&(ctx->mutex)); + + list_for_each_entry(rr, &(ctx->role_head.list), list) { + nsr_recon_driver_state_t state; + state = nsr_recon_driver_get_role(&status, ctx, rr); + + if (status == -1) { + op_errno = EIO; + goto out; + } + + switch (state) { + + case leader: + if (!nsr_recon_driver_leader(priv)) { + goto out; + } + break; + case reconciliator: + if (!nsr_recon_driver_reconciliator(priv)) { + goto out; + } + break; + case resolutor: + if (!nsr_recon_driver_resolutor(priv)) { + goto out; + } + break; + + case joiner: + + nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n"); + // Get last term info from all members for this group + // which will be the leader(this node) and the node that wants to join. + send_and_wait(&status, &op_errno, -1, + replica_group_size, + ctx, + NSR_WORK_ID_GET_LAST_TERM_INFO, + NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term); + if (status == -1) + goto out; + + + // send message to other node that just joined to sync up with this node which is also the leader + nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this\n"); + bm = ~(1); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_RESOLUTION_DO_WORK, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto out; + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished recon work as joiner \n"); + break; + + default: + nsr_driver_log (this->name, GF_LOG_ERROR, + "bad state %d", state); + } + + + // free the asasociated recon_info contexts created as part of this role + +out: + nsr_driver_log (this->name, GF_LOG_INFO, + "sending end of reconciliation message \n"); + nsr_recon_return_back(priv, ctx->term, status, op_errno); + nsr_driver_log (this->name, GF_LOG_INFO, + "finished sending end of reconciliation message \n"); + } + list_del_init (&rr->list); + } + + return NULL; +} diff --git a/xlators/cluster/nsr-recon/src/recon_driver.h b/xlators/cluster/nsr-recon/src/recon_driver.h new file mode 100644 index 000000000..3efb26269 --- /dev/null +++ b/xlators/cluster/nsr-recon/src/recon_driver.h @@ -0,0 +1,325 @@ +/* + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef __RECON_DRIVER_H__ +#define __RECON_DRIVER_H__ + + +#include "api/src/glfs.h" + +#define MAX_HOSTNAME_LEN 32 +#define MAXIMUM_REPLICA_STRENGTH 8 +#define MAX_RECONCILIATION_WINDOW_SIZE 10000 + +#define GLUSTERD_DEFAULT_WORKDIR "/var/lib/glusterd" +#define GLUSTERD_VOLUME_DIR_PREFIX "vols" +#define GLUSTERD_BRICK_INFO_DIR "bricks" + +/* + * Even with the names fixed, the non-NSR_DEBUG definitions of nsr_*_log don't + * work because many callers don't have "this" defined. + * + * TBD: use gf_log, fix "this" problem, eliminate extra fields and newlines. + */ +#define NSR_DEBUG + +typedef enum nsr_recon_work_req_id_t { + NSR_WORK_ID_GET_NONE = 0, + NSR_WORK_ID_GET_LAST_TERM_INFO = NSR_WORK_ID_GET_NONE + 1, + NSR_WORK_ID_GET_GIVEN_TERM_INFO = NSR_WORK_ID_GET_LAST_TERM_INFO + 1, + NSR_WORK_ID_RECONCILIATOR_DO_WORK = NSR_WORK_ID_GET_GIVEN_TERM_INFO + 1, + NSR_WORK_ID_RESOLUTION_DO_WORK = NSR_WORK_ID_RECONCILIATOR_DO_WORK + 1, + NSR_WORK_ID_GET_RECONCILATION_WINDOW = NSR_WORK_ID_RESOLUTION_DO_WORK + 1, + NSR_WORK_ID_SINGLE_RECONCILIATION_READ = NSR_WORK_ID_GET_RECONCILATION_WINDOW + 1, + NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT = NSR_WORK_ID_SINGLE_RECONCILIATION_READ + 1, + NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH = NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT + 1, + NSR_WORK_ID_GET_RESOLUTION_WINDOW = NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH + 1, + NSR_WORK_ID_END_RECONCILIATION = NSR_WORK_ID_GET_RESOLUTION_WINDOW + 1, + NSR_WORK_ID_INI = NSR_WORK_ID_END_RECONCILIATION + 1, + NSR_WORK_ID_FINI = NSR_WORK_ID_INI + 1 +} nsr_recon_work_req_id_t; + +typedef enum nsr_recon_queue_type_t { + NSR_RECON_QUEUE_TO_CONTROL = 0, + NSR_RECON_QUEUE_TO_DATA =NSR_RECON_QUEUE_TO_CONTROL + 1, +} nsr_recon_queue_type_t; + +typedef enum nsr_log_type_t { + NSR_LOG_HOLE = 0b0, + NSR_LOG_PSEUDO_HOLE = 0b1, + NSR_LOG_FILL = 0b11 +} nsr_log_type_t; + +typedef enum nsr_mode_t { + NSR_SEQ = 0, + NSR_USE_THREADS = 1, + NSR_ASYNC = 2 +} nsr_mode_t; + +typedef enum nsr_recon_work_type_t { + NSR_RECON_WORK_NONE = 0, + NSR_RECON_WORK_HOLE_TO_NOOP = NSR_RECON_WORK_NONE + 1, + NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE = NSR_RECON_WORK_HOLE_TO_NOOP + 1, + NSR_RECON_WORK_COMPARE_PSEUDO_HOLE = NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE + 1, + NSR_RECON_WORK_HOLE_TO_FILL = NSR_RECON_WORK_COMPARE_PSEUDO_HOLE + 1, + NSR_RECON_WORK_UNDO_FILL = NSR_RECON_WORK_HOLE_TO_FILL + 1, +} nsr_recon_work_type_t; + +typedef enum nsr_recon_driver_state_t { + none = 0, + leader = 1, + reconciliator = 2, + resolutor = 3, + joiner = 4, +} nsr_recon_driver_state_t; + +// role structure +#pragma pack(push, 1) +typedef struct _nsr_recon_role_s { + uint32_t role; // leader, reconciliator, resolutor + uint32_t num; // required in case state is reconciliator + uint32_t current_term; // current term used in case of leader + // In case this is reconciliator, num is set to nodes that were part + // of previous term. + // In case this is resolutor, num is set to 2. + // info[0] - information for this node. + // info[1] - information of the reconciliator. + // In case this is leader, num is set to this term's membership list + // set info.name to all members including the leader + struct { + int32_t last_term; + int32_t commited_ops; + uint32_t last_index; + uint32_t first_index; + char name[MAX_HOSTNAME_LEN]; + } info[MAXIMUM_REPLICA_STRENGTH]; +} nsr_recon_role_t; +#pragma pack(pop) + +#define ENDIAN_CONVERSION_RR(rr, is_true) \ +{ \ + uint32_t i=0; \ + uint32_t (*f)(uint32_t) = ((is_true == _gf_true) ? ntohl : htonl); \ + if (is_true == _gf_true) rr.num = f(rr.num); \ + rr.current_term = f(rr.current_term); \ + for (i=0; i < rr.num; i++) { \ + rr.info[i].last_term = f(rr.info[i].last_term); \ + rr.info[i].commited_ops = f(rr.info[i].commited_ops); \ + rr.info[i].last_index = f(rr.info[i].last_index); \ + rr.info[i].first_index = f(rr.info[i].first_index); \ + } \ + if (is_true == _gf_false) rr.num = f(rr.num); \ +} + +// last term info structure +#pragma pack(push, 1) +typedef struct _nsr_recon_last_term_info_s { + int32_t last_term; + int32_t commited_ops; + uint32_t last_index; + uint32_t first_index; +} nsr_recon_last_term_info_t; +#pragma pack(pop) + +#define ENDIAN_CONVERSION_LT(lt, is_true) \ +{ \ + uint32_t (*f)(uint32_t) = ((is_true == _gf_true) ? ntohl : htonl); \ + lt.last_term = f(lt.last_term); \ + lt.commited_ops = f(lt.commited_ops); \ + lt.last_index = f(lt.last_index); \ + lt.first_index = f(lt.first_index); \ +} + +// log information +#pragma pack(push, 1) +typedef struct _nsr_recon_log_info_s { + uint32_t term; + uint32_t first_index; + uint32_t last_index; +} nsr_recon_log_info_t; +#pragma pack(pop) + +#define ENDIAN_CONVERSION_LI(li, is_true) \ +{ \ + uint32_t (*f)(uint32_t) = ((is_true == _gf_true) ? ntohl : htonl); \ + li.term = f(li.term); \ + li.first_index = f(li.first_index); \ + li.last_index = f(li.last_index); \ +} + +#pragma pack(push, 1) +typedef struct nsr_recon_record_details_s { + uint32_t type; + uint32_t op; + char gfid[36+1]; + char pargfid[36+1]; + char link_path[256]; // should it be PATH_MAX? + uint32_t offset; + uint32_t len; + char entry[128]; + char newloc[128]; // for rename. can you overload link_path for this? TBD + mode_t mode; +} nsr_recon_record_details_t; +#pragma pack(pop) + +#define ENDIAN_CONVERSION_RD(rd, is_true) \ +{ \ + uint32_t (*f)(uint32_t) = ((is_true == _gf_true) ? ntohl : htonl); \ + rd.type = f(rd.type); \ + rd.op = f(rd.op); \ + rd.offset = f(rd.offset); \ + rd.len = f(rd.len); \ +} + +typedef struct _nsr_role_work_s { + nsr_recon_role_t role; + uint32_t term; + struct list_head list; +} nsr_role_work_t; + +typedef struct _nsr_recon_work_s { + gf_boolean_t in_use; + uint32_t index; + uint32_t req_id; + struct list_head list; +} nsr_recon_work_t; + +typedef struct _nsr_reconciliation_work_s { + uint32_t term; + uint32_t index; + uint32_t type; + uint32_t source; + void *data; + + uint32_t num; // used for xattr + +} nsr_reconciliation_work_t; + +typedef struct _nsr_reconciliation_record_s { + nsr_reconciliation_work_t work; // will store the computed work + nsr_recon_record_details_t rec; +} nsr_reconciliation_record_t; + +typedef struct _nsr_reconciliator_info { + uint32_t reconcilator_index; + int32_t last_term; + int32_t commited_ops; + uint32_t last_index; + uint32_t first_index; + //nsr_reconciliation_record_t records[MAX_RECONCILIATION_WINDOW_SIZE]; + nsr_reconciliation_record_t *records; +} nsr_reconciliator_info_t; + +typedef struct _nsr_per_node_worker_s { + char *id; // identifier + char vol_file[256]; //volfile that will be used by this thread + glfs_t *fs; + glfs_fd_t *aux_fd; + uint32_t index; // index into array of workers + pthread_t thread_id; // thread id + void * context; // thread context + struct _nsr_recon_driver_ctxt *driver_ctx; + char local; // local data worker + //struct list_head list; //list of work items + nsr_recon_work_t head; + pthread_mutex_t mutex; //mutex to guard the state + pthread_cond_t cv; //condition variable for signaling the worker thread + gf_boolean_t is_control; +#if defined(NSR_DEBUG) + FILE *fp; +#endif + int32_t result; // result of latest work + int32_t op_errno; // errno +} nsr_per_node_worker_t; + +typedef struct _nsr_replica_worker_s { + char name[256]; + nsr_per_node_worker_t *control_worker; + nsr_per_node_worker_t *data_worker; + gf_boolean_t in_use; + nsr_reconciliator_info_t *recon_info; // Bunch of infos kept for this reconciliation +} nsr_replica_worker_t; + +typedef struct _nsr_recon_driver_ctxt { + xlator_t *this; + uint32_t replica_group_size; // number of static members of replica group + nsr_replica_worker_t *workers; // worker info + int32_t reconciliator; + pthread_mutex_t mutex; + pthread_cond_t cv; + nsr_role_work_t role_head; + volatile int32_t outstanding; + uint32_t reconciliator_index; + uint32_t term; + uint32_t current_term; + nsr_mode_t mode; // default set to seq +#if defined(NSR_DEBUG) + FILE *fp; +#endif +} nsr_recon_driver_ctx_t; + +void * +nsr_reconciliation_driver(void *); + +gf_boolean_t +nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, uint32_t term); + +#define atomic_inc(ptr) ((void) __sync_fetch_and_add(ptr, 1)) +#define atomic_dec(ptr) ((void) __sync_fetch_and_add(ptr, -1)) +#define atomic_fetch_and __sync_fetch_and_and +#define atomic_fetch_or __sync_fetch_and_or + +#if defined(NSR_DEBUG) + +#define NSR_LOG_DIR "/var/log/nsr-logs" + +extern int nsr_debug_level; +extern FILE *recon_create_log (char *member, char *module); + +extern void +_nsr_driver_log (const char *func, int line, char *member, FILE *fp, + char *fmt, ...); + +#define nsr_driver_log(dom, levl, fmt...) do { \ + FMT_WARN (fmt); \ + if (levl <= nsr_debug_level) { \ + nsr_recon_private_t *priv = ctx->this->private; \ + _nsr_driver_log (__FUNCTION__, __LINE__, \ + priv->replica_group_members[0], \ + ctx->fp, \ + ##fmt); \ + } \ +} while (0) + +extern void +_nsr_worker_log (const char *func, int line, char *member, + char *type, uint32_t index, FILE *fp, + char *fmt, ...); + +#define nsr_worker_log(dom, levl, fmt...) do { \ + FMT_WARN (fmt); \ + if (levl <= nsr_debug_level) { \ + nsr_recon_private_t *priv; \ + priv = ctx->driver_ctx->this->private; \ + _nsr_worker_log (__FUNCTION__, __LINE__, \ + priv->replica_group_members[0], \ + ctx->is_control ? "recon-con" : \ + "recon-data", \ + ctx->index, ctx->fp, \ + ##fmt); \ + } \ +} while (0) + +#else +#define nsr_driver_log(dom, levl, fmt...) gf_log(dom, levl, fmt) +#define nsr_worker_log(dom, levl, fmt...) gf_log(dom, levl, fmt) +#endif + +#endif /* #ifndef __RECON_DRIVER_H__ */ diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.c b/xlators/cluster/nsr-recon/src/recon_xlator.c new file mode 100644 index 000000000..272c35dc2 --- /dev/null +++ b/xlators/cluster/nsr-recon/src/recon_xlator.c @@ -0,0 +1,1010 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <sys/types.h> +#include <fcntl.h> +#include <string.h> +#include <unistd.h> + +#include "call-stub.h" +#include "defaults.h" +#include "xlator.h" + +#include "recon_driver.h" +#include "recon_xlator.h" + +typedef struct _nsr_recon_fd_s { + int32_t term; + nsr_recon_driver_state_t state; + uint32_t first_index; + uint32_t last_index; + call_frame_t *frame; +} nsr_recon_fd_t; + +#if defined(NSR_DEBUG) + +void +_recon_main_log (const char *func, int line, char *member, FILE *fp, + char *fmt, ...) +{ + va_list ap; + char *buf = NULL; + int retval; + + if (!fp) { + fp = recon_create_log(member,"recon-main-log"); + if (!fp) { + return; + } + } + + va_start(ap,fmt); + retval = vasprintf(&buf,fmt,ap); + if (buf) { + fprintf(fp,"[%s:%d] %.*s\n",func,line,retval,buf); + free(buf); + } + va_end(ap); +} + +#endif + +// Given fd, get back the NSR based fd context. +static int32_t this_fd_ctx_get(fd_t *fd, xlator_t *this, nsr_recon_fd_t **rfd) +{ + uint64_t tmp = 0; + int32_t ret = -1; + + if ((ret = fd_ctx_get(fd, this, &tmp)) != 0) { + return ret; + } else { + *rfd = (nsr_recon_fd_t *)tmp; + return 0; + } +} + +// Add the frame in q after associating with term +// term usage tbd +static void put_frame(nsr_recon_private_t *priv, + call_frame_t *frame, + uint32_t term) +{ + xlator_t *this = priv->this; + recon_main_log (this->name, GF_LOG_INFO, "adding frame for term %d \n", term); + priv->frame = frame; + return; +} + +// get the frame from the queue given the term +// term usage tbd +static void get_frame(nsr_recon_private_t *priv, + call_frame_t **frame, + uint32_t term) +{ + if (frame != NULL) + *frame = priv->frame; + priv->frame = NULL; + return; +} + +// check if there are outstanding frames +static gf_boolean_t is_frame(nsr_recon_private_t *priv) +{ + return((priv->frame != NULL) ? _gf_true : _gf_false); +} + +#define ENTRY_SIZE 128 + +long +get_entry_count (char *path) +{ + int fd; + struct stat buf; + unsigned long entries = -1; + long min; /* last entry not known to be empty */ + long max; /* first entry known to be empty */ + long curr; + char entry[ENTRY_SIZE]; + void *err_label = &&done; + + fd = open(path,O_RDONLY); + if (fd < 0) { + goto *err_label; + } + err_label = &&close_fd; + + if (fstat(fd,&buf) < 0) { + goto *err_label; + } + + min = 0; + max = buf.st_size / ENTRY_SIZE; + printf("max = %ld\n",max); + + while ((min+1) < max) { + curr = (min + max) / 2; + printf("trying entry %ld\n",curr); + if (lseek(fd,curr*ENTRY_SIZE,SEEK_SET) < 0) { + goto *err_label; + } + if (read(fd,entry,sizeof(entry)) != sizeof(entry)) { + goto *err_label; + } + if ((entry[0] == '_') && (entry[1] == 'P')) { + min = curr; + } + else { + max = curr; + } + } + + entries = max; + +close_fd: + close(fd); +done: + return entries; +} + +// Get the term info for the term number specified +void nsr_recon_libchangelog_get_this_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt) +{ + char path[PATH_MAX]; + long entries; + + bzero(lt, sizeof(nsr_recon_last_term_info_t)); + lt->last_term = term; + sprintf(path,"%s/%s%d",bp,"TERM.",term); + entries = get_entry_count(path); + if (entries > 1) { + /* The first entry is actually a header. */ + lt->first_index = 1; + /* + * This seems wrong, because it means that last_index*128 will + * be exactly at EOF and commited_ops will be one greater than + * it should be. Maybe some other code makes the exact + * opposite mistake to compensate. + */ + lt->last_index = lt->commited_ops = (int)entries; + } + recon_main_log (this->name, GF_LOG_INFO, "for term=%d got first_index=%d last_index=%d commited_ops=%d\n", + term, lt->first_index, lt->last_index, lt->commited_ops); + return; +} + +// Given the term number, find the last term in the changelogs +void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt) +{ + uint32_t t = term; + struct stat buf; + char path[PATH_MAX]; + bzero(lt, sizeof(nsr_recon_last_term_info_t)); + while(t) { + // journal file is of type TERM-1.jnl + sprintf(path,"%s/%s%d",bp,"TERM.",t); + if (!stat(path, &buf)) { + nsr_recon_libchangelog_get_this_term_info(this, bp, t, lt); + recon_main_log (this->name, GF_LOG_INFO, "got last term given current term %d as %d\n", term, t); + return; + } + t--; + } + recon_main_log (this->name, GF_LOG_INFO, "got no last term given current term %d \n", term); + + return; +} + +// Return back the frame stored against the term +void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term, int32_t status, int32_t op_errno) +{ + call_frame_t *old_frame = NULL; + xlator_t *this = priv->this; + + get_frame(priv, &old_frame, term); + if (old_frame) { + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev returns old frame \n"); + // first return the original write for which this ack was sent + STACK_UNWIND_STRICT (writev, old_frame, status, op_errno, NULL, NULL, NULL); + } else { + recon_main_log (this->name, GF_LOG_ERROR, "EIII---nsr_recon_writev cnnot return old frame \n"); + } +} + +typedef enum records_type_t { + fop_gfid_pgfid_oldloc_newloc = 1, + fop_gfid_pgfid_entry = fop_gfid_pgfid_oldloc_newloc + 1, + fop_gfid = fop_gfid_pgfid_entry + 1 , + fop_gfid_offset = fop_gfid + 1, + fop_gfid_offset_len = fop_gfid_offset + 1, +} records_type_t; + +// Get the backend ./glusterfs/xx/xx/<...> path +static void +get_gfid_path(nsr_recon_private_t *priv, char *gfid, char *path) +{ + strcpy(path, priv->base_dir); + strcat(path, "/.glusterfs/"); + strncat(path,gfid,2); + strcat(path,"/"); + strncat(path,gfid+2,2); + strcat(path,"/"); + strcat(path,gfid); +} + + +// Get the link to which backend points to +static gf_boolean_t +get_link_using_gfid(nsr_recon_private_t *priv, char *gfid, char *path) +{ + char lp[PATH_MAX]; + xlator_t *this = priv->this; + get_gfid_path(priv,gfid, lp); + if (readlink(lp, path, 255) == -1) { + GF_ASSERT(0); + recon_main_log(priv->this, GF_LOG_ERROR, + "cannot get readlink for %s\n",lp); + return _gf_false; + } + return _gf_true; +} + +// Get the list of changelog records given a term , first and last index. +// +// TBD: rewrite this hideous ball of mud in at least the following ways: +// +// (1) Break out the code for handling a single record into a separate +// function, to make error handling easier and reduce "indentation +// creep" so the code's readable. +// +// (2) Change all of the fop_xxx_yyy nonsense to OR together values +// like FOP_HAS_FIELD_XXX and FOP_HAS_FIELD_YYY, to reduce code +// duplication and facilitate the addition of new fields. +// +// (3) Stop making so many assumptions about the underlying formats. +// The code as it is won't even work for the existing binary format, +// let alone as changelog evolves over time. +// +// Really, 90% of this code should just GO AWAY in favor of using +// libgfchangelog, enhanced as necessary to support our needs. + +/* + * Use this macro to skip over a field we're not using yet. + * NB: the body is a null statement on purpose + * TBD: all instances of this should be removed eventually! + */ +#define SKIP_FIELD do /* nothing */ ; while (*(start++) != '\0') + +#define SKIP_OVER +gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf) +{ + // do a mmap; seek into the first and read all records till last. + // TBD - right now all records are pseudo holes but mark them as fills. + // TBD - pseudo hole to be implemented when actual fsync gets done on data. + char *rb = NULL, *orig = NULL; + char path[PATH_MAX]; + int fd; + uint32_t index = 0; + + recon_main_log (this->name, GF_LOG_INFO, + "libchangelog_get_records called for term %d index from %d to %d \n", + term, first, last ); + + orig = rb = GF_CALLOC(128, ((last - first) + 1), + gf_mt_recon_changelog_buf_t); + + sprintf(path,"%s/%s%d",bp,"TERM.",term); + fd = open(path, O_RDONLY); + if (fd == -1) { + return _gf_false; + } else { + char *start = NULL; + nsr_recon_record_details_t * rec = (nsr_recon_record_details_t *)buf; + + if (first == 0) + lseek(fd, 128, SEEK_SET); + else + lseek(fd, first * 128, SEEK_SET); + if (read(fd, rb, (last - first + 1) * 128) == -1) { + return _gf_false; + } + start = rb; + index = first; + do { + recon_main_log (this->name, GF_LOG_INFO, + "libchangelog_get_records start inspecting records at index %d \n", + index ); + if (!strncmp(start, "_PRE_", 5)) { + uint32_t i; + uint32_t opcode = 0; + records_type_t type; + + start += 5; + // increment by the NULLs after the PRE + start += 4; + SKIP_FIELD; // real index + // now we have the opcode + while (*start != '\0') { + opcode *= 10; + opcode += (*(start++) - '0'); + } + ++start; + recon_main_log (this->name, GF_LOG_ERROR, + "libchangelog_get_records: got opcode %d @index %d\n", opcode, index); + if ((opcode == GF_FOP_RENAME)) { + type = fop_gfid_pgfid_oldloc_newloc; + } else if ((opcode == GF_FOP_UNLINK) || + (opcode == GF_FOP_RMDIR) || + (opcode == GF_FOP_LINK) || + (opcode == GF_FOP_MKDIR) || + (opcode == GF_FOP_SYMLINK) || + (opcode == GF_FOP_MKNOD) || + (opcode == GF_FOP_CREATE)) { + type = fop_gfid_pgfid_entry; + } else if ((opcode == GF_FOP_FSETATTR) || + (opcode == GF_FOP_SETATTR) || + (opcode == GF_FOP_FREMOVEXATTR) || + (opcode == GF_FOP_REMOVEXATTR) || + (opcode == GF_FOP_SETXATTR) || + (opcode == GF_FOP_FSETXATTR)) { + type = fop_gfid; + } else if ((opcode == GF_FOP_TRUNCATE) || + (opcode == GF_FOP_FTRUNCATE)) { + type = fop_gfid_offset; + } else if (opcode == GF_FOP_WRITE) { + type = fop_gfid_offset_len; + } else { + recon_main_log (this->name, + GF_LOG_ERROR, + "libchangelog_get_records:got no proper opcode %d @index %d\n", + opcode, index); + //GF_ASSERT(0); + // make this as a hole. + // TBD - check this logic later. maybe we should raise alarm here because + // this means that changelog is corrupted. We are not handling changelog + // corruptions as of now. + rec->type = NSR_LOG_HOLE; + goto finish; + } + // TBD - handle psuedo holes once that logic is in. + rec->type = NSR_LOG_FILL; + recon_main_log (this->name, GF_LOG_ERROR, + "libchangelog_get_records:got type %d at index %d \n", + rec->type, index); + rec->op = opcode; + + // Now get the gfid and parse it + // before that increment the pointer + for (i=0; i < 36; i++) { + rec->gfid[i] = (*start); + start++; + } + rec->gfid[i] = '\0'; + + GF_ASSERT(*start == 0); + start ++; + + if (opcode == GF_FOP_SYMLINK) { + i = 0; + do { + if (i >= 256) { + goto finish; + } + rec->link_path[i++] = *start; + } while (*(start++) != '\0'); + } + + i = 0; + // If type is fop_gfid_offset+_len, get offset + if ((type == fop_gfid_offset) || (type == fop_gfid_offset_len)) { + char offset_str[128]; + while(*start != 0) { + offset_str[i++] = *start; + start ++; + } + offset_str[i] = '\0'; + // get over the 0 + start++; + rec->offset = strtoul(offset_str, NULL, 10); + recon_main_log (this->name, + GF_LOG_ERROR, + "libchangelog_get_records:got offset %d @index %d \n", rec->offset, index); + + } + i = 0; + if (type == fop_gfid_offset_len) { + char len_str[128]; + while(*start != 0) { + len_str[i++] = *start; + start ++; + } + len_str[i] = '\0'; + // get over the 0 + start++; + rec->len = strtoul(len_str, NULL, 10); + recon_main_log (this->name, + GF_LOG_ERROR, + "libchangelog_get_records:got length %d @index %d \n", rec->len, index); + } + i = 0; + if (type == fop_gfid_pgfid_entry) { + switch (opcode) { + case GF_FOP_CREATE: + case GF_FOP_MKDIR: + case GF_FOP_MKNOD: + SKIP_FIELD; // mode + break; + /* TBD: handle GF_FOP_SYMLINK target */ + default: + ; + } + SKIP_FIELD; // uid + SKIP_FIELD; // gid + if (opcode == GF_FOP_MKNOD) { + SKIP_FIELD; // dev + } + // first get the gfid and then the path + for (i=0; i < 36; i++) { + rec->pargfid[i] = (*start); + start++; + } + rec->pargfid[i] = '\0'; + GF_ASSERT(*start == '/'); + start ++; + + i = 0; + while(*start != 0) { + rec->entry[i++] = *start; + start ++; + } + rec->entry[i] = '\0'; + // get over the 0 + start++; + /* + * Having to add this as a special case + * is awful. See the function header + * comment for the real solution. + */ + if (opcode == GF_FOP_CREATE) { + rec->mode = 0; + while (*start != '\0') { + rec->mode *= 10; + rec->mode += *start + - '0'; + ++start; + } + ++start; + } + recon_main_log (this->name, + GF_LOG_ERROR, + "libchangelog_get_records:got entry %s @index %d \n", rec->entry, index); + + } + i = 0; + if (type == fop_gfid_pgfid_oldloc_newloc) { + + // first get the source and then the destination + // source stuff gets stored in pargfid/entry + for (i=0; i < 36; i++) { + rec->pargfid[i] = (*start); + start++; + } + rec->pargfid[i] = '\0'; + GF_ASSERT(*start == '/'); + start ++; + + i=0; + while(*start != 0) { + rec->entry[i++] = *start; + start ++; + } + rec->entry[i] = '\0'; + // get over the 0 + start++; + + // dst stuff gets stored in gfid/newloc + for (i=0; i < 36; i++) { + rec->gfid[i] = (*start); + start++; + } + rec->gfid[i] = '\0'; + GF_ASSERT(*start == '/'); + start ++; + i = 0; + while(*start != 0) { + rec->newloc[i++] = *start; + start ++; + } + rec->newloc[i] = '\0'; + // get over the 0 + start++; + + } + ENDIAN_CONVERSION_RD((*rec), _gf_false); //htonl + } +finish: + if (index == last) + break; + index++; + rb += 128; + start = rb; + rec++; + } while(1); + } + GF_FREE(orig); + close(fd); + + recon_main_log (this->name, GF_LOG_INFO, + "libchangelog_get_records finsihed inspecting records for term %d \n", + term); + return _gf_true; +} + +int32_t +nsr_recon_open (call_frame_t *frame, xlator_t *this, + loc_t *loc, int32_t flags, fd_t *fd, dict_t *xdata) +{ + int32_t op_ret = 0; + int32_t op_errno = 0; + nsr_recon_fd_t *rfd = NULL; + + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_open called for path %s \n",loc->path ); + rfd = GF_CALLOC (1, sizeof (*rfd), gf_mt_recon_fd_t); + if (!rfd) { + op_ret = -1; + op_errno = ENOMEM; + } + + op_ret = fd_ctx_set (fd, this, (uint64_t)(long)rfd); + if (op_ret) { + op_ret = -1; + op_errno = EINVAL; + } + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_open returns with %d for path %s \n",op_ret,loc->path ); + STACK_UNWIND_STRICT (open, frame, op_ret, op_errno, fd, NULL); + return 0; +} + +int32_t +nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, + struct iovec *vector, int32_t count, off_t offset, + uint32_t flags, struct iobref *iobref, dict_t *xdata) +{ + nsr_recon_fd_t *rfd = NULL; + nsr_recon_private_t *priv = NULL; + int32_t op_ret = 0; + int32_t op_errno = 0; + int32_t ret = 0; + + ret = this_fd_ctx_get (fd, this, &rfd); + if (ret < 0) { + return -1; + } + priv = (nsr_recon_private_t *)this->private; + + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called for offset %d \n",(unsigned int)offset ); + GF_ASSERT(count == 1); + switch (offset) { + // client(brick, leader) writes the role of the node + case nsr_recon_xlator_sector_1 : + { + nsr_recon_role_t rr; + memcpy((void *)&rr, (void *)vector[0].iov_base, sizeof(rr)); + ENDIAN_CONVERSION_RR(rr, _gf_true); //ntohl + + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called to set role %d\n", rr.role); + if ((rr.role != leader) && + (rr.role != reconciliator) && + (rr.role != resolutor) && + (rr.role != joiner)) { + recon_main_log (this->name, GF_LOG_ERROR, + "EIII---nsr_recon_writev cannot set state \n"); + STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + NULL, NULL, NULL); + } + + GF_ASSERT(rr.num <= MAXIMUM_REPLICA_STRENGTH); + + // Check if already a role play is going on. If yes return with EAGAIN. + // Ideally we should check if we have got a higher term number while + // servicing a lower term number; if so abort the older one. + // However the abort infrastructure needs to be sketched properly; TBD. + if (is_frame(priv) == _gf_true) { + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_writev set_role - already role play \n"); + STACK_UNWIND_STRICT (writev, frame, -1, EAGAIN, + NULL, NULL, NULL); + } else { + + // Store the stack frame so that when the actual job gets finished + // we send the response back to the brick. + put_frame(priv, frame, rr.current_term); + if (nsr_recon_driver_set_role(priv->driver_thread_context, + &rr, + rr.current_term) == _gf_false) { + get_frame(priv, NULL, rr.current_term); + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_writev set_role - cannot seem to set role \n"); + STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + NULL, NULL, NULL); + } else { + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_writev set_role - set role succesfully \n"); + } + } + break; + } + // client(reconciliator) writes how much it needs for the read + case nsr_recon_xlator_sector_2 : + { + nsr_recon_log_info_t li; + memcpy((void *)&li, (void *)vector[0].iov_base, sizeof(li)); + ENDIAN_CONVERSION_LI(li, _gf_true); //ntohl + + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_writev - setting term info for reconcilation info. term=%d, first_index=%d,start_index=%d \n", + li.term, li.first_index, li.last_index); + rfd->term = li.term; + rfd->last_index = li.last_index; + rfd->first_index = li.first_index; + STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, + NULL, NULL, NULL); + break; + } + // client(reconciliator) writes term for which it needs info + case nsr_recon_xlator_sector_3 : + { + int32_t term; + + memcpy((void *)&term, (void *)vector[0].iov_base, sizeof(term)); + term = ntohl(term); //ntohl + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_writev - setting term info for term info. term=%d\n", + term); + rfd->term = term; + STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, + NULL, NULL, NULL); + break; + } + // client(reconciliator) writes current term so that it gets last term info later + case nsr_recon_xlator_sector_4 : + { + int32_t term; + + memcpy((void *)&term, (void *)vector[0].iov_base, sizeof(term)); + term = ntohl(term); //ntohl + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_writev - setting term info for last term info given current term=%d\n", + term); + rfd->term = term; + STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, + NULL, NULL, NULL); + break; + } + default: + { + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_writev called with wrong offset\n"); + STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + NULL, NULL, NULL); + break; + } + } + + return 0; +} + +int +nsr_recon_readv (call_frame_t *frame, xlator_t *this, + fd_t *fd, size_t size, off_t offset, uint32_t flags, dict_t *xdata) +{ + nsr_recon_fd_t *rfd = NULL; + int32_t op_errno = 0; + // copied stuff from quick-read.c and posix.c + struct iobuf *iobuf = NULL; + struct iobref *iobref = NULL; + struct iovec iov = {0, }; + int32_t ret = -1; + nsr_recon_private_t *priv = NULL; + + iobuf = iobuf_get2 (this->ctx->iobuf_pool, size); + if (!iobuf) { + op_errno = ENOMEM; + goto out; + } + + iobref = iobref_new (); + if (!iobref) { + op_errno = ENOMEM; + goto out; + } + + iobref_add (iobref, iobuf); + + ret = this_fd_ctx_get (fd, this, &rfd); + if (ret < 0) { + op_errno = -ret; + goto out; + } + priv = (nsr_recon_private_t *)this->private; + + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_readv called for offset %d \n",(unsigned int)offset ); + switch (offset) { + // client(leader) reads from here to get info for this term on this node + // invole libchagelog to get the information + case nsr_recon_xlator_sector_3 : + { + nsr_recon_last_term_info_t lt; + GF_ASSERT(size == sizeof(lt)); + nsr_recon_libchangelog_get_this_term_info(this,priv->changelog_base_path, rfd->term, <); + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_readv - getting term info for term=%d, ops=%d, first=%d, last=%d\n", + rfd->term, lt.commited_ops, lt.first_index, lt.last_index); + ENDIAN_CONVERSION_LT(lt, _gf_false); //htonl + memcpy(iobuf->ptr, <, size); + goto out; + } + // client(reconciliator) reads individual record information + case nsr_recon_xlator_sector_2 : + { + uint32_t num = (rfd->last_index - rfd->first_index + 1); + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_readv - expected size %lu got size %lu\n", + (num * sizeof(nsr_recon_record_details_t)), size); + + GF_ASSERT(size == (num * sizeof(nsr_recon_record_details_t))); + bzero(iobuf->ptr, size); + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_readv - getting records for term=%d from %d to %d\n", + rfd->term, rfd->first_index, rfd->last_index); + nsr_recon_libchangelog_get_records(this, priv->changelog_base_path, + rfd->term, rfd->first_index, rfd->last_index, iobuf->ptr); + goto out; + } + // read last term info + case nsr_recon_xlator_sector_4 : + { + nsr_recon_last_term_info_t lt; + GF_ASSERT(size == sizeof(lt)); + nsr_recon_libchangelog_get_last_term_info(this, priv->changelog_base_path, rfd->term, <); + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_readv - getting last term info given current term=%d. last term = %d ops=%d, first=%d, last=%d\n", + rfd->term, lt.last_term, lt.commited_ops, lt.first_index, lt.last_index); + ENDIAN_CONVERSION_LT(lt, _gf_false); //htonl + memcpy(iobuf->ptr, <, size); + goto out; + } + default: + { + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_readv called with wrong offset\n"); + op_errno = -1; + break; + } + } + +out: + if (op_errno == 0) { + iov.iov_base = iobuf->ptr; + ret = iov.iov_len = size; + } + + STACK_UNWIND_STRICT (readv, frame, ret, op_errno, &iov, 1, NULL, iobref , NULL); + + if (iobref) + iobref_unref (iobref); + if (iobuf) + iobuf_unref (iobuf); + return 0; +} + +int +nsr_recon_lookup (call_frame_t *frame, xlator_t *this, + loc_t *loc, dict_t *xdata) +{ + struct iatt buf = {0, }; + // dirty hack to set root as regular but seems to work. + buf.ia_type = IA_IFREG; + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_lookup called \n"); + + STACK_UNWIND_STRICT (lookup, frame, 0, 0, this->itable->root, &buf, NULL, NULL); + return 0; +} + + +int32_t +nsr_recon_flush (call_frame_t *frame, xlator_t *this, + fd_t *fd, dict_t *xdata) +{ + STACK_UNWIND_STRICT (flush, frame, 0, 0, NULL); + return 0; +} + + +int32_t +mem_acct_init (xlator_t *this) +{ + int ret = -1; + + GF_VALIDATE_OR_GOTO ("recon", this, out); + + ret = xlator_mem_acct_init (this, gf_mt_recon_end + 1); + + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "Memory accounting init" "failed"); + return ret; + } +out: + return ret; +} + + +int32_t +init (xlator_t *this) +{ + nsr_recon_private_t *priv = NULL; + char *local, *members; + unsigned int i=0; + + priv = GF_CALLOC (1, sizeof (*priv), gf_mt_recon_private_t); + if (!priv) { + gf_log (this->name, GF_LOG_ERROR, + "priv allocation error\n"); + return -1; + } + GF_OPTION_INIT ("replica-group-size", priv->replica_group_size, uint32, err); + GF_OPTION_INIT ("vol-name", priv->volname, str, err); + if (!priv->volname) { + gf_log (this->name, GF_LOG_ERROR, + "missing volname option (required)"); + return -1; + } + GF_OPTION_INIT ("changelog-dir", priv->changelog_base_path, str, err); + if (!priv->changelog_base_path) { + gf_log (this->name, GF_LOG_ERROR, + "missing changelog directory option (required)"); + return -1; + } + GF_OPTION_INIT ("base-dir", priv->base_dir, str, err); + if (!priv->base_dir) { + gf_log (this->name, GF_LOG_ERROR, + "missing brick base directory option (required)"); + return -1; + } + GF_OPTION_INIT ("replica-group-members", members, str, err); + if (!members) { + gf_log (this->name, GF_LOG_ERROR, + "missing membership option (required)"); + return -1; + } + GF_OPTION_INIT ("local-member", local, str, err); + if (!local) { + gf_log (this->name, GF_LOG_ERROR, + "missing local member option (required)"); + return -1; + } + + priv->replica_group_members = GF_CALLOC (priv->replica_group_size, + sizeof(char *), + gf_mt_recon_members_list_t); + priv->replica_group_members[0] = GF_CALLOC (1, + strlen(local), + gf_mt_recon_member_name_t); + if (!priv->replica_group_members || !(priv->replica_group_members[0])) { + gf_log (this->name, GF_LOG_ERROR, + "str allocation error\n"); + return -1; + } + strcpy(priv->replica_group_members[0], local); + for (i=1; i < priv->replica_group_size; i++) { + char *member; + if (i == 1) + member = strtok(members, ","); + else + member = strtok(NULL, ","); + priv->replica_group_members[i] = GF_CALLOC (1, + strlen(member) + 1, gf_mt_recon_member_name_t); + if (!priv->replica_group_members[i]) { + gf_log (this->name, GF_LOG_ERROR, + "str allocation error\n"); + return -1; + } + strcpy(priv->replica_group_members[i], member); + } + + + priv->this = this; + this->private = (void *)priv; + + priv->fp = recon_create_log (priv->replica_group_members[0], "recon-main-log"); + if (!priv->fp) + return -1; + + recon_main_log (this->name, GF_LOG_INFO, "creating reconciliation driver \n"); + + if (pthread_create(&priv->thread_id, NULL, nsr_reconciliation_driver, priv)) { + recon_main_log (this->name, GF_LOG_ERROR, + "pthread creation error \n"); + return -1; + } + + INIT_LIST_HEAD(&(priv->list)); + + + return 0; + +err: + return -1; +} + + +void +fini (xlator_t *this) +{ + nsr_recon_private_t *priv = NULL; + void *ret = NULL; + + priv = (nsr_recon_private_t *)this->private; + + pthread_cancel(priv->thread_id); + pthread_join(priv->thread_id, &ret); +} + + +struct xlator_fops fops = { + .open = nsr_recon_open, + .readv = nsr_recon_readv, + .writev = nsr_recon_writev, + .lookup = nsr_recon_lookup, + .flush = nsr_recon_flush +}; + +struct xlator_cbks cbks = { +}; + +struct volume_options options[] = { + { .key = {"replica-group-size"}, + .type = GF_OPTION_TYPE_INT, + .min = 2, + .max = INT_MAX, + .default_value = "2", + .description = "Number of bricks in replica group. can be derived but putting it here for testing." + }, + { + .key = {"vol-name"}, + .type = GF_OPTION_TYPE_STR, + .description = "volume name" + }, + { + .key = {"local-member"}, + .type = GF_OPTION_TYPE_STR, + .description = "member(brick) for which this translator is responsible." + }, + { + .key = {"replica-group-members"}, + .type = GF_OPTION_TYPE_STR, + .description = "Comma seperated member names other than local." + }, + { + .key = {"changelog-dir"}, + .type = GF_OPTION_TYPE_STR, + .description = "Base directory where per term changelogs are maintained." + }, + { + .key = {"base-dir"}, + .type = GF_OPTION_TYPE_STR, + .description = "Base directory for this brick. This should go away once we fix gfid based lookups" + }, + { .key = {NULL} }, +}; diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.h b/xlators/cluster/nsr-recon/src/recon_xlator.h new file mode 100644 index 000000000..d9692a632 --- /dev/null +++ b/xlators/cluster/nsr-recon/src/recon_xlator.h @@ -0,0 +1,92 @@ +/* + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef __RECON_XLATOR_H__ +#define __RECON_XLATOR_H__ + +#include <semaphore.h> +#include <pthread.h> + +enum gf_dht_mem_types_ { + gf_mt_recon_changelog_buf_t = gf_common_mt_end + 1, + gf_mt_recon_driver_ctx_t, + gf_mt_recon_fd_t, + gf_mt_recon_id_t, + gf_mt_recon_member_name_t, + gf_mt_recon_members_list_t, + gf_mt_recon_per_node_worker_t, + gf_mt_recon_private_t, + gf_mt_recon_reconciliator_info_t, + gf_mt_recon_record_t, + gf_mt_recon_record_details_t, + gf_mt_recon_role_work_t, + gf_mt_recon_work_t, + gf_mt_recon_work_data_t, + gf_mt_recon_worker_t, + gf_mt_recon_end, +}; + +enum nsr_recon_xlator_sector_t { + nsr_recon_xlator_sector_0 = 0, // to report back the status of given transaction ids + nsr_recon_xlator_sector_1 = 512, // to write here information about leadership changes from the brick + nsr_recon_xlator_sector_2 = (512 * 2), // to write here individual roles and wait for that role to be done + nsr_recon_xlator_sector_3 = (512 *3), // read from here to get term info for given term + nsr_recon_xlator_sector_4 = (512 * 4), // read from here to get last term info +}; + + +typedef struct _nsr_recon_private_s { + xlator_t *this; //back pointer + unsigned int replica_group_size; // number of static members of replica group + char **replica_group_members; // replica group members (including itself in first slot) + pthread_t thread_id; // driver thread id + nsr_recon_driver_ctx_t *driver_thread_context; //driver thread context + unsigned int outstanding; // for communicating with driver thread + call_frame_t *frame; // old frame that is pending (just one as of now) + struct list_head list; + char *volname; + uint32_t txn_id; + char *changelog_base_path; + char *base_dir; +#if defined(NSR_DEBUG) + FILE *fp; +#endif +} nsr_recon_private_t; + +#define atomic_cmpxchg __sync_val_compare_and_swap + +#if defined(NSR_DEBUG) + +extern void +_recon_main_log (const char *func, int line, char *member, FILE *fp, + char *fmt, ...); + +#define recon_main_log(dom, levl, fmt...) do { \ + FMT_WARN (fmt); \ + if (levl <= nsr_debug_level) { \ + nsr_recon_private_t *priv = this->private; \ + _recon_main_log (__FUNCTION__, __LINE__, \ + priv->replica_group_members[0], \ + priv->fp, \ + ##fmt); \ + } \ +} while (0) + +#else +#define recon_main_log(dom, levl, fmt...) gf_log(dom, levl, fmt) +#endif + +void nsr_recon_libchangelog_get_this_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt); +void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt); +void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term, int32_t status, int32_t op_errno); +gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf); + + +#endif /* #ifndef __RECON_XLATOR_H__ */ |