summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xtests/basic/recon.t5
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.c1840
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.h2
3 files changed, 970 insertions, 877 deletions
diff --git a/tests/basic/recon.t b/tests/basic/recon.t
index 405fcb5d2..ab2241c7d 100755
--- a/tests/basic/recon.t
+++ b/tests/basic/recon.t
@@ -78,11 +78,12 @@ TEST ping_file $M0/probe2
TEST [ $(count_matches probe2) = 1 ]
# Restart the brick and give reconciliation a chance to run.
+# TBD: figure out why reconciliation takes so $#@! long to run
TEST $CLI volume start $V0 force
-sleep 10
+sleep 20
# Make sure *both* copies are valid after reconciliation.
TEST [ $(count_matches probe2) = 2 ]
-#cleanup
+cleanup
#killall -9 etcd
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c
index 1e68414ee..b02abf2bb 100644
--- a/xlators/cluster/nsr-recon/src/recon_driver.c
+++ b/xlators/cluster/nsr-recon/src/recon_driver.c
@@ -640,7 +640,6 @@ control_worker_func_0(nsr_per_node_worker_t *ctx,
{
// For local resolution, the main driver thread does it.
// SO there is no way we can have this message for this node.
- GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_INFO,
"this message should not be sent \n");
@@ -648,7 +647,6 @@ control_worker_func_0(nsr_per_node_worker_t *ctx,
}
case NSR_WORK_ID_RESOLUTION_DO_WORK:
{
- GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_INFO,
"this message should not be sent \n");
@@ -764,319 +762,334 @@ control_worker_main_0(nsr_per_node_worker_t *ctx)
return NULL;
}
-/*
- * Control worker funct for getting changelog info on some other node.
- * calls glfs functions to seek/read/write on aux_fd.
- *
- * Input arguments:
- * ctx - The per worker based context
- * control - set to true if this worker is for the control plane
- */
static void
-control_worker_func(nsr_per_node_worker_t *ctx,
- nsr_recon_work_t *work)
+control_worker_do_reconciliation (nsr_per_node_worker_t *ctx,
+ nsr_recon_work_t *work)
{
unsigned int index = ctx->index;
- nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]);
nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+ nsr_recon_role_t rr;
+ uint32_t i=0;
+ uint32_t num=0;
+ uint32_t idx = dr->reconciliator_index;
+ uint32_t term = dr->workers[idx].recon_info->last_term;
- ctx->is_control = _gf_true;
+ GF_ASSERT(idx == index);
- switch (work->req_id){
- case NSR_WORK_ID_INI:
- {
- nsr_worker_log(this->name, GF_LOG_INFO,
- "calling nsr_recon_start_work\n");
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to make this index %d as reconciliator for term %d\n", index, term);
+
+ // TBD - error handling for all the glfs APIs
+ if (glfs_lseek(ctx->aux_fd,
+ nsr_recon_xlator_sector_1,
+ SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
- // TBD - handle error in case nsr_recon_start_work gives error
- if (nsr_recon_start_work(ctx, _gf_true) != 0) {
- ctx->result = -1;
- return;
- }
+ // We have all the info for all other nodes.
+ // Fill all that info when sending data to that process.
+ for (i=0; i < dr->replica_group_size; i++) {
+ if ( dr->workers[i].in_use &&
+ (dr->workers[i].recon_info->last_term == term)) {
+ rr.info[num].last_term =
+ dr->workers[i].recon_info->last_term;
+ rr.info[num].commited_ops =
+ dr->workers[i].recon_info->commited_ops;
+ rr.info[num].last_index =
+ dr->workers[i].recon_info->last_index;
+ rr.info[num].first_index =
+ dr->workers[i].recon_info->first_index;
+ strcpy(rr.info[num].name,
+ dr->workers[i].name);
+ }
+ num++;
+ }
+ rr.num = num;
+ rr.role = reconciliator;
+ ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl
+ if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) {
+ ctx->result = -1;
+ // Put the errno only for this case since we are bothered about
+ // retrying only for this case. For rest of the cases we will
+ // just return EIO in errno.
+ ctx->op_errno = errno;
+ return;
+ }
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished nsr_recon_start_work\n");
- break;
- }
- case NSR_WORK_ID_FINI:
- {
- nsr_worker_log(this->name, GF_LOG_INFO,
- "calling nsr_recon_end_work\n");
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "sent reconciliator info for term %d with node count as %d\n", term, num);
+}
- // TBD - handle error in case nsr_recon_end_work gives error
- if (nsr_recon_end_work(ctx, _gf_true) != 0) {
- ctx->result = -1;
- return;
- }
+static void
+control_worker_do_resolution (nsr_per_node_worker_t *ctx,
+ nsr_recon_work_t *work)
+{
+ unsigned int index = ctx->index;
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+ nsr_recon_role_t rr;
+ unsigned int i=0, j=0;
+ unsigned int rec = dr->reconciliator_index;
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished nsr_recon_end_work\n");
- break;
- }
- case NSR_WORK_ID_GET_LAST_TERM_INFO:
- {
- nsr_recon_last_term_info_t lt;
- nsr_reconciliator_info_t *recon_info = rw->recon_info;
- int32_t term = htonl(work->index); // overloading it
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to make this index %d as resolutor with reconciliator as %d\n",index, rec);
+
+ // TBD - error handling for all the glfs APIs
+ if (glfs_lseek(ctx->aux_fd,
+ nsr_recon_xlator_sector_1,
+ SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
- nsr_worker_log(this->name, GF_LOG_INFO,
- "trying to get last term info for node %d with current term %d\n",index, work->index);
+ rr.num = 2;
+
+ // Fill in info[0] as info for the node for which we are seeking
+ // resolution. Fill in info[1] as info of the reconciliator node. The
+ // function nsr_recon_driver_get_role() that will be called when this
+ // message reaches the node will look at index 1 for term information
+ // related to the reconciliator.
+ for (i=0; i < 2; i++) {
+ (i == 0) ? (j = index) : (j = rec);
+ rr.info[i].last_term =
+ dr->workers[j].recon_info->last_term;
+ rr.info[i].commited_ops =
+ dr->workers[j].recon_info->commited_ops;
+ rr.info[i].last_index =
+ dr->workers[j].recon_info->last_index;
+ rr.info[i].first_index =
+ dr->workers[j].recon_info->first_index;
+ // The name is used as the key to convert indices since the
+ // reconciliator index could be different across the nodes.
+ strcpy(rr.info[i].name,
+ dr->workers[j].name);
+ if (i == 0) {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "this node info term=%d, ops=%d, first=%d, last=%d\n",
+ rr.info[i].last_term, rr.info[i].commited_ops,
+ rr.info[i].first_index,rr.info[i].last_index);
+ } else {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "reconciliator node info term=%d, ops=%d, first=%d, last=%d\n",
+ rr.info[i].last_term, rr.info[i].commited_ops,
+ rr.info[i].first_index,rr.info[i].last_index);
+ }
+ }
+ rr.role = resolutor;
+ ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl
+ if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) {
+ ctx->result = -1;
+ // Put the errno only for this case since we are bothered about
+ // retrying only for this case. For rest of the cases we will
+ // just return EIO in errno.
+ ctx->op_errno = errno;
+ return;
+ }
- // first write the current term term number
- // TBD - error handling for all the glfs APIs
- if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET) == -1) {
- ctx->result = -1;
- return;
- }
- if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) {
- ctx->result = -1;
- return;
- }
- if (glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0) == -1) {
- ctx->result = -1;
- return;
- }
- ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl
- recon_info->last_term = lt.last_term;
- recon_info->commited_ops = lt.commited_ops;
- recon_info->last_index = lt.last_index;
- recon_info->first_index = lt.first_index;
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "sent message to this node %d resolutor with reconciliator as %d\n", index, rec);
+}
- nsr_worker_log(this->name, GF_LOG_INFO,
- "out of get last term info with current term %d. got ops %d with first %d and last %d \n",
- recon_info->last_term, recon_info->commited_ops,
- recon_info->first_index, recon_info->last_index);
+static void
+control_worker_get_window (nsr_per_node_worker_t *ctx, nsr_recon_work_t *work)
+{
+ unsigned int index = ctx->index;
+ nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]);
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+ nsr_recon_log_info_t li;
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ uint32_t i = 0;
+ uint32_t num = (dr->workers[index].recon_info->last_index -
+ dr->workers[index].recon_info->first_index +1);
+ nsr_recon_record_details_t *rd;
- break;
- }
- case NSR_WORK_ID_GET_GIVEN_TERM_INFO:
- {
- nsr_recon_last_term_info_t lt;
- nsr_reconciliator_info_t *recon_info = rw->recon_info;
- int32_t term = htonl(work->index); // overloading it
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get reconciliation window records for node %d for term %d with first %d last %d\n",
+ index, recon_info->last_term, recon_info->first_index, recon_info->last_index);
- nsr_worker_log(this->name, GF_LOG_INFO,
- "trying to get term info for node %d for term %d\n",index, work->index);
+ // TBD - error handling for all the glfs APIs
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
- // first write the term number
- // TBD - error handling for all the glfs APIs
- if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET) == -1) {
- ctx->result = -1;
- return;
- }
- if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) {
- ctx->result = -1;
- return;
- }
- if (glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0) == -1) {
- ctx->result = -1;
- return;
- }
- ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl
- recon_info->last_term = lt.last_term;
- recon_info->commited_ops = lt.commited_ops;
- recon_info->last_index = lt.last_index;
- recon_info->first_index = lt.first_index;
+ // write to node what term & indices we are interested
+ li.term = recon_info->last_term;
+ li.first_index = recon_info->first_index;
+ li.last_index = recon_info->last_index;
+ ENDIAN_CONVERSION_LI(li, _gf_false); //htonl
+ if (glfs_write(ctx->aux_fd, &li, sizeof(li), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
- nsr_worker_log(this->name, GF_LOG_INFO,
- "out of get term info for term %d. got ops %d with first %d and last %d \n",
- recon_info->last_term, recon_info->commited_ops,
- recon_info->first_index, recon_info->last_index);
+ // then read
+ rd = GF_CALLOC(num,
+ sizeof(nsr_recon_record_details_t),
+ gf_mt_recon_private_t);
+ if (rd == NULL) {
+ ctx->result = -1;
+ return;
+ }
+ recon_info->records = GF_CALLOC(num,
+ sizeof(nsr_reconciliation_record_t),
+ gf_mt_recon_private_t);
+ if (recon_info->records == NULL) {
+ ctx->result = -1;
+ goto err;
+ }
- break;
- }
- case NSR_WORK_ID_RECONCILIATOR_DO_WORK:
- {
- nsr_recon_role_t rr;
- uint32_t i=0;
- uint32_t num=0;
- uint32_t idx = dr->reconciliator_index;
- uint32_t term = dr->workers[idx].recon_info->last_term;
- GF_ASSERT(idx == index);
+ if (glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0) == -1) {
+ ctx->result = -1;
+ goto err;
+ }
- nsr_worker_log(this->name, GF_LOG_INFO,
- "trying to make this index %d as reconciliator for term %d\n", index, term);
+ for (i=0; i < num; i++) {
+ ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl
+ memcpy (&(recon_info->records[i].rec), &(rd[i]),
+ sizeof(nsr_recon_record_details_t));
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "get_reconcilaition_window:Got %d at index %d\n",
+ recon_info->records[i].rec.type,
+ i + recon_info->first_index);
+ }
- // TBD - error handling for all the glfs APIs
- if (glfs_lseek(ctx->aux_fd,
- nsr_recon_xlator_sector_1,
- SEEK_SET) == -1) {
- ctx->result = -1;
- return;
- }
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got reconciliation window records for node %d for term %d \n",
+ index, recon_info->last_term);
- // We have all the info for all other nodes.
- // Fill all that info when sending data to that process.
- for (i=0; i < dr->replica_group_size; i++) {
- if ( dr->workers[i].in_use &&
- (dr->workers[i].recon_info->last_term == term)) {
- rr.info[num].last_term =
- dr->workers[i].recon_info->last_term;
- rr.info[num].commited_ops =
- dr->workers[i].recon_info->commited_ops;
- rr.info[num].last_index =
- dr->workers[i].recon_info->last_index;
- rr.info[num].first_index =
- dr->workers[i].recon_info->first_index;
- strcpy(rr.info[num].name,
- dr->workers[i].name);
- }
- num++;
- }
- rr.num = num;
- rr.role = reconciliator;
- ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl
- if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) {
- ctx->result = -1;
- // Put the errno only for this case since we
- // are bothered about retrying only for this case.
- // For rest of the cases we will just return EIO
- // in errno.
- ctx->op_errno = errno;
- return;
- }
+err:
+ GF_FREE(rd);
+}
- nsr_worker_log(this->name, GF_LOG_INFO,
- "sent reconciliator info for term %d with node count as %d\n", term, num);
+/*
+ * Control worker funct for getting changelog info on some other node.
+ * calls glfs functions to seek/read/write on aux_fd.
+ *
+ * Input arguments:
+ * ctx - The per worker based context
+ * control - set to true if this worker is for the control plane
+ */
+static void
+control_worker_func(nsr_per_node_worker_t *ctx,
+ nsr_recon_work_t *work)
+{
+ unsigned int index = ctx->index;
+ nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]);
+ nsr_recon_last_term_info_t lt;
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ int32_t term = htonl(work->index); // overloading it
- break;
- }
- case NSR_WORK_ID_RESOLUTION_DO_WORK:
- {
- nsr_recon_role_t rr;
- unsigned int i=0, j=0;
- unsigned int rec = dr->reconciliator_index;
+ ctx->is_control = _gf_true;
- nsr_worker_log(this->name, GF_LOG_INFO,
- "trying to make this index %d as resolutor with reconciliator as %d\n",index, rec);
+ switch (work->req_id){
- // TBD - error handling for all the glfs APIs
- if (glfs_lseek(ctx->aux_fd,
- nsr_recon_xlator_sector_1,
- SEEK_SET) == -1) {
- ctx->result = -1;
- return;
- }
+ case NSR_WORK_ID_INI:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "calling nsr_recon_start_work\n");
- rr.num = 2;
-
- // Fill in info[0] as info for the node for which we are seeking resolution.
- // Fill in info[1] as info of the reconciliator node.
- // The function nsr_recon_driver_get_role() that will be called when
- // this message reaches the node will look at index 1 for term information
- // related to the reconciliator.
- for (i=0; i < 2; i++) {
- (i == 0) ? (j = index) : (j = rec);
- rr.info[i].last_term =
- dr->workers[j].recon_info->last_term;
- rr.info[i].commited_ops =
- dr->workers[j].recon_info->commited_ops;
- rr.info[i].last_index =
- dr->workers[j].recon_info->last_index;
- rr.info[i].first_index =
- dr->workers[j].recon_info->first_index;
- // The name is used as the key to convert indices since
- // the reconciliator index could be different across the nodes.
- strcpy(rr.info[i].name,
- dr->workers[j].name);
- if (i == 0) {
- nsr_worker_log(this->name, GF_LOG_INFO,
- "this node info term=%d, ops=%d, first=%d, last=%d\n",
- rr.info[i].last_term, rr.info[i].commited_ops,
- rr.info[i].first_index,rr.info[i].last_index);
- } else {
- nsr_worker_log(this->name, GF_LOG_INFO,
- "reconciliator node info term=%d, ops=%d, first=%d, last=%d\n",
- rr.info[i].last_term, rr.info[i].commited_ops,
- rr.info[i].first_index,rr.info[i].last_index);
- }
- }
- rr.role = resolutor;
- ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl
- if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) {
- ctx->result = -1;
- // Put the errno only for this case since we
- // are bothered about retrying only for this case.
- // For rest of the cases we will just return EIO
- // in errno.
- ctx->op_errno = errno;
- return;
- }
+ // TBD - handle error in case nsr_recon_start_work gives error
+ if (nsr_recon_start_work(ctx, _gf_true) != 0) {
+ ctx->result = -1;
+ return;
+ }
- nsr_worker_log(this->name, GF_LOG_INFO,
- "sent message to this node %d resolutor with reconciliator as %d\n", index, rec);
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished nsr_recon_start_work\n");
+ break;
- break;
- }
- case NSR_WORK_ID_GET_RECONCILATION_WINDOW:
- {
- nsr_recon_log_info_t li;
- nsr_reconciliator_info_t *recon_info = rw->recon_info;
- uint32_t i = 0;
- uint32_t num = (dr->workers[index].recon_info->last_index -
- dr->workers[index].recon_info->first_index +1);
- nsr_recon_record_details_t *rd;
+ case NSR_WORK_ID_FINI:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "calling nsr_recon_end_work\n");
- nsr_worker_log(this->name, GF_LOG_INFO,
- "trying to get reconciliation window records for node %d for term %d with first %d last %d\n",
- index, recon_info->last_term, recon_info->first_index, recon_info->last_index);
+ // TBD - handle error in case nsr_recon_end_work gives error
+ if (nsr_recon_end_work(ctx, _gf_true) != 0) {
+ ctx->result = -1;
+ return;
+ }
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished nsr_recon_end_work\n");
+ break;
- // TBD - error handling for all the glfs APIs
- if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET) == -1) {
- ctx->result = -1;
- return;
- }
+ case NSR_WORK_ID_GET_LAST_TERM_INFO:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get last term info for node %d with current term %d\n",index, work->index);
- // write to node what term & indices we are interested
- li.term = recon_info->last_term;
- li.first_index = recon_info->first_index;
- li.last_index = recon_info->last_index;
- ENDIAN_CONVERSION_LI(li, _gf_false); //htonl
- if (glfs_write(ctx->aux_fd, &li, sizeof(li), 0) == -1) {
- ctx->result = -1;
- return;
- }
+ // first write the current term term number
+ // TBD - error handling for all the glfs APIs
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl
+ recon_info->last_term = lt.last_term;
+ recon_info->commited_ops = lt.commited_ops;
+ recon_info->last_index = lt.last_index;
+ recon_info->first_index = lt.first_index;
- // then read
- rd = GF_CALLOC(num,
- sizeof(nsr_recon_record_details_t),
- gf_mt_recon_private_t);
- if (rd == NULL) {
- ctx->result = -1;
- return;
- }
- recon_info->records = GF_CALLOC(num,
- sizeof(nsr_reconciliation_record_t),
- gf_mt_recon_private_t);
- if (recon_info->records == NULL) {
- ctx->result = -1;
- return;
- }
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "out of get last term info with current term %d. got ops %d with first %d and last %d \n",
+ recon_info->last_term, recon_info->commited_ops,
+ recon_info->first_index, recon_info->last_index);
- if (glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0) == -1) {
- ctx->result = -1;
- return;
- }
+ break;
- for (i=0; i < num; i++) {
- ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl
- memcpy(&(recon_info->records[i].rec),
- &(rd[i]),
- sizeof(nsr_recon_record_details_t));
- nsr_worker_log(this->name, GF_LOG_INFO,
- "get_reconcilaition_window:Got %d at index %d\n",
- recon_info->records[i].rec.type,
- i + recon_info->first_index);
- }
- GF_FREE(rd);
+ case NSR_WORK_ID_GET_GIVEN_TERM_INFO:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get term info for node %d for term %d\n",index, work->index);
- nsr_worker_log(this->name, GF_LOG_INFO,
- "got reconciliation window records for node %d for term %d \n",
- index, recon_info->last_term);
- break;
- }
- }
+ // first write the term number
+ // TBD - error handling for all the glfs APIs
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl
+ recon_info->last_term = lt.last_term;
+ recon_info->commited_ops = lt.commited_ops;
+ recon_info->last_index = lt.last_index;
+ recon_info->first_index = lt.first_index;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "out of get term info for term %d. got ops %d with first %d and last %d \n",
+ recon_info->last_term, recon_info->commited_ops,
+ recon_info->first_index, recon_info->last_index);
+
+ break;
+
+ case NSR_WORK_ID_RECONCILIATOR_DO_WORK:
+ control_worker_do_reconciliation(ctx,work);
+ break;
+
+ case NSR_WORK_ID_RESOLUTION_DO_WORK:
+ control_worker_do_resolution(ctx,work);
+ break;
+
+ case NSR_WORK_ID_GET_RECONCILATION_WINDOW:
+ control_worker_get_window(ctx,work);
+ break;
+
+ default:
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "bad work type %d", work->req_id);
+ }
return;
}
@@ -1241,6 +1254,7 @@ nsr_recon_driver_get_role(int32_t *status,
{
uint8_t i=0, j=0;
nsr_recon_role_t *rr = &(rw->role);
+ nsr_reconciliator_info_t *tmp;
// First make all the threads uninitialise
for (i = 0; i < ctx->replica_group_size; i++) {
@@ -1249,49 +1263,60 @@ nsr_recon_driver_get_role(int32_t *status,
return 0;
}
}
- if ((rr->role == leader) || (rr->role == joiner)) {
+
+ switch (rr->role) {
+ case leader:
+ case joiner:
// First set info this node
+ tmp = GF_CALLOC (1, sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_private_t);
+ if (!tmp) {
+ *status = -1;
+ return 0;
+ }
+ ctx->workers[0].recon_info = tmp;
if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) {
*status = -1;
return 0;
}
- ctx->workers[0].recon_info = GF_CALLOC (1,
- sizeof (nsr_reconciliator_info_t),
- gf_mt_recon_private_t);
- if (!ctx->workers[0].recon_info) {
- *status = -1;
- return 0;
- }
ctx->current_term = rr->current_term;
// Find rest of the nodes
for (i=1; i < ctx->replica_group_size; i++) {
- for (j=0 ; j < rr->num; j++) {
- // TBD - make this strcmp later when etcd servers set properly
- if (!strcmp(ctx->workers[i].name, rr->info[j].name)) {
- //if (strstr(ctx->workers[i].name, rr->info[j].name)) {
- nsr_driver_log(this->name, GF_LOG_INFO,
- "nsr_recon_driver_get_role: this as %s. found other server %s\n",
- (rr->role == leader) ? "leader" : "joiner",
- ctx->workers[i].name);
-
- if (nsr_recon_in_use(ctx, i, _gf_true) == -1) {
- *status = -1;
- return 0;
- }
- // Allocate this here. This will get later filled when
- // the leader tries to get last term information from all
- // the nodes
- ctx->workers[i].recon_info = GF_CALLOC (1,
- sizeof (nsr_reconciliator_info_t),
- gf_mt_recon_private_t);
- if (!ctx->workers[i].recon_info) {
- *status = -1;
- return 0;
- }
+ for (j=0 ; /* nothing */; j++) {
+ if (j >= rr->num) {
+ nsr_driver_log (this->name, GF_LOG_ERROR,
+ "failed to find %s",
+ ctx->workers[i].name);
break;
}
+ if (strcmp(ctx->workers[i].name,
+ rr->info[j].name)) {
+ continue;
+ }
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "nsr_recon_driver_get_role: this as %s. found other server %s\n",
+ (rr->role == leader) ? "leader"
+ : "joiner",
+ ctx->workers[i].name);
+
+ // Allocate this here. This will get later
+ // filled when the leader tries to get last term
+ // information from all the nodes
+ tmp = GF_CALLOC (1,
+ sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_private_t);
+ if (!tmp) {
+ *status = -1;
+ return 0;
+ }
+ ctx->workers[i].recon_info = tmp;
+ if (nsr_recon_in_use(ctx, i, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
+ break;
}
}
// If leader, reconciliator has to be chosen.
@@ -1300,83 +1325,98 @@ nsr_recon_driver_get_role(int32_t *status,
ctx->reconciliator_index = -1;
else
ctx->reconciliator_index = 0;
- } else if (rr->role == reconciliator) {
+ break;
+
+ case reconciliator:
ctx->reconciliator_index = 0;
- // Copy information about all the other members which had the same term
+ // Copy information about all the other members which had the
+ // same term
for (i=0; i < rr->num; i++) {
- for (j=0; j < ctx->replica_group_size; j++) {
- if (!strcmp(rr->info[i].name, ctx->workers[j].name)) {
- //if (strstr(ctx->workers[j].name, rr->info[i].name)) {
- nsr_driver_log(this->name, GF_LOG_INFO,
- "nsr_recon_driver_get_role: this as reconciliator. found other server %s\n",
- ctx->workers[j].name);
- ctx->workers[j].recon_info = GF_CALLOC (1,
- sizeof (nsr_reconciliator_info_t),
- gf_mt_recon_private_t);
- if (!ctx->workers[j].recon_info) {
- *status = -1;
- return 0;
- }
- ctx->workers[j].recon_info->last_term =
- rr->info[i].last_term;
- ctx->workers[j].recon_info->commited_ops =
- rr->info[i].commited_ops;
- ctx->workers[j].recon_info->last_index =
- rr->info[i].last_index;
- ctx->workers[j].recon_info->first_index =
- rr->info[i].first_index;
- if (nsr_recon_in_use(ctx, j, _gf_true) == -1) {
- *status = -1;
- return 0;
- }
+ for (j=0; /* nothing */; j++) {
+ if (j >= ctx->replica_group_size) {
+ nsr_driver_log (this->name, GF_LOG_ERROR,
+ "failed to find %s",
+ rr->info[i].name);
break;
}
- }
- }
- } else if (rr->role == resolutor) {
- for (j=0; j < ctx->replica_group_size; j++) {
- // info[1] has the information regarding the reconciliator
- //if (strstr(ctx->workers[j].name, rr->info[1].name)) {
- if (!strcmp(rr->info[1].name, ctx->workers[j].name)) {
+ if (strcmp(rr->info[i].name,
+ ctx->workers[j].name)) {
+ continue;
+ }
nsr_driver_log(this->name, GF_LOG_INFO,
- "nsr_recon_driver_get_role: this as resolutor. found other server %s as reconciliator\n",
+ "nsr_recon_driver_get_role: this as reconciliator. found other server %s\n",
ctx->workers[j].name);
- ctx->workers[j].recon_info = GF_CALLOC (1,
- sizeof (nsr_reconciliator_info_t),
- gf_mt_recon_private_t);
- if (!ctx->workers[j].recon_info) {
- *status = -1;
- return 0;
+ tmp = GF_CALLOC (1,
+ sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_private_t);
+ if (!tmp) {
+ *status = -1;
+ return 0;
}
- ctx->workers[j].recon_info->last_term =
- rr->info[1].last_term;
- ctx->workers[j].recon_info->commited_ops =
- rr->info[1].commited_ops;
- ctx->workers[j].recon_info->last_index =
- rr->info[1].last_index;
- ctx->workers[j].recon_info->first_index =
- rr->info[1].first_index;
- ctx->reconciliator_index = j;
+ tmp->last_term = rr->info[i].last_term;
+ tmp->commited_ops = rr->info[i].commited_ops;
+ tmp->last_index = rr->info[i].last_index;
+ tmp->first_index = rr->info[i].first_index;
+ ctx->workers[j].recon_info = tmp;
if (nsr_recon_in_use(ctx, j, _gf_true) == -1) {
- *status = -1;
- return 0;
- }
- GF_ASSERT(ctx->reconciliator_index != 0);
+ *status = -1;
+ return 0;
+ }
break;
}
}
- ctx->workers[0].recon_info = GF_CALLOC (1,
- sizeof (nsr_reconciliator_info_t),
- gf_mt_recon_private_t);
- if (!ctx->workers[0].recon_info) {
+ break;
+
+ case resolutor:
+ for (j=0; /* nothing */; j++) {
+ // info[1] has the information regarding the
+ // reconciliator
+ if (j >= ctx->replica_group_size) {
+ nsr_driver_log (this->name, GF_LOG_ERROR,
+ "failed to find %s",
+ rr->info[1].name);
+ break;
+ }
+ if (strcmp(rr->info[1].name,
+ ctx->workers[j].name)) {
+ continue;
+ }
+ nsr_driver_log(this->name, GF_LOG_INFO,
+ "nsr_recon_driver_get_role: this as resolutor. found other server %s as reconciliator\n",
+ ctx->workers[j].name);
+ tmp = GF_CALLOC (1,
+ sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_private_t);
+ if (!tmp) {
+ *status = -1;
+ return 0;
+ }
+ tmp->last_term = rr->info[1].last_term;
+ tmp->commited_ops = rr->info[1].commited_ops;
+ tmp->last_index = rr->info[1].last_index;
+ tmp->first_index = rr->info[1].first_index;
+ ctx->reconciliator_index = j;
+ ctx->workers[j].recon_info = tmp;
+ if (nsr_recon_in_use(ctx, j, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
+ GF_ASSERT(ctx->reconciliator_index != 0);
+ break;
+ }
+ tmp = GF_CALLOC (1,
+ sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_private_t);
+ if (!tmp) {
*status = -1;
return 0;
}
// info[0] has all info for this node
- ctx->workers[0].recon_info->last_term = rr->info[0].last_term;
- ctx->workers[0].recon_info->commited_ops = rr->info[0].commited_ops;
- ctx->workers[0].recon_info->last_index = rr->info[0].last_index;
- ctx->workers[0].recon_info->first_index = rr->info[0].first_index;
+ tmp->last_term = rr->info[0].last_term;
+ tmp->commited_ops = rr->info[0].commited_ops;
+ tmp->last_index = rr->info[0].last_index;
+ tmp->first_index = rr->info[0].first_index;
+ ctx->workers[0].recon_info = tmp;
if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) {
*status = -1;
return 0;
@@ -1484,7 +1524,7 @@ apply_record(nsr_per_node_worker_t *ctx,
{
struct glfs_fd *fd = NULL;
struct glfs_object *obj = NULL;
-
+ gf_boolean_t retval = _gf_false;
if (ri->rec.op == GF_FOP_WRITE) {
@@ -1492,40 +1532,36 @@ apply_record(nsr_per_node_worker_t *ctx,
"DOing write for file %s @offset %d for len %d\n",
ri->rec.gfid, ri->rec.offset, ri->rec.len);
- // The file has got deleted on the source. Hence just ignore this.
- // TBD - get a way to just stuff the log entry without writing the data so that
- // changelogs remain identical.
+ // The file has got deleted on the source. Hence just ignore
+ // this.
+ // TBD - get a way to just stuff the log entry without writing
+ // the data so that changelogs remain identical.
if (ri->work.data == NULL) {
return _gf_true;
}
if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL)
- return _gf_false;;
+ goto err;
fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
if (fd == NULL) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return _gf_false;
+ goto err;
}
if (glfs_lseek_with_xdata(fd, ri->rec.offset, SEEK_SET, dict) != ri->rec.offset) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"lseek for file %s failed at offset %d\n",
ri->rec.gfid, ri->rec.offset);
- return _gf_false;
+ goto err;
}
if (glfs_write_with_xdata(fd, ri->work.data, ri->rec.len, 0, dict) != ri->rec.len) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"write for file %s failed for bytes %d\n",
ri->rec.gfid, ri->rec.len);
- return _gf_false;
+ goto err;
}
- if (glfs_close_with_xdata(fd, dict) == -1) {
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "close failed\n");
- return _gf_false;
- }
nsr_worker_log(this->name, GF_LOG_INFO,
"Finished DOing write for gfid %s @offset %d for len %d\n",
@@ -1537,26 +1573,23 @@ apply_record(nsr_per_node_worker_t *ctx,
"DOing truncate for file %s @offset %d \n",
ri->rec.gfid, ri->rec.offset);
- if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) {
+ goto err;
+ }
fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
if (fd == NULL) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return _gf_false;
+ goto err;
}
if (glfs_ftruncate_with_xdata(fd, ri->rec.offset, dict) == -1) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"trunctae for file %s failed @offset %d\n",
ri->rec.gfid,ri->rec.offset );
- return _gf_false;
+ goto err;
}
- if (glfs_close_with_xdata(fd, dict) == -1) {
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "close failed\n");
- return _gf_false;
- }
nsr_worker_log(this->name, GF_LOG_INFO,
"Finished DOing truncate for gfid %s @offset %d \n",
@@ -1575,14 +1608,16 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing set extended attr for file %s \n",
ri->rec.gfid);
- // The file has got deleted on the source. Hence just ignore this.
- // TBD - get a way to just stuff the log entry without writing the data so that
- // changelogs remain identical.
+ // The file has got deleted on the source. Hence just ignore
+ // this. TBD - get a way to just stuff the log entry without
+ // writing the data so that changelogs remain identical.
if (ri->work.data == NULL) {
return _gf_true;
}
- if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) {
+ goto err;
+ }
if (obj->inode->ia_type == IA_IFDIR)
fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
@@ -1592,20 +1627,20 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return _gf_false;
+ goto err;
}
if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) {
if (t_b) free(t_b);
nsr_worker_log(this->name, GF_LOG_ERROR,
"list of xattr of %s failed\n", ri->rec.gfid);
- return _gf_false;
+ goto err;
}
if (delete_xattr(fd, dict, t_b, num) == -1) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"deleting xattrs failed\n");
- return _gf_false;
+ goto err;
}
// Set one special dict flag to indicate the opcode so that
@@ -1613,19 +1648,13 @@ apply_record(nsr_per_node_worker_t *ctx,
if (dict_set_int32(dict,"recon-xattr-opcode",ri->rec.op)) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"setting opcode to %d failed\n",ri->rec.op);
- return _gf_false;
+ goto err;
}
if (fill_xattr(fd, dict, ri->work.data, ri->work.num) == -1) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"filling xattrs failed\n");
- return _gf_false;
- }
-
- if (glfs_close_with_xdata(fd, dict) == -1) {
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "close failed\n");
- return _gf_false;
+ goto err;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1642,7 +1671,9 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - add mode and flags later
uuid_parse(ri->rec.gfid, gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) {
+ goto err;
+ }
nsr_worker_log (this->name, GF_LOG_INFO,
"creating with mode 0%o", ri->rec.mode);
@@ -1650,7 +1681,7 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing create for file %s\n",
ri->rec.entry);
- return _gf_false;
+ goto err;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1667,13 +1698,15 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - add mode and flags later
uuid_parse(ri->rec.gfid, gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) {
+ goto err;
+ }
if (glfs_h_mknod_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, 0777, NULL, gfid, dict) == NULL) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing mknod for file %s\n",
ri->rec.entry);
- return _gf_false;
+ goto err;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1690,13 +1723,15 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - add mode and flags later
uuid_parse(ri->rec.gfid, gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) {
+ goto err;
+ }
if (glfs_h_mkdir_with_xdata(ctx->fs, obj, ri->rec.entry, 0777, NULL, gfid, dict) != 0) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing mkdir for file %s\n",
ri->rec.entry);
- return _gf_false;
+ goto err;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1709,12 +1744,14 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing rmdir/ublink for dir %s \n",
ri->rec.entry);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) {
+ goto err;
+ }
if (glfs_h_unlink_with_xdata(ctx->fs, obj, ri->rec.entry, dict) != 0) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing rmdir/unlink for file %s\n",
ri->rec.entry);
- return _gf_false;
+ goto err;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1729,14 +1766,16 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing symlink for file %s to file %s \n",
ri->rec.entry, ri->rec.link_path);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) {
+ goto err;
+ }
uuid_parse(ri->rec.gfid, gfid);
if (glfs_h_symlink_with_xdata(ctx->fs, obj, ri->rec.entry, ri->rec.link_path, NULL, gfid, dict) == NULL) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failed to Doing symlink for file %s to file %s \n",
ri->rec.entry, ri->rec.link_path);
- return _gf_false;
+ goto err;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1751,14 +1790,18 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing hard link for file %s to file %s \n",
ri->rec.entry, ri->rec.gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
- if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) {
+ goto err;
+ }
+ if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) {
+ goto err;
+ }
if (glfs_h_link_with_xdata(ctx->fs, to_obj, obj, ri->rec.entry, dict) == -1) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failed to Doing hard link for file %s to file %s \n",
ri->rec.entry, ri->rec.gfid);
- return _gf_false;
+ goto err;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1773,14 +1816,18 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing rename for file %s to file %s \n",
ri->rec.entry, ri->rec.newloc);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
- if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) {
+ goto err;
+ }
+ if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) {
+ goto err;
+ }
if (glfs_h_rename_with_xdata(ctx->fs, obj, ri->rec.entry, to_obj, ri->rec.newloc, dict) == -1) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failed to Doing rename for file %s to file %s \n",
ri->rec.entry, ri->rec.newloc);
- return _gf_false;
+ goto err;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1801,7 +1848,9 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing attr for file %s \n",
ri->rec.gfid);
- if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) {
+ goto err;
+ }
if (obj->inode->ia_type == IA_IFDIR)
fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
@@ -1811,7 +1860,7 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return _gf_false;
+ goto err;
}
iatt.ia_prot = ia_prot_from_st_mode(777);
@@ -1823,7 +1872,7 @@ apply_record(nsr_per_node_worker_t *ctx,
if (dict_set_int32(dict,"recon-attr-opcode",ri->rec.op)) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"setting opcode to %d failed\n",ri->rec.op);
- return _gf_false;
+ goto err;
}
ret = glfs_fsetattr_with_xdata(fd, &iatt, valid, dict);
@@ -1831,21 +1880,37 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"failed Doing attr for file %s \n",
ri->rec.gfid);
- return _gf_false;
+ goto err;
}
- if (glfs_close_with_xdata(fd, dict) == -1) {
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "close failed\n");
- return _gf_false;
- }
nsr_worker_log(this->name, GF_LOG_INFO,
"Doing attr for file %s \n",
ri->rec.gfid);
}
- return _gf_true;
+ retval = _gf_true;
+
+err:
+ if (fd) {
+ /*
+ * It's not clear that we should be passing the same dict to
+ * glfs_close that was passed to us for glfs_open, but that's
+ * the prior behavior so let's preserve it for now.
+ */
+ if (glfs_close_with_xdata(fd, dict) == -1) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "close failed\n");
+ }
+ }
+ if (obj) {
+ /*
+ * AFAICT fd operations do not borrow this reference, so we
+ * still need to drop it ourselves.
+ */
+ glfs_h_close(obj);
+ }
+ return retval;
}
//return back opcodes that requires reading from source
@@ -2379,38 +2444,463 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
gf_boolean_t send = _gf_false;
int32_t status =0, op_errno = 0;
+ send = (ctx->workers[i].in_use != in_use);
+ ctx->workers[i].in_use = in_use;
+
+ if (!send) {
+ return 0;
+ }
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "sending %s to index %d\n",in_use?"INI":"FINI",i);
+
+ send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx,
+ (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto err;
+
+ send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx,
+ (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
+ NSR_RECON_QUEUE_TO_DATA, -1);
+ if (status == -1)
+ goto err;
+
+ /*
+ * We really need better error recovery. To activate a worker, we
+ * allocate memory and send two messages. If any of those actions
+ * fail, we should undo the others. It would probably be good to
+ * collapse the two messages into one, because it's pretty trivial to
+ * allocate a temporary structure and either link it in or free it
+ * depending on the result here.
+ */
+
if (in_use == _gf_false) {
- if (ctx->workers[i].in_use == _gf_true)
- send = _gf_true;
- ctx->workers[i].in_use = _gf_false;
- } else {
- if (ctx->workers[i].in_use != _gf_true) {
- ctx->workers[i].in_use = _gf_true;
- send = _gf_true;
- }
+ GF_FREE(ctx->workers[i].recon_info);
}
- if (send == _gf_true) {
- if (in_use == _gf_true) {
- nsr_driver_log(this->name, GF_LOG_INFO, "sending INI to index %d\n",i);
- } else {
- nsr_driver_log(this->name, GF_LOG_INFO, "sending FINI to index %d\n",i);
+
+ return 0;
+
+err:
+ GF_FREE(ctx->workers[i].recon_info);
+ ctx->workers[i].recon_info = NULL;
+ return -1;
+}
+
+gf_boolean_t
+nsr_recon_driver_reconciliator (nsr_recon_private_t *priv)
+{
+ uint32_t replica_group_size = priv->replica_group_size;
+ uint32_t i;
+ nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context;
+ int32_t bm;
+ int32_t status = 0;
+ int32_t op_errno = 0;
+ gf_boolean_t do_recon = _gf_false;
+ uint32_t start_index = ctx->workers[0].recon_info->first_index;
+ uint32_t end_index = ctx->workers[0].recon_info->last_index;
+ uint32_t num = ((start_index == 0) && (end_index == 0)) ? 0 : (end_index - start_index + 1);
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "starting reconciliation work as reconciliator \n");
+
+ // nothing to be done? signal back to the recon translator that this
+ // phase done.
+ bm = 1;
+ for (i=1; i < replica_group_size; i++) {
+ if (ctx->workers[i].in_use &&
+ (ctx->workers[0].recon_info->last_term == ctx->workers[i].recon_info->last_term)) {
+ ctx->workers[i].recon_info->last_index = end_index;
+ ctx->workers[i].recon_info->first_index = start_index;
+ bm |= (1 << i);
+ do_recon = _gf_true;
}
- send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx,
- (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
+ }
+
+ if (!do_recon || !num) {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "nothing needs to be done as resolutor \n");
+ return _gf_true;
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "getting reconciliation window for term %d from %dto %d \n",
+ ctx->workers[0].recon_info->last_term,
+ start_index, end_index);
+ // We have set the bm in the above for loop where we go thru all nodes
+ // including this node that have seen the last term.
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_GET_RECONCILATION_WINDOW,
NSR_RECON_QUEUE_TO_CONTROL, -1);
- if (status == -1)
- return -1;
- send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx,
- (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
- NSR_RECON_QUEUE_TO_DATA, -1);
- if (status == -1)
- return -1;
- if (in_use == _gf_false) {
- //GF_FREE(ctx->workers[i].recon_info->records);
- GF_FREE(ctx->workers[i].recon_info);
+ if (status == -1)
+ return _gf_false;
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished getting reconciliation window for term %d from %dto %d \n",
+ ctx->workers[0].recon_info->last_term,
+ start_index, end_index);
+
+
+ // from the changelogs, calculate the entries that need action and the
+ // source for each of these entries
+ compute_reconciliation_work(ctx);
+
+ // for each of the entries that need fixup, issue IO
+ for (i=start_index; i < (start_index + num); i++) {
+ nsr_reconciliator_info_t *my_recon_info =
+ ctx->workers[0].recon_info;
+ nsr_reconciliation_record_t *record =
+ &(my_recon_info->records[i - start_index]);
+
+ record->work.term = ctx->workers[0].recon_info->last_term;
+ record->work.index = i;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "fixing index %d\n",i);
+ if ((record->work.type == NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE) ||
+ (record->work.type == NSR_RECON_WORK_HOLE_TO_FILL)) {
+ // 1st case (RECON_WORK_HOLE_TO_PSEUDO_HOLE): If there
+ // are only pseudo_holes in others, it is best effort.
+ // Just pick from the first node that has it and
+ // proceed.
+ // 2nd case (RECON_WORK_HOLE_TO_FILL): this node has
+ // either a HOLE or PSUEDO_HOLE and some one else has a
+ // FILL(source). analyse the changelog to check if data
+ // needs to be read or if the log has all the data
+ // required
+
+ if (recon_check_changelog(&record->rec)) {
+ bm = (1 << record->work.source);
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "reading data from source %d\n",record->work.source);
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_READ,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+ if (status == -1)
+ return _gf_false;
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "got data from source %d\n",record->work.source);
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "fixing local data as part of reconciliation\n");
+
+ bm = 1;
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+ if (status == -1)
+ return _gf_false;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished fixing local data as part of reconciliation\n");
+
+ } else if (record->work.type == NSR_RECON_WORK_COMPARE_PSEUDO_HOLE) {
+ // this node has a pseudo_hole and some others have just
+ // that too. Just convert this to FILL. let others
+ // blindly pick it from here.
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "fixing this record as a fill\n");
+ bm = 1;
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+ if (status == -1)
+ return _gf_false;
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished fixing this record as a fill\n");
}
- }
- return 0;
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished reconciliation work as reconciliator \n");
+
+ // tbd - mark this term golden in the reconciliator
+ return _gf_true;
+}
+
+gf_boolean_t
+nsr_recon_driver_resolutor (nsr_recon_private_t *priv)
+{
+ uint32_t replica_group_size = priv->replica_group_size;
+ uint32_t i;
+ nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context;
+ int32_t bm;
+ int32_t status = 0;
+ int32_t op_errno = 0;
+ // This node's last term is filled when it gets a message from the
+ // leader to act as a reconciliator.
+ uint32_t recon_index = ctx->reconciliator_index;
+ nsr_reconciliator_info_t *my_info =
+ ctx->workers[0].recon_info;
+ nsr_reconciliator_info_t *his_info =
+ ctx->workers[recon_index].recon_info;
+ uint32_t my_last_term = my_info->last_term;
+ uint32_t to_do_term = his_info->last_term;
+ uint32_t my_start_index = 1, my_end_index = 1;
+ uint32_t his_start_index = 1, his_end_index = 1;
+ uint32_t num = 0;
+ gf_boolean_t fl = _gf_true;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "starting resolutor work with reconciliator as %d from term %d to term %d \n",
+ recon_index, my_last_term, to_do_term);
+
+ do {
+
+ if (!fl) {
+ (his_info->last_term)++;
+ (my_info->last_term)++;
+ } else {
+ his_info->last_term = my_last_term;
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "resolving term %d \n", my_info->last_term);
+
+ // Get reconciliator's term information for that term
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "getting info from reconciliator for term %d \n", my_info->last_term);
+ bm = (1 << recon_index);
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_GET_GIVEN_TERM_INFO,
+ NSR_RECON_QUEUE_TO_CONTROL, his_info->last_term);
+ if (status == -1)
+ return _gf_false;
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished getting info from reconciliator for term %d \n", my_info->last_term);
+
+
+ // empty term
+ if (!his_info->commited_ops) {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "reconciliator for term %d is empty. moving to next term. \n", my_info->last_term);
+ // TBD - mark the term golden
+ fl = _gf_false;
+ continue;
+ }
+
+ // calculate the resolution window boundary. for the last term
+ // this node saw, we compare the resolution window of this and
+ // reconciliator. for the rest of the nodes, we just accept the
+ // reconciliator info.
+ if (fl) {
+ my_start_index = my_info->first_index;
+ my_end_index = my_info->last_index;
+ his_start_index = his_info->first_index;
+ his_end_index = his_info->last_index;
+ my_info->first_index = (my_start_index < his_start_index) ? my_start_index : his_start_index;
+ my_info->last_index = (my_end_index > his_end_index) ? my_end_index : his_end_index;
+ } else {
+ my_info->first_index = his_info->first_index;
+ my_info->last_index = his_info->last_index;
+ my_info->commited_ops = his_info->commited_ops;
+ }
+ if (my_info->first_index == 0)
+ my_info->first_index = 1;
+ num = (my_info->last_index - my_info->first_index) + 1;
+
+
+ // Get the logs from the reconciliator (and this node for this
+ // term)
+ if (fl)
+ bm = ((1 << recon_index) | 1);
+ else
+ bm = (1 << recon_index);
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "getting reconciliation window for term %d from %d to %d \n",
+ my_info->last_term,
+ my_info->first_index, my_info->last_index);
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_GET_RECONCILATION_WINDOW,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ return _gf_false;
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished getting reconciliation window for term %d from %d to %d \n",
+ my_info->last_term,
+ my_info->first_index, my_info->last_index);
+
+ // from the changelogs, calculate the entries that need action
+ compute_resolution_work(ctx, my_info, his_info, !fl);
+
+
+ // for each of the entries that need fixup, issue IO
+ for (i=my_info->first_index; i < (my_info->first_index + num); i++) {
+ nsr_reconciliation_record_t *record =
+ &(my_info->records[i - my_info->first_index]);
+
+ record->work.term = my_info->last_term;
+ record->work.index = i;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "fixing index %d\n",i);
+ if ((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) ||
+ (record->work.type == NSR_RECON_WORK_UNDO_FILL)) {
+ if (((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) &&
+ recon_check_changelog(&record->rec)) ||
+ ((record->work.type == NSR_RECON_WORK_UNDO_FILL) &&
+ recon_compute_undo(&record->rec))) {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "reading data from source %d\n",recon_index);
+ bm = (1 << recon_index);
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_READ,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+ if (status == -1)
+ return _gf_false;
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished reading data from source %d\n",recon_index);
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "fixing local data as part of resolutor\n");
+
+ bm = 1;
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+ if (status == -1)
+ return _gf_false;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished fixing local data as part of resolutor\n");
+ }
+ }
+ fl = _gf_false;
+
+ // tbd - mark this term golden in the reconciliator
+ } while (my_last_term++ != to_do_term);
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished resolutor work \n");
+ return _gf_true;
+}
+
+gf_boolean_t
+nsr_recon_driver_leader (nsr_recon_private_t *priv)
+{
+ uint32_t replica_group_size = priv->replica_group_size;
+ uint32_t i;
+ nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context;
+ int32_t bm;
+ int32_t status = 0;
+ int32_t op_errno = 0;
+ int32_t chosen = -1;
+ int32_t last_term = -1, last_ops = -1;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "getting last term info from all members of this group\n");
+ // Get last term info from all members for this group
+ send_and_wait(&status, &op_errno, -1,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_GET_LAST_TERM_INFO,
+ NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term);
+ if (status == -1)
+ return _gf_false;
+
+
+ // compare all the info received and choose the reconciliator First
+ // choose all with latest term
+ for (i=0; i < replica_group_size; i++) {
+ if (ctx->workers[i].in_use) {
+ if (ctx->workers[i].recon_info->last_term > last_term) {
+ last_term = ctx->workers[i].recon_info->last_term;
+ }
+ }
+ }
+ // First choose all with latest term and highest ops
+ for (i=0; i < replica_group_size; i++) {
+ if ((ctx->workers[i].in_use) && (last_term == ctx->workers[i].recon_info->last_term)) {
+ if (ctx->workers[i].recon_info->commited_ops > last_ops) {
+ last_ops = ctx->workers[i].recon_info->commited_ops;
+ }
+ }
+ }
+ // choose the first among the lot
+ for (i=0; i < replica_group_size; i++) {
+ if ((ctx->workers[i].in_use) &&
+ (last_term == ctx->workers[i].recon_info->last_term) &&
+ (last_ops == ctx->workers[i].recon_info->commited_ops)) {
+ chosen = i;
+ break;
+ }
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "reconciliator chosen is %d\n", chosen);
+ ctx->reconciliator_index = chosen;
+ GF_ASSERT(chosen != -1);
+ if (chosen == -1) {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "no reconciliatior chosen\n");
+ return _gf_false;
+ }
+
+ // send the message to reconciliator to do reconciliation with list of
+ // nodes that are part of this quorum
+ if (chosen != 0) {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "sending reconciliation work to %d\n", chosen);
+ bm = 1 << ctx->reconciliator_index;
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_RECONCILIATOR_DO_WORK,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ return _gf_false;
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished reconciliation work to %d\n", chosen);
+ } else {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "local node is reconciliator. before set jmp\n");
+ nsr_recon_driver_reconciliator(priv);
+ }
+
+ // send message to all other nodes to sync up with the reconciliator
+ // including itself if required
+ // requires optimisation - TBD
+ if (chosen != 0) {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "local node resolution needs to be done. before set jmp\n");
+ nsr_recon_driver_resolutor(priv);
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "sending resolution work to all nodes except this node and reconciliator\n");
+ bm = ~((1 << ctx->reconciliator_index) || 1);
+ send_and_wait(&status, &op_errno, bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_RESOLUTION_DO_WORK,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ return _gf_false;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished reconciliation work as leader \n");
+ return _gf_true;
}
// main recon driver thread
@@ -2531,424 +3021,25 @@ nsr_reconciliation_driver(void *arg)
goto out;
}
- if (state == leader) {
+ switch (state) {
- int32_t chosen = -1;
- int32_t last_term = -1, last_ops = -1;
-
- nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n");
- // Get last term info from all members for this group
- send_and_wait(&status, &op_errno, -1,
- replica_group_size,
- ctx,
- NSR_WORK_ID_GET_LAST_TERM_INFO,
- NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term);
- if (status == -1)
- goto out;
-
-
- // compare all the info received and choose the reconciliator
- // First choose all with latest term
- for (i=0; i < replica_group_size; i++) {
- if (ctx->workers[i].in_use) {
- if (ctx->workers[i].recon_info->last_term > last_term) {
- last_term = ctx->workers[i].recon_info->last_term;
- }
- }
- }
- // First choose all with latest term and highest ops
- for (i=0; i < replica_group_size; i++) {
- if ((ctx->workers[i].in_use) && (last_term == ctx->workers[i].recon_info->last_term)) {
- if (ctx->workers[i].recon_info->commited_ops > last_ops) {
- last_ops = ctx->workers[i].recon_info->commited_ops;
- }
- }
- }
- // choose the first among the lot
- for (i=0; i < replica_group_size; i++) {
- if ((ctx->workers[i].in_use) &&
- (last_term == ctx->workers[i].recon_info->last_term) &&
- (last_ops == ctx->workers[i].recon_info->commited_ops)) {
- chosen = i;
- break;
- }
- }
-
- nsr_driver_log (this->name, GF_LOG_INFO, "reconciliator chosen is %d\n", chosen);
- ctx->reconciliator_index = chosen;
- GF_ASSERT(chosen != -1);
- if (chosen == -1) {
- nsr_driver_log (this->name, GF_LOG_INFO, "no reconciliatior chosen\n");
+ case leader:
+ if (!nsr_recon_driver_leader(priv)) {
goto out;
}
-
- // send the message to reconciliator to do reconciliation with list of nodes that are part of this quorum
- if (chosen != 0) {
- nsr_driver_log (this->name, GF_LOG_INFO, "sending reconciliation work to %d\n", chosen);
- bm = 1 << ctx->reconciliator_index;
- send_and_wait(&status, &op_errno, bm,
- replica_group_size,
- ctx,
- NSR_WORK_ID_RECONCILIATOR_DO_WORK,
- NSR_RECON_QUEUE_TO_CONTROL, -1);
- if (status == -1)
- goto out;
- nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work to %d\n", chosen);
- } else {
- nsr_driver_log (this->name, GF_LOG_INFO, "local node is reconciliator. before set jmp\n");
- ctx->env = calloc(1,sizeof(jmp_buf));
- /*
- * REVIEW
- * Use of setjmp/longjmp in an environment
- * where we already use ucontext is dangerous
- * and therefore forbidden. Refactoring will
- * also help with some of the rampant 80-column
- * violations and indented code crawling across
- * the screen, which together make this entire
- * file almost unreadable.
- */
- if (!setjmp(*(ctx->env))) {
- state = reconciliator;
- goto i_am_reconciliator;
- } else {
- nsr_driver_log (this->name, GF_LOG_INFO, "long jmp return to leader\n");
- free(ctx->env);
- ctx->env = NULL;
- state = leader;
- }
- }
-
- // send message to all other nodes to sync up with the reconciliator including itself if required
- // requires optimisation - TBD
- if (chosen != 0) {
- nsr_driver_log (this->name, GF_LOG_INFO, "local node resolution needs to be done. before set jmp\n");
- ctx->env = calloc(1,sizeof(jmp_buf));
- if (!setjmp(*(ctx->env))) {
- state = resolutor;
- goto i_am_resolutor;
- } else {
- nsr_driver_log (this->name, GF_LOG_INFO, "long jmp return to leader\n");
- free(ctx->env);
- ctx->env = NULL;
- state = leader;
- }
- }
-
- nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this node and reconciliator\n");
- bm = ~((1 << ctx->reconciliator_index) || 1);
- send_and_wait(&status, &op_errno, bm,
- replica_group_size,
- ctx,
- NSR_WORK_ID_RESOLUTION_DO_WORK,
- NSR_RECON_QUEUE_TO_CONTROL, -1);
- if (status == -1)
- goto out;
-
- nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work as leader \n");
-
- }
-i_am_reconciliator:
- if (state == reconciliator) {
- gf_boolean_t do_recon = _gf_false;
- uint32_t start_index = ctx->workers[0].recon_info->first_index;
- uint32_t end_index = ctx->workers[0].recon_info->last_index;
- uint32_t num = ((start_index == 0) && (end_index == 0)) ? 0 : (end_index - start_index + 1);
-
- nsr_driver_log (this->name, GF_LOG_INFO, "starting reconciliation work as reconciliator \n");
-
- // nothing to be done? signal back to the recon translator that this phase done.
- bm = 1;
- for (i=1; i < replica_group_size; i++) {
- if (ctx->workers[i].in_use &&
- (ctx->workers[0].recon_info->last_term == ctx->workers[i].recon_info->last_term)) {
- ctx->workers[i].recon_info->last_index = end_index;
- ctx->workers[i].recon_info->first_index = start_index;
- bm |= (1 << i);
- do_recon = _gf_true;
- }
- }
-
- if (!do_recon || !num) {
- nsr_driver_log (this->name, GF_LOG_INFO, "nothing needs to be done as resolutor \n");
- if (ctx->env) {
- nsr_driver_log (this->name, GF_LOG_INFO, "before longjmp \n");
- longjmp(*(ctx->env), 1);
- } else {
- goto out;
- }
- }
-
- nsr_driver_log (this->name, GF_LOG_INFO,
- "getting reconciliation window for term %d from %dto %d \n",
- ctx->workers[0].recon_info->last_term,
- start_index, end_index);
- // We have set the bm in the above for loop where
- // we go thru all nodes including this node that
- // have seen the last term.
- send_and_wait(&status, &op_errno, bm,
- replica_group_size,
- ctx,
- NSR_WORK_ID_GET_RECONCILATION_WINDOW,
- NSR_RECON_QUEUE_TO_CONTROL, -1);
- if (status == -1)
- goto out;
- nsr_driver_log (this->name, GF_LOG_INFO,
- "finished getting reconciliation window for term %d from %dto %d \n",
- ctx->workers[0].recon_info->last_term,
- start_index, end_index);
-
-
- // from the changelogs, calculate the entries
- // that need action and the source for each of these entries
- compute_reconciliation_work(ctx);
-
- // for each of the entries that need fixup, issue IO
- for (i=start_index; i < (start_index + num); i++) {
- nsr_reconciliator_info_t *my_recon_info =
- ctx->workers[0].recon_info;
- nsr_reconciliation_record_t *record =
- &(my_recon_info->records[i - start_index]);
-
- record->work.term = ctx->workers[0].recon_info->last_term;
- record->work.index = i;
-
- nsr_driver_log (this->name, GF_LOG_INFO,
- "fixing index %d\n",i);
- if ((record->work.type == NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE) ||
- (record->work.type == NSR_RECON_WORK_HOLE_TO_FILL)) {
- // 1st case (RECON_WORK_HOLE_TO_PSEUDO_HOLE):
- // If there are only pseudo_holes in others, it is best effort.
- // Just pick from the first node that has it and proceed.
- // 2nd case (RECON_WORK_HOLE_TO_FILL):
- // this node has either a HOLE or PSUEDO_HOLE and some one else has a FILL(source).
- // analyse the changelog to check if data needs to be read or if the log has all the data required
-
- if (recon_check_changelog(&record->rec)) {
- bm = (1 << record->work.source);
- nsr_driver_log (this->name, GF_LOG_INFO,
- "reading data from source %d\n",record->work.source);
- send_and_wait(&status, &op_errno, bm,
- replica_group_size,
- ctx,
- NSR_WORK_ID_SINGLE_RECONCILIATION_READ,
- NSR_RECON_QUEUE_TO_DATA,
- i);
- if (status == -1)
- goto out;
- nsr_driver_log (this->name, GF_LOG_INFO,
- "got data from source %d\n",record->work.source);
- }
-
- nsr_driver_log (this->name, GF_LOG_INFO,
- "fixing local data as part of reconciliation\n");
-
- bm = 1;
- send_and_wait(&status, &op_errno, bm,
- replica_group_size,
- ctx,
- NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT,
- NSR_RECON_QUEUE_TO_DATA,
- i);
- if (status == -1)
- goto out;
-
- nsr_driver_log (this->name, GF_LOG_INFO,
- "finished fixing local data as part of reconciliation\n");
-
- } else if (record->work.type == NSR_RECON_WORK_COMPARE_PSEUDO_HOLE) {
- // this node has a pseudo_hole and some others have just that too. Just convert this to FILL.
- // let others blindly pick it from here.
- nsr_driver_log (this->name, GF_LOG_INFO,
- "fixing this record as a fill\n");
- bm = 1;
- send_and_wait(&status, &op_errno, bm,
- replica_group_size,
- ctx,
- NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH,
- NSR_RECON_QUEUE_TO_DATA,
- i);
- if (status == -1)
- goto out;
- nsr_driver_log (this->name, GF_LOG_INFO,
- "finished fixing this record as a fill\n");
- }
- }
-
- nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work as reconciliator \n");
-
- if (ctx->env) {
- nsr_driver_log (this->name, GF_LOG_INFO, "before longjmp \n");
- longjmp(*(ctx->env), 1);
+ break;
+ case reconciliator:
+ if (!nsr_recon_driver_reconciliator(priv)) {
+ goto out;
}
-
- // tbd - mark this term golden in the reconciliator
-
- }
-i_am_resolutor:
- if (state == resolutor) {
-
- // This node's last term is filled when it gets a message
- // from the leader to act as a reconciliator.
- uint32_t recon_index = ctx->reconciliator_index;
- nsr_reconciliator_info_t *my_info =
- ctx->workers[0].recon_info;
- nsr_reconciliator_info_t *his_info =
- ctx->workers[recon_index].recon_info;
- uint32_t my_last_term = my_info->last_term;
- uint32_t to_do_term = his_info->last_term;
- uint32_t my_start_index = 1, my_end_index = 1;
- uint32_t his_start_index = 1, his_end_index = 1;
- uint32_t num = 0;
- gf_boolean_t fl = _gf_true;
-
- nsr_driver_log (this->name, GF_LOG_INFO,
- "starting resolutor work with reconciliator as %d from term %d to term %d \n",
- recon_index, my_last_term, to_do_term);
-
- do {
-
- if (!fl) {
- (his_info->last_term)++;
- (my_info->last_term)++;
- } else {
- his_info->last_term = my_last_term;
- }
-
- nsr_driver_log (this->name, GF_LOG_INFO, "resolving term %d \n", my_info->last_term);
-
- // Get reconciliator's term information for that term
- nsr_driver_log (this->name, GF_LOG_INFO,
- "getting info from reconciliator for term %d \n", my_info->last_term);
- bm = (1 << recon_index);
- send_and_wait(&status, &op_errno, bm,
- replica_group_size,
- ctx,
- NSR_WORK_ID_GET_GIVEN_TERM_INFO,
- NSR_RECON_QUEUE_TO_CONTROL, his_info->last_term);
- if (status == -1)
- goto out;
- nsr_driver_log (this->name, GF_LOG_INFO,
- "finished getting info from reconciliator for term %d \n", my_info->last_term);
-
-
- // empty term
- if (!his_info->commited_ops) {
- nsr_driver_log (this->name, GF_LOG_INFO,
- "reconciliator for term %d is empty. moving to next term. \n", my_info->last_term);
- // TBD - mark the term golden
- fl = _gf_false;
- continue;
- }
-
- // calculate the resolution window boundary.
- // for the last term this node saw, we compare the resolution window of this and reconciliator.
- // for the rest of the nodes, we just accept the reconciliator info.
- if (fl) {
- my_start_index = my_info->first_index;
- my_end_index = my_info->last_index;
- his_start_index = his_info->first_index;
- his_end_index = his_info->last_index;
- my_info->first_index = (my_start_index < his_start_index) ? my_start_index : his_start_index;
- my_info->last_index = (my_end_index > his_end_index) ? my_end_index : his_end_index;
- } else {
- my_info->first_index = his_info->first_index;
- my_info->last_index = his_info->last_index;
- my_info->commited_ops = his_info->commited_ops;
- }
- if (my_info->first_index == 0)
- my_info->first_index = 1;
- num = (my_info->last_index - my_info->first_index) + 1;
-
-
- // Get the logs from the reconciliator (and this node for this term)
- if (fl)
- bm = ((1 << recon_index) | 1);
- else
- bm = (1 << recon_index);
-
- nsr_driver_log (this->name, GF_LOG_INFO,
- "getting reconciliation window for term %d from %d to %d \n",
- my_info->last_term,
- my_info->first_index, my_info->last_index);
- send_and_wait(&status, &op_errno, bm,
- replica_group_size,
- ctx,
- NSR_WORK_ID_GET_RECONCILATION_WINDOW,
- NSR_RECON_QUEUE_TO_CONTROL, -1);
- if (status == -1)
- goto out;
- nsr_driver_log (this->name, GF_LOG_INFO,
- "finished getting reconciliation window for term %d from %d to %d \n",
- my_info->last_term,
- my_info->first_index, my_info->last_index);
-
- // from the changelogs, calculate the entries that need action
- compute_resolution_work(ctx, my_info, his_info, !fl);
-
-
- // for each of the entries that need fixup, issue IO
- for (i=my_info->first_index; i < (my_info->first_index + num); i++) {
- nsr_reconciliation_record_t *record =
- &(my_info->records[i - my_info->first_index]);
-
- record->work.term = my_info->last_term;
- record->work.index = i;
-
- nsr_driver_log (this->name, GF_LOG_INFO, "fixing index %d\n",i);
- if ((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) ||
- (record->work.type == NSR_RECON_WORK_UNDO_FILL)) {
- if (((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) &&
- recon_check_changelog(&record->rec)) ||
- ((record->work.type == NSR_RECON_WORK_UNDO_FILL) &&
- recon_compute_undo(&record->rec))) {
- nsr_driver_log (this->name, GF_LOG_INFO,
- "reading data from source %d\n",recon_index);
- bm = (1 << recon_index);
- send_and_wait(&status, &op_errno, bm,
- replica_group_size,
- ctx,
- NSR_WORK_ID_SINGLE_RECONCILIATION_READ,
- NSR_RECON_QUEUE_TO_DATA,
- i);
- if (status == -1)
- goto out;
- nsr_driver_log (this->name, GF_LOG_INFO,
- "finished reading data from source %d\n",recon_index);
- }
-
- nsr_driver_log (this->name, GF_LOG_INFO,
- "fixing local data as part of resolutor\n");
-
- bm = 1;
- send_and_wait(&status, &op_errno, bm,
- replica_group_size,
- ctx,
- NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT,
- NSR_RECON_QUEUE_TO_DATA,
- i);
- if (status == -1)
- goto out;
-
- nsr_driver_log (this->name, GF_LOG_INFO,
- "finished fixing local data as part of resolutor\n");
- }
- }
- fl = _gf_false;
-
- // tbd - mark this term golden in the reconciliator
- } while (my_last_term++ != to_do_term);
-
- nsr_driver_log (this->name, GF_LOG_INFO,
- "finished resolutor work \n");
-
- if (ctx->env) {
- nsr_driver_log (this->name, GF_LOG_INFO, "before longjmp \n");
- longjmp(*(ctx->env), 1);
+ break;
+ case resolutor:
+ if (!nsr_recon_driver_resolutor(priv)) {
+ goto out;
}
+ break;
- }
-
- if (state == joiner) {
+ case joiner:
nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n");
// Get last term info from all members for this group
@@ -2976,6 +3067,9 @@ i_am_resolutor:
nsr_driver_log (this->name, GF_LOG_INFO,
"finished recon work as joiner \n");
+ default:
+ nsr_driver_log (this->name, GF_LOG_ERROR,
+ "bad state %d", state);
}
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.h b/xlators/cluster/nsr-recon/src/recon_driver.h
index 8d87e29af..3efb26269 100644
--- a/xlators/cluster/nsr-recon/src/recon_driver.h
+++ b/xlators/cluster/nsr-recon/src/recon_driver.h
@@ -13,7 +13,6 @@
#include "api/src/glfs.h"
-#include <setjmp.h>
#define MAX_HOSTNAME_LEN 32
#define MAXIMUM_REPLICA_STRENGTH 8
@@ -260,7 +259,6 @@ typedef struct _nsr_recon_driver_ctxt {
uint32_t reconciliator_index;
uint32_t term;
uint32_t current_term;
- jmp_buf *env;
nsr_mode_t mode; // default set to seq
#if defined(NSR_DEBUG)
FILE *fp;