/* Copyright (c) 2013 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" #endif #include #include #include #include #include #include "call-stub.h" #include "defaults.h" #include "xlator.h" #include "recon_driver.h" #include "recon_xlator.h" #include "api/src/glfs-internal.h" #include "api/src/glfs-handles.h" /* TBD: move declarations here and nsr.c into a common place */ #define NSR_TERM_XATTR "trusted.nsr.term" #define RECON_TERM_XATTR "trusted.nsr.recon-term" #define RECON_INDEX_XATTR "trusted.nsr.recon-index" /* * Execution architecture for the NSR reconciliation driver. The driver runs * as a seperate process in each node where the brick is. The main function of * the driver is nsr_reconciliation_driver() (last function below) The driver * just sits in a tight loop waiting for state changes. When a brick becomes a * replica leader, it fences IO, contacts this process and waits for * reconciliation to finish. * * The replica leader talks to other bricks in replica group which are alive * and gets the last term info using which it decides which has the latest * data. That brick is referred to as the "reconciliator"; leader sends a * message to reconciliator to freeze its data (by reading any incomplete data * from other nodes from that term if required) * * Once that is done leader sends a message to all nodes except the * reconciliator to sync themselves with the reconciliator. This process is * referred to as "resolution". * * Hence the reconciliation processes need to talk to each other to get a given * term info. This is implemented using the recon translator IOs which * implements a bare bone RPC by exposing a file interface to which * reads/writes are done to pass control messages. This is referred to as the * "control plane". This implementation allows the control plane to be * implemented as a bunch of threads for each of the nodes. * * The reconciliation process also needs to talk to the brick process on that * node to actually write the data as part of reconciliation/resolution. This * is referred to as the "data plane". Again there are a bunch of threads that * do this work. * * The way the worker threads are organised is that main driver context has a * pointer to contexts for each of these thread contexts. The thread context at * index 0 always refers to talking with local recon process/brick. So the * control worker at index 0 will get the local changelog info and data worker * at index 0 will talk to local brick. * * All the ops from the control/data planes are implemented using the glfs * APIs. */ /* * This function gets the size of all the extended attributes for a file. * This is used so that caller knows how much to allocate for key-value storage. * * Input Arguments: * fd - the file opened using glfs API. * dict - passed so that NSR translator can get this from the required brick * * Output Arguments: * b - pointer to the buffer where the attributes are filled up. * key_size - the size of all keys * val_size - the size of all values * num - number of key/values */ static int32_t get_xattr_total_size( struct glfs_fd *fd, char **b, uint32_t *key_size, uint32_t *val_size, uint32_t* num, dict_t *dict) { int32_t s = -1, ret = -1; char *c = NULL; *key_size = 0; *val_size = 0; *num = 0; // First get the size of the keys s = glfs_flistxattr_with_xdata(fd, NULL,0, dict); if (s == -1) goto out; *key_size = s; // TBD - use the regular calloc (*b) = c = calloc(s+1,1); // get the keys themselves if (glfs_flistxattr_with_xdata(fd, c, s+1, dict) == -1) goto out; do { int32_t r; uint32_t len = 0; // for each key get the size of the value r = glfs_fgetxattr_with_xdata(fd, c, NULL, 0, dict); if (r == -1) goto out; (*val_size) += r; len = strlen(c) + 1; c += len; s -= len; (*num)++; } while(s); ret = 0; out: return ret; } /* * This function gets bunch of xattr values given set of keys. * * Input Arguments: * fd - the file opened using glfs API. * keys - the bunch of keys * size - size of values * num - number of keys * dict - passed so that NSR translator can get this from the required brick * * Output Arguments: * buf - where the values are written one after the other (NULL seperated) */ static void get_xattr(struct glfs_fd *fd, char *keys, char *buf, uint32_t size, uint32_t num, dict_t *dict) { while(num--) { int32_t r; uint32_t len = 0; // copy the key strcpy(buf, keys); len = strlen(keys); len++; buf += len; // get the value and copy the value after incrementing buf after the key r = glfs_fgetxattr_with_xdata(fd, keys, buf, size, dict); // TBD - handle error if (r == -1) return; // increment the key to next value keys += len; // increment buf to hold the next key buf += strlen(buf) + 1; } return; } /* * Function deletes a bunch of key values in extended attributes of a file. * Input Arguments: * fd - the file opened using glfs API. * dict - passed so that NSR translator can do this from the required brick * keys - bunch of NULL seperated key names * num - number of keys */ static void delete_xattr(struct glfs_fd *fd, dict_t *dict_t, char *keys, uint32_t num) { while(num--) { // get the value and copy the value // TBD - handle failure cases when calling glfs_fremovexattr_with_xdata() glfs_fremovexattr_with_xdata(fd, keys, dict_t); keys += strlen(keys) +1; } return; } /* * Given a bunch of key value pairs, fill them as xattrs for a file * * Input Arguments: * fd - the file opened using glfs API. * dict - passed so that NSR translator can do this from the required brick * buf - buffer containing the keys-values pairs. The key value are NULL seperated. * Each of the key-value is seperated by NULL in turn. * num - Number of such key value pairs. */ static void fill_xattr(struct glfs_fd *fd, dict_t *dict, char *buf, uint32_t num) { char *k = buf, *val = NULL; while(num--) { int32_t r; val = k + strlen(k) + 1; // TBD - handle failure cases when calling glfs_fsetxattr_with_xdata() r = glfs_fsetxattr_with_xdata(fd, k, val, strlen(val), 0, dict); if (r == -1) return; k = val + strlen(val) + 1; } return; } /* * This function gets a file that can be used for doing glfs_init later. * The control file is used by control thread(function) to talk to peer reconciliation process. * The data file is used by the data thread(function) to talk to the bricks. * The control file is of name such as con:gfs1:-mnt-a1 where "gfs1" is name of host * and the brick path is "/mnt/a1". * The data file is of name such as data:gfs1:-mnt-a1. * * Input Arguments: * vol - name of the volume. This is used to build the full path of the control and data file * such as /var/lib/glusterd/vols/test/bricks/gfs2:-mnt-test1-nsr-recon.vol. * In above example the volume name is test and brick on gfs2 is on path /mnt/test1 * * worker - The worker for a given node. This worker has 2 threads - one on the data plane * and one on the control plane. The worker->name is already filled with hostname:brickname * in the function nsr_reconciliation_driver(). Use that to build the volume file. * So if worker->name has gfs1:/mnt/a1, control file is con:gfs1:-mnt-a1 * and data file is data:gfs1:-mnt-a1. * All these files are under the bricks directory. TBD - move this to a NSR recon directory later. */ static void nsr_recon_get_file(char *vol, nsr_replica_worker_t *worker) { char *ptr; char tr[256]; // Replace the "/" to - strcpy(tr, worker->name); ptr = strchr (tr, '/'); while (ptr) { *ptr = '-'; ptr = strchr (tr, '/'); } // Build the base directory such as "/var/lib/glusterd/vols/test/bricks/" sprintf(worker->control_worker->vol_file, "/%s/%s/%s/%s/", GLUSTERD_DEFAULT_WORKDIR, GLUSTERD_VOLUME_DIR_PREFIX, vol, GLUSTERD_BRICK_INFO_DIR); strcat(worker->control_worker->vol_file, "con:"); strcat(worker->control_worker->vol_file, tr); sprintf(worker->data_worker->vol_file, "/%s/%s/%s/%s/", GLUSTERD_DEFAULT_WORKDIR, GLUSTERD_VOLUME_DIR_PREFIX, vol, GLUSTERD_BRICK_INFO_DIR); strcat(worker->data_worker->vol_file, "data:"); strcat(worker->data_worker->vol_file, tr); } /* * This function does all the glfs initialisation * so that reconciliation process can talk to other recon processes/bricks * for the control/data messages. * This will be done everytime a worker needs to be kicked off to talk * across any plane. * * Input arguments: * ctx - The per worker based context * control - set to true if this worker is for the control plane */ static int nsr_recon_start_work(nsr_per_node_worker_t *ctx, gf_boolean_t control) { glfs_t *fs = NULL; xlator_t *this = ctx->driver_ctx->this; int32_t ret = 0; glfs_fd_t *aux_fd = NULL; // fd of auxilary log char lf[256]; nsr_worker_log(this->name, GF_LOG_INFO, "starting work with volfile %s\n", ctx->vol_file); fs = glfs_new(ctx->id); if (!fs) { glusterfs_this_set(this); nsr_worker_log(this->name, GF_LOG_ERROR, "cannot create gfls context for thread %s\n",ctx->id); return -1; } // For some vague reason, glfs init APIs seem to be clobbering "this". hence resetting it. glusterfs_this_set(this); nsr_worker_log(this->name, GF_LOG_INFO, "init done. setting volfile %s\n", ctx->vol_file); ret = glfs_set_volfile(fs, ctx->vol_file); if (ret != 0) { glusterfs_this_set(this); nsr_worker_log(this->name, GF_LOG_ERROR, "cannot set volfile %s for thread %s\n",ctx->vol_file, ctx->id); return -1; } // TBD - convert this to right /usr/local/var/log based log files. sprintf(lf,"/tmp/logs/%s-%s",(control == _gf_true)?"con":"data",ctx->id); glfs_set_logging (fs, lf, 7); glusterfs_this_set(this); ret = glfs_init (fs); if (ret != 0) { glusterfs_this_set(this); nsr_worker_log(this->name, GF_LOG_ERROR, "cannot do init for thread %s with volfile %s\n",ctx->id, ctx->vol_file); return -1; } glusterfs_this_set(this); nsr_worker_log(this->name, GF_LOG_INFO, "setting volfile %s done\n", ctx->vol_file); // If it is control thread, open the "/" as the aux_fd. // All IOs happening via the fd will do the RPCs across the reconciliation // processes. For some vague reason, the root seems to be open'able like a file. // TBD - try to clean this up. (implement a virtual file???) if (control == _gf_true) { nsr_worker_log(this->name, GF_LOG_INFO, "doing open for / \n"); aux_fd = glfs_open (fs, "/", O_RDWR); // TBD - proper error handling. Stall reconciliation if such a thing happens? if (aux_fd == NULL) { nsr_worker_log(this->name, GF_LOG_ERROR, "cannot open aux log file for thread %s\n",ctx->id); } else { nsr_worker_log(this->name, GF_LOG_ERROR, "---opened aux log file for thread %s\n",ctx->id); } ctx->aux_fd = aux_fd; } glusterfs_this_set(this); ctx->fs = fs; return 0; } /* * * This function does the cleanup after reconciliation is done * or before we start a new reconciliation. * * Input arguments: * ctx - The per worker based context * control - set to true if this worker is for the control plane */ static int nsr_recon_end_work(nsr_per_node_worker_t *ctx, gf_boolean_t control) { int32_t ret = 0; xlator_t *this = ctx->driver_ctx->this; nsr_worker_log(this->name, GF_LOG_INFO, "doing fini for recon worker\n"); ret = glfs_fini(ctx->fs); if (ret != 0) { glusterfs_this_set(this); nsr_worker_log(this->name, GF_LOG_ERROR, "cannot do fini for thread %s with volfile %s\n",ctx->id, ctx->vol_file); return -1; } glusterfs_this_set(this); ctx->fs = NULL; if (control == _gf_true) { glfs_close (ctx->aux_fd); ctx->aux_fd = NULL; } return 0; } //called in case all worker functions run as sepeerate threads static void init_worker(nsr_per_node_worker_t *ctx, gf_boolean_t control) { pthread_mutex_init(&(ctx->mutex), NULL); pthread_cond_init(&(ctx->cv), NULL); INIT_LIST_HEAD(&(ctx->head.list)); } /* * Control worker funct for getting changelog info on this node. * calls directly functions to parse the changelog. * * Input arguments: * ctx - The per worker based context * control - set to true if this worker is for the control plane */ static void control_worker_func_0(nsr_per_node_worker_t *ctx, nsr_recon_work_t *work) { unsigned int index = ctx->index; nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]); xlator_t *this = ctx->driver_ctx->this; nsr_recon_private_t *priv = this->private; nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; ctx->is_control = _gf_true; switch (work->req_id){ case NSR_WORK_ID_INI: { break; } case NSR_WORK_ID_FINI: { break; } case NSR_WORK_ID_GET_LAST_TERM_INFO: { nsr_recon_last_term_info_t lt; nsr_reconciliator_info_t *recon_info = rw->recon_info; // term is stuffed inside work->index. overloading. int32_t term = work->index; nsr_worker_log(this->name, GF_LOG_INFO, "trying to get last term info for node %d with current term %d\n",index, term); // TBD - handle errors // This is called by the leader after it gets the current term. // Makes searching easier. nsr_recon_libchangelog_get_last_term_info(this, priv->changelog_base_path, term, <); recon_info->last_term = lt.last_term; recon_info->commited_ops = lt.commited_ops; recon_info->last_index = lt.last_index; recon_info->first_index = lt.first_index; nsr_worker_log(this->name, GF_LOG_INFO, "out of get last term info with current term %d. got ops %d with first %d and last %d \n", recon_info->last_term, recon_info->commited_ops, recon_info->first_index, recon_info->last_index); break; } case NSR_WORK_ID_GET_GIVEN_TERM_INFO: { nsr_recon_last_term_info_t lt; nsr_reconciliator_info_t *recon_info = rw->recon_info; // term is stuffed inside work->index. overloading. int32_t term = work->index; nsr_worker_log(this->name, GF_LOG_INFO, "trying to get term info for node %d for term %d\n",index, term); // TBD - handle errors nsr_recon_libchangelog_get_this_term_info(this,priv->changelog_base_path, term, <); recon_info->last_term = lt.last_term; recon_info->commited_ops = lt.commited_ops; recon_info->last_index = lt.last_index; recon_info->first_index = lt.first_index; nsr_worker_log(this->name, GF_LOG_INFO, "out of get term info for term %d. got ops %d with first %d and last %d \n", recon_info->last_term, recon_info->commited_ops, recon_info->last_index, recon_info->first_index); break; } case NSR_WORK_ID_RECONCILIATOR_DO_WORK: { // For local resolution, the main driver thread does it. // SO there is no way we can have this message for this node. GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_INFO, "this message should not be sent \n"); break; } case NSR_WORK_ID_RESOLUTION_DO_WORK: { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_INFO, "this message should not be sent \n"); break; } case NSR_WORK_ID_END_RECONCILIATION: { nsr_worker_log(this->name, GF_LOG_INFO, "sending reconciliation end message to node %d\n", index); nsr_recon_return_back(priv, dr->txn_id); break; } case NSR_WORK_ID_GET_RECONCILATION_WINDOW: { nsr_reconciliator_info_t *recon_info = rw->recon_info; // first_index and last_index at 0 indicates empty log. // For non empty log, the first_index always starts at 1. uint32_t num = (dr->workers[index].recon_info->last_index - dr->workers[index].recon_info->first_index + 1); nsr_recon_record_details_t *rd; uint32_t i=0; nsr_worker_log(this->name, GF_LOG_INFO, "trying to get reconciliation window records for node %d for term %d with first %d last %d\n", index, recon_info->last_term, recon_info->first_index, recon_info->last_index); GF_ASSERT(num <= MAX_RECONCILIATION_WINDOW_SIZE); // TBD - handle buffer allocation errors rd = GF_CALLOC(num, sizeof(nsr_recon_record_details_t), gf_mt_recon_private_t); // TBD - handle errors nsr_recon_libchangelog_get_records(this, priv->changelog_base_path, recon_info->last_term, recon_info->first_index, recon_info->last_index, rd); // The above function writes into rd from 0 to (num -1) // We need to take care of this whenever we deal with records for (i=0; i < num; i++) { ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl memcpy(&(recon_info->records[i].rec), &(rd[i]), sizeof(nsr_recon_record_details_t)); } GF_FREE(rd); nsr_worker_log(this->name, GF_LOG_INFO, "got reconciliation window records for node %d for term %d \n", index, recon_info->last_term); break; } } return; } // Control worker thread static void* control_worker_main_0(nsr_per_node_worker_t *ctx) { ctx->is_control = _gf_true; nsr_worker_log(this->name, GF_LOG_INFO, "starting control worker func 0\n"); init_worker(ctx, 1); while(1) { nsr_recon_work_t *work = NULL; nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; nsr_worker_log(this->name, GF_LOG_INFO, "waiting for work\n"); pthread_mutex_lock(&ctx->mutex); while (list_empty(&(ctx->head.list))) { pthread_cond_wait(&ctx->cv, &ctx->mutex); } pthread_mutex_unlock(&ctx->mutex); list_for_each_entry(work, &(ctx->head.list), list) { nsr_worker_log(this->name, GF_LOG_INFO, "got work with id %d\n", work->req_id); work->in_use = _gf_false; // Call the main function. control_worker_func_0(ctx, work); atomic_dec(&(dr->outstanding)); break; } nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n"); list_del_init (&work->list); nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n"); } return NULL; } /* * Control worker funct for getting changelog info on some other node. * calls glfs functions to seek/read/write on aux_fd. * * Input arguments: * ctx - The per worker based context * control - set to true if this worker is for the control plane */ static void control_worker_func(nsr_per_node_worker_t *ctx, nsr_recon_work_t *work) { unsigned int index = ctx->index; nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]); nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; ctx->is_control = _gf_true; switch (work->req_id){ case NSR_WORK_ID_INI: { nsr_worker_log(this->name, GF_LOG_INFO, "calling nsr_recon_start_work\n"); // TBD - handle error in case nsr_recon_start_work gives error nsr_recon_start_work(ctx, _gf_true); nsr_worker_log(this->name, GF_LOG_INFO, "finished nsr_recon_start_work\n"); break; } case NSR_WORK_ID_FINI: { nsr_worker_log(this->name, GF_LOG_INFO, "calling nsr_recon_end_work\n"); // TBD - handle error in case nsr_recon_end_work gives error nsr_recon_end_work(ctx, _gf_true); nsr_worker_log(this->name, GF_LOG_INFO, "finished nsr_recon_end_work\n"); break; } case NSR_WORK_ID_GET_LAST_TERM_INFO: { nsr_recon_last_term_info_t lt; nsr_reconciliator_info_t *recon_info = rw->recon_info; int32_t term = htonl(work->index); // overloading it nsr_worker_log(this->name, GF_LOG_INFO, "trying to get last term info for node %d with current term %d\n",index, work->index); // first write the current term term number // TBD - error handling for all the glfs APIs glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET); glfs_write(ctx->aux_fd, &term, sizeof(term), 0); glfs_read(ctx->aux_fd, <, sizeof(lt), 0); ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl recon_info->last_term = lt.last_term; recon_info->commited_ops = lt.commited_ops; recon_info->last_index = lt.last_index; recon_info->first_index = lt.first_index; nsr_worker_log(this->name, GF_LOG_INFO, "out of get last term info with current term %d. got ops %d with first %d and last %d \n", recon_info->last_term, recon_info->commited_ops, recon_info->last_index, recon_info->first_index); break; } case NSR_WORK_ID_GET_GIVEN_TERM_INFO: { nsr_recon_last_term_info_t lt; nsr_reconciliator_info_t *recon_info = rw->recon_info; int32_t term = htonl(work->index); // overloading it nsr_worker_log(this->name, GF_LOG_INFO, "trying to get term info for node %d for term %d\n",index, work->index); // first write the term number // TBD - error handling for all the glfs APIs glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET); glfs_write(ctx->aux_fd, &term, sizeof(term), 0); glfs_read(ctx->aux_fd, <, sizeof(lt), 0); ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl recon_info->last_term = lt.last_term; recon_info->commited_ops = lt.commited_ops; recon_info->last_index = lt.last_index; recon_info->first_index = lt.first_index; nsr_worker_log(this->name, GF_LOG_INFO, "out of get term info for term %d. got ops %d with first %d and last %d \n", recon_info->last_term, recon_info->commited_ops, recon_info->last_index, recon_info->first_index); break; } case NSR_WORK_ID_RECONCILIATOR_DO_WORK: { nsr_recon_role_t rr; uint32_t i=0; uint32_t num=0; uint32_t idx = dr->reconciliator_index; uint32_t term = dr->workers[idx].recon_info->last_term; GF_ASSERT(idx == index); nsr_worker_log(this->name, GF_LOG_INFO, "trying to make this index %d as reconciliator for term %d\n", index, term); // TBD - error handling for all the glfs APIs glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_1, SEEK_SET); // We have all the info for all other nodes. // Fill all that info when sending data to that process. for (i=0; i < dr->replica_group_size; i++) { if ( dr->workers[i].in_use && (dr->workers[i].recon_info->last_term == term)) { rr.info[num].last_term = dr->workers[i].recon_info->last_term; rr.info[num].commited_ops = dr->workers[i].recon_info->commited_ops; rr.info[num].last_index = dr->workers[i].recon_info->last_index; rr.info[num].first_index = dr->workers[i].recon_info->first_index; strcpy(rr.info[num].name, dr->workers[i].name); } num++; } rr.num = num; rr.role = reconciliator; ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0); nsr_worker_log(this->name, GF_LOG_INFO, "sent reconciliator info for term %d with node count as %d\n", term, num); break; } case NSR_WORK_ID_RESOLUTION_DO_WORK: { nsr_recon_role_t rr; unsigned int i=0, j=0; unsigned int rec = dr->reconciliator_index; nsr_worker_log(this->name, GF_LOG_INFO, "trying to make this index %d as resolutor with reconciliator as %d\n",index, rec); // TBD - error handling for all the glfs APIs glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_1, SEEK_SET); rr.num = 2; // Fill in info[0] as info for the node for which we are seeking resolution. // Fill in info[1] as info of the reconciliator node. // The function nsr_recon_driver_set_role() that will be called when // this message reaches the node will look at index 1 for term information // related to the reconciliator. for (i=0; i < 2; i++) { (i == 0) ? (j = index) : (j = rec); rr.info[i].last_term = dr->workers[j].recon_info->last_term; rr.info[i].commited_ops = dr->workers[j].recon_info->commited_ops; rr.info[i].last_index = dr->workers[j].recon_info->last_index; rr.info[i].first_index = dr->workers[j].recon_info->first_index; // The name is used as the key to convert indices since // the reconciliator index could be different across the nodes. strcpy(rr.info[i].name, dr->workers[j].name); if (i == 0) { nsr_worker_log(this->name, GF_LOG_INFO, "this node info term=%d, ops=%d, first=%d, last=%d\n", rr.info[i].last_term, rr.info[i].commited_ops, rr.info[i].first_index,rr.info[i].last_index); } else { nsr_worker_log(this->name, GF_LOG_INFO, "reconciliator node info term=%d, ops=%d, first=%d, last=%d\n", rr.info[i].last_term, rr.info[i].commited_ops, rr.info[i].first_index,rr.info[i].last_index); } } rr.role = resolutor; ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0); nsr_worker_log(this->name, GF_LOG_INFO, "sent message to this node %d resolutor with reconciliator as %d\n", index, rec); break; } case NSR_WORK_ID_END_RECONCILIATION: { char c[4]; uint32_t old = htonl(dr->txn_id); nsr_worker_log(this->name, GF_LOG_INFO, "sending reconciliation end message to node %d\n", index); memcpy(c, &old, sizeof(uint32_t)); // TBD - error handling for all the glfs APIs glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_0, SEEK_SET); glfs_write(ctx->aux_fd, c, sizeof(c), 0); nsr_worker_log(this->name, GF_LOG_INFO, "finished sending reconciliation end message to node %d\n", index); break; } case NSR_WORK_ID_GET_RECONCILATION_WINDOW: { nsr_recon_log_info_t li; nsr_reconciliator_info_t *recon_info = rw->recon_info; uint32_t i = 0; uint32_t num = (dr->workers[index].recon_info->last_index - dr->workers[index].recon_info->first_index +1); nsr_recon_record_details_t *rd; nsr_worker_log(this->name, GF_LOG_INFO, "trying to get reconciliation window records for node %d for term %d with first %d last %d\n", index, recon_info->last_term, recon_info->first_index, recon_info->last_index); GF_ASSERT(num <= MAX_RECONCILIATION_WINDOW_SIZE); // TBD - error handling for all the glfs APIs glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET); // write to node what term & indices we are interested li.term = recon_info->last_term; li.first_index = recon_info->first_index; li.last_index = recon_info->last_index; ENDIAN_CONVERSION_LI(li, _gf_false); //htonl glfs_write(ctx->aux_fd, &li, sizeof(li), 0); // then read rd = GF_CALLOC(num, sizeof(nsr_recon_record_details_t), gf_mt_recon_private_t); glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0); for (i=0; i < num; i++) { ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl memcpy(&(recon_info->records[i].rec), &(rd[i]), sizeof(nsr_recon_record_details_t)); nsr_worker_log(this->name, GF_LOG_INFO, "get_reconcilaition_window:Got %d at index %d\n", recon_info->records[i].rec.type, i + recon_info->first_index); } free(rd); nsr_worker_log(this->name, GF_LOG_INFO, "got reconciliation window records for node %d for term %d \n", index, recon_info->last_term); break; } } return; } // Control worker thread static void* control_worker_main(nsr_per_node_worker_t *ctx) { unsigned int index = ctx->index; ctx->is_control = _gf_true; nsr_worker_log(this->name, GF_LOG_INFO, "starting control worker func\n"); // if this is for local processing, call the changelog parsing calls directly if (index == 0) { control_worker_main_0(ctx); return NULL; } init_worker(ctx, 1); while(1) { nsr_recon_work_t *work = NULL; nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; nsr_worker_log(this->name, GF_LOG_INFO, "waiting for work\n"); pthread_mutex_lock(&ctx->mutex); while (list_empty(&(ctx->head.list))) { pthread_cond_wait(&ctx->cv, &ctx->mutex); } pthread_mutex_unlock(&ctx->mutex); list_for_each_entry(work, &(ctx->head.list), list) { nsr_worker_log(this->name, GF_LOG_INFO, "got work with id %d\n", work->req_id); work->in_use = _gf_false; control_worker_func(ctx,work); atomic_dec(&(dr->outstanding)); break; } nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n"); list_del_init (&work->list); nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n"); } return NULL; } /* * This function gets called if this process is chosen as the reconciliator * for this replica group. It would have already got the records for the last term * for the indices that are required (from the first HOLE to last index) from * all other nodes that also witnessed that term. COmpare all the records and * compute the work required. * * Input arguments * ctx - driver context. All recon work is stored in workers[0].recon_info */ static void compute_reconciliation_work(nsr_recon_driver_ctx_t *ctx) { uint32_t i=0, j=0; nsr_reconciliator_info_t *my_recon = ctx->workers[0].recon_info; uint32_t num = (my_recon->last_index - my_recon->first_index + 1); for (i=0; i < num; i++) { nsr_log_type_t orig, new; unsigned int src = 0; orig = new = my_recon->records[i].rec.type; nsr_recon_work_type_t tw = NSR_RECON_WORK_NONE; // index 0 means this node. Look at all other nodes. for (j=1; j < ctx->replica_group_size; j++) { if (ctx->workers[j].in_use) { nsr_log_type_t pr = ctx->workers[j].recon_info->records[i].work.type; if ((new != pr) && (pr > new)) { src = j; new = (new | pr); } } } // TBD - compare data if new and orig are all FILLs. (can detect changelog corruption) // Right now we compare if both orig and new are psuedo holes since // only that is of interest to us. if (orig != new) { if ((orig == NSR_LOG_HOLE) && (new == NSR_LOG_PSEUDO_HOLE)) tw = NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE; else if ((orig == NSR_LOG_HOLE) && (new == NSR_LOG_FILL)) tw = NSR_RECON_WORK_HOLE_TO_FILL; else if ((orig == NSR_LOG_PSEUDO_HOLE) && (new == NSR_LOG_PSEUDO_HOLE)) tw = NSR_RECON_WORK_COMPARE_PSEUDO_HOLE; else if ((orig == NSR_LOG_PSEUDO_HOLE) && (new == NSR_LOG_FILL)) tw = NSR_RECON_WORK_HOLE_TO_FILL; } if (tw != NSR_RECON_WORK_NONE) { my_recon->records[i].work.type = tw; my_recon->records[i].work.source = src; // Overwrite the record memcpy(&(my_recon->records[i].rec), &(ctx->workers[src].recon_info->records[i].rec), sizeof(nsr_recon_record_details_t)); } } return; } static void nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, uint32_t i, gf_boolean_t in_use); /* * Write the role and associated information to the node. * This gets called from recon xlator indicating node is either * leader, reconciliator or should do resolution. * First we undo the last role to make sure we clean up. * * Input arguments * ctx - driver context. * rr - Role information. * If leader, the thread now sends the list of all nodes that are part of * the current replica group. Use that to find out the activate the * required worker threads. * If reconciliator, the leader node would have sent information about * all nodes which saw last term as the reconciliator. * If resolution to be done, then rr.info[0] will have this node's info * which the leader would have got earlier. rr[1].info will have the * info regarding the reconciliator. * txn_id - All role changes(except when leader becomes reconciliator or resolutor) * would be initiated as write to the recon xlator which would have got a frame from * either the brick process(leader change) or other reconciliation process. * The write function would return immediately after storing the frame which * needs to be returned back after the actual reconciliation is done. * For that we store the frame against this id which acts as a key. */ gf_boolean_t nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, uint32_t txn_id) { uint8_t i=0, j=0; pthread_mutex_lock(&(ctx->mutex)); ctx->state = rr->role; // First make all the threads uninitialise for (i = 0; i < ctx->replica_group_size; i++) { nsr_recon_in_use(ctx, i, _gf_false); } if (rr->role == leader) { // First set info this node nsr_recon_in_use(ctx, 0, _gf_true); ctx->workers[0].recon_info = GF_CALLOC (1, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[0].recon_info) { return _gf_false; } ctx->current_term = rr->current_term; // Find rest of the nodes for (i=1; i < ctx->replica_group_size; i++) { for (j=0 ; j < rr->num; j++) { // TBD - make this strcmp later when etcd servers set properly //if (!strcmp(ctx->workers[i].name, rr->info[j].name)) { if (strstr(ctx->workers[i].name, rr->info[j].name)) { nsr_driver_log(this->name, GF_LOG_INFO, "nsr_recon_driver_set_role: this as leader. found other server %s\n", ctx->workers[i].name); nsr_recon_in_use(ctx, i, _gf_true); // Allocate this here. This will get later filled when // the leader tries to get last term information from all // the nodes ctx->workers[i].recon_info = GF_CALLOC (1, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[i].recon_info) { return _gf_false; } break; } } } ctx->reconciliator_index = -1; } else if (rr->role == reconciliator) { ctx->reconciliator_index = 0; // Copy information about all the other members which had the same term for (i=0; i < rr->num; i++) { for (j=0; j < ctx->replica_group_size; j++) { //if (!strcmp(rr->info[i].name, ctx->workers[j].name)) { if (strstr(ctx->workers[j].name, rr->info[i].name)) { nsr_driver_log(this->name, GF_LOG_INFO, "nsr_recon_driver_set_role: this as reconciliator. found other server %s\n", ctx->workers[j].name); ctx->workers[j].recon_info = GF_CALLOC (1, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[j].recon_info) { return _gf_false; } ctx->workers[j].recon_info->last_term = rr->info[i].last_term; ctx->workers[j].recon_info->commited_ops = rr->info[i].commited_ops; ctx->workers[j].recon_info->last_index = rr->info[i].last_index; ctx->workers[j].recon_info->first_index = rr->info[i].first_index; nsr_recon_in_use(ctx, j, _gf_true); break; } } } } else if (rr->role == resolutor) { for (j=0; j < ctx->replica_group_size; j++) { // info[1] has the information regarding the reconciliator if (strstr(ctx->workers[j].name, rr->info[1].name)) { //if (!strcmp(rr->info[1].name, ctx->workers[j].name)) { nsr_driver_log(this->name, GF_LOG_INFO, "nsr_recon_driver_set_role: this as resolutor. found other server %s as reconciliator\n", ctx->workers[1].name); ctx->workers[j].recon_info = GF_CALLOC (1, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[j].recon_info) { return _gf_false; } ctx->workers[j].recon_info->last_term = rr->info[1].last_term; ctx->workers[j].recon_info->commited_ops = rr->info[1].commited_ops; ctx->workers[j].recon_info->last_index = rr->info[1].last_index; ctx->workers[j].recon_info->first_index = rr->info[1].first_index; ctx->reconciliator_index = j; nsr_recon_in_use(ctx, j, _gf_true); GF_ASSERT(ctx->reconciliator_index != 0); break; } } ctx->workers[0].recon_info = GF_CALLOC (1, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[0].recon_info) { return _gf_false; } // info[0] has all info for this node ctx->workers[0].recon_info->last_term = rr->info[0].last_term; ctx->workers[0].recon_info->commited_ops = rr->info[0].commited_ops; ctx->workers[0].recon_info->last_index = rr->info[0].last_index; ctx->workers[0].recon_info->first_index = rr->info[0].first_index; nsr_recon_in_use(ctx, 0, _gf_true); } ctx->txn_id = txn_id; // Signal the main driver thread pthread_cond_signal(&(ctx->cv)); pthread_mutex_unlock(&(ctx->mutex)); return _gf_true; } /* * This function gets called if this process is chosen to sync itself with * the reconciliator. * * Input arguments * ctx - driver context. * my_info - local changelog info that has all the local records for indices that require work * his_info - reconciliator's info that has all the golden copies * invalidate - if set to true, then do not consult local records */ static void compute_resolution_work(nsr_recon_driver_ctx_t *ctx, nsr_reconciliator_info_t *my_info, nsr_reconciliator_info_t *his_info, gf_boolean_t invalidate) { uint32_t i=0; uint32_t num = (my_info->last_index - my_info->first_index + 1); for (i=0; i < num; i++) { nsr_log_type_t orig, new; nsr_recon_work_type_t tw = NSR_RECON_WORK_NONE; orig = my_info->records[i].rec.type; if (invalidate) orig = NSR_LOG_HOLE; new = his_info->records[i].rec.type; // TBD - we can never have PSUEDO_HOLE in reconciliator's info // We should have taken care of that during reconciliation. // Put an assert to validate that. if (new != orig) { if ((orig != NSR_LOG_FILL) && (new == NSR_LOG_FILL)) tw = NSR_RECON_WORK_HOLE_TO_FILL; else if ((orig != NSR_LOG_HOLE) && (new == NSR_LOG_HOLE)) tw = NSR_RECON_WORK_UNDO_FILL; } // copy the records anyway my_info->records[i].work.type = tw; my_info->records[i].work.source = ctx->reconciliator_index; memcpy(&(my_info->records[i].rec), &(his_info->records[i].rec), sizeof(nsr_recon_record_details_t)); } return; } // Create an glfs object static struct glfs_object * create_obj(nsr_per_node_worker_t *ctx, char *gfid_str) { struct glfs_object *obj = NULL; uuid_t gfid; uuid_parse(gfid_str, gfid); obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL); if (obj == NULL) { GF_ASSERT(obj != NULL); nsr_worker_log(this->name, GF_LOG_ERROR, "creating of handle failed\n"); return NULL; } return obj; } /* * Function to apply the actual record onto the local brick. * prior to this we should have read all the data from the * brick that has the data. * * Input parameters: * ctx - per node worker context that has the fs for communicating to brick * ri - Reconciliation record that needs fixup * dict - So that NSR server translator on brick applis fixup only on this brick * and the changelog translator consumes term and index. */ static void apply_record(nsr_per_node_worker_t *ctx, nsr_reconciliation_record_t *ri, dict_t * dict) { struct glfs_fd *fd = NULL; struct glfs_object *obj = NULL; if (ri->rec.op == GF_FOP_WRITE) { nsr_worker_log(this->name, GF_LOG_INFO, "DOing write for file %s @offset %d for len %d\n", ri->rec.gfid, ri->rec.offset, ri->rec.len); // The file has got deleted on the source. Hence just ignore this. // TBD - get a way to just stuff the log entry without writing the data so that // changelogs remain identical. if (ri->work.data == NULL) { return; } if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL) return; fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); if (fd == NULL) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); return; } if (glfs_lseek_with_xdata(fd, ri->rec.offset, SEEK_SET, dict) != ri->rec.offset) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "lseek for file %s failed at offset %d\n", ri->rec.gfid, ri->rec.offset); return; } if (glfs_write_with_xdata(fd, ri->work.data, ri->rec.len, 0, dict) != ri->rec.len) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "write for file %s failed for bytes %d\n", ri->rec.gfid, ri->rec.len); return; } glfs_close_with_xdata(fd, dict); nsr_worker_log(this->name, GF_LOG_INFO, "Finished DOing write for gfid %s @offset %d for len %d\n", ri->rec.gfid, ri->rec.offset, ri->rec.len); } else if (ri->rec.op == GF_FOP_FTRUNCATE) { nsr_worker_log(this->name, GF_LOG_INFO, "DOing truncate for file %s @offset %d \n", ri->rec.gfid, ri->rec.offset); if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); if (fd == NULL) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); return; } if (glfs_ftruncate_with_xdata(fd, ri->rec.offset, dict) == -1) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR "trunctae for file %s failed @offset %d\n", ri->rec.gfid,ri->rec.offset ); return; } glfs_close_with_xdata(fd, dict); nsr_worker_log(this->name, GF_LOG_INFO, "Finished DOing truncate for gfid %s @offset %d \n", ri->rec.gfid, ri->rec.offset); } else if ((ri->rec.op == GF_FOP_FREMOVEXATTR) || (ri->rec.op == GF_FOP_REMOVEXATTR) || (ri->rec.op == GF_FOP_SETXATTR) || (ri->rec.op == GF_FOP_FSETXATTR)) { uint32_t k_s = 0, v_s = 0; char *t_b= NULL; uint32_t num = 0; nsr_worker_log(this->name, GF_LOG_INFO, "Doing set extended attr for file %s \n", ri->rec.gfid); // The file has got deleted on the source. Hence just ignore this. // TBD - get a way to just stuff the log entry without writing the data so that // changelogs remain identical. if (ri->work.data == NULL) { return; } if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; if (obj->inode->ia_type == IA_IFDIR) fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict); else fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); if (fd == NULL) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); return; } if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) { if (t_b) free(t_b); GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "list of xattr of %s failed\n", ri->rec.gfid); return; } delete_xattr(fd, dict, t_b, num); // Set one special dict flag to indicate the opcode so that // the opcode gets set to this if (dict_set_int32(dict,"recon-xattr-opcode",ri->rec.op)) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "setting opcode to %d failed\n",ri->rec.op); return; } fill_xattr(fd, dict, ri->work.data, ri->work.num); glfs_close_with_xdata(fd, dict); nsr_worker_log(this->name, GF_LOG_INFO, "Finsihed Doing set extended attr for %s \n", ri->rec.gfid); } else if (ri->rec.op == GF_FOP_CREATE) { uuid_t gfid; nsr_worker_log(this->name, GF_LOG_INFO, "Doing create for file %s \n", ri->rec.gfid); // TBD - add mode and flags later uuid_parse(ri->rec.gfid, gfid); if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; if (glfs_h_creat_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, 0777, NULL, gfid, dict) == NULL) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing create for file %s\n", ri->rec.entry); return; } nsr_worker_log(this->name, GF_LOG_INFO, "Finished Doing create for file %s \n", ri->rec.entry); } else if (ri->rec.op == GF_FOP_MKNOD) { uuid_t gfid; nsr_worker_log(this->name, GF_LOG_INFO, "Doing mknod for file %s \n", ri->rec.entry); // TBD - add mode and flags later uuid_parse(ri->rec.gfid, gfid); if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; if (glfs_h_mknod_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, 0777, NULL, gfid, dict) == NULL) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing mknod for file %s\n", ri->rec.entry); return; } nsr_worker_log(this->name, GF_LOG_INFO, "Finished Doing mknod for file %s \n", ri->rec.entry); } else if (ri->rec.op == GF_FOP_MKDIR) { uuid_t gfid; nsr_worker_log(this->name, GF_LOG_INFO, "Doing mkdir for dir %s \n", ri->rec.gfid); // TBD - add mode and flags later uuid_parse(ri->rec.gfid, gfid); if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; if (glfs_h_mkdir_with_xdata(ctx->fs, obj, ri->rec.entry, 0777, NULL, gfid, dict) != 0) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing mkdir for file %s\n", ri->rec.entry); return; } nsr_worker_log(this->name, GF_LOG_INFO, "Finished Doing mkdir for file %s \n", ri->rec.entry); } else if ((ri->rec.op == GF_FOP_RMDIR) || (ri->rec.op == GF_FOP_UNLINK)) { nsr_worker_log(this->name, GF_LOG_INFO, "Doing rmdir/ublink for dir %s \n", ri->rec.entry); if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; if (glfs_h_unlink_with_xdata(ctx->fs, obj, ri->rec.entry, dict) != 0) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing rmdir/unlink for file %s\n", ri->rec.entry); return; } nsr_worker_log(this->name, GF_LOG_INFO, "Finished Doing rmdir/unlink for file %s \n", ri->rec.entry); } else if (ri->rec.op == GF_FOP_SYMLINK) { uuid_t gfid; nsr_worker_log(this->name, GF_LOG_INFO, "Doing symlink for file %s to file %s \n", ri->rec.entry, ri->rec.link_path); if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; uuid_parse(ri->rec.gfid, gfid); if (glfs_h_symlink_with_xdata(ctx->fs, obj, ri->rec.entry, ri->rec.link_path, NULL, gfid, dict) == NULL) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failed to Doing symlink for file %s to file %s \n", ri->rec.entry, ri->rec.link_path); return; } nsr_worker_log(this->name, GF_LOG_INFO, "Finished Doing symlink for file %s to file %s \n", ri->rec.entry, ri->rec.link_path); } else if (ri->rec.op == GF_FOP_LINK) { struct glfs_object *to_obj = NULL; nsr_worker_log(this->name, GF_LOG_INFO, "Doing hard link for file %s to file %s \n", ri->rec.entry, ri->rec.gfid); if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; if (glfs_h_link_with_xdata(ctx->fs, to_obj, obj, ri->rec.entry, dict) == -1) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failed to Doing hard link for file %s to file %s \n", ri->rec.entry, ri->rec.gfid); return; } nsr_worker_log(this->name, GF_LOG_INFO, "Finsihed doing hard link for file %s to file %s \n", ri->rec.entry, ri->rec.gfid); } else if (ri->rec.op == GF_FOP_RENAME) { struct glfs_object *to_obj = NULL; nsr_worker_log(this->name, GF_LOG_INFO, "Doing rename for file %s to file %s \n", ri->rec.entry, ri->rec.newloc); if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; if (glfs_h_rename_with_xdata(ctx->fs, obj, ri->rec.entry, to_obj, ri->rec.newloc, dict) == -1) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failed to Doing rename for file %s to file %s \n", ri->rec.entry, ri->rec.newloc); return; } nsr_worker_log(this->name, GF_LOG_INFO, "Finsihed doing renam for file %s to file %s \n", ri->rec.entry, ri->rec.newloc); } else if ((ri->rec.op == GF_FOP_SETATTR) || (ri->rec.op == GF_FOP_FSETATTR)) { struct iatt iatt = {0, }; int valid = 0; int ret = -1; // TBD - do the actual settings once we do that // right now we just set the mode so that changelog gets filled nsr_worker_log(this->name, GF_LOG_INFO, "Doing attr for file %s \n", ri->rec.gfid); if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; if (obj->inode->ia_type == IA_IFDIR) fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict); else fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); if (fd == NULL) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); return; } iatt.ia_prot = ia_prot_from_st_mode(777); valid = GF_SET_ATTR_MODE; // Set one special dict flag to indicate the opcode so that // the opcode gets set to this if (dict_set_int32(dict,"recon-attr-opcode",ri->rec.op)) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "setting opcode to %d failed\n",ri->rec.op); return; } ret = glfs_fsetattr_with_xdata(fd, &iatt, valid, dict); if (ret == -1) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_INFO, "failed Doing attr for file %s \n", ri->rec.gfid); return; } glfs_close_with_xdata(fd, dict); nsr_worker_log(this->name, GF_LOG_INFO, "Doing attr for file %s \n", ri->rec.gfid); } return; } //return back opcodes that requires reading from source static gf_boolean_t recon_check_changelog(nsr_recon_record_details_t *rd) { return((rd->op == GF_FOP_WRITE) || (rd->op == GF_FOP_FSETATTR) || (rd-> op == GF_FOP_SETATTR) || (rd->op == GF_FOP_FREMOVEXATTR) || (rd->op == GF_FOP_SETXATTR) || (rd->op == GF_FOP_FSETXATTR) || (rd->op == GF_FOP_SYMLINK)); } // TBD static gf_boolean_t recon_compute_undo(nsr_recon_record_details_t *rd) { return(_gf_false); } /* * Function that talks to the brick for data tranfer. * * Input arguments: * ctx - worker context * work - pointer to work object */ static void data_worker_func(nsr_per_node_worker_t *ctx, nsr_recon_work_t *work) { nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; nsr_reconciliation_record_t *ri = NULL; nsr_recon_record_details_t *rd = NULL; glfs_fd_t *fd = NULL; int wip = 0; switch (work->req_id){ case NSR_WORK_ID_INI: { nsr_worker_log(this->name, GF_LOG_INFO, "started data ini \n"); nsr_recon_start_work(ctx, _gf_false); nsr_worker_log(this->name, GF_LOG_INFO, "finished data ini \n"); break; } case NSR_WORK_ID_FINI: { nsr_worker_log(this->name, GF_LOG_INFO, "started data fini \n"); nsr_recon_end_work(ctx, _gf_false); nsr_worker_log(this->name, GF_LOG_INFO, "finished data fini \n"); break; } case NSR_WORK_ID_SINGLE_RECONCILIATION_READ: { dict_t * dict = NULL; // first_index always starts with 1 but records starts at 0. wip = work->index - (dr->workers[0].recon_info->first_index); ri = &(dr->workers[0].recon_info->records[wip]); rd = &(ri->rec); dict = dict_new (); if (!dict) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "failed allocating for dictionary\n"); goto commit_out; } if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); goto commit_out; } if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); goto commit_out; } if (rd->op == GF_FOP_WRITE) { // record already copied. // copy data to this node's info. struct glfs_fd *fd = NULL; struct glfs_object *obj = NULL; uuid_t gfid; uuid_parse(ri->rec.gfid, gfid); nsr_worker_log(this->name, GF_LOG_INFO, "started recon read for file %s at offset %d at len %d\n", ri->rec.gfid, rd->offset, rd->len); obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL); if (obj == NULL) { GF_ASSERT(obj != NULL); nsr_worker_log(this->name, GF_LOG_ERROR, "creating of handle failed\n"); goto read_out; } // The file has probably got deleted. fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDONLY, dict); if (fd == NULL) { GF_ASSERT(fd != NULL); nsr_worker_log(this->name, GF_LOG_ERROR, "opening of file failed\n"); goto read_out; } if (glfs_lseek_with_xdata(fd, rd->offset, SEEK_SET, dict) != rd->offset) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "lseek of file failed to offset %d\n", rd->offset); goto read_out; } ri->work.data = GF_CALLOC(rd->len , sizeof(char), gf_mt_recon_private_t); if (glfs_read_with_xdata(fd, ri->work.data, rd->len, 0, dict) != rd->len) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "read of file failed to offset %d for bytes %d\n", rd->offset, rd->len); goto read_out; } glfs_close_with_xdata(fd, dict); glfs_h_close(obj); } else if (rd->op == GF_FOP_FTRUNCATE) { } else if (rd->op == GF_FOP_SYMLINK) { } else if ((rd->op == GF_FOP_RMDIR) || (rd->op == GF_FOP_UNLINK) || (rd->op == GF_FOP_MKNOD) || (rd->op == GF_FOP_CREATE) || (rd->op == GF_FOP_LINK) || (rd->op == GF_FOP_MKDIR)) { } else if (rd->op == GF_FOP_RENAME) { } else if ((rd->op == GF_FOP_FREMOVEXATTR) || (rd->op == GF_FOP_REMOVEXATTR) || (rd->op == GF_FOP_SETXATTR) || (rd->op == GF_FOP_FSETXATTR)) { struct glfs_fd *fd = NULL; struct glfs_object *obj = NULL; uuid_t gfid; uuid_parse(ri->rec.gfid, gfid); // This is for all the set attribute/extended attributes commands. // Get all the attributes from the source and fill it in the buffer // as a NULL seperated key and value which are in turn seperated by // NULL. uint32_t k_s = 0, v_s = 0; char *t_b= NULL; uint32_t num=0; nsr_worker_log(this->name, GF_LOG_INFO, "doing getattr for gfid %s \n", ri->rec.gfid); obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL); if (obj == NULL) { GF_ASSERT(fd != NULL); nsr_worker_log(this->name, GF_LOG_ERROR, "creating of handle failed\n"); goto read_out; } if (obj->inode->ia_type == IA_IFDIR) fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict); else fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDONLY, dict); if (fd == NULL) { GF_ASSERT(fd != NULL); nsr_worker_log(this->name, GF_LOG_ERROR, "opening of file failed\n"); goto read_out; } if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) { if (t_b) free(t_b); GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "list of xattr of gfid %s failed\n", rd->gfid); goto read_out; } ri->work.data = GF_CALLOC((k_s + v_s) , sizeof(char), gf_mt_recon_private_t); get_xattr(fd, t_b, ri->work.data, v_s, num, dict); ri->work.num = num; nsr_worker_log(this->name, GF_LOG_INFO, "finished getattr for gfid %s \n", ri->rec.gfid); free(t_b); goto read_out; } else if ((rd->op == GF_FOP_FSETATTR) || (rd->op == GF_FOP_SETATTR)) { //TBD - to get the actual attrbutes and fill // mode, uid, gid, size, atime, mtime } read_out: nsr_worker_log(this->name, GF_LOG_INFO, "finished recon read for gfid %s at offset %d for %d bytes \n", rd->gfid, rd->offset, rd->len); break; } case NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT: { dict_t * dict = NULL; // first_index always starts with 1 but records starts at 0. wip = work->index - (dr->workers[0].recon_info->first_index); ri = &(dr->workers[0].recon_info->records[wip]); rd = &(ri->rec); nsr_worker_log(this->name, GF_LOG_INFO, "got recon commit for index %d that has gfid %s \n", wip, rd->gfid); dict = dict_new (); if (!dict) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "failed allocating for dictionary\n"); goto commit_out; } if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); goto commit_out; } if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); goto commit_out; } apply_record(ctx, ri, dict); commit_out: dict_unref (dict); nsr_worker_log(this->name, GF_LOG_INFO, "finished recon commit for gfid %s \n", rd->gfid); break; } case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH: { dict_t * dict = NULL; dict = dict_new (); if (!dict) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "failed allocating for dictionary\n"); goto commit_out; } if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); goto commit_out; } if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); goto commit_out; } // Increment work index with the start index wip = work->index - (dr->workers[0].recon_info->first_index); ri = &(dr->workers[0].recon_info->records[wip]); rd = &(ri->rec); //fd = glfs_open(ctx->fs, rd->gfid, O_RDONLY); //TBD - using gfid glfs_fsync_with_xdata(fd, dict); break; } } return; } // thread for doing data work static void * data_worker_main(nsr_per_node_worker_t *ctx) { nsr_worker_log(this->name, GF_LOG_INFO, "starting data worker func\n"); init_worker(ctx, 0); while(1) { nsr_recon_work_t *work = NULL; nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; nsr_worker_log(this->name, GF_LOG_INFO, "waiting for work\n"); pthread_mutex_lock(&(ctx->mutex)); while (list_empty(&(ctx->head.list))) { pthread_cond_wait(&(ctx->cv), &(ctx->mutex)); } pthread_mutex_unlock(&(ctx->mutex)); list_for_each_entry(work, &(ctx->head.list), list) { nsr_worker_log(this->name, GF_LOG_INFO, "got work with id %d\n",work->req_id); work->in_use = _gf_false; data_worker_func(ctx, work); atomic_dec(&(dr->outstanding)); break; } nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n"); list_del_init (&work->list); nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n"); } return NULL; } //make recon work static void recon_make_work(nsr_recon_work_t **work, nsr_recon_work_req_id_t req_id, int32_t i) { // TBD - change this to get from a static pool // This cannot fail (*work) = GF_CALLOC (1, sizeof (nsr_recon_work_t), gf_mt_recon_private_t); (*work)->req_id = req_id; (*work)->index = i; (*work)->in_use = _gf_true; INIT_LIST_HEAD(&((*work)->list)); return; } // Schedule a work object to a worker thread. static void recon_queue_to_worker(nsr_recon_driver_ctx_t *ctx, nsr_recon_work_t *work, unsigned int id, nsr_recon_queue_type_t type) { nsr_per_node_worker_t *worker; if (type == NSR_RECON_QUEUE_TO_CONTROL) { worker = ctx->workers[id].control_worker; nsr_driver_log(this->name, GF_LOG_INFO, "queueing work to control index %d\n",id); } else { worker= ctx->workers[id].data_worker; nsr_driver_log(this->name, GF_LOG_INFO, "queueing work to data index %d\n",id); } pthread_mutex_lock(&worker->mutex); list_add_tail(&work->list, &worker->head.list); pthread_cond_signal(&worker->cv); pthread_mutex_unlock(&worker->mutex); return; } typedef void * (*F_t) (void *); // In case mode is set to NSR_USE_THREADS, create worker threads. static gf_boolean_t create_worker_threads(nsr_recon_private_t *priv, nsr_recon_driver_ctx_t *ctx, nsr_per_node_worker_t *w, gf_boolean_t control_or_data, F_t f, uint32_t num) { uint32_t i; nsr_per_node_worker_t *worker = w; for (i=0; i < num; i++) { worker->id = GF_CALLOC(1, 10, gf_mt_recon_private_t); if (!worker->id) { nsr_driver_log (priv->this->name, GF_LOG_ERROR, "memory allocation error \n"); return _gf_false; } sprintf(worker->id,"recon_%d", i); worker->driver_ctx = ctx ; if (ctx->mode == NSR_USE_THREADS) { if (pthread_create(&worker->thread_id, NULL, f, worker)) { nsr_driver_log (ctx->this->name, GF_LOG_ERROR, "control work thread creation error \n"); return _gf_false; } } worker->index = i; worker++; } return _gf_true; } /* * In case of thread, send the work item; else call the function directly. * * Input arguments: * bm - bitmap containing indices of nodes we want to send work * num - number of such indices * ctx - driver context from where we derive per worker context * id - request ID * q - control or data * misc - used to overload such as index. */ static void send_and_wait(int32_t bm, uint32_t num, nsr_recon_driver_ctx_t *ctx, nsr_recon_work_req_id_t id, nsr_recon_queue_type_t q, int32_t misc) { uint32_t i = 0; nsr_recon_work_t *work; if (ctx->mode == NSR_SEQ) { for (i=0; i < num; i++) { if ((bm & (1 << i)) && ctx->workers[i].in_use) { recon_make_work(&work, id, misc); if (q == NSR_RECON_QUEUE_TO_CONTROL) { if (i == 0) control_worker_func_0(ctx->workers[0].control_worker, work); else control_worker_func(ctx->workers[i].control_worker, work); } else { data_worker_func(ctx->workers[i].data_worker, work); } } } nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned\n"); return; } for (i=0; i < num; i++) { if ((bm & (1 << i)) && ctx->workers[i].in_use) { recon_make_work(&work, id, misc); atomic_inc(&(ctx->outstanding)); recon_queue_to_worker(ctx, work, i, q); } } nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: waiting\n"); while (ctx->outstanding) { pthread_yield(); } nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned\n"); return; } #if 0 static void send_and_do_not_wait(int32_t bm, uint32_t num, nsr_recon_driver_ctx_t *ctx, nsr_recon_work_req_id_t id, nsr_recon_queue_type_t q, int32_t misc) { uint32_t i = 0; for (i=0; i < num; i++) { if ((bm & (1 << i)) && ctx->workers[i].in_use) { nsr_recon_work_t *work; recon_make_work(&work, id, misc); recon_queue_to_worker(ctx, work, i, q); } } return; } #endif // send INI or FINI static void nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, uint32_t i, gf_boolean_t in_use) { uint32_t bm = 1 << i; gf_boolean_t send = _gf_false; if (in_use == _gf_false) { if (ctx->workers[i].in_use == _gf_true) send = _gf_true; ctx->workers[i].in_use = _gf_false; } else { if (ctx->workers[i].in_use != _gf_true) { ctx->workers[i].in_use = _gf_true; send = _gf_true; } } #if 1 if (send == _gf_true) { if (in_use == _gf_true) { nsr_driver_log(this->name, GF_LOG_INFO, "sending INI to index %d\n",i); } else { nsr_driver_log(this->name, GF_LOG_INFO, "sending FINI to index %d\n",i); } send_and_wait(bm, ctx->replica_group_size, ctx, (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, NSR_RECON_QUEUE_TO_CONTROL, -1); send_and_wait(bm, ctx->replica_group_size, ctx, (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, NSR_RECON_QUEUE_TO_DATA, -1); } #endif } // main recon driver thread void * nsr_reconciliation_driver(void *arg) { nsr_recon_private_t *priv = (nsr_recon_private_t *) arg; uint32_t replica_group_size = priv->replica_group_size; uint32_t i; nsr_per_node_worker_t *control_s, *data_s; nsr_recon_driver_ctx_t **driver_ctx, *ctx; int32_t bm; xlator_t *this = priv->this; driver_ctx = &priv->driver_thread_context; (*driver_ctx) = GF_CALLOC (1, sizeof (nsr_recon_driver_ctx_t), gf_mt_recon_private_t); if (!driver_ctx) { gf_log (this->name, GF_LOG_ERROR, "memory allocation error \n"); return NULL; } ctx = *driver_ctx; ctx->this = priv->this; ctx->replica_group_size = replica_group_size; if ((pthread_mutex_init(&(ctx->mutex), NULL)) || (pthread_cond_init(&(ctx->cv), NULL))){ nsr_driver_log (this->name, GF_LOG_ERROR, "mutex init error \n"); return NULL; } ctx->workers = GF_CALLOC (replica_group_size, sizeof(nsr_replica_worker_t), gf_mt_recon_private_t); if (!ctx->workers) { nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n"); return NULL; } for (i=0; i < replica_group_size; i++) { strcpy(ctx->workers[i].name, priv->replica_group_members[i]); } control_s = GF_CALLOC (replica_group_size, sizeof(nsr_per_node_worker_t), gf_mt_recon_private_t); if (!control_s) { nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n"); return NULL; } data_s = GF_CALLOC (replica_group_size, sizeof(nsr_per_node_worker_t), gf_mt_recon_private_t); if (!data_s) { nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n"); return NULL; } for (i=0; i < replica_group_size; i++) { ctx->workers[i].control_worker = &control_s[i]; ctx->workers[i].data_worker = &data_s[i]; } nsr_driver_log (this->name, GF_LOG_INFO, "creating threads \n"); // Create the worker threads // For every brick including itself there will be 2 worker threads: // one for data and one for control if (!create_worker_threads(priv, ctx, control_s, _gf_true, (F_t) control_worker_main, replica_group_size) || !create_worker_threads(priv, ctx, data_s, _gf_false, (F_t) data_worker_main, replica_group_size)) { return NULL; } for (i=0; i < replica_group_size; i++) { nsr_recon_get_file(priv->volname, &(ctx->workers[i])); } while (1) { nsr_driver_log (this->name, GF_LOG_INFO, "waiting for state change \n"); pthread_mutex_lock(&(ctx->mutex)); while ((*driver_ctx)->state == 0) { pthread_cond_wait(&(ctx->cv), &(ctx->mutex)); } pthread_mutex_unlock(&(ctx->mutex)); nsr_driver_log (this->name, GF_LOG_INFO, " state changed to %d \n", ctx->state); #if 0 for (i=0; i < replica_group_size; i++) { if (ctx->workers[i].in_use) { nsr_recon_start_work(ctx->workers[i].control_worker, _gf_true); nsr_recon_start_work(ctx->workers[i].data_worker, _gf_false); } } #endif if (ctx->state == leader) { int32_t chosen = -1; int32_t last_term = -1, last_ops = -1; nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n"); // Get last term info from all members for this group send_and_wait(-1, replica_group_size, ctx, NSR_WORK_ID_GET_LAST_TERM_INFO, NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term); // compare all the info received and choose the reconciliator // First choose all with latest term for (i=0; i < replica_group_size; i++) { if (ctx->workers[i].in_use) { if (ctx->workers[i].recon_info->last_term > last_term) { last_term = ctx->workers[i].recon_info->last_term; } } } // First choose all with latest term and highest ops for (i=0; i < replica_group_size; i++) { if ((ctx->workers[i].in_use) && (last_term == ctx->workers[i].recon_info->last_term)) { if (ctx->workers[i].recon_info->commited_ops > last_ops) { last_ops = ctx->workers[i].recon_info->commited_ops; } } } // choose the first among the lot for (i=0; i < replica_group_size; i++) { if ((ctx->workers[i].in_use) && (last_term == ctx->workers[i].recon_info->last_term) && (last_ops == ctx->workers[i].recon_info->commited_ops)) { chosen = i; break; } } nsr_driver_log (this->name, GF_LOG_INFO, "reconciliator chosen is %d\n", chosen); ctx->reconciliator_index = chosen; GF_ASSERT(chosen != -1); if (chosen == -1) { nsr_driver_log (this->name, GF_LOG_INFO, "no reconciliatior chosen\n"); goto out; } // send the message to reconciliator to do reconciliation with list of nodes that are part of this quorum if (chosen != 0) { nsr_driver_log (this->name, GF_LOG_INFO, "sending reconciliation work to %d\n", chosen); bm = 1 << ctx->reconciliator_index; send_and_wait(bm, replica_group_size, ctx, NSR_WORK_ID_RECONCILIATOR_DO_WORK, NSR_RECON_QUEUE_TO_CONTROL, -1); nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work to %d\n", chosen); } else { nsr_driver_log (this->name, GF_LOG_INFO, "local node is reconciliator. before set jmp\n"); ctx->env = calloc(1,sizeof(jmp_buf)); /* * REVIEW * Use of setjmp/longjmp in an environment * where we already use ucontext is dangerous * and therefore forbidden. Refactoring will * also help with some of the rampant 80-column * violations and indented code crawling across * the screen, which together make this entire * file almost unreadable. */ if (!setjmp(*(ctx->env))) { ctx->state = reconciliator; goto i_am_reconciliator; } else { nsr_driver_log (this->name, GF_LOG_INFO, "long jmp return to leader\n"); free(ctx->env); ctx->env = NULL; ctx->state = leader; } } // send message to all other nodes to sync up with the reconciliator including itself if required // requires optimisation - TBD if (chosen != 0) { nsr_driver_log (this->name, GF_LOG_INFO, "local node resolution needs to be done. before set jmp\n"); ctx->env = calloc(1,sizeof(jmp_buf)); if (!setjmp(*(ctx->env))) { ctx->state = resolutor; goto i_am_resolutor; } else { nsr_driver_log (this->name, GF_LOG_INFO, "long jmp return to leader\n"); free(ctx->env); ctx->env = NULL; ctx->state = leader; } } nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this node and reconciliator\n"); bm = ~((1 << ctx->reconciliator_index) || 1); send_and_wait(bm, replica_group_size, ctx, NSR_WORK_ID_RESOLUTION_DO_WORK, NSR_RECON_QUEUE_TO_CONTROL, -1); nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work as leader \n"); } i_am_reconciliator: if (ctx->state == reconciliator) { gf_boolean_t do_recon = _gf_false; uint32_t start_index = ctx->workers[0].recon_info->first_index; uint32_t end_index = ctx->workers[0].recon_info->last_index; uint32_t num = ((start_index == 0) && (end_index == 0)) ? 0 : (end_index - start_index + 1); nsr_driver_log (this->name, GF_LOG_INFO, "starting reconciliation work as reconciliator \n"); // nothing to be done? signal back to the recon translator that this phase done. bm = 1; for (i=1; i < replica_group_size; i++) { if (ctx->workers[i].in_use && (ctx->workers[0].recon_info->last_term == ctx->workers[i].recon_info->last_term)) { ctx->workers[i].recon_info->last_index = end_index; ctx->workers[i].recon_info->first_index = start_index; bm = (1 << i); do_recon = _gf_true; } } if (!do_recon || !num) { nsr_driver_log (this->name, GF_LOG_INFO, "nothing needs to be done as resolutor \n"); if (ctx->env) { nsr_driver_log (this->name, GF_LOG_INFO, "before longjmp \n"); longjmp(*(ctx->env), 1); } else { goto out; } } nsr_driver_log (this->name, GF_LOG_INFO, "getting reconciliation window for term %d from %dto %d \n", ctx->workers[0].recon_info->last_term, start_index, end_index); // We have set the bm in the above for loop where // we go thru all nodes including this node that // have seen the last term. send_and_wait(bm, replica_group_size, ctx, NSR_WORK_ID_GET_RECONCILATION_WINDOW, NSR_RECON_QUEUE_TO_CONTROL, -1); nsr_driver_log (this->name, GF_LOG_INFO, "finished getting reconciliation window for term %d from %dto %d \n", ctx->workers[0].recon_info->last_term, start_index, end_index); // from the changelogs, calculate the entries // that need action and the source for each of these entries compute_reconciliation_work(ctx); // for each of the entries that need fixup, issue IO for (i=start_index; i < (start_index + num); i++) { nsr_reconciliator_info_t *my_recon_info = ctx->workers[0].recon_info; nsr_reconciliation_record_t *record = &(my_recon_info->records[i - start_index]); record->work.term = ctx->workers[0].recon_info->last_term; record->work.index = i; nsr_driver_log (this->name, GF_LOG_INFO, "fixing index %d\n",i); if ((record->work.type == NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE) || (record->work.type == NSR_RECON_WORK_HOLE_TO_FILL)) { // 1st case (RECON_WORK_HOLE_TO_PSEUDO_HOLE): // If there are only pseudo_holes in others, it is best effort. // Just pick from the first node that has it and proceed. // 2nd case (RECON_WORK_HOLE_TO_FILL): // this node has either a HOLE or PSUEDO_HOLE and some one else has a FILL(source). // analyse the changelog to check if data needs to be read or if the log has all the data required if (recon_check_changelog(&record->rec)) { bm = (1 << record->work.source); nsr_driver_log (this->name, GF_LOG_INFO, "reading data from source %d\n",record->work.source); send_and_wait(bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_READ, NSR_RECON_QUEUE_TO_DATA, i); nsr_driver_log (this->name, GF_LOG_INFO, "got data from source %d\n",record->work.source); } nsr_driver_log (this->name, GF_LOG_INFO, "fixing local data as part of reconciliation\n"); bm = 1; send_and_wait(bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT, NSR_RECON_QUEUE_TO_DATA, i); nsr_driver_log (this->name, GF_LOG_INFO, "finished fixing local data as part of reconciliation\n"); } else if (record->work.type == NSR_RECON_WORK_COMPARE_PSEUDO_HOLE) { // this node has a pseudo_hole and some others have just that too. Just convert this to FILL. // let others blindly pick it from here. nsr_driver_log (this->name, GF_LOG_INFO, "fixing this record as a fill\n"); bm = 1; send_and_wait(bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH, NSR_RECON_QUEUE_TO_DATA, i); nsr_driver_log (this->name, GF_LOG_INFO, "finished fixing this record as a fill\n"); } } nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work as reconciliator \n"); if (ctx->env) { nsr_driver_log (this->name, GF_LOG_INFO, "before longjmp \n"); longjmp(*(ctx->env), 1); } // tbd - mark this term golden in the reconciliator } i_am_resolutor: if (ctx->state == resolutor) { // This node's last term is filled when it gets a message // from the leader to act as a reconciliator. uint32_t recon_index = ctx->reconciliator_index; nsr_reconciliator_info_t *my_info = ctx->workers[0].recon_info; nsr_reconciliator_info_t *his_info = ctx->workers[recon_index].recon_info; uint32_t my_last_term = my_info->last_term; uint32_t to_do_term = his_info->last_term; uint32_t my_start_index = 1, my_end_index = 1; uint32_t his_start_index = 1, his_end_index = 1; uint32_t num = 0; gf_boolean_t fl = _gf_true; nsr_driver_log (this->name, GF_LOG_INFO, "starting resolutor work with reconciliator as %d from term %d to term %d \n", recon_index, my_last_term, to_do_term); do { if (!fl) { (his_info->last_term)++; (my_info->last_term)++; } else { his_info->last_term = my_last_term; } nsr_driver_log (this->name, GF_LOG_INFO, "resolving term %d \n", my_info->last_term); // Get reconciliator's term information for that term nsr_driver_log (this->name, GF_LOG_INFO, "getting info from reconciliator for term %d \n", my_info->last_term); bm = (1 << recon_index); send_and_wait(bm, replica_group_size, ctx, NSR_WORK_ID_GET_GIVEN_TERM_INFO, NSR_RECON_QUEUE_TO_CONTROL, his_info->last_term); nsr_driver_log (this->name, GF_LOG_INFO, "finished getting info from reconciliator for term %d \n", my_info->last_term); // empty term if (!his_info->commited_ops) { nsr_driver_log (this->name, GF_LOG_INFO, "reconciliator for term %d is empty. moving to next term. \n", my_info->last_term); // TBD - mark the term golden fl = _gf_false; continue; } // calculate the resolution window boundary. // for the last term this node saw, we compare the resolution window of this and reconciliator. // for the rest of the nodes, we just accept the reconciliator info. if (fl) { my_start_index = my_info->first_index; my_end_index = my_info->last_index; his_start_index = his_info->first_index; his_end_index = his_info->last_index; my_info->first_index = (my_start_index < his_start_index) ? my_start_index : his_start_index; my_info->last_index = (my_end_index > his_end_index) ? my_end_index : his_end_index; } else { my_info->first_index = his_info->first_index; my_info->last_index = his_info->last_index; my_info->commited_ops = his_info->commited_ops; } if (my_info->first_index == 0) my_info->first_index = 1; num = (my_info->last_index - my_info->first_index) + 1; // Get the logs from the reconciliator (and this node for this term) if (fl) bm = ((1 << recon_index) | 1); else bm = (1 << recon_index); nsr_driver_log (this->name, GF_LOG_INFO, "getting reconciliation window for term %d from %d to %d \n", my_info->last_term, my_info->first_index, my_info->last_index); send_and_wait(bm, replica_group_size, ctx, NSR_WORK_ID_GET_RECONCILATION_WINDOW, NSR_RECON_QUEUE_TO_CONTROL, -1); nsr_driver_log (this->name, GF_LOG_INFO, "finished getting reconciliation window for term %d from %d to %d \n", my_info->last_term, my_info->first_index, my_info->last_index); // from the changelogs, calculate the entries that need action compute_resolution_work(ctx, my_info, his_info, !fl); // for each of the entries that need fixup, issue IO for (i=my_info->first_index; i < (my_info->first_index + num); i++) { nsr_reconciliation_record_t *record = &(my_info->records[i - my_info->first_index]); record->work.term = my_info->last_term; record->work.index = i; nsr_driver_log (this->name, GF_LOG_INFO, "fixing index %d\n",i); if ((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) || (record->work.type == NSR_RECON_WORK_UNDO_FILL)) { if (((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) && recon_check_changelog(&record->rec)) || ((record->work.type == NSR_RECON_WORK_UNDO_FILL) && recon_compute_undo(&record->rec))) { nsr_driver_log (this->name, GF_LOG_INFO, "reading data from source %d\n",recon_index); bm = (1 << recon_index); send_and_wait(bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_READ, NSR_RECON_QUEUE_TO_DATA, i); nsr_driver_log (this->name, GF_LOG_INFO, "finished reading data from source %d\n",recon_index); } nsr_driver_log (this->name, GF_LOG_INFO, "fixing local data as part of resolutor\n"); bm = 1; send_and_wait(bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT, NSR_RECON_QUEUE_TO_DATA, i); nsr_driver_log (this->name, GF_LOG_INFO, "finished fixing local data as part of resolutor\n"); } } fl = _gf_false; // tbd - mark this term golden in the reconciliator } while (my_last_term++ != to_do_term); nsr_driver_log (this->name, GF_LOG_INFO, "finished resolutor work \n"); if (ctx->env) { nsr_driver_log (this->name, GF_LOG_INFO, "before longjmp \n"); longjmp(*(ctx->env), 1); } } // free the asasociated recon_info contexts created as part of this role out: nsr_driver_log (this->name, GF_LOG_INFO, "sending end of reconciliation message \n"); nsr_recon_return_back(priv, ctx->txn_id); #if 0 // send message that job is done by writing to local recon translator bm = 1; send_and_wait(bm, replica_group_size, ctx, NSR_WORK_ID_END_RECONCILIATION, NSR_RECON_QUEUE_TO_CONTROL, -1); #endif nsr_driver_log (this->name, GF_LOG_INFO, "finished sending end of reconciliation message \n"); ctx->state = 0; } return NULL; }