diff options
Diffstat (limited to 'xlators/cluster/nsr-recon/src/recon_driver.c')
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_driver.c | 3130 |
1 files changed, 3130 insertions, 0 deletions
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c new file mode 100644 index 000000000..8c7622a02 --- /dev/null +++ b/xlators/cluster/nsr-recon/src/recon_driver.c @@ -0,0 +1,3130 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <sys/types.h> +#include <fcntl.h> +#include <string.h> +#include <unistd.h> +#include <fnmatch.h> + + +#include "call-stub.h" +#include "defaults.h" +#include "xlator.h" + + +#include "recon_driver.h" +#include "recon_xlator.h" +#include "api/src/glfs-internal.h" +#include "api/src/glfs-handles.h" + +/* TBD: move declarations here and nsr.c into a common place */ +#define NSR_TERM_XATTR "trusted.nsr.term" +#define RECON_TERM_XATTR "trusted.nsr.recon-term" +#define RECON_INDEX_XATTR "trusted.nsr.recon-index" + +/* + * Execution architecture for the NSR reconciliation driver. The driver runs + * as a seperate process in each node where the brick is. The main function of + * the driver is nsr_reconciliation_driver() (last function below) The driver + * just sits in a tight loop waiting for state changes. When a brick becomes a + * replica leader, it fences IO, contacts this process and waits for + * reconciliation to finish. + * + * The replica leader talks to other bricks in replica group which are alive + * and gets the last term info using which it decides which has the latest + * data. That brick is referred to as the "reconciliator"; leader sends a + * message to reconciliator to freeze its data (by reading any incomplete data + * from other nodes from that term if required) + * + * Once that is done leader sends a message to all nodes except the + * reconciliator to sync themselves with the reconciliator. This process is + * referred to as "resolution". + * + * Hence the reconciliation processes need to talk to each other to get a given + * term info. This is implemented using the recon translator IOs which + * implements a bare bone RPC by exposing a file interface to which + * reads/writes are done to pass control messages. This is referred to as the + * "control plane". This implementation allows the control plane to be + * implemented as a bunch of threads for each of the nodes. + * + * The reconciliation process also needs to talk to the brick process on that + * node to actually write the data as part of reconciliation/resolution. This + * is referred to as the "data plane". Again there are a bunch of threads that + * do this work. + * + * The way the worker threads are organised is that main driver context has a + * pointer to contexts for each of these thread contexts. The thread context at + * index 0 always refers to talking with local recon process/brick. So the + * control worker at index 0 will get the local changelog info and data worker + * at index 0 will talk to local brick. + * + * All the ops from the control/data planes are implemented using the glfs + * APIs. + */ + +#if defined(NSR_DEBUG) + +/* This lets us change on the fly even if NSR_DEBUG is defined. */ +int nsr_debug_level = GF_LOG_TRACE; + +FILE * +recon_create_log (char *member, char *module) +{ + char *dpath = NULL; + char *p; + char *fpath = NULL; + FILE *fp = NULL; + int fd = -1; + + (void)mkdir(NSR_LOG_DIR,0777); + (void)asprintf(&dpath,NSR_LOG_DIR"/%s",member); + if (dpath) { + for (p = dpath + strlen(NSR_LOG_DIR) + 1; *p; ++p) { + if (*p == '/') { + *p = '-'; + } + } + (void)mkdir(dpath,0777); + (void)asprintf(&fpath,"%s/%s",dpath,module); + if (fpath) { + fd = open(fpath,O_WRONLY|O_CREAT|O_APPEND|O_SYNC,0666); + if (fd >= 0) { + fp = fdopen(fd,"a"); + if (!fp) { + close(fd); + } + } + if (fp) { + if (setvbuf (fp, NULL, _IONBF, 0)) { + /* + * Might as well take advantage of it + * to log the error. + */ + fprintf (fp, + "setvbuf failed for log\n"); + fprintf (fp, + "log output may be async\n"); + fflush(fp); + } + } + free(fpath); + } + free(dpath); + } + + return fp; +} + +void +_nsr_driver_log (const char *func, int line, char *member, FILE *fp, + char *fmt, ...) +{ + va_list ap; + char *buf = NULL; + int retval; + + if (!fp) { + fp = recon_create_log(member,"nsr-driver-log"); + if (!fp) { + return; + } + } + + va_start(ap,fmt); + retval = vasprintf(&buf,fmt,ap); + if (buf) { + fprintf(fp,"[%s:%d] %.*s\n",func,line,retval,buf); + free(buf); + } + va_end(ap); +} + +void +_nsr_worker_log (const char *func, int line, char *member, + char *type, uint32_t index, FILE *fp, + char *fmt, ...) +{ + va_list ap; + char *buf = NULL; + int retval; + + if (!fp) { + char *name; + if (asprintf(&name,"%s-%u",type,index) < 1) { + return; + } + fp = recon_create_log (member, name); + if (!fp) { + return; + } + } + + va_start(ap,fmt); + retval = vasprintf(&buf,fmt,ap); + if (buf) { + fprintf(fp,"[%s:%d] %.*s\n",func,line,retval,buf); + free(buf); + } + va_end(ap); +} + +#endif + +/* + * Recon Driver Calloc + * + * We need this because all of this messing about with gfapi from within a + * translator keeps scrambling THIS (only one reason it's a terrible idea) and + * we need THIS to have a value that represents our initialization with our + * memory types. + * + * Note that the macro requires "this" to be defined in the current scope. + */ + +#define RD_CALLOC(x,y,z) ({THIS = this; GF_CALLOC(x,y,z); }) + +/* + * This function gets the size of all the extended attributes for a file. + * This is used so that caller knows how much to allocate for key-value storage. + * + * Input Arguments: + * fd - the file opened using glfs API. + * dict - passed so that NSR translator can get this from the required brick + * + * Output Arguments: + * b - pointer to the buffer where the attributes are filled up. + * key_size - the size of all keys + * val_size - the size of all values + * num - number of key/values + */ +static int32_t +get_xattr_total_size( struct glfs_fd *fd, + char **b, + uint32_t *key_size, + uint32_t *val_size, + uint32_t* num, + dict_t *dict) +{ + int32_t s = -1, ret = -1; + char *c = NULL; + + *key_size = 0; + *val_size = 0; + *num = 0; + + // First get the size of the keys + s = glfs_flistxattr_with_xdata(fd, NULL,0, dict); + if (s == -1) { + goto out; + } + *key_size = s; + + // TBD - use the regular calloc + (*b) = c = calloc(s+1,1); + + // get the keys themselves + if (glfs_flistxattr_with_xdata(fd, c, s+1, dict) == -1) { + goto out; + } + do { + int32_t r; + uint32_t len = 0; + // for each key get the size of the value + r = glfs_fgetxattr_with_xdata(fd, c, NULL, 0, dict); + if (r == -1) + goto out; + (*val_size) += r; + len = strlen(c) + 1; + c += len; + s -= len; + (*num)++; + } while(s); + ret = 0; +out: + return ret; +} + +/* + * This function gets bunch of xattr values given set of keys. + * + * Input Arguments: + * fd - the file opened using glfs API. + * keys - the bunch of keys + * size - size of values + * num - number of keys + * dict - passed so that NSR translator can get this from the required brick + * + * Output Arguments: + * buf - where the values are written one after the other (NULL seperated) + */ +static int32_t +get_xattr(struct glfs_fd *fd, + char *keys, + char *buf, + uint32_t size, + uint32_t num, + dict_t *dict) +{ + while(num--) { + int32_t r; + uint32_t len = 0; + + // copy the key + strcpy(buf, keys); + len = strlen(keys); + len++; + buf += len; + + // get the value and copy the value after incrementing buf after the key + r = glfs_fgetxattr_with_xdata(fd, keys, buf, size, dict); + + // TBD - handle error + if (r == -1) + return -1; + + // increment the key to next value + keys += len; + + // increment buf to hold the next key + buf += strlen(buf) + 1; + } + return 0; +} + +/* + * Function deletes a bunch of key values in extended attributes of a file. + * Input Arguments: + * fd - the file opened using glfs API. + * dict - passed so that NSR translator can do this from the required brick + * keys - bunch of NULL seperated key names + * num - number of keys + */ +static int32_t delete_xattr(struct glfs_fd *fd, + dict_t *dict_t, + char *keys, + uint32_t num) +{ + while(num--) { + // get the value and copy the value + // TBD - handle failure cases when calling glfs_fremovexattr_with_xdata() + if (glfs_fremovexattr_with_xdata(fd, keys, dict_t) == -1) + return -1; + keys += strlen(keys) +1; + } + return 0; +} + +/* + * Given a bunch of key value pairs, fill them as xattrs for a file + * + * Input Arguments: + * fd - the file opened using glfs API. + * dict - passed so that NSR translator can do this from the required brick + * buf - buffer containing the keys-values pairs. The key value are NULL seperated. + * Each of the key-value is seperated by NULL in turn. + * num - Number of such key value pairs. + */ +static int32_t +fill_xattr(struct glfs_fd *fd, + dict_t *dict, + char *buf, + uint32_t num) +{ + char *k = buf, *val = NULL; + + while(num--) { + int32_t r; + + val = k + strlen(k) + 1; + + // TBD - handle failure cases when calling glfs_fsetxattr_with_xdata() + r = glfs_fsetxattr_with_xdata(fd, k, val, strlen(val), 0, dict); + if (r == -1) + return -1; + k = val + strlen(val) + 1; + } + return 0; +} + +/* + * This function gets a file that can be used for doing glfs_init later. + * The control file is used by control thread(function) to talk to peer reconciliation process. + * The data file is used by the data thread(function) to talk to the bricks. + * The control file is of name such as con:gfs1:-mnt-a1 where "gfs1" is name of host + * and the brick path is "/mnt/a1". + * The data file is of name such as data:gfs1:-mnt-a1. + * + * Input Arguments: + * vol - name of the volume. This is used to build the full path of the control and data file + * such as /var/lib/glusterd/vols/test/bricks/gfs2:-mnt-test1-nsr-recon.vol. + * In above example the volume name is test and brick on gfs2 is on path /mnt/test1 + * + * worker - The worker for a given node. This worker has 2 threads - one on the data plane + * and one on the control plane. The worker->name is already filled with hostname:brickname + * in the function nsr_reconciliation_driver(). Use that to build the volume file. + * So if worker->name has gfs1:/mnt/a1, control file is con:gfs1:-mnt-a1 + * and data file is data:gfs1:-mnt-a1. + * All these files are under the bricks directory. TBD - move this to a NSR recon directory later. + */ +static void +nsr_recon_get_file(char *vol, nsr_replica_worker_t *worker) +{ + char *ptr; + char tr[256]; + + // Replace the "/" to - + strcpy(tr, worker->name); + ptr = strchr (tr, '/'); + while (ptr) { + *ptr = '-'; + ptr = strchr (tr, '/'); + } + + // Build the base directory such as "/var/lib/glusterd/vols/test/bricks/" + sprintf(worker->control_worker->vol_file, + "/%s/%s/%s/%s/", + GLUSTERD_DEFAULT_WORKDIR, + GLUSTERD_VOLUME_DIR_PREFIX, + vol, + GLUSTERD_BRICK_INFO_DIR); + + strcat(worker->control_worker->vol_file, "con:"); + strcat(worker->control_worker->vol_file, tr); + + sprintf(worker->data_worker->vol_file, + "/%s/%s/%s/%s/", + GLUSTERD_DEFAULT_WORKDIR, + GLUSTERD_VOLUME_DIR_PREFIX, + vol, + GLUSTERD_BRICK_INFO_DIR); + strcat(worker->data_worker->vol_file, "data:"); + strcat(worker->data_worker->vol_file, tr); +} + +/* + * This function does all the glfs initialisation + * so that reconciliation process can talk to other recon processes/bricks + * for the control/data messages. + * This will be done everytime a worker needs to be kicked off to talk + * across any plane. + * + * Input arguments: + * ctx - The per worker based context + * control - set to true if this worker is for the control plane + */ +static int32_t +nsr_recon_start_work(nsr_per_node_worker_t *ctx, + gf_boolean_t control) +{ + glfs_t *fs = NULL; + xlator_t *this = ctx->driver_ctx->this; + int32_t ret = 0; + glfs_fd_t *aux_fd = NULL; // fd of auxilary log + char lf[256]; + nsr_recon_private_t *priv = NULL; + char *my_name = NULL; + char *morph_name = NULL, *ptr = NULL; + + priv = this->private; + my_name = RD_CALLOC (1, + strlen (priv->replica_group_members[0]) + 1, + gf_mt_recon_member_name_t); + strcpy (my_name, priv->replica_group_members[0]); + + nsr_worker_log(this->name, GF_LOG_INFO, + "starting work with volfile %s\n", + ctx->vol_file); + + fs = glfs_new(ctx->id); + if (!fs) { + glusterfs_this_set(this); + nsr_worker_log(this->name, GF_LOG_ERROR, + "cannot create gfls context for thread %s\n",ctx->id); + return -1; + } + + // For some vague reason, glfs init APIs seem to be clobbering "this". hence resetting it. + glusterfs_this_set(this); + nsr_worker_log(this->name, GF_LOG_INFO, + "init done. setting volfile %s\n", + ctx->vol_file); + + ret = glfs_set_volfile(fs, ctx->vol_file); + if (ret != 0) { + glusterfs_this_set(this); + nsr_worker_log(this->name, GF_LOG_ERROR, + "cannot set volfile %s for thread %s\n",ctx->vol_file, ctx->id); + return -1; + } + + morph_name = RD_CALLOC (1, strlen (my_name) + 1, + gf_mt_recon_member_name_t); + strcpy (morph_name, my_name); + + ptr = strchr (morph_name, '/'); + while (ptr) + { + *ptr = '-'; + ptr = strchr (morph_name, '/'); + } + // TBD - convert this to right /usr/local/var/log based log files. + + sprintf(lf, NSR_LOG_DIR"/%s/%s-%"PRIu32, morph_name, + (control == _gf_true)?"glfs-con":"glfs-data", ctx->index); + ret = glfs_set_logging (fs, lf, 7); + if (ret) { + glusterfs_this_set(this); + gf_log (this->name, GF_LOG_ERROR, "glfs logging set failed (%s)", + strerror (errno)); + return -1; + } + + ret = glfs_init (fs); + if (ret != 0) { + glusterfs_this_set(this); + nsr_worker_log(this->name, GF_LOG_ERROR, "cannot do init for thread %s with volfile %s\n",ctx->id, ctx->vol_file); + return -1; + } + glusterfs_this_set(this); + nsr_worker_log(this->name, GF_LOG_INFO, + "setting volfile %s done\n", + ctx->vol_file); + + // If it is control thread, open the "/" as the aux_fd. + // All IOs happening via the fd will do the RPCs across the reconciliation + // processes. For some vague reason, the root seems to be open'able like a file. + // TBD - try to clean this up. (implement a virtual file???) + if (control == _gf_true) { + nsr_worker_log(this->name, GF_LOG_INFO, + "doing open for / \n"); + aux_fd = glfs_open (fs, "/", O_RDWR); + // TBD - proper error handling. Stall reconciliation if such a thing happens? + if (aux_fd == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "cannot open aux log file for thread %s\n",ctx->id); + return -1; + } else { + nsr_worker_log(this->name, GF_LOG_ERROR, + "---opened aux log file for thread %s\n",ctx->id); + } + ctx->aux_fd = aux_fd; + } + glusterfs_this_set(this); + ctx->fs = fs; + return 0; +} + +/* + * + * This function does the cleanup after reconciliation is done + * or before we start a new reconciliation. + * + * Input arguments: + * ctx - The per worker based context + * control - set to true if this worker is for the control plane + */ +static int32_t +nsr_recon_end_work(nsr_per_node_worker_t *ctx, + gf_boolean_t control) +{ + int32_t ret = 0; + xlator_t *this = ctx->driver_ctx->this; + + nsr_worker_log(this->name, GF_LOG_INFO, + "doing fini for recon worker\n"); + + ret = glfs_fini(ctx->fs); + if (ret != 0) { + glusterfs_this_set(this); + nsr_worker_log(this->name, GF_LOG_ERROR, "cannot do fini for thread %s with volfile %s\n",ctx->id, ctx->vol_file); + return -1; + } + glusterfs_this_set(this); + ctx->fs = NULL; + if (control == _gf_true) { + glfs_close (ctx->aux_fd); + ctx->aux_fd = NULL; + } + return 0; +} + +//called in case all worker functions run as sepeerate threads +static void +init_worker(nsr_per_node_worker_t *ctx, gf_boolean_t control) +{ + pthread_mutex_init(&(ctx->mutex), NULL); + pthread_cond_init(&(ctx->cv), NULL); + INIT_LIST_HEAD(&(ctx->head.list)); +} + + +/* + * Control worker funct for getting changelog info on this node. + * calls directly functions to parse the changelog. + * + * Input arguments: + * ctx - The per worker based context + * control - set to true if this worker is for the control plane + */ +static void +control_worker_func_0(nsr_per_node_worker_t *ctx, + nsr_recon_work_t *work) +{ + unsigned int index = ctx->index; + nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]); + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + xlator_t *this = dr->this; + nsr_recon_private_t *priv = this->private; + + ctx->is_control = _gf_true; + + switch (work->req_id) { + case NSR_WORK_ID_INI: + { + break; + } + case NSR_WORK_ID_FINI: + { + break; + } + case NSR_WORK_ID_GET_LAST_TERM_INFO: + { + nsr_recon_last_term_info_t lt; + nsr_reconciliator_info_t *recon_info = rw->recon_info; + // term is stuffed inside work->index. overloading. + int32_t term = work->index; + + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get last term info for node %d with current term %d\n",index, term); + + // TBD - handle errors + // This is called by the leader after it gets the current term. + // Makes searching easier. + nsr_recon_libchangelog_get_last_term_info(this, priv->changelog_base_path, term, <); + recon_info->last_term = lt.last_term; + recon_info->commited_ops = lt.commited_ops; + recon_info->last_index = lt.last_index; + recon_info->first_index = lt.first_index; + + + nsr_worker_log(this->name, GF_LOG_INFO, + "out of get last term info with current term %d. got ops %d with first %d and last %d \n", + recon_info->last_term, recon_info->commited_ops, + recon_info->first_index, recon_info->last_index); + break; + } + case NSR_WORK_ID_GET_GIVEN_TERM_INFO: + { + nsr_recon_last_term_info_t lt; + nsr_reconciliator_info_t *recon_info = rw->recon_info; + // term is stuffed inside work->index. overloading. + int32_t term = work->index; + + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get term info for node %d for term %d\n",index, term); + + // TBD - handle errors + nsr_recon_libchangelog_get_this_term_info(this,priv->changelog_base_path, term, <); + + recon_info->last_term = lt.last_term; + recon_info->commited_ops = lt.commited_ops; + recon_info->last_index = lt.last_index; + recon_info->first_index = lt.first_index; + + nsr_worker_log(this->name, GF_LOG_INFO, + "out of get term info for term %d. got ops %d with first %d and last %d \n", + recon_info->last_term, recon_info->commited_ops, + recon_info->first_index, recon_info->last_index); + + break; + } + case NSR_WORK_ID_RECONCILIATOR_DO_WORK: + { + // For local resolution, the main driver thread does it. + // SO there is no way we can have this message for this node. + + nsr_worker_log(this->name, GF_LOG_INFO, + "this message should not be sent \n"); + break; + } + case NSR_WORK_ID_RESOLUTION_DO_WORK: + { + + nsr_worker_log(this->name, GF_LOG_INFO, + "this message should not be sent \n"); + ctx->result = -1; + break; + } + case NSR_WORK_ID_GET_RECONCILATION_WINDOW: + { + nsr_reconciliator_info_t *recon_info = rw->recon_info; + // first_index and last_index at 0 indicates empty log. + // For non empty log, the first_index always starts at 1. + uint32_t num = (dr->workers[index].recon_info->last_index - + dr->workers[index].recon_info->first_index + 1); + nsr_recon_record_details_t *rd; + uint32_t i=0; + + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get reconciliation window records for node %d for term %d with first %d last %d\n", + index, recon_info->last_term, recon_info->first_index, recon_info->last_index); + + + // TBD - handle buffer allocation errors + rd = RD_CALLOC(num, + sizeof(nsr_recon_record_details_t), + gf_mt_recon_record_details_t); + if (rd == NULL) { + ctx->result = -1; + return; + } + + recon_info->records = RD_CALLOC(num, + sizeof(nsr_reconciliation_record_t), + gf_mt_recon_record_t); + if (recon_info->records == NULL) { + ctx->result = -1; + return; + } + + // TBD - handle errors + if (nsr_recon_libchangelog_get_records(this, priv->changelog_base_path, + recon_info->last_term, + recon_info->first_index, + recon_info->last_index, + rd) == _gf_false) { + ctx->result = -1; + return; + } + + // The above function writes into rd from 0 to (num -1) + // We need to take care of this whenever we deal with records + for (i=0; i < num; i++) { + ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl + memcpy(&(recon_info->records[i].rec), + &(rd[i]), + sizeof(nsr_recon_record_details_t)); + } + + GF_FREE(rd); + + nsr_worker_log(this->name, GF_LOG_INFO, + "got reconciliation window records for node %d for term %d \n", + index, recon_info->last_term); + break; + } + + default: + nsr_worker_log (this->name, GF_LOG_ERROR, + "bad req id %u", work->req_id); + } + + return; +} + +// Control worker thread +static void* +control_worker_main_0(nsr_per_node_worker_t *ctx) +{ + + ctx->is_control = _gf_true; + nsr_worker_log(this->name, GF_LOG_INFO, + "starting control worker func 0\n"); + + init_worker(ctx, 1); + + while(1) + { + nsr_recon_work_t *work = NULL; + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + + nsr_worker_log(this->name, GF_LOG_INFO, + "waiting for work\n"); + + pthread_mutex_lock(&ctx->mutex); + while (list_empty(&(ctx->head.list))) { + pthread_cond_wait(&ctx->cv, &ctx->mutex); + } + pthread_mutex_unlock(&ctx->mutex); + + + list_for_each_entry(work, &(ctx->head.list), list) { + nsr_worker_log(this->name, GF_LOG_INFO, + "got work with id %d\n", work->req_id); + work->in_use = _gf_false; + + // Call the main function. + control_worker_func_0(ctx, work); + + atomic_dec(&(dr->outstanding)); + break; + } + + nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n"); + list_del_init (&work->list); + GF_FREE(work); + nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n"); + } + + return NULL; +} + +static void +control_worker_do_reconciliation (nsr_per_node_worker_t *ctx, + nsr_recon_work_t *work) +{ + unsigned int index = ctx->index; + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + nsr_recon_role_t rr; + uint32_t i=0; + uint32_t num=0; + uint32_t idx = dr->reconciliator_index; + uint32_t term = dr->workers[idx].recon_info->last_term; + + GF_ASSERT(idx == index); + + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to make this index %d as reconciliator for term %d\n", index, term); + + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, + nsr_recon_xlator_sector_1, + SEEK_SET) == -1) { + ctx->result = -1; + return; + } + + // We have all the info for all other nodes. + // Fill all that info when sending data to that process. + for (i=0; i < dr->replica_group_size; i++) { + if ( dr->workers[i].in_use && + (dr->workers[i].recon_info->last_term == term)) { + rr.info[num].last_term = + dr->workers[i].recon_info->last_term; + rr.info[num].commited_ops = + dr->workers[i].recon_info->commited_ops; + rr.info[num].last_index = + dr->workers[i].recon_info->last_index; + rr.info[num].first_index = + dr->workers[i].recon_info->first_index; + strcpy(rr.info[num].name, + dr->workers[i].name); + } + num++; + } + rr.num = num; + rr.role = reconciliator; + ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl + if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) { + ctx->result = -1; + // Put the errno only for this case since we are bothered about + // retrying only for this case. For rest of the cases we will + // just return EIO in errno. + ctx->op_errno = errno; + return; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "sent reconciliator info for term %d with node count as %d\n", term, num); +} + +static void +control_worker_do_resolution (nsr_per_node_worker_t *ctx, + nsr_recon_work_t *work) +{ + unsigned int index = ctx->index; + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + nsr_recon_role_t rr; + unsigned int i=0, j=0; + unsigned int rec = dr->reconciliator_index; + + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to make this index %d as resolutor with reconciliator as %d\n",index, rec); + + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, + nsr_recon_xlator_sector_1, + SEEK_SET) == -1) { + ctx->result = -1; + return; + } + + rr.num = 2; + + // Fill in info[0] as info for the node for which we are seeking + // resolution. Fill in info[1] as info of the reconciliator node. The + // function nsr_recon_driver_get_role() that will be called when this + // message reaches the node will look at index 1 for term information + // related to the reconciliator. + for (i=0; i < 2; i++) { + (i == 0) ? (j = index) : (j = rec); + rr.info[i].last_term = + dr->workers[j].recon_info->last_term; + rr.info[i].commited_ops = + dr->workers[j].recon_info->commited_ops; + rr.info[i].last_index = + dr->workers[j].recon_info->last_index; + rr.info[i].first_index = + dr->workers[j].recon_info->first_index; + // The name is used as the key to convert indices since the + // reconciliator index could be different across the nodes. + strcpy(rr.info[i].name, + dr->workers[j].name); + if (i == 0) { + nsr_worker_log(this->name, GF_LOG_INFO, + "this node info term=%d, ops=%d, first=%d, last=%d\n", + rr.info[i].last_term, rr.info[i].commited_ops, + rr.info[i].first_index,rr.info[i].last_index); + } else { + nsr_worker_log(this->name, GF_LOG_INFO, + "reconciliator node info term=%d, ops=%d, first=%d, last=%d\n", + rr.info[i].last_term, rr.info[i].commited_ops, + rr.info[i].first_index,rr.info[i].last_index); + } + } + rr.role = resolutor; + ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl + if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) { + ctx->result = -1; + // Put the errno only for this case since we are bothered about + // retrying only for this case. For rest of the cases we will + // just return EIO in errno. + ctx->op_errno = errno; + return; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "sent message to this node %d resolutor with reconciliator as %d\n", index, rec); +} + +static void +control_worker_get_window (nsr_per_node_worker_t *ctx, nsr_recon_work_t *work) +{ + unsigned int index = ctx->index; + nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]); + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + xlator_t *this = dr->this; + nsr_recon_log_info_t li; + nsr_reconciliator_info_t *recon_info = rw->recon_info; + uint32_t i = 0; + uint32_t num = (dr->workers[index].recon_info->last_index - + dr->workers[index].recon_info->first_index +1); + nsr_recon_record_details_t *rd; + + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get reconciliation window records for node %d for term %d with first %d last %d\n", + index, recon_info->last_term, recon_info->first_index, recon_info->last_index); + + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET) == -1) { + ctx->result = -1; + return; + } + + // write to node what term & indices we are interested + li.term = recon_info->last_term; + li.first_index = recon_info->first_index; + li.last_index = recon_info->last_index; + ENDIAN_CONVERSION_LI(li, _gf_false); //htonl + if (glfs_write(ctx->aux_fd, &li, sizeof(li), 0) == -1) { + ctx->result = -1; + return; + } + + // then read + rd = RD_CALLOC(num, + sizeof(nsr_recon_record_details_t), + gf_mt_recon_private_t); + if (rd == NULL) { + ctx->result = -1; + return; + } + recon_info->records = RD_CALLOC(num, + sizeof(nsr_reconciliation_record_t), + gf_mt_recon_private_t); + if (recon_info->records == NULL) { + ctx->result = -1; + goto err; + } + + if (glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0) == -1) { + ctx->result = -1; + goto err; + } + + for (i=0; i < num; i++) { + ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl + memcpy (&(recon_info->records[i].rec), &(rd[i]), + sizeof(nsr_recon_record_details_t)); + nsr_worker_log(this->name, GF_LOG_INFO, + "get_reconcilaition_window:Got %d at index %d\n", + recon_info->records[i].rec.type, + i + recon_info->first_index); + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "got reconciliation window records for node %d for term %d \n", + index, recon_info->last_term); + +err: + GF_FREE(rd); +} + +/* + * Control worker funct for getting changelog info on some other node. + * calls glfs functions to seek/read/write on aux_fd. + * + * Input arguments: + * ctx - The per worker based context + * control - set to true if this worker is for the control plane + */ +static void +control_worker_func(nsr_per_node_worker_t *ctx, + nsr_recon_work_t *work) +{ + unsigned int index = ctx->index; + nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]); + nsr_recon_last_term_info_t lt; + nsr_reconciliator_info_t *recon_info = rw->recon_info; + int32_t term = htonl(work->index); // overloading it + + ctx->is_control = _gf_true; + + switch (work->req_id){ + + case NSR_WORK_ID_INI: + nsr_worker_log(this->name, GF_LOG_INFO, + "calling nsr_recon_start_work\n"); + + // TBD - handle error in case nsr_recon_start_work gives error + if (nsr_recon_start_work(ctx, _gf_true) != 0) { + ctx->result = -1; + return; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "finished nsr_recon_start_work\n"); + break; + + case NSR_WORK_ID_FINI: + nsr_worker_log(this->name, GF_LOG_INFO, + "calling nsr_recon_end_work\n"); + + // TBD - handle error in case nsr_recon_end_work gives error + if (nsr_recon_end_work(ctx, _gf_true) != 0) { + ctx->result = -1; + return; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "finished nsr_recon_end_work\n"); + break; + + case NSR_WORK_ID_GET_LAST_TERM_INFO: + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get last term info for node %d with current term %d\n",index, work->index); + + // first write the current term term number + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET) == -1) { + ctx->result = -1; + return; + } + if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) { + ctx->result = -1; + return; + } + if (glfs_read(ctx->aux_fd, <, sizeof(lt), 0) == -1) { + ctx->result = -1; + return; + } + ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl + recon_info->last_term = lt.last_term; + recon_info->commited_ops = lt.commited_ops; + recon_info->last_index = lt.last_index; + recon_info->first_index = lt.first_index; + + nsr_worker_log(this->name, GF_LOG_INFO, + "out of get last term info with current term %d. got ops %d with first %d and last %d \n", + recon_info->last_term, recon_info->commited_ops, + recon_info->first_index, recon_info->last_index); + + break; + + case NSR_WORK_ID_GET_GIVEN_TERM_INFO: + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get term info for node %d for term %d\n",index, work->index); + + // first write the term number + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET) == -1) { + ctx->result = -1; + return; + } + if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) { + ctx->result = -1; + return; + } + if (glfs_read(ctx->aux_fd, <, sizeof(lt), 0) == -1) { + ctx->result = -1; + return; + } + ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl + recon_info->last_term = lt.last_term; + recon_info->commited_ops = lt.commited_ops; + recon_info->last_index = lt.last_index; + recon_info->first_index = lt.first_index; + + nsr_worker_log(this->name, GF_LOG_INFO, + "out of get term info for term %d. got ops %d with first %d and last %d \n", + recon_info->last_term, recon_info->commited_ops, + recon_info->first_index, recon_info->last_index); + + break; + + case NSR_WORK_ID_RECONCILIATOR_DO_WORK: + control_worker_do_reconciliation(ctx,work); + break; + + case NSR_WORK_ID_RESOLUTION_DO_WORK: + control_worker_do_resolution(ctx,work); + break; + + case NSR_WORK_ID_GET_RECONCILATION_WINDOW: + control_worker_get_window(ctx,work); + break; + + default: + nsr_worker_log (this->name, GF_LOG_ERROR, + "bad work type %d", work->req_id); + } + + return; +} + +// Control worker thread +static void* +control_worker_main(nsr_per_node_worker_t *ctx) +{ + unsigned int index = ctx->index; + + ctx->is_control = _gf_true; + nsr_worker_log(this->name, GF_LOG_INFO, + "starting control worker func\n"); + + // if this is for local processing, call the changelog parsing calls directly + if (index == 0) { + control_worker_main_0(ctx); + return NULL; + } + + init_worker(ctx, 1); + + + while(1) + { + nsr_recon_work_t *work = NULL; + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + + nsr_worker_log(this->name, GF_LOG_INFO, + "waiting for work\n"); + + pthread_mutex_lock(&ctx->mutex); + while (list_empty(&(ctx->head.list))) { + pthread_cond_wait(&ctx->cv, &ctx->mutex); + } + pthread_mutex_unlock(&ctx->mutex); + + + list_for_each_entry(work, &(ctx->head.list), list) { + nsr_worker_log(this->name, GF_LOG_INFO, + "got work with id %d\n", work->req_id); + work->in_use = _gf_false; + control_worker_func(ctx,work); + atomic_dec(&(dr->outstanding)); + break; + } + nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n"); + list_del_init (&work->list); + GF_FREE(work); + nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n"); + } + + return NULL; +} + +/* + * This function gets called if this process is chosen as the reconciliator + * for this replica group. It would have already got the records for the last term + * for the indices that are required (from the first HOLE to last index) from + * all other nodes that also witnessed that term. COmpare all the records and + * compute the work required. + * + * Input arguments + * ctx - driver context. All recon work is stored in workers[0].recon_info + */ +static void +compute_reconciliation_work(nsr_recon_driver_ctx_t *ctx) +{ + uint32_t i=0, j=0; + nsr_reconciliator_info_t *my_recon = ctx->workers[0].recon_info; + uint32_t num = (my_recon->last_index - my_recon->first_index + 1); + + for (i=0; i < num; i++) { + nsr_log_type_t orig, new; + unsigned int src = 0; + orig = new = my_recon->records[i].rec.type; + nsr_recon_work_type_t tw = NSR_RECON_WORK_NONE; + // index 0 means this node. Look at all other nodes. + for (j=1; j < ctx->replica_group_size; j++) { + if (ctx->workers[j].in_use) { + nsr_log_type_t pr = ctx->workers[j].recon_info->records[i].work.type; + if ((new != pr) && (pr > new)) { + src = j; + new = (new | pr); + } + } + } + // TBD - compare data if new and orig are all FILLs. (can detect changelog corruption) + // Right now we compare if both orig and new are psuedo holes since + // only that is of interest to us. + if (orig != new) { + if ((orig == NSR_LOG_HOLE) && (new == NSR_LOG_PSEUDO_HOLE)) + tw = NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE; + else if ((orig == NSR_LOG_HOLE) && (new == NSR_LOG_FILL)) + tw = NSR_RECON_WORK_HOLE_TO_FILL; + else if ((orig == NSR_LOG_PSEUDO_HOLE) && (new == NSR_LOG_PSEUDO_HOLE)) + tw = NSR_RECON_WORK_COMPARE_PSEUDO_HOLE; + else if ((orig == NSR_LOG_PSEUDO_HOLE) && (new == NSR_LOG_FILL)) + tw = NSR_RECON_WORK_HOLE_TO_FILL; + } + if (tw != NSR_RECON_WORK_NONE) { + my_recon->records[i].work.type = tw; + my_recon->records[i].work.source = src; + // Overwrite the record + memcpy(&(my_recon->records[i].rec), + &(ctx->workers[src].recon_info->records[i].rec), + sizeof(nsr_recon_record_details_t)); + } + } + return; +} + +static int32_t +nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, + uint32_t i, + gf_boolean_t in_use); + +/* + * Write the role and associated information to the node. + * This gets called from recon xlator indicating node is either + * leader, reconciliator or should do resolution. + */ +gf_boolean_t +nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, + nsr_recon_role_t *rr, + uint32_t term) +{ + nsr_role_work_t *rw; + xlator_t *this = ctx->this; + + nsr_driver_log(this->name, GF_LOG_INFO, "set role called \n"); + rw = RD_CALLOC(1, sizeof (nsr_role_work_t), gf_mt_recon_role_work_t); + memcpy(&rw->role, rr, sizeof(nsr_recon_role_t)); + rw->term = term; + INIT_LIST_HEAD(&(rw->list)); + pthread_mutex_lock(&(ctx->mutex)); + list_add_tail(&rw->list, &ctx->role_head.list); + pthread_cond_signal(&(ctx->cv)); + pthread_mutex_unlock(&(ctx->mutex)); + nsr_driver_log(this->name, GF_LOG_INFO, "set role returns \n"); + return _gf_true; +} + +/* + * First we undo the last role to make sure we clean up. + * + * Input arguments + * ctx - driver context. + * rr - Role information. + * If leader, the thread now sends the list of all nodes that are part of + * the current replica group. Use that to find out the activate the + * required worker threads. + * If reconciliator, the leader node would have sent information about + * all nodes which saw last term as the reconciliator. + * If resolution to be done, then rr.info[0] will have this node's info + * which the leader would have got earlier. rr[1].info will have the + * info regarding the reconciliator. + * term - leader's term that is causing this role + */ +nsr_recon_driver_state_t +nsr_recon_driver_get_role(int32_t *status, + nsr_recon_driver_ctx_t *ctx, + nsr_role_work_t *rw) +{ + uint8_t i=0, j=0; + nsr_recon_role_t *rr = &(rw->role); + nsr_reconciliator_info_t *tmp; + xlator_t *this = ctx->this; + + // First make all the threads uninitialise + for (i = 0; i < ctx->replica_group_size; i++) { + if (nsr_recon_in_use(ctx, i, _gf_false) == -1) { + *status = -1; + return 0; + } + } + + switch (rr->role) { + case leader: + case joiner: + + // First set info this node + tmp = RD_CALLOC (1, sizeof (nsr_reconciliator_info_t), + gf_mt_recon_reconciliator_info_t); + if (!tmp) { + *status = -1; + return 0; + } + ctx->workers[0].recon_info = tmp; + if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) { + *status = -1; + return 0; + } + ctx->current_term = rr->current_term; + + // Find rest of the nodes + for (i=1; i < ctx->replica_group_size; i++) { + for (j=0 ; /* nothing */; j++) { + if (j >= rr->num) { + nsr_driver_log (this->name, GF_LOG_ERROR, + "failed to find %s", + ctx->workers[i].name); + break; + } + if (strcmp(ctx->workers[i].name, + rr->info[j].name)) { + continue; + } + nsr_driver_log (this->name, GF_LOG_INFO, + "nsr_recon_driver_get_role: this as %s. found other server %s\n", + (rr->role == leader) ? "leader" + : "joiner", + ctx->workers[i].name); + + // Allocate this here. This will get later + // filled when the leader tries to get last term + // information from all the nodes + tmp = RD_CALLOC (1, + sizeof (nsr_reconciliator_info_t), + gf_mt_recon_reconciliator_info_t); + if (!tmp) { + *status = -1; + return 0; + } + ctx->workers[i].recon_info = tmp; + if (nsr_recon_in_use(ctx, i, _gf_true) == -1) { + *status = -1; + return 0; + } + break; + } + } + // If leader, reconciliator has to be chosen. + // If joiner, we are the reconciliator. + if (rr->role == leader) + ctx->reconciliator_index = -1; + else + ctx->reconciliator_index = 0; + break; + + case reconciliator: + ctx->reconciliator_index = 0; + // Copy information about all the other members which had the + // same term + for (i=0; i < rr->num; i++) { + for (j=0; /* nothing */; j++) { + if (j >= ctx->replica_group_size) { + nsr_driver_log (this->name, GF_LOG_ERROR, + "failed to find %s", + rr->info[i].name); + break; + } + if (strcmp(rr->info[i].name, + ctx->workers[j].name)) { + continue; + } + nsr_driver_log(this->name, GF_LOG_INFO, + "nsr_recon_driver_get_role: this as reconciliator. found other server %s\n", + ctx->workers[j].name); + tmp = RD_CALLOC (1, + sizeof (nsr_reconciliator_info_t), + gf_mt_recon_reconciliator_info_t); + if (!tmp) { + *status = -1; + return 0; + } + tmp->last_term = rr->info[i].last_term; + tmp->commited_ops = rr->info[i].commited_ops; + tmp->last_index = rr->info[i].last_index; + tmp->first_index = rr->info[i].first_index; + ctx->workers[j].recon_info = tmp; + if (nsr_recon_in_use(ctx, j, _gf_true) == -1) { + *status = -1; + return 0; + } + break; + } + } + break; + + case resolutor: + for (j=0; /* nothing */; j++) { + // info[1] has the information regarding the + // reconciliator + if (j >= ctx->replica_group_size) { + nsr_driver_log (this->name, GF_LOG_ERROR, + "failed to find %s", + rr->info[1].name); + break; + } + if (strcmp(rr->info[1].name, + ctx->workers[j].name)) { + continue; + } + nsr_driver_log(this->name, GF_LOG_INFO, + "nsr_recon_driver_get_role: this as resolutor. found other server %s as reconciliator\n", + ctx->workers[j].name); + tmp = RD_CALLOC (1, + sizeof (nsr_reconciliator_info_t), + gf_mt_recon_reconciliator_info_t); + if (!tmp) { + *status = -1; + return 0; + } + tmp->last_term = rr->info[1].last_term; + tmp->commited_ops = rr->info[1].commited_ops; + tmp->last_index = rr->info[1].last_index; + tmp->first_index = rr->info[1].first_index; + ctx->reconciliator_index = j; + ctx->workers[j].recon_info = tmp; + if (nsr_recon_in_use(ctx, j, _gf_true) == -1) { + *status = -1; + return 0; + } + GF_ASSERT(ctx->reconciliator_index != 0); + break; + } + tmp = RD_CALLOC (1, + sizeof (nsr_reconciliator_info_t), + gf_mt_recon_reconciliator_info_t); + if (!tmp) { + *status = -1; + return 0; + } + // info[0] has all info for this node + tmp->last_term = rr->info[0].last_term; + tmp->commited_ops = rr->info[0].commited_ops; + tmp->last_index = rr->info[0].last_index; + tmp->first_index = rr->info[0].first_index; + ctx->workers[0].recon_info = tmp; + if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) { + *status = -1; + return 0; + } + } + + ctx->term = rw->term; + + *status = 0; + return rr->role; +} + + +/* + * This function gets called if this process is chosen to sync itself with + * the reconciliator. + * + * Input arguments + * ctx - driver context. + * my_info - local changelog info that has all the local records for indices that require work + * his_info - reconciliator's info that has all the golden copies + * invalidate - if set to true, then do not consult local records + */ + +static void +compute_resolution_work(nsr_recon_driver_ctx_t *ctx, + nsr_reconciliator_info_t *my_info, + nsr_reconciliator_info_t *his_info, + gf_boolean_t invalidate) +{ + uint32_t i=0; + uint32_t num = (my_info->last_index - my_info->first_index + 1); + xlator_t *this = ctx->this; + + if (invalidate) { + if (my_info->records) { + GF_FREE(my_info->records); + } + my_info->records = RD_CALLOC(num, + sizeof(nsr_reconciliation_record_t), + gf_mt_recon_record_t); + } + + for (i=0; i < num; i++) { + nsr_log_type_t orig, new; + nsr_recon_work_type_t tw = NSR_RECON_WORK_NONE; + orig = my_info->records[i].rec.type; + if (invalidate) + orig = NSR_LOG_HOLE; + new = his_info->records[i].rec.type; + // TBD - we can never have PSUEDO_HOLE in reconciliator's info + // We should have taken care of that during reconciliation. + // Put an assert to validate that. + if (new != orig) { + if ((orig != NSR_LOG_FILL) && (new == NSR_LOG_FILL)) + tw = NSR_RECON_WORK_HOLE_TO_FILL; + else if ((orig != NSR_LOG_HOLE) && (new == NSR_LOG_HOLE)) + tw = NSR_RECON_WORK_UNDO_FILL; + } + // copy the records anyway + my_info->records[i].work.type = tw; + my_info->records[i].work.source = ctx->reconciliator_index; + memcpy(&(my_info->records[i].rec), + &(his_info->records[i].rec), + sizeof(nsr_recon_record_details_t)); + } + return; +} + + +// Create an glfs object +static struct glfs_object * +create_obj(nsr_per_node_worker_t *ctx, char *gfid_str) +{ + struct glfs_object *obj = NULL; + uuid_t gfid; + + uuid_parse(gfid_str, gfid); + + obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL); + if (obj == NULL) { + GF_ASSERT(obj != NULL); + nsr_worker_log(this->name, GF_LOG_ERROR, + "creating of handle failed\n"); + return NULL; + } + return obj; +} + +/* + * Function to apply the actual record onto the local brick. + * prior to this we should have read all the data from the + * brick that has the data. + * + * Input parameters: + * ctx - per node worker context that has the fs for communicating to brick + * ri - Reconciliation record that needs fixup + * dict - So that NSR server translator on brick applis fixup only on this brick + * and the changelog translator consumes term and index. + */ + +static gf_boolean_t +apply_record(nsr_per_node_worker_t *ctx, + nsr_reconciliation_record_t *ri, + dict_t * dict) +{ + struct glfs_fd *fd = NULL; + struct glfs_object *obj = NULL; + struct glfs_object *to_obj = NULL; + gf_boolean_t retval = _gf_false; + + if (ri->rec.op == GF_FOP_WRITE) { + + nsr_worker_log(this->name, GF_LOG_INFO, + "DOing write for file %s @offset %d for len %d\n", + ri->rec.gfid, ri->rec.offset, ri->rec.len); + + // The file has got deleted on the source. Hence just ignore + // this. + // TBD - get a way to just stuff the log entry without writing + // the data so that changelogs remain identical. + if (ri->work.data == NULL) { + return _gf_true; + } + + if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL) + goto err; + + fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); + if (fd == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "open for file %s failed\n", + ri->rec.gfid); + goto err; + } + if (glfs_lseek_with_xdata(fd, ri->rec.offset, SEEK_SET, dict) != ri->rec.offset) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "lseek for file %s failed at offset %d\n", + ri->rec.gfid, ri->rec.offset); + goto err; + } + if (glfs_write_with_xdata(fd, ri->work.data, ri->rec.len, 0, dict) != ri->rec.len) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "write for file %s failed for bytes %d\n", + ri->rec.gfid, ri->rec.len); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finished DOing write for gfid %s @offset %d for len %d\n", + ri->rec.gfid, ri->rec.offset, ri->rec.len); + + } else if (ri->rec.op == GF_FOP_FTRUNCATE) { + + nsr_worker_log(this->name, GF_LOG_INFO, + "DOing truncate for file %s @offset %d \n", + ri->rec.gfid, ri->rec.offset); + + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } + + fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); + if (fd == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "open for file %s failed\n", + ri->rec.gfid); + goto err; + } + if (glfs_ftruncate_with_xdata(fd, ri->rec.offset, dict) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "trunctae for file %s failed @offset %d\n", + ri->rec.gfid,ri->rec.offset ); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finished DOing truncate for gfid %s @offset %d \n", + ri->rec.gfid, ri->rec.offset); + + } else if ((ri->rec.op == GF_FOP_FREMOVEXATTR) || + (ri->rec.op == GF_FOP_REMOVEXATTR) || + (ri->rec.op == GF_FOP_SETXATTR) || + (ri->rec.op == GF_FOP_FSETXATTR)) { + + uint32_t k_s = 0, v_s = 0; + char *t_b= NULL; + uint32_t num = 0; + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing set extended attr for file %s \n", + ri->rec.gfid); + + // The file has got deleted on the source. Hence just ignore + // this. TBD - get a way to just stuff the log entry without + // writing the data so that changelogs remain identical. + if (ri->work.data == NULL) { + return _gf_true; + } + + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } + + if (obj->inode->ia_type == IA_IFDIR) + fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict); + else + fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); + if (fd == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "open for file %s failed\n", + ri->rec.gfid); + goto err; + } + + if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) { + if (t_b) free(t_b); + nsr_worker_log(this->name, GF_LOG_ERROR, + "list of xattr of %s failed\n", ri->rec.gfid); + goto err; + } + + if (delete_xattr(fd, dict, t_b, num) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "deleting xattrs failed\n"); + goto err; + } + + // Set one special dict flag to indicate the opcode so that + // the opcode gets set to this + if (dict_set_int32(dict,"recon-xattr-opcode",ri->rec.op)) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "setting opcode to %d failed\n",ri->rec.op); + goto err; + } + + if (fill_xattr(fd, dict, ri->work.data, ri->work.num) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "filling xattrs failed\n"); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finsihed Doing set extended attr for %s \n", + ri->rec.gfid); + + } else if (ri->rec.op == GF_FOP_CREATE) { + + uuid_t gfid; + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing create for file %s \n", + ri->rec.gfid); + + // TBD - add mode and flags later + uuid_parse(ri->rec.gfid, gfid); + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + + nsr_worker_log (this->name, GF_LOG_INFO, + "creating with mode 0%o", ri->rec.mode); + if (glfs_h_creat_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, ri->rec.mode, NULL, gfid, dict) == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "Failure for Doing create for file %s\n", + ri->rec.entry); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finished Doing create for file %s \n", + ri->rec.entry); + + } else if (ri->rec.op == GF_FOP_MKNOD) { + + uuid_t gfid; + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing mknod for file %s \n", + ri->rec.entry); + + // TBD - add mode and flags later + uuid_parse(ri->rec.gfid, gfid); + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + + if (glfs_h_mknod_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, 0777, NULL, gfid, dict) == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "Failure for Doing mknod for file %s\n", + ri->rec.entry); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finished Doing mknod for file %s \n", + ri->rec.entry); + + } else if (ri->rec.op == GF_FOP_MKDIR) { + + uuid_t gfid; + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing mkdir for dir %s \n", + ri->rec.gfid); + + // TBD - add mode and flags later + uuid_parse(ri->rec.gfid, gfid); + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + + if (glfs_h_mkdir_with_xdata(ctx->fs, obj, ri->rec.entry, 0777, NULL, gfid, dict) != 0) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "Failure for Doing mkdir for file %s\n", + ri->rec.entry); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finished Doing mkdir for file %s \n", + ri->rec.entry); + + } else if ((ri->rec.op == GF_FOP_RMDIR) || (ri->rec.op == GF_FOP_UNLINK)) { + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing rmdir/ublink for dir %s \n", + ri->rec.entry); + + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + if (glfs_h_unlink_with_xdata(ctx->fs, obj, ri->rec.entry, dict) != 0) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "Failure for Doing rmdir/unlink for file %s\n", + ri->rec.entry); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finished Doing rmdir/unlink for file %s \n", + ri->rec.entry); + + } else if (ri->rec.op == GF_FOP_SYMLINK) { + + uuid_t gfid; + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing symlink for file %s to file %s \n", + ri->rec.entry, ri->rec.link_path); + + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + uuid_parse(ri->rec.gfid, gfid); + + if (glfs_h_symlink_with_xdata(ctx->fs, obj, ri->rec.entry, ri->rec.link_path, NULL, gfid, dict) == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "Failed to Doing symlink for file %s to file %s \n", + ri->rec.entry, ri->rec.link_path); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finished Doing symlink for file %s to file %s \n", + ri->rec.entry, ri->rec.link_path); + + } else if (ri->rec.op == GF_FOP_LINK) { + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing hard link for file %s to file %s \n", + ri->rec.entry, ri->rec.gfid); + + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } + + if (glfs_h_link_with_xdata(ctx->fs, to_obj, obj, ri->rec.entry, dict) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "Failed to Doing hard link for file %s to file %s \n", + ri->rec.entry, ri->rec.gfid); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finsihed doing hard link for file %s to file %s \n", + ri->rec.entry, ri->rec.gfid); + + } else if (ri->rec.op == GF_FOP_RENAME) { + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing rename for file %s to file %s \n", + ri->rec.entry, ri->rec.newloc); + + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } + + if (glfs_h_rename_with_xdata(ctx->fs, obj, ri->rec.entry, to_obj, ri->rec.newloc, dict) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "Failed to Doing rename for file %s to file %s \n", + ri->rec.entry, ri->rec.newloc); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Finsihed doing renam for file %s to file %s \n", + ri->rec.entry, ri->rec.newloc); + + + } else if ((ri->rec.op == GF_FOP_SETATTR) || (ri->rec.op == GF_FOP_FSETATTR)) { + + struct iatt iatt = {0, }; + int valid = 0; + int ret = -1; + + // TBD - do the actual settings once we do that + // right now we just set the mode so that changelog gets filled + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing attr for file %s \n", + ri->rec.gfid); + + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } + + if (obj->inode->ia_type == IA_IFDIR) + fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict); + else + fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); + if (fd == NULL) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "open for file %s failed\n", + ri->rec.gfid); + goto err; + } + + iatt.ia_prot = ia_prot_from_st_mode(777); + valid = GF_SET_ATTR_MODE; + + + // Set one special dict flag to indicate the opcode so that + // the opcode gets set to this + if (dict_set_int32(dict,"recon-attr-opcode",ri->rec.op)) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "setting opcode to %d failed\n",ri->rec.op); + goto err; + } + + ret = glfs_fsetattr_with_xdata(fd, &iatt, valid, dict); + if (ret == -1) { + nsr_worker_log(this->name, GF_LOG_INFO, + "failed Doing attr for file %s \n", + ri->rec.gfid); + goto err; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "Doing attr for file %s \n", + ri->rec.gfid); + + } + + retval = _gf_true; + +err: + if (fd) { + /* + * It's not clear that we should be passing the same dict to + * glfs_close that was passed to us for glfs_open, but that's + * the prior behavior so let's preserve it for now. + */ + if (glfs_close_with_xdata(fd, dict) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "close failed\n"); + } + } + if (obj) { + /* + * AFAICT fd operations do not borrow this reference, so we + * still need to drop it ourselves. + */ + glfs_h_close(obj); + } + if (to_obj) { + /* + * AFAICT fd operations do not borrow this reference, so we + * still need to drop it ourselves. + */ + glfs_h_close(to_obj); + } + return retval; +} + +//return back opcodes that requires reading from source +static gf_boolean_t +recon_check_changelog(nsr_recon_record_details_t *rd) +{ + return((rd->op == GF_FOP_WRITE) || + (rd->op == GF_FOP_FSETATTR) || + (rd-> op == GF_FOP_SETATTR) || + (rd->op == GF_FOP_FREMOVEXATTR) || + (rd->op == GF_FOP_SETXATTR) || + (rd->op == GF_FOP_FSETXATTR) || + (rd->op == GF_FOP_SYMLINK)); + +} + +// TBD +static gf_boolean_t +recon_compute_undo(nsr_recon_record_details_t *rd) +{ + return(_gf_false); +} + + +/* + * Function that talks to the brick for data tranfer. + * + * Input arguments: + * ctx - worker context + * work - pointer to work object + */ +static void +data_worker_func(nsr_per_node_worker_t *ctx, + nsr_recon_work_t *work) +{ + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + xlator_t *this = dr->this; + nsr_reconciliation_record_t *ri = NULL; + nsr_recon_record_details_t *rd = NULL; + int wip = 0; + dict_t * dict = NULL; + struct glfs_fd *fd = NULL; + struct glfs_object *obj = NULL; + uuid_t gfid; + uint32_t k_s = 0, v_s = 0; + char *t_b= NULL; + uint32_t num=0; + + switch (work->req_id){ + case NSR_WORK_ID_INI: + nsr_worker_log(this->name, GF_LOG_INFO, + "started data ini \n"); + + if (nsr_recon_start_work(ctx, _gf_false) != 0) { + ctx->result = -1; + return; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "finished data ini \n"); + break; + case NSR_WORK_ID_FINI: + nsr_worker_log(this->name, GF_LOG_INFO, + "started data fini \n"); + + if (nsr_recon_end_work(ctx, _gf_false) != 0) { + ctx->result = -1; + return; + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "finished data fini \n"); + break; + case NSR_WORK_ID_SINGLE_RECONCILIATION_READ: + // first_index always starts with 1 but records starts at 0. + wip = work->index - (dr->workers[0].recon_info->first_index); + ri = &(dr->workers[0].recon_info->records[wip]); + rd = &(ri->rec); + + dict = dict_new (); + if (!dict) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "failed allocating for dictionary\n"); + break; + } + if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + + switch (rd->op) { + case GF_FOP_WRITE: + + // record already copied. + // copy data to this node's info. + + uuid_parse(ri->rec.gfid, gfid); + + nsr_worker_log(this->name, GF_LOG_INFO, + "started recon read for file %s at offset %d at len %d\n", + ri->rec.gfid, rd->offset, rd->len); + + obj = glfs_h_create_from_handle (ctx->fs, gfid, + GFAPI_HANDLE_LENGTH, + NULL); + if (obj == NULL) { + GF_ASSERT(obj != NULL); + nsr_worker_log(this->name, GF_LOG_ERROR, + "creating of handle failed\n"); + break; + } + + // The file has probably got deleted. + fd = glfs_h_open_with_xdata (ctx->fs, obj, O_RDONLY, + dict); + if (fd == NULL) { + GF_ASSERT(fd != NULL); + nsr_worker_log(this->name, GF_LOG_ERROR, + "opening of file failed\n"); + break; + } + + if (glfs_lseek_with_xdata (fd, rd->offset, SEEK_SET, + dict) != rd->offset) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "lseek of file failed to offset %d\n", + rd->offset); + break; + } + + ri->work.data = RD_CALLOC (rd->len , sizeof(char), + gf_mt_recon_work_data_t); + if (glfs_read_with_xdata (fd, ri->work.data, rd->len, + 0, dict) != rd->len) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "read of file failed to offset %d for bytes %d\n", + rd->offset, rd->len); + break; + } + break; + + case GF_FOP_FTRUNCATE: + case GF_FOP_SYMLINK: + case GF_FOP_RMDIR: + case GF_FOP_UNLINK: + case GF_FOP_MKNOD: + case GF_FOP_CREATE: + case GF_FOP_LINK: + case GF_FOP_MKDIR: + case GF_FOP_RENAME: + nsr_worker_log (this->name, GF_LOG_ERROR, + "unimplemented fop %u\n", rd->op); + break; + + case GF_FOP_FREMOVEXATTR: + case GF_FOP_REMOVEXATTR: + case GF_FOP_SETXATTR: + case GF_FOP_FSETXATTR: + + uuid_parse(ri->rec.gfid, gfid); + + + // This is for all the set attribute/extended + // attributes commands. Get all the attributes from + // the source and fill it in the buffer as a NULL + // seperated key and value which are in turn seperated + // by NULL. + + nsr_worker_log(this->name, GF_LOG_INFO, + "doing getattr for gfid %s \n", + ri->rec.gfid); + + obj = glfs_h_create_from_handle (ctx->fs, gfid, + GFAPI_HANDLE_LENGTH, + NULL); + if (obj == NULL) { + GF_ASSERT(fd != NULL); + nsr_worker_log(this->name, GF_LOG_ERROR, + "creating of handle failed\n"); + break; + } + + if (obj->inode->ia_type == IA_IFDIR) + fd = glfs_h_opendir_with_xdata (ctx->fs, obj, + dict); + else + fd = glfs_h_open_with_xdata (ctx->fs, obj, + O_RDONLY, dict); + + if (fd == NULL) { + GF_ASSERT(fd != NULL); + nsr_worker_log(this->name, GF_LOG_ERROR, + "opening of file failed\n"); + break; + } + + if (get_xattr_total_size (fd, &t_b, &k_s, &v_s, &num, + dict) == -1) { + if (t_b) free(t_b); + nsr_worker_log(this->name, GF_LOG_ERROR, + "list of xattr of gfid %s failed\n", + rd->gfid); + break; + } + ri->work.data = RD_CALLOC ((k_s + v_s) , sizeof(char), + gf_mt_recon_work_data_t); + if (get_xattr(fd, t_b, ri->work.data, v_s, num, dict) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "get xattr of gfid %s failed\n", rd->gfid); + break; + } + ri->work.num = num; + nsr_worker_log(this->name, GF_LOG_INFO, + "finished getattr for gfid %s \n", + ri->rec.gfid); + free(t_b); + break; + + case GF_FOP_FSETATTR: + case GF_FOP_SETATTR: + //TBD - to get the actual attrbutes and fill + // mode, uid, gid, size, atime, mtime + nsr_worker_log (this->name, GF_LOG_ERROR, + "unimplemented fop %u\n", rd->op); + break; + default: + nsr_worker_log (this->name, GF_LOG_ERROR, + "unrecognized fop %u\n", rd->op); + + } + nsr_worker_log(this->name, GF_LOG_INFO, + "finished recon read for gfid %s at offset %d for %d bytes \n", + rd->gfid, rd->offset, rd->len); + break; + + case NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT: + // first_index always starts with 1 but records starts at 0. + wip = work->index - (dr->workers[0].recon_info->first_index); + ri = &(dr->workers[0].recon_info->records[wip]); + rd = &(ri->rec); + + nsr_worker_log(this->name, GF_LOG_INFO, + "got recon commit for index %d that has gfid %s \n", + wip, rd->gfid); + dict = dict_new (); + if (!dict) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "failed allocating for dictionary\n"); + break; + } + if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + if (apply_record(ctx, ri, dict) == _gf_false) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "apply_record fails\n"); + } + + nsr_worker_log(this->name, GF_LOG_INFO, + "finished recon commit for gfid %s \n", + rd->gfid); + break; + + case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH: + dict = dict_new (); + if (!dict) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "failed allocating for dictionary\n"); + break; + } + if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + ctx->result = -1; + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + + // Increment work index with the start index + wip = work->index - (dr->workers[0].recon_info->first_index); + ri = &(dr->workers[0].recon_info->records[wip]); + rd = &(ri->rec); + + glfs_fsync_with_xdata(fd, dict); + break; + + default: + nsr_worker_log (this->name, GF_LOG_ERROR, + "unrecognized request id %u\n", work->req_id); + } + + if (fd) { + glfs_close_with_xdata(fd, dict); + } + if (obj) { + glfs_h_close(obj); + } + if (dict) { + dict_unref(dict); + } +} + +// thread for doing data work +static void * +data_worker_main(nsr_per_node_worker_t *ctx) +{ + nsr_worker_log(this->name, GF_LOG_INFO, + "starting data worker func\n"); + init_worker(ctx, 0); + + while(1) { + nsr_recon_work_t *work = NULL; + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + + nsr_worker_log(this->name, GF_LOG_INFO, + "waiting for work\n"); + + pthread_mutex_lock(&(ctx->mutex)); + while (list_empty(&(ctx->head.list))) { + pthread_cond_wait(&(ctx->cv), &(ctx->mutex)); + } + pthread_mutex_unlock(&(ctx->mutex)); + list_for_each_entry(work, &(ctx->head.list), list) { + nsr_worker_log(this->name, GF_LOG_INFO, + "got work with id %d\n",work->req_id); + work->in_use = _gf_false; + data_worker_func(ctx, work); + atomic_dec(&(dr->outstanding)); + break; + } + nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n"); + list_del_init (&work->list); + GF_FREE(work); + nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n"); + } + + return NULL; +} + + +//make recon work +static void +recon_make_work(nsr_recon_driver_ctx_t *ctx, + nsr_recon_work_t **work, + nsr_recon_work_req_id_t req_id, + int32_t i) +{ + xlator_t *this = ctx->this; + + // TBD - change this to get from a static pool + // This cannot fail + (*work) = RD_CALLOC (1, sizeof (nsr_recon_work_t), gf_mt_recon_work_t); + (*work)->req_id = req_id; + (*work)->index = i; + (*work)->in_use = _gf_true; + INIT_LIST_HEAD(&((*work)->list)); + return; +} + +// Schedule a work object to a worker thread. +static void +recon_queue_to_worker(nsr_recon_driver_ctx_t *ctx, + nsr_recon_work_t *work, + unsigned int id, + nsr_recon_queue_type_t type) +{ + nsr_per_node_worker_t *worker; + if (type == NSR_RECON_QUEUE_TO_CONTROL) { + worker = ctx->workers[id].control_worker; + nsr_driver_log(this->name, GF_LOG_INFO, + "queueing work to control index %d\n",id); + } else { + worker= ctx->workers[id].data_worker; + nsr_driver_log(this->name, GF_LOG_INFO, + "queueing work to data index %d\n",id); + } + pthread_mutex_lock(&worker->mutex); + list_add_tail(&work->list, &worker->head.list); + pthread_cond_signal(&worker->cv); + pthread_mutex_unlock(&worker->mutex); + return; +} + +typedef void * (*F_t) (void *); + +// In case mode is set to NSR_USE_THREADS, create worker threads. +static gf_boolean_t +create_worker_threads(nsr_recon_private_t *priv, + nsr_recon_driver_ctx_t *ctx, + nsr_per_node_worker_t *w, + gf_boolean_t control_or_data, + F_t f, + uint32_t num) +{ + uint32_t i; + nsr_per_node_worker_t *worker = w; + xlator_t *this = ctx->this; + + for (i=0; i < num; i++) { + worker->id = RD_CALLOC(1, 10, gf_mt_recon_id_t); + if (!worker->id) { + nsr_driver_log (priv->this->name, GF_LOG_ERROR, "memory allocation error \n"); + return _gf_false; + } + sprintf(worker->id,"recon_%d", i); + worker->driver_ctx = ctx ; + + if (ctx->mode == NSR_USE_THREADS) { + if (pthread_create(&worker->thread_id, NULL, f, worker)) { + nsr_driver_log (ctx->this->name, GF_LOG_ERROR, "control work thread creation error \n"); + return _gf_false; + } + } + worker->index = i; + worker++; + } + return _gf_true; +} + +/* + * In case of thread, send the work item; else call the function directly. + * + * Input arguments: + * bm - bitmap containing indices of nodes we want to send work + * num - number of such indices + * ctx - driver context from where we derive per worker context + * id - request ID + * q - control or data + * misc - used to overload such as index. + */ +static void +send_and_wait(int32_t *result, + int32_t *op_errno, + int32_t bm, + uint32_t num, + nsr_recon_driver_ctx_t *ctx, + nsr_recon_work_req_id_t id, + nsr_recon_queue_type_t q, + int32_t misc) +{ + uint32_t i = 0; + nsr_recon_work_t *work; + +#define CONTROL_WORKER(i) ctx->workers[i].control_worker +#define DATA_WORKER(i) ctx->workers[i].data_worker +#define WORKER(i) ((q == NSR_RECON_QUEUE_TO_CONTROL) ? (CONTROL_WORKER(i)) : (DATA_WORKER(i))) + + *result = *op_errno = 0; + + for (i=0; i < num; i++) { + if ((bm & (1 << i)) && ctx->workers[i].in_use) { + WORKER(i)->result = 0; + WORKER(i)->op_errno = 0; + } + } + if (ctx->mode == NSR_SEQ) { + for (i=0; i < num; i++) { + if ((bm & (1 << i)) && ctx->workers[i].in_use) { + recon_make_work(ctx,&work, id, misc); + if (q == NSR_RECON_QUEUE_TO_CONTROL) { + if (i == 0) + control_worker_func_0(ctx->workers[0].control_worker, work); + else + control_worker_func(ctx->workers[i].control_worker, work); + } else { + data_worker_func(ctx->workers[i].data_worker, work); + } + GF_FREE(work); + } + } + goto out; + } + + for (i=0; i < num; i++) { + if ((bm & (1 << i)) && ctx->workers[i].in_use) { + recon_make_work(ctx,&work, id, misc); + atomic_inc(&(ctx->outstanding)); + recon_queue_to_worker(ctx, work, i, q); + } + } + + nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: waiting\n"); + while (ctx->outstanding) { + pthread_yield(); + } +out: + for (i=0; i < num; i++) { + if ((bm & (1 << i)) && ctx->workers[i].in_use) { + if (WORKER(i)->result == -1) { + *result = -1; + } + } + } + if (*result == -1) { + for (i=0; i < num; i++) { + if ((bm & (1 << i)) && ctx->workers[i].in_use) { + if (WORKER(i)->op_errno == EAGAIN) { + *op_errno = EAGAIN; + break; + } else { + *op_errno = EIO; + } + } + } + } + + nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned with result: %d errno:%d\n", *result, *op_errno); + return; +} + +// send INI or FINI +static int32_t +nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, + uint32_t i, + gf_boolean_t in_use) +{ + uint32_t bm = 1 << i; + gf_boolean_t send = _gf_false; + int32_t status =0, op_errno = 0; + + send = (ctx->workers[i].in_use != in_use); + ctx->workers[i].in_use = in_use; + + if (!send) { + return 0; + } + nsr_driver_log (this->name, GF_LOG_INFO, + "sending %s to index %d\n",in_use?"INI":"FINI",i); + + send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx, + (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto err; + + send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx, + (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, + NSR_RECON_QUEUE_TO_DATA, -1); + if (status == -1) + goto err; + + /* + * We really need better error recovery. To activate a worker, we + * allocate memory and send two messages. If any of those actions + * fail, we should undo the others. It would probably be good to + * collapse the two messages into one, because it's pretty trivial to + * allocate a temporary structure and either link it in or free it + * depending on the result here. + */ + + if (in_use == _gf_false) { + GF_FREE(ctx->workers[i].recon_info); + } + + return 0; + +err: + GF_FREE(ctx->workers[i].recon_info); + ctx->workers[i].recon_info = NULL; + return -1; +} + +gf_boolean_t +nsr_recon_driver_reconciliator (nsr_recon_private_t *priv) +{ + uint32_t replica_group_size = priv->replica_group_size; + uint32_t i; + nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context; + int32_t bm; + int32_t status = 0; + int32_t op_errno = 0; + gf_boolean_t do_recon = _gf_false; + uint32_t start_index = ctx->workers[0].recon_info->first_index; + uint32_t end_index = ctx->workers[0].recon_info->last_index; + uint32_t num = ((start_index == 0) && (end_index == 0)) ? 0 : (end_index - start_index + 1); + + nsr_driver_log (this->name, GF_LOG_INFO, + "starting reconciliation work as reconciliator \n"); + + // nothing to be done? signal back to the recon translator that this + // phase done. + bm = 1; + for (i=1; i < replica_group_size; i++) { + if (ctx->workers[i].in_use && + (ctx->workers[0].recon_info->last_term == ctx->workers[i].recon_info->last_term)) { + ctx->workers[i].recon_info->last_index = end_index; + ctx->workers[i].recon_info->first_index = start_index; + bm |= (1 << i); + do_recon = _gf_true; + } + } + + if (!do_recon || !num) { + nsr_driver_log (this->name, GF_LOG_INFO, + "nothing needs to be done as resolutor \n"); + return _gf_true; + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "getting reconciliation window for term %d from %dto %d \n", + ctx->workers[0].recon_info->last_term, + start_index, end_index); + // We have set the bm in the above for loop where we go thru all nodes + // including this node that have seen the last term. + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_GET_RECONCILATION_WINDOW, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished getting reconciliation window for term %d from %dto %d \n", + ctx->workers[0].recon_info->last_term, + start_index, end_index); + + + // from the changelogs, calculate the entries that need action and the + // source for each of these entries + compute_reconciliation_work(ctx); + + // for each of the entries that need fixup, issue IO + for (i=start_index; i < (start_index + num); i++) { + nsr_reconciliator_info_t *my_recon_info = + ctx->workers[0].recon_info; + nsr_reconciliation_record_t *record = + &(my_recon_info->records[i - start_index]); + + record->work.term = ctx->workers[0].recon_info->last_term; + record->work.index = i; + + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing index %d\n",i); + if ((record->work.type == NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE) || + (record->work.type == NSR_RECON_WORK_HOLE_TO_FILL)) { + // 1st case (RECON_WORK_HOLE_TO_PSEUDO_HOLE): If there + // are only pseudo_holes in others, it is best effort. + // Just pick from the first node that has it and + // proceed. + // 2nd case (RECON_WORK_HOLE_TO_FILL): this node has + // either a HOLE or PSUEDO_HOLE and some one else has a + // FILL(source). analyse the changelog to check if data + // needs to be read or if the log has all the data + // required + + if (recon_check_changelog(&record->rec)) { + bm = (1 << record->work.source); + nsr_driver_log (this->name, GF_LOG_INFO, + "reading data from source %d\n",record->work.source); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_READ, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "got data from source %d\n",record->work.source); + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing local data as part of reconciliation\n"); + + bm = 1; + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished fixing local data as part of reconciliation\n"); + + } else if (record->work.type == NSR_RECON_WORK_COMPARE_PSEUDO_HOLE) { + // this node has a pseudo_hole and some others have just + // that too. Just convert this to FILL. let others + // blindly pick it from here. + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing this record as a fill\n"); + bm = 1; + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished fixing this record as a fill\n"); + } + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished reconciliation work as reconciliator \n"); + + // tbd - mark this term golden in the reconciliator + return _gf_true; +} + +gf_boolean_t +nsr_recon_driver_resolutor (nsr_recon_private_t *priv) +{ + uint32_t replica_group_size = priv->replica_group_size; + uint32_t i; + nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context; + int32_t bm; + int32_t status = 0; + int32_t op_errno = 0; + // This node's last term is filled when it gets a message from the + // leader to act as a reconciliator. + uint32_t recon_index = ctx->reconciliator_index; + nsr_reconciliator_info_t *my_info = + ctx->workers[0].recon_info; + nsr_reconciliator_info_t *his_info = + ctx->workers[recon_index].recon_info; + uint32_t my_last_term = my_info->last_term; + uint32_t to_do_term = his_info->last_term; + uint32_t my_start_index = 1, my_end_index = 1; + uint32_t his_start_index = 1, his_end_index = 1; + uint32_t num = 0; + gf_boolean_t fl = _gf_true; + + nsr_driver_log (this->name, GF_LOG_INFO, + "starting resolutor work with reconciliator as %d from term %d to term %d \n", + recon_index, my_last_term, to_do_term); + + do { + + if (!fl) { + (his_info->last_term)++; + (my_info->last_term)++; + } else { + his_info->last_term = my_last_term; + } + + nsr_driver_log (this->name, GF_LOG_INFO, "resolving term %d \n", my_info->last_term); + + // Get reconciliator's term information for that term + nsr_driver_log (this->name, GF_LOG_INFO, + "getting info from reconciliator for term %d \n", my_info->last_term); + bm = (1 << recon_index); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_GET_GIVEN_TERM_INFO, + NSR_RECON_QUEUE_TO_CONTROL, his_info->last_term); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished getting info from reconciliator for term %d \n", my_info->last_term); + + + // empty term + if (!his_info->commited_ops) { + nsr_driver_log (this->name, GF_LOG_INFO, + "reconciliator for term %d is empty. moving to next term. \n", my_info->last_term); + // TBD - mark the term golden + fl = _gf_false; + continue; + } + + // calculate the resolution window boundary. for the last term + // this node saw, we compare the resolution window of this and + // reconciliator. for the rest of the nodes, we just accept the + // reconciliator info. + if (fl) { + my_start_index = my_info->first_index; + my_end_index = my_info->last_index; + his_start_index = his_info->first_index; + his_end_index = his_info->last_index; + my_info->first_index = (my_start_index < his_start_index) ? my_start_index : his_start_index; + my_info->last_index = (my_end_index > his_end_index) ? my_end_index : his_end_index; + } else { + my_info->first_index = his_info->first_index; + my_info->last_index = his_info->last_index; + my_info->commited_ops = his_info->commited_ops; + } + if (my_info->first_index == 0) + my_info->first_index = 1; + num = (my_info->last_index - my_info->first_index) + 1; + + + // Get the logs from the reconciliator (and this node for this + // term) + if (fl) + bm = ((1 << recon_index) | 1); + else + bm = (1 << recon_index); + + nsr_driver_log (this->name, GF_LOG_INFO, + "getting reconciliation window for term %d from %d to %d \n", + my_info->last_term, + my_info->first_index, my_info->last_index); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_GET_RECONCILATION_WINDOW, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished getting reconciliation window for term %d from %d to %d \n", + my_info->last_term, + my_info->first_index, my_info->last_index); + + // from the changelogs, calculate the entries that need action + compute_resolution_work(ctx, my_info, his_info, !fl); + + + // for each of the entries that need fixup, issue IO + for (i=my_info->first_index; i < (my_info->first_index + num); i++) { + nsr_reconciliation_record_t *record = + &(my_info->records[i - my_info->first_index]); + + record->work.term = my_info->last_term; + record->work.index = i; + + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing index %d\n",i); + if ((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) || + (record->work.type == NSR_RECON_WORK_UNDO_FILL)) { + if (((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) && + recon_check_changelog(&record->rec)) || + ((record->work.type == NSR_RECON_WORK_UNDO_FILL) && + recon_compute_undo(&record->rec))) { + nsr_driver_log (this->name, GF_LOG_INFO, + "reading data from source %d\n",recon_index); + bm = (1 << recon_index); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_READ, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished reading data from source %d\n",recon_index); + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing local data as part of resolutor\n"); + + bm = 1; + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished fixing local data as part of resolutor\n"); + } + } + fl = _gf_false; + + // tbd - mark this term golden in the reconciliator + } while (my_last_term++ != to_do_term); + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished resolutor work \n"); + return _gf_true; +} + +gf_boolean_t +nsr_recon_driver_leader (nsr_recon_private_t *priv) +{ + uint32_t replica_group_size = priv->replica_group_size; + uint32_t i; + nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context; + int32_t bm; + int32_t status = 0; + int32_t op_errno = 0; + int32_t chosen = -1; + int32_t last_term = -1, last_ops = -1; + + nsr_driver_log (this->name, GF_LOG_INFO, + "getting last term info from all members of this group\n"); + // Get last term info from all members for this group + send_and_wait(&status, &op_errno, -1, + replica_group_size, + ctx, + NSR_WORK_ID_GET_LAST_TERM_INFO, + NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term); + if (status == -1) + return _gf_false; + + + // compare all the info received and choose the reconciliator First + // choose all with latest term + for (i=0; i < replica_group_size; i++) { + if (ctx->workers[i].in_use) { + if (ctx->workers[i].recon_info->last_term > last_term) { + last_term = ctx->workers[i].recon_info->last_term; + } + } + } + // First choose all with latest term and highest ops + for (i=0; i < replica_group_size; i++) { + if ((ctx->workers[i].in_use) && (last_term == ctx->workers[i].recon_info->last_term)) { + if (ctx->workers[i].recon_info->commited_ops > last_ops) { + last_ops = ctx->workers[i].recon_info->commited_ops; + } + } + } + // choose the first among the lot + for (i=0; i < replica_group_size; i++) { + if ((ctx->workers[i].in_use) && + (last_term == ctx->workers[i].recon_info->last_term) && + (last_ops == ctx->workers[i].recon_info->commited_ops)) { + chosen = i; + break; + } + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "reconciliator chosen is %d\n", chosen); + ctx->reconciliator_index = chosen; + GF_ASSERT(chosen != -1); + if (chosen == -1) { + nsr_driver_log (this->name, GF_LOG_INFO, + "no reconciliatior chosen\n"); + return _gf_false; + } + + // send the message to reconciliator to do reconciliation with list of + // nodes that are part of this quorum + if (chosen != 0) { + nsr_driver_log (this->name, GF_LOG_INFO, + "sending reconciliation work to %d\n", chosen); + bm = 1 << ctx->reconciliator_index; + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_RECONCILIATOR_DO_WORK, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished reconciliation work to %d\n", chosen); + } else { + nsr_driver_log (this->name, GF_LOG_INFO, + "local node is reconciliator. before set jmp\n"); + nsr_recon_driver_reconciliator(priv); + } + + // send message to all other nodes to sync up with the reconciliator + // including itself if required + // requires optimisation - TBD + if (chosen != 0) { + nsr_driver_log (this->name, GF_LOG_INFO, + "local node resolution needs to be done. before set jmp\n"); + nsr_recon_driver_resolutor(priv); + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "sending resolution work to all nodes except this node and reconciliator\n"); + bm = ~((1 << ctx->reconciliator_index) || 1); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_RESOLUTION_DO_WORK, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + return _gf_false; + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished reconciliation work as leader \n"); + return _gf_true; +} + +// main recon driver thread +void * +nsr_reconciliation_driver(void *arg) +{ + nsr_recon_private_t *priv = (nsr_recon_private_t *) arg; + uint32_t replica_group_size = priv->replica_group_size; + uint32_t i; + nsr_per_node_worker_t *control_s, *data_s; + nsr_recon_driver_ctx_t **driver_ctx, *ctx; + int32_t bm; + xlator_t *this = priv->this; + char *con_name, *data_name; + int32_t status = 0; + int32_t op_errno = 0; + + driver_ctx = &priv->driver_thread_context; + (*driver_ctx) = GF_CALLOC (1, + sizeof (nsr_recon_driver_ctx_t), + gf_mt_recon_driver_ctx_t); + if (!driver_ctx) { + gf_log (this->name, GF_LOG_ERROR, "memory allocation error \n"); + return NULL; + } + ctx = *driver_ctx; + ctx->this = priv->this; + ctx->replica_group_size = replica_group_size; + + ctx->fp = recon_create_log (priv->replica_group_members[0], "nsr-driver-log"); + if (!ctx->fp) + return NULL; + + if ((pthread_mutex_init(&(ctx->mutex), NULL)) || + (pthread_cond_init(&(ctx->cv), NULL))){ + nsr_driver_log (this->name, GF_LOG_ERROR, "mutex init error \n"); + return NULL; + } + INIT_LIST_HEAD(&(ctx->role_head.list)); + + ctx->workers = RD_CALLOC (replica_group_size, + sizeof(nsr_replica_worker_t), + gf_mt_recon_worker_t); + if (!ctx->workers) { + nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n"); + return NULL; + } + for (i=0; i < replica_group_size; i++) { + strcpy(ctx->workers[i].name, priv->replica_group_members[i]); + } + + control_s = RD_CALLOC (replica_group_size, + sizeof(nsr_per_node_worker_t), + gf_mt_recon_per_node_worker_t); + if (!control_s) { + nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n"); + return NULL; + } + + data_s = RD_CALLOC (replica_group_size, + sizeof(nsr_per_node_worker_t), + gf_mt_recon_per_node_worker_t); + if (!data_s) { + nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n"); + return NULL; + } + for (i=0; i < replica_group_size; i++) { + ctx->workers[i].control_worker = &control_s[i]; + if (asprintf(&con_name,"recon-con-%u",i) < 1) { + return NULL; + } + ctx->workers[i].control_worker->fp = recon_create_log + (priv->replica_group_members[0], con_name); + if (!ctx->workers[i].control_worker->fp) + return NULL; + ctx->workers[i].data_worker = &data_s[i]; + if (asprintf (&data_name,"recon-data-%u",i) <1) { + return NULL; + } + ctx->workers[i].data_worker->fp = recon_create_log + (priv->replica_group_members[0], data_name); + if (!ctx->workers[i].data_worker->fp) + return NULL; + } + + nsr_driver_log (this->name, GF_LOG_INFO, "creating threads \n"); + // Create the worker threads + // For every brick including itself there will be 2 worker threads: + // one for data and one for control + if (!create_worker_threads(priv, ctx, control_s, _gf_true, + (F_t) control_worker_main, replica_group_size) || + !create_worker_threads(priv, ctx, data_s, _gf_false, + (F_t) data_worker_main, replica_group_size)) { + return NULL; + } + + for (i=0; i < replica_group_size; i++) { + nsr_recon_get_file(priv->volname, &(ctx->workers[i])); + } + + while (1) { + + nsr_role_work_t *rr; + + nsr_driver_log (this->name, GF_LOG_INFO, "waiting for role to be queued \n"); + pthread_mutex_lock(&(ctx->mutex)); + while (list_empty(&(ctx->role_head.list))) { + pthread_cond_wait(&(ctx->cv), &(ctx->mutex)); + } + pthread_mutex_unlock(&(ctx->mutex)); + + list_for_each_entry(rr, &(ctx->role_head.list), list) { + nsr_recon_driver_state_t state; + state = nsr_recon_driver_get_role(&status, ctx, rr); + + if (status == -1) { + op_errno = EIO; + goto out; + } + + switch (state) { + + case leader: + if (!nsr_recon_driver_leader(priv)) { + goto out; + } + break; + case reconciliator: + if (!nsr_recon_driver_reconciliator(priv)) { + goto out; + } + break; + case resolutor: + if (!nsr_recon_driver_resolutor(priv)) { + goto out; + } + break; + + case joiner: + + nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n"); + // Get last term info from all members for this group + // which will be the leader(this node) and the node that wants to join. + send_and_wait(&status, &op_errno, -1, + replica_group_size, + ctx, + NSR_WORK_ID_GET_LAST_TERM_INFO, + NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term); + if (status == -1) + goto out; + + + // send message to other node that just joined to sync up with this node which is also the leader + nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this\n"); + bm = ~(1); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_RESOLUTION_DO_WORK, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto out; + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished recon work as joiner \n"); + break; + + default: + nsr_driver_log (this->name, GF_LOG_ERROR, + "bad state %d", state); + } + + + // free the asasociated recon_info contexts created as part of this role + +out: + nsr_driver_log (this->name, GF_LOG_INFO, + "sending end of reconciliation message \n"); + nsr_recon_return_back(priv, ctx->term, status, op_errno); + nsr_driver_log (this->name, GF_LOG_INFO, + "finished sending end of reconciliation message \n"); + } + list_del_init (&rr->list); + } + + return NULL; +} |