summaryrefslogtreecommitdiffstats
path: root/xlators/cluster
diff options
context:
space:
mode:
authorRaghavan P <rpichai@redhat.com>2014-02-19 07:03:26 +0530
committerJeff Darcy <jdarcy@redhat.com>2014-03-03 19:41:32 +0000
commitc28972ea53cc7cdb91c7aac01754dd7f0b66e1a7 (patch)
treefc316e94c6494b282a1179bb97939909e5cbcba0 /xlators/cluster
parent3bbfebc8dc21c469d47b576069ae137aec4567c9 (diff)
changes to NSR reconciliation code to add error handling.
Description of chnages added: 1) In recon driver, check for all glfs calls return values. 2) make the driver send back error values to other drivers or to main translator. 3) let the leader retry on errors Change-Id: I050003a819d2314c8fdfd111df465041c30ee6e3 Signed-off-by: Raghavan P <rpichai@redhat.com>
Diffstat (limited to 'xlators/cluster')
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.c533
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.h8
-rw-r--r--xlators/cluster/nsr-recon/src/recon_xlator.c138
-rw-r--r--xlators/cluster/nsr-recon/src/recon_xlator.h4
-rw-r--r--xlators/cluster/nsr-server/src/recon_notify.c43
5 files changed, 468 insertions, 258 deletions
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c
index 7f92b6578..49ee465c5 100644
--- a/xlators/cluster/nsr-recon/src/recon_driver.c
+++ b/xlators/cluster/nsr-recon/src/recon_driver.c
@@ -254,7 +254,7 @@ out:
* Output Arguments:
* buf - where the values are written one after the other (NULL seperated)
*/
-static void
+static int32_t
get_xattr(struct glfs_fd *fd,
char *keys,
char *buf,
@@ -277,7 +277,7 @@ get_xattr(struct glfs_fd *fd,
// TBD - handle error
if (r == -1)
- return;
+ return -1;
// increment the key to next value
keys += len;
@@ -285,7 +285,7 @@ get_xattr(struct glfs_fd *fd,
// increment buf to hold the next key
buf += strlen(buf) + 1;
}
- return;
+ return 0;
}
/*
@@ -296,7 +296,7 @@ get_xattr(struct glfs_fd *fd,
* keys - bunch of NULL seperated key names
* num - number of keys
*/
-static void delete_xattr(struct glfs_fd *fd,
+static int32_t delete_xattr(struct glfs_fd *fd,
dict_t *dict_t,
char *keys,
uint32_t num)
@@ -304,10 +304,11 @@ static void delete_xattr(struct glfs_fd *fd,
while(num--) {
// get the value and copy the value
// TBD - handle failure cases when calling glfs_fremovexattr_with_xdata()
- glfs_fremovexattr_with_xdata(fd, keys, dict_t);
+ if (glfs_fremovexattr_with_xdata(fd, keys, dict_t) == -1)
+ return -1;
keys += strlen(keys) +1;
}
- return;
+ return 0;
}
/*
@@ -320,7 +321,7 @@ static void delete_xattr(struct glfs_fd *fd,
* Each of the key-value is seperated by NULL in turn.
* num - Number of such key value pairs.
*/
-static void
+static int32_t
fill_xattr(struct glfs_fd *fd,
dict_t *dict,
char *buf,
@@ -336,10 +337,10 @@ fill_xattr(struct glfs_fd *fd,
// TBD - handle failure cases when calling glfs_fsetxattr_with_xdata()
r = glfs_fsetxattr_with_xdata(fd, k, val, strlen(val), 0, dict);
if (r == -1)
- return;
+ return -1;
k = val + strlen(val) + 1;
}
- return;
+ return 0;
}
/*
@@ -408,7 +409,7 @@ nsr_recon_get_file(char *vol, nsr_replica_worker_t *worker)
* ctx - The per worker based context
* control - set to true if this worker is for the control plane
*/
-static int
+static int32_t
nsr_recon_start_work(nsr_per_node_worker_t *ctx,
gf_boolean_t control)
{
@@ -497,6 +498,7 @@ nsr_recon_start_work(nsr_per_node_worker_t *ctx,
if (aux_fd == NULL) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"cannot open aux log file for thread %s\n",ctx->id);
+ return -1;
} else {
nsr_worker_log(this->name, GF_LOG_ERROR,
"---opened aux log file for thread %s\n",ctx->id);
@@ -517,7 +519,7 @@ nsr_recon_start_work(nsr_per_node_worker_t *ctx,
* ctx - The per worker based context
* control - set to true if this worker is for the control plane
*/
-static int
+static int32_t
nsr_recon_end_work(nsr_per_node_worker_t *ctx,
gf_boolean_t control)
{
@@ -648,13 +650,7 @@ control_worker_func_0(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"this message should not be sent \n");
- break;
- }
- case NSR_WORK_ID_END_RECONCILIATION:
- {
- nsr_worker_log(this->name, GF_LOG_INFO,
- "sending reconciliation end message to node %d\n", index);
- nsr_recon_return_back(priv, dr->txn_id);
+ ctx->result = -1;
break;
}
case NSR_WORK_ID_GET_RECONCILATION_WINDOW:
@@ -676,16 +672,29 @@ control_worker_func_0(nsr_per_node_worker_t *ctx,
rd = GF_CALLOC(num,
sizeof(nsr_recon_record_details_t),
gf_mt_recon_private_t);
+ if (rd == NULL) {
+ ctx->result = -1;
+ return;
+ }
+
recon_info->records = GF_CALLOC(num,
sizeof(nsr_reconciliation_record_t),
gf_mt_recon_private_t);
+ if (recon_info->records == NULL) {
+ ctx->result = -1;
+ return;
+ }
// TBD - handle errors
- nsr_recon_libchangelog_get_records(this, priv->changelog_base_path,
+ if (nsr_recon_libchangelog_get_records(this, priv->changelog_base_path,
recon_info->last_term,
recon_info->first_index,
recon_info->last_index,
- rd);
+ rd) == _gf_false) {
+ ctx->result = -1;
+ return;
+ }
+
// The above function writes into rd from 0 to (num -1)
// We need to take care of this whenever we deal with records
for (i=0; i < num; i++) {
@@ -778,7 +787,10 @@ control_worker_func(nsr_per_node_worker_t *ctx,
"calling nsr_recon_start_work\n");
// TBD - handle error in case nsr_recon_start_work gives error
- nsr_recon_start_work(ctx, _gf_true);
+ if (nsr_recon_start_work(ctx, _gf_true) != 0) {
+ ctx->result = -1;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished nsr_recon_start_work\n");
@@ -790,7 +802,10 @@ control_worker_func(nsr_per_node_worker_t *ctx,
"calling nsr_recon_end_work\n");
// TBD - handle error in case nsr_recon_end_work gives error
- nsr_recon_end_work(ctx, _gf_true);
+ if (nsr_recon_end_work(ctx, _gf_true) != 0) {
+ ctx->result = -1;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished nsr_recon_end_work\n");
@@ -807,9 +822,18 @@ control_worker_func(nsr_per_node_worker_t *ctx,
// first write the current term term number
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET);
- glfs_write(ctx->aux_fd, &term, sizeof(term), 0);
- glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0);
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl
recon_info->last_term = lt.last_term;
recon_info->commited_ops = lt.commited_ops;
@@ -834,9 +858,18 @@ control_worker_func(nsr_per_node_worker_t *ctx,
// first write the term number
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET);
- glfs_write(ctx->aux_fd, &term, sizeof(term), 0);
- glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0);
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl
recon_info->last_term = lt.last_term;
recon_info->commited_ops = lt.commited_ops;
@@ -863,9 +896,12 @@ control_worker_func(nsr_per_node_worker_t *ctx,
"trying to make this index %d as reconciliator for term %d\n", index, term);
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd,
+ if (glfs_lseek(ctx->aux_fd,
nsr_recon_xlator_sector_1,
- SEEK_SET);
+ SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
// We have all the info for all other nodes.
// Fill all that info when sending data to that process.
@@ -888,7 +924,15 @@ control_worker_func(nsr_per_node_worker_t *ctx,
rr.num = num;
rr.role = reconciliator;
ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl
- glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0);
+ if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) {
+ ctx->result = -1;
+ // Put the errno only for this case since we
+ // are bothered about retrying only for this case.
+ // For rest of the cases we will just return EIO
+ // in errno.
+ ctx->op_errno = errno;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"sent reconciliator info for term %d with node count as %d\n", term, num);
@@ -905,14 +949,18 @@ control_worker_func(nsr_per_node_worker_t *ctx,
"trying to make this index %d as resolutor with reconciliator as %d\n",index, rec);
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd,
+ if (glfs_lseek(ctx->aux_fd,
nsr_recon_xlator_sector_1,
- SEEK_SET);
+ SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+
rr.num = 2;
// Fill in info[0] as info for the node for which we are seeking resolution.
// Fill in info[1] as info of the reconciliator node.
- // The function nsr_recon_driver_set_role() that will be called when
+ // The function nsr_recon_driver_get_role() that will be called when
// this message reaches the node will look at index 1 for term information
// related to the reconciliator.
for (i=0; i < 2; i++) {
@@ -943,33 +991,21 @@ control_worker_func(nsr_per_node_worker_t *ctx,
}
rr.role = resolutor;
ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl
- glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0);
+ if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) {
+ ctx->result = -1;
+ // Put the errno only for this case since we
+ // are bothered about retrying only for this case.
+ // For rest of the cases we will just return EIO
+ // in errno.
+ ctx->op_errno = errno;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"sent message to this node %d resolutor with reconciliator as %d\n", index, rec);
break;
}
- case NSR_WORK_ID_END_RECONCILIATION:
- {
- char c[4];
- uint32_t old = htonl(dr->txn_id);
-
- nsr_worker_log(this->name, GF_LOG_INFO,
- "sending reconciliation end message to node %d\n", index);
-
- memcpy(c, &old, sizeof(uint32_t));
- // TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd,
- nsr_recon_xlator_sector_0,
- SEEK_SET);
- glfs_write(ctx->aux_fd, c, sizeof(c), 0);
-
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished sending reconciliation end message to node %d\n", index);
-
- break;
- }
case NSR_WORK_ID_GET_RECONCILATION_WINDOW:
{
nsr_recon_log_info_t li;
@@ -985,24 +1021,42 @@ control_worker_func(nsr_per_node_worker_t *ctx,
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET);
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
// write to node what term & indices we are interested
li.term = recon_info->last_term;
li.first_index = recon_info->first_index;
li.last_index = recon_info->last_index;
ENDIAN_CONVERSION_LI(li, _gf_false); //htonl
- glfs_write(ctx->aux_fd, &li, sizeof(li), 0);
+ if (glfs_write(ctx->aux_fd, &li, sizeof(li), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
// then read
rd = GF_CALLOC(num,
sizeof(nsr_recon_record_details_t),
gf_mt_recon_private_t);
+ if (rd == NULL) {
+ ctx->result = -1;
+ return;
+ }
recon_info->records = GF_CALLOC(num,
sizeof(nsr_reconciliation_record_t),
gf_mt_recon_private_t);
+ if (recon_info->records == NULL) {
+ ctx->result = -1;
+ return;
+ }
+
+ if (glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
- glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0);
for (i=0; i < num; i++) {
ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl
memcpy(&(recon_info->records[i].rec),
@@ -1132,7 +1186,7 @@ compute_reconciliation_work(nsr_recon_driver_ctx_t *ctx)
return;
}
-static void
+static int32_t
nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
uint32_t i,
gf_boolean_t in_use);
@@ -1145,14 +1199,14 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
gf_boolean_t
nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
nsr_recon_role_t *rr,
- uint32_t txn_id)
+ uint32_t term)
{
nsr_role_work_t *rw;
nsr_driver_log(this->name, GF_LOG_INFO, "set role called \n");
rw = GF_CALLOC(1, sizeof (nsr_role_work_t), 0);
memcpy(&rw->role, rr, sizeof(nsr_recon_role_t));
- rw->txn_id = txn_id;
+ rw->term = term;
INIT_LIST_HEAD(&(rw->list));
pthread_mutex_lock(&(ctx->mutex));
list_add_tail(&rw->list, &ctx->role_head.list);
@@ -1176,15 +1230,11 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
* If resolution to be done, then rr.info[0] will have this node's info
* which the leader would have got earlier. rr[1].info will have the
* info regarding the reconciliator.
- * txn_id - All role changes(except when leader becomes reconciliator or resolutor)
- * would be initiated as write to the recon xlator which would have got a frame from
- * either the brick process(leader change) or other reconciliation process.
- * The write function would return immediately after storing the frame which
- * needs to be returned back after the actual reconciliation is done.
- * For that we store the frame against this id which acts as a key.
+ * term - leader's term that is causing this role
*/
nsr_recon_driver_state_t
-nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
+nsr_recon_driver_get_role(int32_t *status,
+ nsr_recon_driver_ctx_t *ctx,
nsr_role_work_t *rw)
{
uint8_t i=0, j=0;
@@ -1192,17 +1242,24 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
// First make all the threads uninitialise
for (i = 0; i < ctx->replica_group_size; i++) {
- nsr_recon_in_use(ctx, i, _gf_false);
+ if (nsr_recon_in_use(ctx, i, _gf_false) == -1) {
+ *status = -1;
+ return 0;
+ }
}
if ((rr->role == leader) || (rr->role == joiner)) {
// First set info this node
- nsr_recon_in_use(ctx, 0, _gf_true);
+ if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
ctx->workers[0].recon_info = GF_CALLOC (1,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[0].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
ctx->current_term = rr->current_term;
@@ -1213,11 +1270,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
if (!strcmp(ctx->workers[i].name, rr->info[j].name)) {
//if (strstr(ctx->workers[i].name, rr->info[j].name)) {
nsr_driver_log(this->name, GF_LOG_INFO,
- "nsr_recon_driver_set_role: this as %s. found other server %s\n",
+ "nsr_recon_driver_get_role: this as %s. found other server %s\n",
(rr->role == leader) ? "leader" : "joiner",
ctx->workers[i].name);
- nsr_recon_in_use(ctx, i, _gf_true);
+ if (nsr_recon_in_use(ctx, i, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
// Allocate this here. This will get later filled when
// the leader tries to get last term information from all
// the nodes
@@ -1225,7 +1285,8 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[i].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
break;
}
@@ -1245,13 +1306,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
if (!strcmp(rr->info[i].name, ctx->workers[j].name)) {
//if (strstr(ctx->workers[j].name, rr->info[i].name)) {
nsr_driver_log(this->name, GF_LOG_INFO,
- "nsr_recon_driver_set_role: this as reconciliator. found other server %s\n",
+ "nsr_recon_driver_get_role: this as reconciliator. found other server %s\n",
ctx->workers[j].name);
ctx->workers[j].recon_info = GF_CALLOC (1,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[j].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
ctx->workers[j].recon_info->last_term =
rr->info[i].last_term;
@@ -1261,7 +1323,10 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
rr->info[i].last_index;
ctx->workers[j].recon_info->first_index =
rr->info[i].first_index;
- nsr_recon_in_use(ctx, j, _gf_true);
+ if (nsr_recon_in_use(ctx, j, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
break;
}
}
@@ -1272,13 +1337,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
//if (strstr(ctx->workers[j].name, rr->info[1].name)) {
if (!strcmp(rr->info[1].name, ctx->workers[j].name)) {
nsr_driver_log(this->name, GF_LOG_INFO,
- "nsr_recon_driver_set_role: this as resolutor. found other server %s as reconciliator\n",
- ctx->workers[1].name);
+ "nsr_recon_driver_get_role: this as resolutor. found other server %s as reconciliator\n",
+ ctx->workers[j].name);
ctx->workers[j].recon_info = GF_CALLOC (1,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[j].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
ctx->workers[j].recon_info->last_term =
rr->info[1].last_term;
@@ -1289,7 +1355,10 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
ctx->workers[j].recon_info->first_index =
rr->info[1].first_index;
ctx->reconciliator_index = j;
- nsr_recon_in_use(ctx, j, _gf_true);
+ if (nsr_recon_in_use(ctx, j, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
GF_ASSERT(ctx->reconciliator_index != 0);
break;
}
@@ -1298,18 +1367,23 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[0].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
// info[0] has all info for this node
ctx->workers[0].recon_info->last_term = rr->info[0].last_term;
ctx->workers[0].recon_info->commited_ops = rr->info[0].commited_ops;
ctx->workers[0].recon_info->last_index = rr->info[0].last_index;
ctx->workers[0].recon_info->first_index = rr->info[0].first_index;
- nsr_recon_in_use(ctx, 0, _gf_true);
+ if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
}
- ctx->txn_id = rw->txn_id;
+ ctx->term = rw->term;
+ *status = 0;
return rr->role;
}
@@ -1401,7 +1475,7 @@ create_obj(nsr_per_node_worker_t *ctx, char *gfid_str)
* and the changelog translator consumes term and index.
*/
-static void
+static gf_boolean_t
apply_record(nsr_per_node_worker_t *ctx,
nsr_reconciliation_record_t *ri,
dict_t * dict)
@@ -1420,10 +1494,11 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - get a way to just stuff the log entry without writing the data so that
// changelogs remain identical.
if (ri->work.data == NULL) {
- return;
+ return _gf_true;
}
- if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL)
+ return _gf_false;;
fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
if (fd == NULL) {
@@ -1431,23 +1506,28 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
if (glfs_lseek_with_xdata(fd, ri->rec.offset, SEEK_SET, dict) != ri->rec.offset) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"lseek for file %s failed at offset %d\n",
ri->rec.gfid, ri->rec.offset);
- return;
+ return _gf_false;
}
if (glfs_write_with_xdata(fd, ri->work.data, ri->rec.len, 0, dict) != ri->rec.len) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"write for file %s failed for bytes %d\n",
ri->rec.gfid, ri->rec.len);
- return;
+ return _gf_false;
}
- glfs_close_with_xdata(fd, dict);
+ if (glfs_close_with_xdata(fd, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "close failed\n");
+ return _gf_false;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"Finished DOing write for gfid %s @offset %d for len %d\n",
@@ -1459,7 +1539,7 @@ apply_record(nsr_per_node_worker_t *ctx,
"DOing truncate for file %s @offset %d \n",
ri->rec.gfid, ri->rec.offset);
- if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
if (fd == NULL) {
@@ -1467,16 +1547,21 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
if (glfs_ftruncate_with_xdata(fd, ri->rec.offset, dict) == -1) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"trunctae for file %s failed @offset %d\n",
ri->rec.gfid,ri->rec.offset );
- return;
+ return _gf_false;
}
- glfs_close_with_xdata(fd, dict);
+ if (glfs_close_with_xdata(fd, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "close failed\n");
+ return _gf_false;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"Finished DOing truncate for gfid %s @offset %d \n",
@@ -1499,10 +1584,10 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - get a way to just stuff the log entry without writing the data so that
// changelogs remain identical.
if (ri->work.data == NULL) {
- return;
+ return _gf_true;
}
- if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
if (obj->inode->ia_type == IA_IFDIR)
fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
@@ -1513,7 +1598,7 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) {
@@ -1521,10 +1606,15 @@ apply_record(nsr_per_node_worker_t *ctx,
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"list of xattr of %s failed\n", ri->rec.gfid);
- return;
+ return _gf_false;
}
- delete_xattr(fd, dict, t_b, num);
+ if (delete_xattr(fd, dict, t_b, num) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "deleting xattrs failed\n");
+ return _gf_false;
+ }
// Set one special dict flag to indicate the opcode so that
// the opcode gets set to this
@@ -1532,12 +1622,22 @@ apply_record(nsr_per_node_worker_t *ctx,
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"setting opcode to %d failed\n",ri->rec.op);
- return;
+ return _gf_false;
}
- fill_xattr(fd, dict, ri->work.data, ri->work.num);
+ if (fill_xattr(fd, dict, ri->work.data, ri->work.num) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "filling xattrs failed\n");
+ return _gf_false;
+ }
- glfs_close_with_xdata(fd, dict);
+ if (glfs_close_with_xdata(fd, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "close failed\n");
+ return _gf_false;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"Finsihed Doing set extended attr for %s \n",
@@ -1553,7 +1653,7 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - add mode and flags later
uuid_parse(ri->rec.gfid, gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
nsr_worker_log (this->name, GF_LOG_INFO,
"creating with mode 0%o", ri->rec.mode);
@@ -1562,7 +1662,7 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing create for file %s\n",
ri->rec.entry);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1579,14 +1679,14 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - add mode and flags later
uuid_parse(ri->rec.gfid, gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
if (glfs_h_mknod_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, 0777, NULL, gfid, dict) == NULL) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing mknod for file %s\n",
ri->rec.entry);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1603,14 +1703,14 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - add mode and flags later
uuid_parse(ri->rec.gfid, gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
if (glfs_h_mkdir_with_xdata(ctx->fs, obj, ri->rec.entry, 0777, NULL, gfid, dict) != 0) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing mkdir for file %s\n",
ri->rec.entry);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1623,13 +1723,13 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing rmdir/ublink for dir %s \n",
ri->rec.entry);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
if (glfs_h_unlink_with_xdata(ctx->fs, obj, ri->rec.entry, dict) != 0) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing rmdir/unlink for file %s\n",
ri->rec.entry);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1644,7 +1744,7 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing symlink for file %s to file %s \n",
ri->rec.entry, ri->rec.link_path);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
uuid_parse(ri->rec.gfid, gfid);
if (glfs_h_symlink_with_xdata(ctx->fs, obj, ri->rec.entry, ri->rec.link_path, NULL, gfid, dict) == NULL) {
@@ -1652,7 +1752,7 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failed to Doing symlink for file %s to file %s \n",
ri->rec.entry, ri->rec.link_path);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1667,15 +1767,15 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing hard link for file %s to file %s \n",
ri->rec.entry, ri->rec.gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
- if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
+ if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
if (glfs_h_link_with_xdata(ctx->fs, to_obj, obj, ri->rec.entry, dict) == -1) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failed to Doing hard link for file %s to file %s \n",
ri->rec.entry, ri->rec.gfid);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1690,15 +1790,15 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing rename for file %s to file %s \n",
ri->rec.entry, ri->rec.newloc);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
- if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
+ if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
if (glfs_h_rename_with_xdata(ctx->fs, obj, ri->rec.entry, to_obj, ri->rec.newloc, dict) == -1) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failed to Doing rename for file %s to file %s \n",
ri->rec.entry, ri->rec.newloc);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1719,7 +1819,7 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing attr for file %s \n",
ri->rec.gfid);
- if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
if (obj->inode->ia_type == IA_IFDIR)
fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
@@ -1730,7 +1830,7 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
iatt.ia_prot = ia_prot_from_st_mode(777);
@@ -1743,7 +1843,7 @@ apply_record(nsr_per_node_worker_t *ctx,
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"setting opcode to %d failed\n",ri->rec.op);
- return;
+ return _gf_false;
}
ret = glfs_fsetattr_with_xdata(fd, &iatt, valid, dict);
@@ -1752,17 +1852,22 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"failed Doing attr for file %s \n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
- glfs_close_with_xdata(fd, dict);
+ if (glfs_close_with_xdata(fd, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "close failed\n");
+ return _gf_false;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"Doing attr for file %s \n",
ri->rec.gfid);
}
- return;
+ return _gf_true;
}
//return back opcodes that requires reading from source
@@ -1816,7 +1921,10 @@ data_worker_func(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"started data ini \n");
- nsr_recon_start_work(ctx, _gf_false);
+ if (nsr_recon_start_work(ctx, _gf_false) != 0) {
+ ctx->result = -1;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished data ini \n");
@@ -1825,7 +1933,10 @@ data_worker_func(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"started data fini \n");
- nsr_recon_end_work(ctx, _gf_false);
+ if (nsr_recon_end_work(ctx, _gf_false) != 0) {
+ ctx->result = -1;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished data fini \n");
@@ -1838,18 +1949,21 @@ data_worker_func(nsr_per_node_worker_t *ctx,
dict = dict_new ();
if (!dict) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"failed allocating for dictionary\n");
break;
}
if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
break;
}
if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
@@ -1978,7 +2092,12 @@ data_worker_func(nsr_per_node_worker_t *ctx,
}
ri->work.data = GF_CALLOC ((k_s + v_s) , sizeof(char),
gf_mt_recon_private_t);
- get_xattr(fd, t_b, ri->work.data, v_s, num, dict);
+ if (get_xattr(fd, t_b, ri->work.data, v_s, num, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "get xattr of gfid %s failed\n", rd->gfid);
+ break;
+ }
ri->work.num = num;
nsr_worker_log(this->name, GF_LOG_INFO,
"finished getattr for gfid %s \n",
@@ -2014,24 +2133,30 @@ data_worker_func(nsr_per_node_worker_t *ctx,
wip, rd->gfid);
dict = dict_new ();
if (!dict) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"failed allocating for dictionary\n");
break;
}
if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
break;
}
if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
break;
}
- apply_record(ctx, ri, dict);
+ if (apply_record(ctx, ri, dict) == _gf_false) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "apply_record fails\n");
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished recon commit for gfid %s \n",
@@ -2041,18 +2166,21 @@ data_worker_func(nsr_per_node_worker_t *ctx,
case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH:
dict = dict_new ();
if (!dict) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"failed allocating for dictionary\n");
break;
}
if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
break;
}
if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
@@ -2198,7 +2326,9 @@ create_worker_threads(nsr_recon_private_t *priv,
* misc - used to overload such as index.
*/
static void
-send_and_wait(int32_t bm,
+send_and_wait(int32_t *result,
+ int32_t *op_errno,
+ int32_t bm,
uint32_t num,
nsr_recon_driver_ctx_t *ctx,
nsr_recon_work_req_id_t id,
@@ -2208,6 +2338,18 @@ send_and_wait(int32_t bm,
uint32_t i = 0;
nsr_recon_work_t *work;
+#define CONTROL_WORKER(i) ctx->workers[i].control_worker
+#define DATA_WORKER(i) ctx->workers[i].data_worker
+#define WORKER(i) ((q == NSR_RECON_QUEUE_TO_CONTROL) ? (CONTROL_WORKER(i)) : (DATA_WORKER(i)))
+
+ *result = *op_errno = 0;
+
+ for (i=0; i < num; i++) {
+ if ((bm & (1 << i)) && ctx->workers[i].in_use) {
+ WORKER(i)->result = 0;
+ WORKER(i)->op_errno = 0;
+ }
+ }
if (ctx->mode == NSR_SEQ) {
for (i=0; i < num; i++) {
if ((bm & (1 << i)) && ctx->workers[i].in_use) {
@@ -2222,8 +2364,7 @@ send_and_wait(int32_t bm,
}
}
}
- nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned\n");
- return;
+ goto out;
}
for (i=0; i < num; i++) {
@@ -2238,41 +2379,40 @@ send_and_wait(int32_t bm,
while (ctx->outstanding) {
pthread_yield();
}
- nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned\n");
- return;
-}
-
-#if 0
-static void
-send_and_do_not_wait(int32_t bm,
- uint32_t num,
- nsr_recon_driver_ctx_t *ctx,
- nsr_recon_work_req_id_t id,
- nsr_recon_queue_type_t q,
- int32_t misc)
-{
- uint32_t i = 0;
-
- for (i=0; i < num; i++) {
+out:
+ for (i=0; i < num; i++) {
if ((bm & (1 << i)) && ctx->workers[i].in_use) {
- nsr_recon_work_t *work;
- recon_make_work(&work, id, misc);
- recon_queue_to_worker(ctx, work, i, q);
- }
- }
+ if (WORKER(i)->result == -1) {
+ *result = -1;
+ }
+ }
+ }
+ if (*result == -1) {
+ for (i=0; i < num; i++) {
+ if ((bm & (1 << i)) && ctx->workers[i].in_use) {
+ if (WORKER(i)->op_errno == EAGAIN) {
+ *op_errno = EAGAIN;
+ break;
+ } else {
+ *op_errno = EIO;
+ }
+ }
+ }
+ }
+ nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned with result: %d errno:%d\n", *result, *op_errno);
return;
}
-#endif
// send INI or FINI
-static void
+static int32_t
nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
uint32_t i,
gf_boolean_t in_use)
{
uint32_t bm = 1 << i;
gf_boolean_t send = _gf_false;
+ int32_t status =0, op_errno = 0;
if (in_use == _gf_false) {
if (ctx->workers[i].in_use == _gf_true)
@@ -2284,25 +2424,28 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
send = _gf_true;
}
}
-#if 1
if (send == _gf_true) {
if (in_use == _gf_true) {
nsr_driver_log(this->name, GF_LOG_INFO, "sending INI to index %d\n",i);
} else {
nsr_driver_log(this->name, GF_LOG_INFO, "sending FINI to index %d\n",i);
}
- send_and_wait(bm, ctx->replica_group_size, ctx,
+ send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx,
(in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
NSR_RECON_QUEUE_TO_CONTROL, -1);
- send_and_wait(bm, ctx->replica_group_size, ctx,
+ if (status == -1)
+ return -1;
+ send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx,
(in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
NSR_RECON_QUEUE_TO_DATA, -1);
+ if (status == -1)
+ return -1;
if (in_use == _gf_false) {
//GF_FREE(ctx->workers[i].recon_info->records);
GF_FREE(ctx->workers[i].recon_info);
}
}
-#endif
+ return 0;
}
// main recon driver thread
@@ -2317,6 +2460,8 @@ nsr_reconciliation_driver(void *arg)
int32_t bm;
xlator_t *this = priv->this;
char *con_name, *data_name;
+ int32_t status = 0;
+ int32_t op_errno = 0;
driver_ctx = &priv->driver_thread_context;
(*driver_ctx) = GF_CALLOC (1,
@@ -2414,7 +2559,12 @@ nsr_reconciliation_driver(void *arg)
list_for_each_entry(rr, &(ctx->role_head.list), list) {
nsr_recon_driver_state_t state;
- state = nsr_recon_driver_get_role(ctx, rr);
+ state = nsr_recon_driver_get_role(&status, ctx, rr);
+
+ if (status == -1) {
+ op_errno = EIO;
+ goto out;
+ }
if (state == leader) {
@@ -2423,11 +2573,13 @@ nsr_reconciliation_driver(void *arg)
nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n");
// Get last term info from all members for this group
- send_and_wait(-1,
+ send_and_wait(&status, &op_errno, -1,
replica_group_size,
ctx,
NSR_WORK_ID_GET_LAST_TERM_INFO,
NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term);
+ if (status == -1)
+ goto out;
// compare all the info received and choose the reconciliator
@@ -2469,11 +2621,13 @@ nsr_reconciliation_driver(void *arg)
if (chosen != 0) {
nsr_driver_log (this->name, GF_LOG_INFO, "sending reconciliation work to %d\n", chosen);
bm = 1 << ctx->reconciliator_index;
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_RECONCILIATOR_DO_WORK,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work to %d\n", chosen);
} else {
nsr_driver_log (this->name, GF_LOG_INFO, "local node is reconciliator. before set jmp\n");
@@ -2517,11 +2671,13 @@ nsr_reconciliation_driver(void *arg)
nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this node and reconciliator\n");
bm = ~((1 << ctx->reconciliator_index) || 1);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_RESOLUTION_DO_WORK,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work as leader \n");
@@ -2564,11 +2720,13 @@ i_am_reconciliator:
// We have set the bm in the above for loop where
// we go thru all nodes including this node that
// have seen the last term.
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_GET_RECONCILATION_WINDOW,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished getting reconciliation window for term %d from %dto %d \n",
ctx->workers[0].recon_info->last_term,
@@ -2604,12 +2762,14 @@ i_am_reconciliator:
bm = (1 << record->work.source);
nsr_driver_log (this->name, GF_LOG_INFO,
"reading data from source %d\n",record->work.source);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_READ,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"got data from source %d\n",record->work.source);
}
@@ -2618,12 +2778,14 @@ i_am_reconciliator:
"fixing local data as part of reconciliation\n");
bm = 1;
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished fixing local data as part of reconciliation\n");
@@ -2634,12 +2796,14 @@ i_am_reconciliator:
nsr_driver_log (this->name, GF_LOG_INFO,
"fixing this record as a fill\n");
bm = 1;
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished fixing this record as a fill\n");
}
@@ -2691,11 +2855,13 @@ i_am_resolutor:
nsr_driver_log (this->name, GF_LOG_INFO,
"getting info from reconciliator for term %d \n", my_info->last_term);
bm = (1 << recon_index);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_GET_GIVEN_TERM_INFO,
NSR_RECON_QUEUE_TO_CONTROL, his_info->last_term);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished getting info from reconciliator for term %d \n", my_info->last_term);
@@ -2739,11 +2905,13 @@ i_am_resolutor:
"getting reconciliation window for term %d from %d to %d \n",
my_info->last_term,
my_info->first_index, my_info->last_index);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_GET_RECONCILATION_WINDOW,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished getting reconciliation window for term %d from %d to %d \n",
my_info->last_term,
@@ -2771,12 +2939,14 @@ i_am_resolutor:
nsr_driver_log (this->name, GF_LOG_INFO,
"reading data from source %d\n",recon_index);
bm = (1 << recon_index);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_READ,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished reading data from source %d\n",recon_index);
}
@@ -2785,12 +2955,14 @@ i_am_resolutor:
"fixing local data as part of resolutor\n");
bm = 1;
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished fixing local data as part of resolutor\n");
@@ -2816,21 +2988,25 @@ i_am_resolutor:
nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n");
// Get last term info from all members for this group
// which will be the leader(this node) and the node that wants to join.
- send_and_wait(-1,
+ send_and_wait(&status, &op_errno, -1,
replica_group_size,
ctx,
NSR_WORK_ID_GET_LAST_TERM_INFO,
NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term);
+ if (status == -1)
+ goto out;
// send message to other node that just joined to sync up with this node which is also the leader
nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this\n");
bm = ~(1);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_RESOLUTION_DO_WORK,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished recon work as joiner \n");
@@ -2843,16 +3019,7 @@ i_am_resolutor:
out:
nsr_driver_log (this->name, GF_LOG_INFO,
"sending end of reconciliation message \n");
- nsr_recon_return_back(priv, ctx->txn_id);
-#if 0
- // send message that job is done by writing to local recon translator
- bm = 1;
- send_and_wait(bm,
- replica_group_size,
- ctx,
- NSR_WORK_ID_END_RECONCILIATION,
- NSR_RECON_QUEUE_TO_CONTROL, -1);
-#endif
+ nsr_recon_return_back(priv, ctx->term, status, op_errno);
nsr_driver_log (this->name, GF_LOG_INFO,
"finished sending end of reconciliation message \n");
}
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.h b/xlators/cluster/nsr-recon/src/recon_driver.h
index 4030c9d73..8d87e29af 100644
--- a/xlators/cluster/nsr-recon/src/recon_driver.h
+++ b/xlators/cluster/nsr-recon/src/recon_driver.h
@@ -181,7 +181,7 @@ typedef struct nsr_recon_record_details_s {
typedef struct _nsr_role_work_s {
nsr_recon_role_t role;
- uint32_t txn_id;
+ uint32_t term;
struct list_head list;
} nsr_role_work_t;
@@ -236,6 +236,8 @@ typedef struct _nsr_per_node_worker_s {
#if defined(NSR_DEBUG)
FILE *fp;
#endif
+ int32_t result; // result of latest work
+ int32_t op_errno; // errno
} nsr_per_node_worker_t;
typedef struct _nsr_replica_worker_s {
@@ -256,7 +258,7 @@ typedef struct _nsr_recon_driver_ctxt {
nsr_role_work_t role_head;
volatile int32_t outstanding;
uint32_t reconciliator_index;
- uint32_t txn_id;
+ uint32_t term;
uint32_t current_term;
jmp_buf *env;
nsr_mode_t mode; // default set to seq
@@ -269,7 +271,7 @@ void *
nsr_reconciliation_driver(void *);
gf_boolean_t
-nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, uint32_t txn_id);
+nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, uint32_t term);
#define atomic_inc(ptr) ((void) __sync_fetch_and_add(ptr, 1))
#define atomic_dec(ptr) ((void) __sync_fetch_and_add(ptr, -1))
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.c b/xlators/cluster/nsr-recon/src/recon_xlator.c
index c3c8d4d55..868377bd2 100644
--- a/xlators/cluster/nsr-recon/src/recon_xlator.c
+++ b/xlators/cluster/nsr-recon/src/recon_xlator.c
@@ -32,13 +32,6 @@ typedef struct _nsr_recon_fd_s {
call_frame_t *frame;
} nsr_recon_fd_t;
-
-typedef struct _nsr_txn_id_s {
- uint32_t txn_id;
- call_frame_t *frame;
- struct list_head list;
-} nsr_txn_id_t;
-
#if defined(NSR_DEBUG)
void
@@ -81,37 +74,34 @@ static int32_t this_fd_ctx_get(fd_t *fd, xlator_t *this, nsr_recon_fd_t **rfd)
}
}
-// Add the frame in q after associating with txn_id
+// Add the frame in q after associating with term
+// term usage tbd
static void put_frame(nsr_recon_private_t *priv,
call_frame_t *frame,
- uint32_t txn_id)
+ uint32_t term)
{
- xlator_t *this = priv->this;
- nsr_txn_id_t * tid = GF_CALLOC(1, sizeof(nsr_txn_id_t), gf_mt_recon_private_t);
- tid->txn_id = txn_id;
- tid->frame = frame;
- INIT_LIST_HEAD(&(tid->list));
- list_add_tail(&(tid->list), &(priv->list));
- recon_main_log (this->name, GF_LOG_INFO, "adding framef or txn id %d into queue \n", txn_id);
+ xlator_t *this = priv->this;
+ recon_main_log (this->name, GF_LOG_INFO, "adding frame for term %d \n", term);
+ priv->frame = frame;
+ return;
}
-// get the frame from the queue given the txn id
+// get the frame from the queue given the term
+// term usage tbd
static void get_frame(nsr_recon_private_t *priv,
call_frame_t **frame,
- uint32_t txn_id)
+ uint32_t term)
{
- nsr_txn_id_t *tid = NULL;
- xlator_t *this = priv->this;
+ if (frame != NULL)
+ *frame = priv->frame;
+ priv->frame = NULL;
+ return;
+}
- list_for_each_entry(tid, &(priv->list), list) {
- if (tid->txn_id == txn_id) {
- *frame = tid->frame;
- recon_main_log (this->name, GF_LOG_INFO, "got frame for txn id %d into queue \n", txn_id);
- return;
- }
- }
- recon_main_log (this->name, GF_LOG_INFO, "got no frame for txn id %d into queue \n", txn_id);
- GF_ASSERT(0);
+// check if there are outstanding frames
+static gf_boolean_t is_frame(nsr_recon_private_t *priv)
+{
+ return((priv->frame != NULL) ? _gf_true : _gf_false);
}
#define ENTRY_SIZE 128
@@ -215,19 +205,17 @@ void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t
return;
}
-// Return back the frame stored against the txn_id
-void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t txn_id)
+// Return back the frame stored against the term
+void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term, int32_t status, int32_t op_errno)
{
call_frame_t *old_frame = NULL;
xlator_t *this = priv->this;
- int32_t op_ret = 0;
- int32_t op_errno = 0;
- get_frame(priv, &old_frame, txn_id);
+ get_frame(priv, &old_frame, term);
if (old_frame) {
recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev returns old frame \n");
// first return the original write for which this ack was sent
- STACK_UNWIND_STRICT (writev, old_frame, op_ret, op_errno, NULL, NULL, NULL);
+ STACK_UNWIND_STRICT (writev, old_frame, status, op_errno, NULL, NULL, NULL);
} else {
recon_main_log (this->name, GF_LOG_ERROR, "EIII---nsr_recon_writev cnnot return old frame \n");
}
@@ -289,7 +277,7 @@ get_link_using_gfid(nsr_recon_private_t *priv, char *gfid, char *path)
//
// Really, 90% of this code should just GO AWAY in favor of using
// libgfchangelog, enhanced as necessary to support our needs.
-void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf)
+gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf)
{
// do a mmap; seek into the first and read all records till last.
// TBD - right now all records are pseudo holes but mark them as fills.
@@ -307,7 +295,9 @@ void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term,
sprintf(path,"%s/%s%d",bp,"TERM.",term);
fd = open(path, O_RDONLY);
- if (fd != -1) {
+ if (fd == -1) {
+ return _gf_false;
+ } else {
char *start = NULL;
nsr_recon_record_details_t * rec = (nsr_recon_record_details_t *)buf;
@@ -315,7 +305,9 @@ void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term,
lseek(fd, 128, SEEK_SET);
else
lseek(fd, first * 128, SEEK_SET);
- read(fd, rb, (last - first + 1) * 128);
+ if (read(fd, rb, (last - first + 1) * 128) == -1) {
+ return _gf_false;
+ }
start = rb;
index = first;
do {
@@ -532,7 +524,7 @@ finish:
recon_main_log (this->name, GF_LOG_INFO,
"libchangelog_get_records finsihed inspecting records for term %d \n",
term);
- return;
+ return _gf_true;
}
int32_t
@@ -580,20 +572,6 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called for offset %d \n",(unsigned int)offset );
GF_ASSERT(count == 1);
switch (offset) {
- // gets called to return back
- case nsr_recon_xlator_sector_0:
- {
- char c[4];
- uint32_t txn_id;
-
- recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev clled to return back \n");
- memcpy((void *)c, (void *)vector[0].iov_base, 4);
- txn_id = ntohl(atoi(c));
- nsr_recon_return_back(priv, txn_id);
- STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno,
- NULL, NULL, NULL);
- break;
- }
// client(brick, leader) writes the role of the node
case nsr_recon_xlator_sector_1 :
{
@@ -614,22 +592,33 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
GF_ASSERT(rr.num <= MAXIMUM_REPLICA_STRENGTH);
- // Store the stack frame so that when the actual job gets finished
- // we send the response back to the brick.
- if (nsr_recon_driver_set_role(priv->driver_thread_context,
- &rr,
- priv->txn_id) == _gf_false) {
+ // Check if already a role play is going on. If yes return with EAGAIN.
+ // Ideally we should check if we have got a higher term number while
+ // servicing a lower term number; if so abort the older one.
+ // However the abort infrastructure needs to be sketched properly; TBD.
+ if (is_frame(priv) == _gf_true) {
recon_main_log (this->name, GF_LOG_ERROR,
- "nsr_recon_writev set_role - cannot seem to set role \n");
- STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ "nsr_recon_writev set_role - already role play \n");
+ STACK_UNWIND_STRICT (writev, frame, -1, EAGAIN,
NULL, NULL, NULL);
- } else {
- uint32_t old = priv->txn_id;
- atomic_cmpxchg(&priv->txn_id, old,old+1);
- put_frame(priv, frame, old);
- recon_main_log (this->name, GF_LOG_INFO,
- "nsr_recon_writev set_role - set role succesfully \n");
- }
+ } else {
+
+ // Store the stack frame so that when the actual job gets finished
+ // we send the response back to the brick.
+ put_frame(priv, frame, rr.current_term);
+ if (nsr_recon_driver_set_role(priv->driver_thread_context,
+ &rr,
+ rr.current_term) == _gf_false) {
+ get_frame(priv, NULL, rr.current_term);
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "nsr_recon_writev set_role - cannot seem to set role \n");
+ STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ } else {
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_writev set_role - set role succesfully \n");
+ }
+ }
break;
}
// client(reconciliator) writes how much it needs for the read
@@ -679,6 +668,14 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
NULL, NULL, NULL);
break;
}
+ default:
+ {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "nsr_recon_writev called with wrong offset\n");
+ STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ break;
+ }
}
return 0;
@@ -764,6 +761,13 @@ nsr_recon_readv (call_frame_t *frame, xlator_t *this,
memcpy(iobuf->ptr, &lt, size);
goto out;
}
+ default:
+ {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "nsr_recon_readv called with wrong offset\n");
+ op_errno = -1;
+ break;
+ }
}
out:
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.h b/xlators/cluster/nsr-recon/src/recon_xlator.h
index 8c48f6ff6..c92489db1 100644
--- a/xlators/cluster/nsr-recon/src/recon_xlator.h
+++ b/xlators/cluster/nsr-recon/src/recon_xlator.h
@@ -70,8 +70,8 @@ _recon_main_log (const char *func, int line, char *member, FILE *fp,
void nsr_recon_libchangelog_get_this_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt);
void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt);
-void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term_id);
-void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf);
+void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term, int32_t status, int32_t op_errno);
+gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf);
#endif /* #ifndef __RECON_XLATOR_H__ */
diff --git a/xlators/cluster/nsr-server/src/recon_notify.c b/xlators/cluster/nsr-server/src/recon_notify.c
index 7a0de85b1..7397192ae 100644
--- a/xlators/cluster/nsr-server/src/recon_notify.c
+++ b/xlators/cluster/nsr-server/src/recon_notify.c
@@ -120,12 +120,49 @@ nsr_recon_set_leader (xlator_t *this)
// in the callback (once reconciliation is done),
// we will unfence the IOs.
// TBD - error handling later.
- glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET);
+ if (glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "doing lseek failed\n");
+ return;
+ }
+
glusterfs_this_set(old);
gf_log (this->name, GF_LOG_INFO,
"Writing to local node to set leader");
- glfs_write(ctx->fd, &role,
- sizeof(role), 0);
+ do {
+ if (priv->leader != _gf_true) {
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_ERROR, "no longer leader\n");
+ return;
+ }
+ if (glfs_write(ctx->fd, &role, sizeof(role), 0) == -1) {
+ if (errno == EAGAIN) {
+ // Wait for old reconciliation to bail out.
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_ERROR,
+ "write failed with retry. retrying after some time\n");
+ sleep(5);
+ continue;
+ }
+ else{
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_ERROR,
+ "doing write failed\n");
+ // This is because reconciliation has returned with error
+ // because some node has died in between.
+ // What should be done? Either we retry being leader
+ // or hook to CHILD_DOWN notification.
+ // Put that logic later. As of now we will just retry.
+ // This is easier.
+ sleep(5);
+ continue;
+ }
+ } else {
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO, "doing write with success\n");
+ break;
+ }
+ } while(1);
glusterfs_this_set(old);
gf_log (this->name, GF_LOG_INFO,
"glfs_write returned. unfencing IO\n");