From 87d68c5bbd085b081bb73c4bfd83115db8cdc308 Mon Sep 17 00:00:00 2001 From: Jeff Darcy Date: Mon, 3 Mar 2014 20:10:01 +0000 Subject: Fix leaks introduced by error-checking patch. Specifically, I050003a819d2314c8fdfd111df465041c30ee6e3 As usual, the best way to make sure resources get reclaimed is to make sure all return paths go through common cleanup code. This meant a lot of refactoring. Besides general readability benefits, this also got rid of the setjmp/longjmp nonsense flagged in a previous review. Change-Id: Ic232cf342a5168bfc33f6e0a0c8f0530d88f7c5e Signed-off-by: Jeff Darcy --- xlators/cluster/nsr-recon/src/recon_driver.c | 1840 ++++++++++++++------------ xlators/cluster/nsr-recon/src/recon_driver.h | 2 - 2 files changed, 967 insertions(+), 875 deletions(-) (limited to 'xlators/cluster/nsr-recon') diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c index 1e68414ee..b02abf2bb 100644 --- a/xlators/cluster/nsr-recon/src/recon_driver.c +++ b/xlators/cluster/nsr-recon/src/recon_driver.c @@ -640,7 +640,6 @@ control_worker_func_0(nsr_per_node_worker_t *ctx, { // For local resolution, the main driver thread does it. // SO there is no way we can have this message for this node. - GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_INFO, "this message should not be sent \n"); @@ -648,7 +647,6 @@ control_worker_func_0(nsr_per_node_worker_t *ctx, } case NSR_WORK_ID_RESOLUTION_DO_WORK: { - GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_INFO, "this message should not be sent \n"); @@ -764,319 +762,334 @@ control_worker_main_0(nsr_per_node_worker_t *ctx) return NULL; } -/* - * Control worker funct for getting changelog info on some other node. - * calls glfs functions to seek/read/write on aux_fd. - * - * Input arguments: - * ctx - The per worker based context - * control - set to true if this worker is for the control plane - */ static void -control_worker_func(nsr_per_node_worker_t *ctx, - nsr_recon_work_t *work) +control_worker_do_reconciliation (nsr_per_node_worker_t *ctx, + nsr_recon_work_t *work) { unsigned int index = ctx->index; - nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]); nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + nsr_recon_role_t rr; + uint32_t i=0; + uint32_t num=0; + uint32_t idx = dr->reconciliator_index; + uint32_t term = dr->workers[idx].recon_info->last_term; - ctx->is_control = _gf_true; + GF_ASSERT(idx == index); - switch (work->req_id){ - case NSR_WORK_ID_INI: - { - nsr_worker_log(this->name, GF_LOG_INFO, - "calling nsr_recon_start_work\n"); + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to make this index %d as reconciliator for term %d\n", index, term); + + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, + nsr_recon_xlator_sector_1, + SEEK_SET) == -1) { + ctx->result = -1; + return; + } - // TBD - handle error in case nsr_recon_start_work gives error - if (nsr_recon_start_work(ctx, _gf_true) != 0) { - ctx->result = -1; - return; - } + // We have all the info for all other nodes. + // Fill all that info when sending data to that process. + for (i=0; i < dr->replica_group_size; i++) { + if ( dr->workers[i].in_use && + (dr->workers[i].recon_info->last_term == term)) { + rr.info[num].last_term = + dr->workers[i].recon_info->last_term; + rr.info[num].commited_ops = + dr->workers[i].recon_info->commited_ops; + rr.info[num].last_index = + dr->workers[i].recon_info->last_index; + rr.info[num].first_index = + dr->workers[i].recon_info->first_index; + strcpy(rr.info[num].name, + dr->workers[i].name); + } + num++; + } + rr.num = num; + rr.role = reconciliator; + ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl + if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) { + ctx->result = -1; + // Put the errno only for this case since we are bothered about + // retrying only for this case. For rest of the cases we will + // just return EIO in errno. + ctx->op_errno = errno; + return; + } - nsr_worker_log(this->name, GF_LOG_INFO, - "finished nsr_recon_start_work\n"); - break; - } - case NSR_WORK_ID_FINI: - { - nsr_worker_log(this->name, GF_LOG_INFO, - "calling nsr_recon_end_work\n"); + nsr_worker_log(this->name, GF_LOG_INFO, + "sent reconciliator info for term %d with node count as %d\n", term, num); +} - // TBD - handle error in case nsr_recon_end_work gives error - if (nsr_recon_end_work(ctx, _gf_true) != 0) { - ctx->result = -1; - return; - } +static void +control_worker_do_resolution (nsr_per_node_worker_t *ctx, + nsr_recon_work_t *work) +{ + unsigned int index = ctx->index; + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + nsr_recon_role_t rr; + unsigned int i=0, j=0; + unsigned int rec = dr->reconciliator_index; - nsr_worker_log(this->name, GF_LOG_INFO, - "finished nsr_recon_end_work\n"); - break; - } - case NSR_WORK_ID_GET_LAST_TERM_INFO: - { - nsr_recon_last_term_info_t lt; - nsr_reconciliator_info_t *recon_info = rw->recon_info; - int32_t term = htonl(work->index); // overloading it + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to make this index %d as resolutor with reconciliator as %d\n",index, rec); + + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, + nsr_recon_xlator_sector_1, + SEEK_SET) == -1) { + ctx->result = -1; + return; + } - nsr_worker_log(this->name, GF_LOG_INFO, - "trying to get last term info for node %d with current term %d\n",index, work->index); + rr.num = 2; + + // Fill in info[0] as info for the node for which we are seeking + // resolution. Fill in info[1] as info of the reconciliator node. The + // function nsr_recon_driver_get_role() that will be called when this + // message reaches the node will look at index 1 for term information + // related to the reconciliator. + for (i=0; i < 2; i++) { + (i == 0) ? (j = index) : (j = rec); + rr.info[i].last_term = + dr->workers[j].recon_info->last_term; + rr.info[i].commited_ops = + dr->workers[j].recon_info->commited_ops; + rr.info[i].last_index = + dr->workers[j].recon_info->last_index; + rr.info[i].first_index = + dr->workers[j].recon_info->first_index; + // The name is used as the key to convert indices since the + // reconciliator index could be different across the nodes. + strcpy(rr.info[i].name, + dr->workers[j].name); + if (i == 0) { + nsr_worker_log(this->name, GF_LOG_INFO, + "this node info term=%d, ops=%d, first=%d, last=%d\n", + rr.info[i].last_term, rr.info[i].commited_ops, + rr.info[i].first_index,rr.info[i].last_index); + } else { + nsr_worker_log(this->name, GF_LOG_INFO, + "reconciliator node info term=%d, ops=%d, first=%d, last=%d\n", + rr.info[i].last_term, rr.info[i].commited_ops, + rr.info[i].first_index,rr.info[i].last_index); + } + } + rr.role = resolutor; + ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl + if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) { + ctx->result = -1; + // Put the errno only for this case since we are bothered about + // retrying only for this case. For rest of the cases we will + // just return EIO in errno. + ctx->op_errno = errno; + return; + } - // first write the current term term number - // TBD - error handling for all the glfs APIs - if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET) == -1) { - ctx->result = -1; - return; - } - if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) { - ctx->result = -1; - return; - } - if (glfs_read(ctx->aux_fd, <, sizeof(lt), 0) == -1) { - ctx->result = -1; - return; - } - ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl - recon_info->last_term = lt.last_term; - recon_info->commited_ops = lt.commited_ops; - recon_info->last_index = lt.last_index; - recon_info->first_index = lt.first_index; + nsr_worker_log(this->name, GF_LOG_INFO, + "sent message to this node %d resolutor with reconciliator as %d\n", index, rec); +} - nsr_worker_log(this->name, GF_LOG_INFO, - "out of get last term info with current term %d. got ops %d with first %d and last %d \n", - recon_info->last_term, recon_info->commited_ops, - recon_info->first_index, recon_info->last_index); +static void +control_worker_get_window (nsr_per_node_worker_t *ctx, nsr_recon_work_t *work) +{ + unsigned int index = ctx->index; + nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]); + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + nsr_recon_log_info_t li; + nsr_reconciliator_info_t *recon_info = rw->recon_info; + uint32_t i = 0; + uint32_t num = (dr->workers[index].recon_info->last_index - + dr->workers[index].recon_info->first_index +1); + nsr_recon_record_details_t *rd; - break; - } - case NSR_WORK_ID_GET_GIVEN_TERM_INFO: - { - nsr_recon_last_term_info_t lt; - nsr_reconciliator_info_t *recon_info = rw->recon_info; - int32_t term = htonl(work->index); // overloading it + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get reconciliation window records for node %d for term %d with first %d last %d\n", + index, recon_info->last_term, recon_info->first_index, recon_info->last_index); - nsr_worker_log(this->name, GF_LOG_INFO, - "trying to get term info for node %d for term %d\n",index, work->index); + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET) == -1) { + ctx->result = -1; + return; + } - // first write the term number - // TBD - error handling for all the glfs APIs - if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET) == -1) { - ctx->result = -1; - return; - } - if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) { - ctx->result = -1; - return; - } - if (glfs_read(ctx->aux_fd, <, sizeof(lt), 0) == -1) { - ctx->result = -1; - return; - } - ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl - recon_info->last_term = lt.last_term; - recon_info->commited_ops = lt.commited_ops; - recon_info->last_index = lt.last_index; - recon_info->first_index = lt.first_index; + // write to node what term & indices we are interested + li.term = recon_info->last_term; + li.first_index = recon_info->first_index; + li.last_index = recon_info->last_index; + ENDIAN_CONVERSION_LI(li, _gf_false); //htonl + if (glfs_write(ctx->aux_fd, &li, sizeof(li), 0) == -1) { + ctx->result = -1; + return; + } - nsr_worker_log(this->name, GF_LOG_INFO, - "out of get term info for term %d. got ops %d with first %d and last %d \n", - recon_info->last_term, recon_info->commited_ops, - recon_info->first_index, recon_info->last_index); + // then read + rd = GF_CALLOC(num, + sizeof(nsr_recon_record_details_t), + gf_mt_recon_private_t); + if (rd == NULL) { + ctx->result = -1; + return; + } + recon_info->records = GF_CALLOC(num, + sizeof(nsr_reconciliation_record_t), + gf_mt_recon_private_t); + if (recon_info->records == NULL) { + ctx->result = -1; + goto err; + } - break; - } - case NSR_WORK_ID_RECONCILIATOR_DO_WORK: - { - nsr_recon_role_t rr; - uint32_t i=0; - uint32_t num=0; - uint32_t idx = dr->reconciliator_index; - uint32_t term = dr->workers[idx].recon_info->last_term; - GF_ASSERT(idx == index); + if (glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0) == -1) { + ctx->result = -1; + goto err; + } - nsr_worker_log(this->name, GF_LOG_INFO, - "trying to make this index %d as reconciliator for term %d\n", index, term); + for (i=0; i < num; i++) { + ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl + memcpy (&(recon_info->records[i].rec), &(rd[i]), + sizeof(nsr_recon_record_details_t)); + nsr_worker_log(this->name, GF_LOG_INFO, + "get_reconcilaition_window:Got %d at index %d\n", + recon_info->records[i].rec.type, + i + recon_info->first_index); + } - // TBD - error handling for all the glfs APIs - if (glfs_lseek(ctx->aux_fd, - nsr_recon_xlator_sector_1, - SEEK_SET) == -1) { - ctx->result = -1; - return; - } + nsr_worker_log(this->name, GF_LOG_INFO, + "got reconciliation window records for node %d for term %d \n", + index, recon_info->last_term); - // We have all the info for all other nodes. - // Fill all that info when sending data to that process. - for (i=0; i < dr->replica_group_size; i++) { - if ( dr->workers[i].in_use && - (dr->workers[i].recon_info->last_term == term)) { - rr.info[num].last_term = - dr->workers[i].recon_info->last_term; - rr.info[num].commited_ops = - dr->workers[i].recon_info->commited_ops; - rr.info[num].last_index = - dr->workers[i].recon_info->last_index; - rr.info[num].first_index = - dr->workers[i].recon_info->first_index; - strcpy(rr.info[num].name, - dr->workers[i].name); - } - num++; - } - rr.num = num; - rr.role = reconciliator; - ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl - if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) { - ctx->result = -1; - // Put the errno only for this case since we - // are bothered about retrying only for this case. - // For rest of the cases we will just return EIO - // in errno. - ctx->op_errno = errno; - return; - } +err: + GF_FREE(rd); +} - nsr_worker_log(this->name, GF_LOG_INFO, - "sent reconciliator info for term %d with node count as %d\n", term, num); +/* + * Control worker funct for getting changelog info on some other node. + * calls glfs functions to seek/read/write on aux_fd. + * + * Input arguments: + * ctx - The per worker based context + * control - set to true if this worker is for the control plane + */ +static void +control_worker_func(nsr_per_node_worker_t *ctx, + nsr_recon_work_t *work) +{ + unsigned int index = ctx->index; + nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]); + nsr_recon_last_term_info_t lt; + nsr_reconciliator_info_t *recon_info = rw->recon_info; + int32_t term = htonl(work->index); // overloading it - break; - } - case NSR_WORK_ID_RESOLUTION_DO_WORK: - { - nsr_recon_role_t rr; - unsigned int i=0, j=0; - unsigned int rec = dr->reconciliator_index; + ctx->is_control = _gf_true; - nsr_worker_log(this->name, GF_LOG_INFO, - "trying to make this index %d as resolutor with reconciliator as %d\n",index, rec); + switch (work->req_id){ - // TBD - error handling for all the glfs APIs - if (glfs_lseek(ctx->aux_fd, - nsr_recon_xlator_sector_1, - SEEK_SET) == -1) { - ctx->result = -1; - return; - } + case NSR_WORK_ID_INI: + nsr_worker_log(this->name, GF_LOG_INFO, + "calling nsr_recon_start_work\n"); - rr.num = 2; - - // Fill in info[0] as info for the node for which we are seeking resolution. - // Fill in info[1] as info of the reconciliator node. - // The function nsr_recon_driver_get_role() that will be called when - // this message reaches the node will look at index 1 for term information - // related to the reconciliator. - for (i=0; i < 2; i++) { - (i == 0) ? (j = index) : (j = rec); - rr.info[i].last_term = - dr->workers[j].recon_info->last_term; - rr.info[i].commited_ops = - dr->workers[j].recon_info->commited_ops; - rr.info[i].last_index = - dr->workers[j].recon_info->last_index; - rr.info[i].first_index = - dr->workers[j].recon_info->first_index; - // The name is used as the key to convert indices since - // the reconciliator index could be different across the nodes. - strcpy(rr.info[i].name, - dr->workers[j].name); - if (i == 0) { - nsr_worker_log(this->name, GF_LOG_INFO, - "this node info term=%d, ops=%d, first=%d, last=%d\n", - rr.info[i].last_term, rr.info[i].commited_ops, - rr.info[i].first_index,rr.info[i].last_index); - } else { - nsr_worker_log(this->name, GF_LOG_INFO, - "reconciliator node info term=%d, ops=%d, first=%d, last=%d\n", - rr.info[i].last_term, rr.info[i].commited_ops, - rr.info[i].first_index,rr.info[i].last_index); - } - } - rr.role = resolutor; - ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl - if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) { - ctx->result = -1; - // Put the errno only for this case since we - // are bothered about retrying only for this case. - // For rest of the cases we will just return EIO - // in errno. - ctx->op_errno = errno; - return; - } + // TBD - handle error in case nsr_recon_start_work gives error + if (nsr_recon_start_work(ctx, _gf_true) != 0) { + ctx->result = -1; + return; + } - nsr_worker_log(this->name, GF_LOG_INFO, - "sent message to this node %d resolutor with reconciliator as %d\n", index, rec); + nsr_worker_log(this->name, GF_LOG_INFO, + "finished nsr_recon_start_work\n"); + break; - break; - } - case NSR_WORK_ID_GET_RECONCILATION_WINDOW: - { - nsr_recon_log_info_t li; - nsr_reconciliator_info_t *recon_info = rw->recon_info; - uint32_t i = 0; - uint32_t num = (dr->workers[index].recon_info->last_index - - dr->workers[index].recon_info->first_index +1); - nsr_recon_record_details_t *rd; + case NSR_WORK_ID_FINI: + nsr_worker_log(this->name, GF_LOG_INFO, + "calling nsr_recon_end_work\n"); - nsr_worker_log(this->name, GF_LOG_INFO, - "trying to get reconciliation window records for node %d for term %d with first %d last %d\n", - index, recon_info->last_term, recon_info->first_index, recon_info->last_index); + // TBD - handle error in case nsr_recon_end_work gives error + if (nsr_recon_end_work(ctx, _gf_true) != 0) { + ctx->result = -1; + return; + } + nsr_worker_log(this->name, GF_LOG_INFO, + "finished nsr_recon_end_work\n"); + break; - // TBD - error handling for all the glfs APIs - if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET) == -1) { - ctx->result = -1; - return; - } + case NSR_WORK_ID_GET_LAST_TERM_INFO: + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get last term info for node %d with current term %d\n",index, work->index); - // write to node what term & indices we are interested - li.term = recon_info->last_term; - li.first_index = recon_info->first_index; - li.last_index = recon_info->last_index; - ENDIAN_CONVERSION_LI(li, _gf_false); //htonl - if (glfs_write(ctx->aux_fd, &li, sizeof(li), 0) == -1) { - ctx->result = -1; - return; - } + // first write the current term term number + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET) == -1) { + ctx->result = -1; + return; + } + if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) { + ctx->result = -1; + return; + } + if (glfs_read(ctx->aux_fd, <, sizeof(lt), 0) == -1) { + ctx->result = -1; + return; + } + ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl + recon_info->last_term = lt.last_term; + recon_info->commited_ops = lt.commited_ops; + recon_info->last_index = lt.last_index; + recon_info->first_index = lt.first_index; - // then read - rd = GF_CALLOC(num, - sizeof(nsr_recon_record_details_t), - gf_mt_recon_private_t); - if (rd == NULL) { - ctx->result = -1; - return; - } - recon_info->records = GF_CALLOC(num, - sizeof(nsr_reconciliation_record_t), - gf_mt_recon_private_t); - if (recon_info->records == NULL) { - ctx->result = -1; - return; - } + nsr_worker_log(this->name, GF_LOG_INFO, + "out of get last term info with current term %d. got ops %d with first %d and last %d \n", + recon_info->last_term, recon_info->commited_ops, + recon_info->first_index, recon_info->last_index); - if (glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0) == -1) { - ctx->result = -1; - return; - } + break; - for (i=0; i < num; i++) { - ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl - memcpy(&(recon_info->records[i].rec), - &(rd[i]), - sizeof(nsr_recon_record_details_t)); - nsr_worker_log(this->name, GF_LOG_INFO, - "get_reconcilaition_window:Got %d at index %d\n", - recon_info->records[i].rec.type, - i + recon_info->first_index); - } - GF_FREE(rd); + case NSR_WORK_ID_GET_GIVEN_TERM_INFO: + nsr_worker_log(this->name, GF_LOG_INFO, + "trying to get term info for node %d for term %d\n",index, work->index); - nsr_worker_log(this->name, GF_LOG_INFO, - "got reconciliation window records for node %d for term %d \n", - index, recon_info->last_term); - break; - } - } + // first write the term number + // TBD - error handling for all the glfs APIs + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET) == -1) { + ctx->result = -1; + return; + } + if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) { + ctx->result = -1; + return; + } + if (glfs_read(ctx->aux_fd, <, sizeof(lt), 0) == -1) { + ctx->result = -1; + return; + } + ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl + recon_info->last_term = lt.last_term; + recon_info->commited_ops = lt.commited_ops; + recon_info->last_index = lt.last_index; + recon_info->first_index = lt.first_index; + + nsr_worker_log(this->name, GF_LOG_INFO, + "out of get term info for term %d. got ops %d with first %d and last %d \n", + recon_info->last_term, recon_info->commited_ops, + recon_info->first_index, recon_info->last_index); + + break; + + case NSR_WORK_ID_RECONCILIATOR_DO_WORK: + control_worker_do_reconciliation(ctx,work); + break; + + case NSR_WORK_ID_RESOLUTION_DO_WORK: + control_worker_do_resolution(ctx,work); + break; + + case NSR_WORK_ID_GET_RECONCILATION_WINDOW: + control_worker_get_window(ctx,work); + break; + + default: + nsr_worker_log (this->name, GF_LOG_ERROR, + "bad work type %d", work->req_id); + } return; } @@ -1241,6 +1254,7 @@ nsr_recon_driver_get_role(int32_t *status, { uint8_t i=0, j=0; nsr_recon_role_t *rr = &(rw->role); + nsr_reconciliator_info_t *tmp; // First make all the threads uninitialise for (i = 0; i < ctx->replica_group_size; i++) { @@ -1249,49 +1263,60 @@ nsr_recon_driver_get_role(int32_t *status, return 0; } } - if ((rr->role == leader) || (rr->role == joiner)) { + + switch (rr->role) { + case leader: + case joiner: // First set info this node + tmp = GF_CALLOC (1, sizeof (nsr_reconciliator_info_t), + gf_mt_recon_private_t); + if (!tmp) { + *status = -1; + return 0; + } + ctx->workers[0].recon_info = tmp; if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) { *status = -1; return 0; } - ctx->workers[0].recon_info = GF_CALLOC (1, - sizeof (nsr_reconciliator_info_t), - gf_mt_recon_private_t); - if (!ctx->workers[0].recon_info) { - *status = -1; - return 0; - } ctx->current_term = rr->current_term; // Find rest of the nodes for (i=1; i < ctx->replica_group_size; i++) { - for (j=0 ; j < rr->num; j++) { - // TBD - make this strcmp later when etcd servers set properly - if (!strcmp(ctx->workers[i].name, rr->info[j].name)) { - //if (strstr(ctx->workers[i].name, rr->info[j].name)) { - nsr_driver_log(this->name, GF_LOG_INFO, - "nsr_recon_driver_get_role: this as %s. found other server %s\n", - (rr->role == leader) ? "leader" : "joiner", - ctx->workers[i].name); - - if (nsr_recon_in_use(ctx, i, _gf_true) == -1) { - *status = -1; - return 0; - } - // Allocate this here. This will get later filled when - // the leader tries to get last term information from all - // the nodes - ctx->workers[i].recon_info = GF_CALLOC (1, - sizeof (nsr_reconciliator_info_t), - gf_mt_recon_private_t); - if (!ctx->workers[i].recon_info) { - *status = -1; - return 0; - } + for (j=0 ; /* nothing */; j++) { + if (j >= rr->num) { + nsr_driver_log (this->name, GF_LOG_ERROR, + "failed to find %s", + ctx->workers[i].name); break; } + if (strcmp(ctx->workers[i].name, + rr->info[j].name)) { + continue; + } + nsr_driver_log (this->name, GF_LOG_INFO, + "nsr_recon_driver_get_role: this as %s. found other server %s\n", + (rr->role == leader) ? "leader" + : "joiner", + ctx->workers[i].name); + + // Allocate this here. This will get later + // filled when the leader tries to get last term + // information from all the nodes + tmp = GF_CALLOC (1, + sizeof (nsr_reconciliator_info_t), + gf_mt_recon_private_t); + if (!tmp) { + *status = -1; + return 0; + } + ctx->workers[i].recon_info = tmp; + if (nsr_recon_in_use(ctx, i, _gf_true) == -1) { + *status = -1; + return 0; + } + break; } } // If leader, reconciliator has to be chosen. @@ -1300,83 +1325,98 @@ nsr_recon_driver_get_role(int32_t *status, ctx->reconciliator_index = -1; else ctx->reconciliator_index = 0; - } else if (rr->role == reconciliator) { + break; + + case reconciliator: ctx->reconciliator_index = 0; - // Copy information about all the other members which had the same term + // Copy information about all the other members which had the + // same term for (i=0; i < rr->num; i++) { - for (j=0; j < ctx->replica_group_size; j++) { - if (!strcmp(rr->info[i].name, ctx->workers[j].name)) { - //if (strstr(ctx->workers[j].name, rr->info[i].name)) { - nsr_driver_log(this->name, GF_LOG_INFO, - "nsr_recon_driver_get_role: this as reconciliator. found other server %s\n", - ctx->workers[j].name); - ctx->workers[j].recon_info = GF_CALLOC (1, - sizeof (nsr_reconciliator_info_t), - gf_mt_recon_private_t); - if (!ctx->workers[j].recon_info) { - *status = -1; - return 0; - } - ctx->workers[j].recon_info->last_term = - rr->info[i].last_term; - ctx->workers[j].recon_info->commited_ops = - rr->info[i].commited_ops; - ctx->workers[j].recon_info->last_index = - rr->info[i].last_index; - ctx->workers[j].recon_info->first_index = - rr->info[i].first_index; - if (nsr_recon_in_use(ctx, j, _gf_true) == -1) { - *status = -1; - return 0; - } + for (j=0; /* nothing */; j++) { + if (j >= ctx->replica_group_size) { + nsr_driver_log (this->name, GF_LOG_ERROR, + "failed to find %s", + rr->info[i].name); break; } - } - } - } else if (rr->role == resolutor) { - for (j=0; j < ctx->replica_group_size; j++) { - // info[1] has the information regarding the reconciliator - //if (strstr(ctx->workers[j].name, rr->info[1].name)) { - if (!strcmp(rr->info[1].name, ctx->workers[j].name)) { + if (strcmp(rr->info[i].name, + ctx->workers[j].name)) { + continue; + } nsr_driver_log(this->name, GF_LOG_INFO, - "nsr_recon_driver_get_role: this as resolutor. found other server %s as reconciliator\n", + "nsr_recon_driver_get_role: this as reconciliator. found other server %s\n", ctx->workers[j].name); - ctx->workers[j].recon_info = GF_CALLOC (1, - sizeof (nsr_reconciliator_info_t), - gf_mt_recon_private_t); - if (!ctx->workers[j].recon_info) { - *status = -1; - return 0; + tmp = GF_CALLOC (1, + sizeof (nsr_reconciliator_info_t), + gf_mt_recon_private_t); + if (!tmp) { + *status = -1; + return 0; } - ctx->workers[j].recon_info->last_term = - rr->info[1].last_term; - ctx->workers[j].recon_info->commited_ops = - rr->info[1].commited_ops; - ctx->workers[j].recon_info->last_index = - rr->info[1].last_index; - ctx->workers[j].recon_info->first_index = - rr->info[1].first_index; - ctx->reconciliator_index = j; + tmp->last_term = rr->info[i].last_term; + tmp->commited_ops = rr->info[i].commited_ops; + tmp->last_index = rr->info[i].last_index; + tmp->first_index = rr->info[i].first_index; + ctx->workers[j].recon_info = tmp; if (nsr_recon_in_use(ctx, j, _gf_true) == -1) { - *status = -1; - return 0; - } - GF_ASSERT(ctx->reconciliator_index != 0); + *status = -1; + return 0; + } + break; + } + } + break; + + case resolutor: + for (j=0; /* nothing */; j++) { + // info[1] has the information regarding the + // reconciliator + if (j >= ctx->replica_group_size) { + nsr_driver_log (this->name, GF_LOG_ERROR, + "failed to find %s", + rr->info[1].name); break; } + if (strcmp(rr->info[1].name, + ctx->workers[j].name)) { + continue; + } + nsr_driver_log(this->name, GF_LOG_INFO, + "nsr_recon_driver_get_role: this as resolutor. found other server %s as reconciliator\n", + ctx->workers[j].name); + tmp = GF_CALLOC (1, + sizeof (nsr_reconciliator_info_t), + gf_mt_recon_private_t); + if (!tmp) { + *status = -1; + return 0; + } + tmp->last_term = rr->info[1].last_term; + tmp->commited_ops = rr->info[1].commited_ops; + tmp->last_index = rr->info[1].last_index; + tmp->first_index = rr->info[1].first_index; + ctx->reconciliator_index = j; + ctx->workers[j].recon_info = tmp; + if (nsr_recon_in_use(ctx, j, _gf_true) == -1) { + *status = -1; + return 0; + } + GF_ASSERT(ctx->reconciliator_index != 0); + break; } - ctx->workers[0].recon_info = GF_CALLOC (1, - sizeof (nsr_reconciliator_info_t), - gf_mt_recon_private_t); - if (!ctx->workers[0].recon_info) { + tmp = GF_CALLOC (1, + sizeof (nsr_reconciliator_info_t), + gf_mt_recon_private_t); + if (!tmp) { *status = -1; return 0; } // info[0] has all info for this node - ctx->workers[0].recon_info->last_term = rr->info[0].last_term; - ctx->workers[0].recon_info->commited_ops = rr->info[0].commited_ops; - ctx->workers[0].recon_info->last_index = rr->info[0].last_index; - ctx->workers[0].recon_info->first_index = rr->info[0].first_index; + tmp->last_term = rr->info[0].last_term; + tmp->commited_ops = rr->info[0].commited_ops; + tmp->last_index = rr->info[0].last_index; + tmp->first_index = rr->info[0].first_index; + ctx->workers[0].recon_info = tmp; if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) { *status = -1; return 0; @@ -1484,7 +1524,7 @@ apply_record(nsr_per_node_worker_t *ctx, { struct glfs_fd *fd = NULL; struct glfs_object *obj = NULL; - + gf_boolean_t retval = _gf_false; if (ri->rec.op == GF_FOP_WRITE) { @@ -1492,40 +1532,36 @@ apply_record(nsr_per_node_worker_t *ctx, "DOing write for file %s @offset %d for len %d\n", ri->rec.gfid, ri->rec.offset, ri->rec.len); - // The file has got deleted on the source. Hence just ignore this. - // TBD - get a way to just stuff the log entry without writing the data so that - // changelogs remain identical. + // The file has got deleted on the source. Hence just ignore + // this. + // TBD - get a way to just stuff the log entry without writing + // the data so that changelogs remain identical. if (ri->work.data == NULL) { return _gf_true; } if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL) - return _gf_false;; + goto err; fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); if (fd == NULL) { nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); - return _gf_false; + goto err; } if (glfs_lseek_with_xdata(fd, ri->rec.offset, SEEK_SET, dict) != ri->rec.offset) { nsr_worker_log(this->name, GF_LOG_ERROR, "lseek for file %s failed at offset %d\n", ri->rec.gfid, ri->rec.offset); - return _gf_false; + goto err; } if (glfs_write_with_xdata(fd, ri->work.data, ri->rec.len, 0, dict) != ri->rec.len) { nsr_worker_log(this->name, GF_LOG_ERROR, "write for file %s failed for bytes %d\n", ri->rec.gfid, ri->rec.len); - return _gf_false; + goto err; } - if (glfs_close_with_xdata(fd, dict) == -1) { - nsr_worker_log(this->name, GF_LOG_ERROR, - "close failed\n"); - return _gf_false; - } nsr_worker_log(this->name, GF_LOG_INFO, "Finished DOing write for gfid %s @offset %d for len %d\n", @@ -1537,26 +1573,23 @@ apply_record(nsr_per_node_worker_t *ctx, "DOing truncate for file %s @offset %d \n", ri->rec.gfid, ri->rec.offset); - if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); if (fd == NULL) { nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); - return _gf_false; + goto err; } if (glfs_ftruncate_with_xdata(fd, ri->rec.offset, dict) == -1) { nsr_worker_log(this->name, GF_LOG_ERROR, "trunctae for file %s failed @offset %d\n", ri->rec.gfid,ri->rec.offset ); - return _gf_false; + goto err; } - if (glfs_close_with_xdata(fd, dict) == -1) { - nsr_worker_log(this->name, GF_LOG_ERROR, - "close failed\n"); - return _gf_false; - } nsr_worker_log(this->name, GF_LOG_INFO, "Finished DOing truncate for gfid %s @offset %d \n", @@ -1575,14 +1608,16 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing set extended attr for file %s \n", ri->rec.gfid); - // The file has got deleted on the source. Hence just ignore this. - // TBD - get a way to just stuff the log entry without writing the data so that - // changelogs remain identical. + // The file has got deleted on the source. Hence just ignore + // this. TBD - get a way to just stuff the log entry without + // writing the data so that changelogs remain identical. if (ri->work.data == NULL) { return _gf_true; } - if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } if (obj->inode->ia_type == IA_IFDIR) fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict); @@ -1592,20 +1627,20 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); - return _gf_false; + goto err; } if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) { if (t_b) free(t_b); nsr_worker_log(this->name, GF_LOG_ERROR, "list of xattr of %s failed\n", ri->rec.gfid); - return _gf_false; + goto err; } if (delete_xattr(fd, dict, t_b, num) == -1) { nsr_worker_log(this->name, GF_LOG_ERROR, "deleting xattrs failed\n"); - return _gf_false; + goto err; } // Set one special dict flag to indicate the opcode so that @@ -1613,19 +1648,13 @@ apply_record(nsr_per_node_worker_t *ctx, if (dict_set_int32(dict,"recon-xattr-opcode",ri->rec.op)) { nsr_worker_log(this->name, GF_LOG_ERROR, "setting opcode to %d failed\n",ri->rec.op); - return _gf_false; + goto err; } if (fill_xattr(fd, dict, ri->work.data, ri->work.num) == -1) { nsr_worker_log(this->name, GF_LOG_ERROR, "filling xattrs failed\n"); - return _gf_false; - } - - if (glfs_close_with_xdata(fd, dict) == -1) { - nsr_worker_log(this->name, GF_LOG_ERROR, - "close failed\n"); - return _gf_false; + goto err; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1642,7 +1671,9 @@ apply_record(nsr_per_node_worker_t *ctx, // TBD - add mode and flags later uuid_parse(ri->rec.gfid, gfid); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } nsr_worker_log (this->name, GF_LOG_INFO, "creating with mode 0%o", ri->rec.mode); @@ -1650,7 +1681,7 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing create for file %s\n", ri->rec.entry); - return _gf_false; + goto err; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1667,13 +1698,15 @@ apply_record(nsr_per_node_worker_t *ctx, // TBD - add mode and flags later uuid_parse(ri->rec.gfid, gfid); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } if (glfs_h_mknod_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, 0777, NULL, gfid, dict) == NULL) { nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing mknod for file %s\n", ri->rec.entry); - return _gf_false; + goto err; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1690,13 +1723,15 @@ apply_record(nsr_per_node_worker_t *ctx, // TBD - add mode and flags later uuid_parse(ri->rec.gfid, gfid); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } if (glfs_h_mkdir_with_xdata(ctx->fs, obj, ri->rec.entry, 0777, NULL, gfid, dict) != 0) { nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing mkdir for file %s\n", ri->rec.entry); - return _gf_false; + goto err; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1709,12 +1744,14 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing rmdir/ublink for dir %s \n", ri->rec.entry); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } if (glfs_h_unlink_with_xdata(ctx->fs, obj, ri->rec.entry, dict) != 0) { nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing rmdir/unlink for file %s\n", ri->rec.entry); - return _gf_false; + goto err; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1729,14 +1766,16 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing symlink for file %s to file %s \n", ri->rec.entry, ri->rec.link_path); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } uuid_parse(ri->rec.gfid, gfid); if (glfs_h_symlink_with_xdata(ctx->fs, obj, ri->rec.entry, ri->rec.link_path, NULL, gfid, dict) == NULL) { nsr_worker_log(this->name, GF_LOG_ERROR, "Failed to Doing symlink for file %s to file %s \n", ri->rec.entry, ri->rec.link_path); - return _gf_false; + goto err; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1751,14 +1790,18 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing hard link for file %s to file %s \n", ri->rec.entry, ri->rec.gfid); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; - if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } if (glfs_h_link_with_xdata(ctx->fs, to_obj, obj, ri->rec.entry, dict) == -1) { nsr_worker_log(this->name, GF_LOG_ERROR, "Failed to Doing hard link for file %s to file %s \n", ri->rec.entry, ri->rec.gfid); - return _gf_false; + goto err; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1773,14 +1816,18 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing rename for file %s to file %s \n", ri->rec.entry, ri->rec.newloc); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; - if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) { + goto err; + } + if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } if (glfs_h_rename_with_xdata(ctx->fs, obj, ri->rec.entry, to_obj, ri->rec.newloc, dict) == -1) { nsr_worker_log(this->name, GF_LOG_ERROR, "Failed to Doing rename for file %s to file %s \n", ri->rec.entry, ri->rec.newloc); - return _gf_false; + goto err; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1801,7 +1848,9 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing attr for file %s \n", ri->rec.gfid); - if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) { + goto err; + } if (obj->inode->ia_type == IA_IFDIR) fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict); @@ -1811,7 +1860,7 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); - return _gf_false; + goto err; } iatt.ia_prot = ia_prot_from_st_mode(777); @@ -1823,7 +1872,7 @@ apply_record(nsr_per_node_worker_t *ctx, if (dict_set_int32(dict,"recon-attr-opcode",ri->rec.op)) { nsr_worker_log(this->name, GF_LOG_ERROR, "setting opcode to %d failed\n",ri->rec.op); - return _gf_false; + goto err; } ret = glfs_fsetattr_with_xdata(fd, &iatt, valid, dict); @@ -1831,21 +1880,37 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_INFO, "failed Doing attr for file %s \n", ri->rec.gfid); - return _gf_false; + goto err; } - if (glfs_close_with_xdata(fd, dict) == -1) { - nsr_worker_log(this->name, GF_LOG_ERROR, - "close failed\n"); - return _gf_false; - } nsr_worker_log(this->name, GF_LOG_INFO, "Doing attr for file %s \n", ri->rec.gfid); } - return _gf_true; + retval = _gf_true; + +err: + if (fd) { + /* + * It's not clear that we should be passing the same dict to + * glfs_close that was passed to us for glfs_open, but that's + * the prior behavior so let's preserve it for now. + */ + if (glfs_close_with_xdata(fd, dict) == -1) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "close failed\n"); + } + } + if (obj) { + /* + * AFAICT fd operations do not borrow this reference, so we + * still need to drop it ourselves. + */ + glfs_h_close(obj); + } + return retval; } //return back opcodes that requires reading from source @@ -2379,38 +2444,463 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, gf_boolean_t send = _gf_false; int32_t status =0, op_errno = 0; + send = (ctx->workers[i].in_use != in_use); + ctx->workers[i].in_use = in_use; + + if (!send) { + return 0; + } + nsr_driver_log (this->name, GF_LOG_INFO, + "sending %s to index %d\n",in_use?"INI":"FINI",i); + + send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx, + (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto err; + + send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx, + (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, + NSR_RECON_QUEUE_TO_DATA, -1); + if (status == -1) + goto err; + + /* + * We really need better error recovery. To activate a worker, we + * allocate memory and send two messages. If any of those actions + * fail, we should undo the others. It would probably be good to + * collapse the two messages into one, because it's pretty trivial to + * allocate a temporary structure and either link it in or free it + * depending on the result here. + */ + if (in_use == _gf_false) { - if (ctx->workers[i].in_use == _gf_true) - send = _gf_true; - ctx->workers[i].in_use = _gf_false; - } else { - if (ctx->workers[i].in_use != _gf_true) { - ctx->workers[i].in_use = _gf_true; - send = _gf_true; - } + GF_FREE(ctx->workers[i].recon_info); } - if (send == _gf_true) { - if (in_use == _gf_true) { - nsr_driver_log(this->name, GF_LOG_INFO, "sending INI to index %d\n",i); - } else { - nsr_driver_log(this->name, GF_LOG_INFO, "sending FINI to index %d\n",i); + + return 0; + +err: + GF_FREE(ctx->workers[i].recon_info); + ctx->workers[i].recon_info = NULL; + return -1; +} + +gf_boolean_t +nsr_recon_driver_reconciliator (nsr_recon_private_t *priv) +{ + uint32_t replica_group_size = priv->replica_group_size; + uint32_t i; + nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context; + int32_t bm; + int32_t status = 0; + int32_t op_errno = 0; + gf_boolean_t do_recon = _gf_false; + uint32_t start_index = ctx->workers[0].recon_info->first_index; + uint32_t end_index = ctx->workers[0].recon_info->last_index; + uint32_t num = ((start_index == 0) && (end_index == 0)) ? 0 : (end_index - start_index + 1); + + nsr_driver_log (this->name, GF_LOG_INFO, + "starting reconciliation work as reconciliator \n"); + + // nothing to be done? signal back to the recon translator that this + // phase done. + bm = 1; + for (i=1; i < replica_group_size; i++) { + if (ctx->workers[i].in_use && + (ctx->workers[0].recon_info->last_term == ctx->workers[i].recon_info->last_term)) { + ctx->workers[i].recon_info->last_index = end_index; + ctx->workers[i].recon_info->first_index = start_index; + bm |= (1 << i); + do_recon = _gf_true; } - send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx, - (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, + } + + if (!do_recon || !num) { + nsr_driver_log (this->name, GF_LOG_INFO, + "nothing needs to be done as resolutor \n"); + return _gf_true; + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "getting reconciliation window for term %d from %dto %d \n", + ctx->workers[0].recon_info->last_term, + start_index, end_index); + // We have set the bm in the above for loop where we go thru all nodes + // including this node that have seen the last term. + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_GET_RECONCILATION_WINDOW, NSR_RECON_QUEUE_TO_CONTROL, -1); - if (status == -1) - return -1; - send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx, - (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, - NSR_RECON_QUEUE_TO_DATA, -1); - if (status == -1) - return -1; - if (in_use == _gf_false) { - //GF_FREE(ctx->workers[i].recon_info->records); - GF_FREE(ctx->workers[i].recon_info); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished getting reconciliation window for term %d from %dto %d \n", + ctx->workers[0].recon_info->last_term, + start_index, end_index); + + + // from the changelogs, calculate the entries that need action and the + // source for each of these entries + compute_reconciliation_work(ctx); + + // for each of the entries that need fixup, issue IO + for (i=start_index; i < (start_index + num); i++) { + nsr_reconciliator_info_t *my_recon_info = + ctx->workers[0].recon_info; + nsr_reconciliation_record_t *record = + &(my_recon_info->records[i - start_index]); + + record->work.term = ctx->workers[0].recon_info->last_term; + record->work.index = i; + + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing index %d\n",i); + if ((record->work.type == NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE) || + (record->work.type == NSR_RECON_WORK_HOLE_TO_FILL)) { + // 1st case (RECON_WORK_HOLE_TO_PSEUDO_HOLE): If there + // are only pseudo_holes in others, it is best effort. + // Just pick from the first node that has it and + // proceed. + // 2nd case (RECON_WORK_HOLE_TO_FILL): this node has + // either a HOLE or PSUEDO_HOLE and some one else has a + // FILL(source). analyse the changelog to check if data + // needs to be read or if the log has all the data + // required + + if (recon_check_changelog(&record->rec)) { + bm = (1 << record->work.source); + nsr_driver_log (this->name, GF_LOG_INFO, + "reading data from source %d\n",record->work.source); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_READ, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "got data from source %d\n",record->work.source); + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing local data as part of reconciliation\n"); + + bm = 1; + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished fixing local data as part of reconciliation\n"); + + } else if (record->work.type == NSR_RECON_WORK_COMPARE_PSEUDO_HOLE) { + // this node has a pseudo_hole and some others have just + // that too. Just convert this to FILL. let others + // blindly pick it from here. + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing this record as a fill\n"); + bm = 1; + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished fixing this record as a fill\n"); } - } - return 0; + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished reconciliation work as reconciliator \n"); + + // tbd - mark this term golden in the reconciliator + return _gf_true; +} + +gf_boolean_t +nsr_recon_driver_resolutor (nsr_recon_private_t *priv) +{ + uint32_t replica_group_size = priv->replica_group_size; + uint32_t i; + nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context; + int32_t bm; + int32_t status = 0; + int32_t op_errno = 0; + // This node's last term is filled when it gets a message from the + // leader to act as a reconciliator. + uint32_t recon_index = ctx->reconciliator_index; + nsr_reconciliator_info_t *my_info = + ctx->workers[0].recon_info; + nsr_reconciliator_info_t *his_info = + ctx->workers[recon_index].recon_info; + uint32_t my_last_term = my_info->last_term; + uint32_t to_do_term = his_info->last_term; + uint32_t my_start_index = 1, my_end_index = 1; + uint32_t his_start_index = 1, his_end_index = 1; + uint32_t num = 0; + gf_boolean_t fl = _gf_true; + + nsr_driver_log (this->name, GF_LOG_INFO, + "starting resolutor work with reconciliator as %d from term %d to term %d \n", + recon_index, my_last_term, to_do_term); + + do { + + if (!fl) { + (his_info->last_term)++; + (my_info->last_term)++; + } else { + his_info->last_term = my_last_term; + } + + nsr_driver_log (this->name, GF_LOG_INFO, "resolving term %d \n", my_info->last_term); + + // Get reconciliator's term information for that term + nsr_driver_log (this->name, GF_LOG_INFO, + "getting info from reconciliator for term %d \n", my_info->last_term); + bm = (1 << recon_index); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_GET_GIVEN_TERM_INFO, + NSR_RECON_QUEUE_TO_CONTROL, his_info->last_term); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished getting info from reconciliator for term %d \n", my_info->last_term); + + + // empty term + if (!his_info->commited_ops) { + nsr_driver_log (this->name, GF_LOG_INFO, + "reconciliator for term %d is empty. moving to next term. \n", my_info->last_term); + // TBD - mark the term golden + fl = _gf_false; + continue; + } + + // calculate the resolution window boundary. for the last term + // this node saw, we compare the resolution window of this and + // reconciliator. for the rest of the nodes, we just accept the + // reconciliator info. + if (fl) { + my_start_index = my_info->first_index; + my_end_index = my_info->last_index; + his_start_index = his_info->first_index; + his_end_index = his_info->last_index; + my_info->first_index = (my_start_index < his_start_index) ? my_start_index : his_start_index; + my_info->last_index = (my_end_index > his_end_index) ? my_end_index : his_end_index; + } else { + my_info->first_index = his_info->first_index; + my_info->last_index = his_info->last_index; + my_info->commited_ops = his_info->commited_ops; + } + if (my_info->first_index == 0) + my_info->first_index = 1; + num = (my_info->last_index - my_info->first_index) + 1; + + + // Get the logs from the reconciliator (and this node for this + // term) + if (fl) + bm = ((1 << recon_index) | 1); + else + bm = (1 << recon_index); + + nsr_driver_log (this->name, GF_LOG_INFO, + "getting reconciliation window for term %d from %d to %d \n", + my_info->last_term, + my_info->first_index, my_info->last_index); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_GET_RECONCILATION_WINDOW, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished getting reconciliation window for term %d from %d to %d \n", + my_info->last_term, + my_info->first_index, my_info->last_index); + + // from the changelogs, calculate the entries that need action + compute_resolution_work(ctx, my_info, his_info, !fl); + + + // for each of the entries that need fixup, issue IO + for (i=my_info->first_index; i < (my_info->first_index + num); i++) { + nsr_reconciliation_record_t *record = + &(my_info->records[i - my_info->first_index]); + + record->work.term = my_info->last_term; + record->work.index = i; + + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing index %d\n",i); + if ((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) || + (record->work.type == NSR_RECON_WORK_UNDO_FILL)) { + if (((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) && + recon_check_changelog(&record->rec)) || + ((record->work.type == NSR_RECON_WORK_UNDO_FILL) && + recon_compute_undo(&record->rec))) { + nsr_driver_log (this->name, GF_LOG_INFO, + "reading data from source %d\n",recon_index); + bm = (1 << recon_index); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_READ, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished reading data from source %d\n",recon_index); + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "fixing local data as part of resolutor\n"); + + bm = 1; + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT, + NSR_RECON_QUEUE_TO_DATA, + i); + if (status == -1) + return _gf_false; + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished fixing local data as part of resolutor\n"); + } + } + fl = _gf_false; + + // tbd - mark this term golden in the reconciliator + } while (my_last_term++ != to_do_term); + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished resolutor work \n"); + return _gf_true; +} + +gf_boolean_t +nsr_recon_driver_leader (nsr_recon_private_t *priv) +{ + uint32_t replica_group_size = priv->replica_group_size; + uint32_t i; + nsr_recon_driver_ctx_t *ctx = priv->driver_thread_context; + int32_t bm; + int32_t status = 0; + int32_t op_errno = 0; + int32_t chosen = -1; + int32_t last_term = -1, last_ops = -1; + + nsr_driver_log (this->name, GF_LOG_INFO, + "getting last term info from all members of this group\n"); + // Get last term info from all members for this group + send_and_wait(&status, &op_errno, -1, + replica_group_size, + ctx, + NSR_WORK_ID_GET_LAST_TERM_INFO, + NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term); + if (status == -1) + return _gf_false; + + + // compare all the info received and choose the reconciliator First + // choose all with latest term + for (i=0; i < replica_group_size; i++) { + if (ctx->workers[i].in_use) { + if (ctx->workers[i].recon_info->last_term > last_term) { + last_term = ctx->workers[i].recon_info->last_term; + } + } + } + // First choose all with latest term and highest ops + for (i=0; i < replica_group_size; i++) { + if ((ctx->workers[i].in_use) && (last_term == ctx->workers[i].recon_info->last_term)) { + if (ctx->workers[i].recon_info->commited_ops > last_ops) { + last_ops = ctx->workers[i].recon_info->commited_ops; + } + } + } + // choose the first among the lot + for (i=0; i < replica_group_size; i++) { + if ((ctx->workers[i].in_use) && + (last_term == ctx->workers[i].recon_info->last_term) && + (last_ops == ctx->workers[i].recon_info->commited_ops)) { + chosen = i; + break; + } + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "reconciliator chosen is %d\n", chosen); + ctx->reconciliator_index = chosen; + GF_ASSERT(chosen != -1); + if (chosen == -1) { + nsr_driver_log (this->name, GF_LOG_INFO, + "no reconciliatior chosen\n"); + return _gf_false; + } + + // send the message to reconciliator to do reconciliation with list of + // nodes that are part of this quorum + if (chosen != 0) { + nsr_driver_log (this->name, GF_LOG_INFO, + "sending reconciliation work to %d\n", chosen); + bm = 1 << ctx->reconciliator_index; + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_RECONCILIATOR_DO_WORK, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + return _gf_false; + nsr_driver_log (this->name, GF_LOG_INFO, + "finished reconciliation work to %d\n", chosen); + } else { + nsr_driver_log (this->name, GF_LOG_INFO, + "local node is reconciliator. before set jmp\n"); + nsr_recon_driver_reconciliator(priv); + } + + // send message to all other nodes to sync up with the reconciliator + // including itself if required + // requires optimisation - TBD + if (chosen != 0) { + nsr_driver_log (this->name, GF_LOG_INFO, + "local node resolution needs to be done. before set jmp\n"); + nsr_recon_driver_resolutor(priv); + } + + nsr_driver_log (this->name, GF_LOG_INFO, + "sending resolution work to all nodes except this node and reconciliator\n"); + bm = ~((1 << ctx->reconciliator_index) || 1); + send_and_wait(&status, &op_errno, bm, + replica_group_size, + ctx, + NSR_WORK_ID_RESOLUTION_DO_WORK, + NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + return _gf_false; + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished reconciliation work as leader \n"); + return _gf_true; } // main recon driver thread @@ -2531,424 +3021,25 @@ nsr_reconciliation_driver(void *arg) goto out; } - if (state == leader) { - - int32_t chosen = -1; - int32_t last_term = -1, last_ops = -1; - - nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n"); - // Get last term info from all members for this group - send_and_wait(&status, &op_errno, -1, - replica_group_size, - ctx, - NSR_WORK_ID_GET_LAST_TERM_INFO, - NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term); - if (status == -1) - goto out; - - - // compare all the info received and choose the reconciliator - // First choose all with latest term - for (i=0; i < replica_group_size; i++) { - if (ctx->workers[i].in_use) { - if (ctx->workers[i].recon_info->last_term > last_term) { - last_term = ctx->workers[i].recon_info->last_term; - } - } - } - // First choose all with latest term and highest ops - for (i=0; i < replica_group_size; i++) { - if ((ctx->workers[i].in_use) && (last_term == ctx->workers[i].recon_info->last_term)) { - if (ctx->workers[i].recon_info->commited_ops > last_ops) { - last_ops = ctx->workers[i].recon_info->commited_ops; - } - } - } - // choose the first among the lot - for (i=0; i < replica_group_size; i++) { - if ((ctx->workers[i].in_use) && - (last_term == ctx->workers[i].recon_info->last_term) && - (last_ops == ctx->workers[i].recon_info->commited_ops)) { - chosen = i; - break; - } - } + switch (state) { - nsr_driver_log (this->name, GF_LOG_INFO, "reconciliator chosen is %d\n", chosen); - ctx->reconciliator_index = chosen; - GF_ASSERT(chosen != -1); - if (chosen == -1) { - nsr_driver_log (this->name, GF_LOG_INFO, "no reconciliatior chosen\n"); + case leader: + if (!nsr_recon_driver_leader(priv)) { goto out; } - - // send the message to reconciliator to do reconciliation with list of nodes that are part of this quorum - if (chosen != 0) { - nsr_driver_log (this->name, GF_LOG_INFO, "sending reconciliation work to %d\n", chosen); - bm = 1 << ctx->reconciliator_index; - send_and_wait(&status, &op_errno, bm, - replica_group_size, - ctx, - NSR_WORK_ID_RECONCILIATOR_DO_WORK, - NSR_RECON_QUEUE_TO_CONTROL, -1); - if (status == -1) - goto out; - nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work to %d\n", chosen); - } else { - nsr_driver_log (this->name, GF_LOG_INFO, "local node is reconciliator. before set jmp\n"); - ctx->env = calloc(1,sizeof(jmp_buf)); - /* - * REVIEW - * Use of setjmp/longjmp in an environment - * where we already use ucontext is dangerous - * and therefore forbidden. Refactoring will - * also help with some of the rampant 80-column - * violations and indented code crawling across - * the screen, which together make this entire - * file almost unreadable. - */ - if (!setjmp(*(ctx->env))) { - state = reconciliator; - goto i_am_reconciliator; - } else { - nsr_driver_log (this->name, GF_LOG_INFO, "long jmp return to leader\n"); - free(ctx->env); - ctx->env = NULL; - state = leader; - } - } - - // send message to all other nodes to sync up with the reconciliator including itself if required - // requires optimisation - TBD - if (chosen != 0) { - nsr_driver_log (this->name, GF_LOG_INFO, "local node resolution needs to be done. before set jmp\n"); - ctx->env = calloc(1,sizeof(jmp_buf)); - if (!setjmp(*(ctx->env))) { - state = resolutor; - goto i_am_resolutor; - } else { - nsr_driver_log (this->name, GF_LOG_INFO, "long jmp return to leader\n"); - free(ctx->env); - ctx->env = NULL; - state = leader; - } - } - - nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this node and reconciliator\n"); - bm = ~((1 << ctx->reconciliator_index) || 1); - send_and_wait(&status, &op_errno, bm, - replica_group_size, - ctx, - NSR_WORK_ID_RESOLUTION_DO_WORK, - NSR_RECON_QUEUE_TO_CONTROL, -1); - if (status == -1) - goto out; - - nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work as leader \n"); - - } -i_am_reconciliator: - if (state == reconciliator) { - gf_boolean_t do_recon = _gf_false; - uint32_t start_index = ctx->workers[0].recon_info->first_index; - uint32_t end_index = ctx->workers[0].recon_info->last_index; - uint32_t num = ((start_index == 0) && (end_index == 0)) ? 0 : (end_index - start_index + 1); - - nsr_driver_log (this->name, GF_LOG_INFO, "starting reconciliation work as reconciliator \n"); - - // nothing to be done? signal back to the recon translator that this phase done. - bm = 1; - for (i=1; i < replica_group_size; i++) { - if (ctx->workers[i].in_use && - (ctx->workers[0].recon_info->last_term == ctx->workers[i].recon_info->last_term)) { - ctx->workers[i].recon_info->last_index = end_index; - ctx->workers[i].recon_info->first_index = start_index; - bm |= (1 << i); - do_recon = _gf_true; - } - } - - if (!do_recon || !num) { - nsr_driver_log (this->name, GF_LOG_INFO, "nothing needs to be done as resolutor \n"); - if (ctx->env) { - nsr_driver_log (this->name, GF_LOG_INFO, "before longjmp \n"); - longjmp(*(ctx->env), 1); - } else { - goto out; - } - } - - nsr_driver_log (this->name, GF_LOG_INFO, - "getting reconciliation window for term %d from %dto %d \n", - ctx->workers[0].recon_info->last_term, - start_index, end_index); - // We have set the bm in the above for loop where - // we go thru all nodes including this node that - // have seen the last term. - send_and_wait(&status, &op_errno, bm, - replica_group_size, - ctx, - NSR_WORK_ID_GET_RECONCILATION_WINDOW, - NSR_RECON_QUEUE_TO_CONTROL, -1); - if (status == -1) - goto out; - nsr_driver_log (this->name, GF_LOG_INFO, - "finished getting reconciliation window for term %d from %dto %d \n", - ctx->workers[0].recon_info->last_term, - start_index, end_index); - - - // from the changelogs, calculate the entries - // that need action and the source for each of these entries - compute_reconciliation_work(ctx); - - // for each of the entries that need fixup, issue IO - for (i=start_index; i < (start_index + num); i++) { - nsr_reconciliator_info_t *my_recon_info = - ctx->workers[0].recon_info; - nsr_reconciliation_record_t *record = - &(my_recon_info->records[i - start_index]); - - record->work.term = ctx->workers[0].recon_info->last_term; - record->work.index = i; - - nsr_driver_log (this->name, GF_LOG_INFO, - "fixing index %d\n",i); - if ((record->work.type == NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE) || - (record->work.type == NSR_RECON_WORK_HOLE_TO_FILL)) { - // 1st case (RECON_WORK_HOLE_TO_PSEUDO_HOLE): - // If there are only pseudo_holes in others, it is best effort. - // Just pick from the first node that has it and proceed. - // 2nd case (RECON_WORK_HOLE_TO_FILL): - // this node has either a HOLE or PSUEDO_HOLE and some one else has a FILL(source). - // analyse the changelog to check if data needs to be read or if the log has all the data required - - if (recon_check_changelog(&record->rec)) { - bm = (1 << record->work.source); - nsr_driver_log (this->name, GF_LOG_INFO, - "reading data from source %d\n",record->work.source); - send_and_wait(&status, &op_errno, bm, - replica_group_size, - ctx, - NSR_WORK_ID_SINGLE_RECONCILIATION_READ, - NSR_RECON_QUEUE_TO_DATA, - i); - if (status == -1) - goto out; - nsr_driver_log (this->name, GF_LOG_INFO, - "got data from source %d\n",record->work.source); - } - - nsr_driver_log (this->name, GF_LOG_INFO, - "fixing local data as part of reconciliation\n"); - - bm = 1; - send_and_wait(&status, &op_errno, bm, - replica_group_size, - ctx, - NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT, - NSR_RECON_QUEUE_TO_DATA, - i); - if (status == -1) - goto out; - - nsr_driver_log (this->name, GF_LOG_INFO, - "finished fixing local data as part of reconciliation\n"); - - } else if (record->work.type == NSR_RECON_WORK_COMPARE_PSEUDO_HOLE) { - // this node has a pseudo_hole and some others have just that too. Just convert this to FILL. - // let others blindly pick it from here. - nsr_driver_log (this->name, GF_LOG_INFO, - "fixing this record as a fill\n"); - bm = 1; - send_and_wait(&status, &op_errno, bm, - replica_group_size, - ctx, - NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH, - NSR_RECON_QUEUE_TO_DATA, - i); - if (status == -1) - goto out; - nsr_driver_log (this->name, GF_LOG_INFO, - "finished fixing this record as a fill\n"); - } - } - - nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work as reconciliator \n"); - - if (ctx->env) { - nsr_driver_log (this->name, GF_LOG_INFO, "before longjmp \n"); - longjmp(*(ctx->env), 1); + break; + case reconciliator: + if (!nsr_recon_driver_reconciliator(priv)) { + goto out; } - - // tbd - mark this term golden in the reconciliator - - } -i_am_resolutor: - if (state == resolutor) { - - // This node's last term is filled when it gets a message - // from the leader to act as a reconciliator. - uint32_t recon_index = ctx->reconciliator_index; - nsr_reconciliator_info_t *my_info = - ctx->workers[0].recon_info; - nsr_reconciliator_info_t *his_info = - ctx->workers[recon_index].recon_info; - uint32_t my_last_term = my_info->last_term; - uint32_t to_do_term = his_info->last_term; - uint32_t my_start_index = 1, my_end_index = 1; - uint32_t his_start_index = 1, his_end_index = 1; - uint32_t num = 0; - gf_boolean_t fl = _gf_true; - - nsr_driver_log (this->name, GF_LOG_INFO, - "starting resolutor work with reconciliator as %d from term %d to term %d \n", - recon_index, my_last_term, to_do_term); - - do { - - if (!fl) { - (his_info->last_term)++; - (my_info->last_term)++; - } else { - his_info->last_term = my_last_term; - } - - nsr_driver_log (this->name, GF_LOG_INFO, "resolving term %d \n", my_info->last_term); - - // Get reconciliator's term information for that term - nsr_driver_log (this->name, GF_LOG_INFO, - "getting info from reconciliator for term %d \n", my_info->last_term); - bm = (1 << recon_index); - send_and_wait(&status, &op_errno, bm, - replica_group_size, - ctx, - NSR_WORK_ID_GET_GIVEN_TERM_INFO, - NSR_RECON_QUEUE_TO_CONTROL, his_info->last_term); - if (status == -1) - goto out; - nsr_driver_log (this->name, GF_LOG_INFO, - "finished getting info from reconciliator for term %d \n", my_info->last_term); - - - // empty term - if (!his_info->commited_ops) { - nsr_driver_log (this->name, GF_LOG_INFO, - "reconciliator for term %d is empty. moving to next term. \n", my_info->last_term); - // TBD - mark the term golden - fl = _gf_false; - continue; - } - - // calculate the resolution window boundary. - // for the last term this node saw, we compare the resolution window of this and reconciliator. - // for the rest of the nodes, we just accept the reconciliator info. - if (fl) { - my_start_index = my_info->first_index; - my_end_index = my_info->last_index; - his_start_index = his_info->first_index; - his_end_index = his_info->last_index; - my_info->first_index = (my_start_index < his_start_index) ? my_start_index : his_start_index; - my_info->last_index = (my_end_index > his_end_index) ? my_end_index : his_end_index; - } else { - my_info->first_index = his_info->first_index; - my_info->last_index = his_info->last_index; - my_info->commited_ops = his_info->commited_ops; - } - if (my_info->first_index == 0) - my_info->first_index = 1; - num = (my_info->last_index - my_info->first_index) + 1; - - - // Get the logs from the reconciliator (and this node for this term) - if (fl) - bm = ((1 << recon_index) | 1); - else - bm = (1 << recon_index); - - nsr_driver_log (this->name, GF_LOG_INFO, - "getting reconciliation window for term %d from %d to %d \n", - my_info->last_term, - my_info->first_index, my_info->last_index); - send_and_wait(&status, &op_errno, bm, - replica_group_size, - ctx, - NSR_WORK_ID_GET_RECONCILATION_WINDOW, - NSR_RECON_QUEUE_TO_CONTROL, -1); - if (status == -1) - goto out; - nsr_driver_log (this->name, GF_LOG_INFO, - "finished getting reconciliation window for term %d from %d to %d \n", - my_info->last_term, - my_info->first_index, my_info->last_index); - - // from the changelogs, calculate the entries that need action - compute_resolution_work(ctx, my_info, his_info, !fl); - - - // for each of the entries that need fixup, issue IO - for (i=my_info->first_index; i < (my_info->first_index + num); i++) { - nsr_reconciliation_record_t *record = - &(my_info->records[i - my_info->first_index]); - - record->work.term = my_info->last_term; - record->work.index = i; - - nsr_driver_log (this->name, GF_LOG_INFO, "fixing index %d\n",i); - if ((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) || - (record->work.type == NSR_RECON_WORK_UNDO_FILL)) { - if (((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) && - recon_check_changelog(&record->rec)) || - ((record->work.type == NSR_RECON_WORK_UNDO_FILL) && - recon_compute_undo(&record->rec))) { - nsr_driver_log (this->name, GF_LOG_INFO, - "reading data from source %d\n",recon_index); - bm = (1 << recon_index); - send_and_wait(&status, &op_errno, bm, - replica_group_size, - ctx, - NSR_WORK_ID_SINGLE_RECONCILIATION_READ, - NSR_RECON_QUEUE_TO_DATA, - i); - if (status == -1) - goto out; - nsr_driver_log (this->name, GF_LOG_INFO, - "finished reading data from source %d\n",recon_index); - } - - nsr_driver_log (this->name, GF_LOG_INFO, - "fixing local data as part of resolutor\n"); - - bm = 1; - send_and_wait(&status, &op_errno, bm, - replica_group_size, - ctx, - NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT, - NSR_RECON_QUEUE_TO_DATA, - i); - if (status == -1) - goto out; - - nsr_driver_log (this->name, GF_LOG_INFO, - "finished fixing local data as part of resolutor\n"); - } - } - fl = _gf_false; - - // tbd - mark this term golden in the reconciliator - } while (my_last_term++ != to_do_term); - - nsr_driver_log (this->name, GF_LOG_INFO, - "finished resolutor work \n"); - - if (ctx->env) { - nsr_driver_log (this->name, GF_LOG_INFO, "before longjmp \n"); - longjmp(*(ctx->env), 1); + break; + case resolutor: + if (!nsr_recon_driver_resolutor(priv)) { + goto out; } + break; - } - - if (state == joiner) { + case joiner: nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n"); // Get last term info from all members for this group @@ -2976,6 +3067,9 @@ i_am_resolutor: nsr_driver_log (this->name, GF_LOG_INFO, "finished recon work as joiner \n"); + default: + nsr_driver_log (this->name, GF_LOG_ERROR, + "bad state %d", state); } diff --git a/xlators/cluster/nsr-recon/src/recon_driver.h b/xlators/cluster/nsr-recon/src/recon_driver.h index 8d87e29af..3efb26269 100644 --- a/xlators/cluster/nsr-recon/src/recon_driver.h +++ b/xlators/cluster/nsr-recon/src/recon_driver.h @@ -13,7 +13,6 @@ #include "api/src/glfs.h" -#include #define MAX_HOSTNAME_LEN 32 #define MAXIMUM_REPLICA_STRENGTH 8 @@ -260,7 +259,6 @@ typedef struct _nsr_recon_driver_ctxt { uint32_t reconciliator_index; uint32_t term; uint32_t current_term; - jmp_buf *env; nsr_mode_t mode; // default set to seq #if defined(NSR_DEBUG) FILE *fp; -- cgit