summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/nsr-recon/src/recon_xlator.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/nsr-recon/src/recon_xlator.c')
-rw-r--r--xlators/cluster/nsr-recon/src/recon_xlator.c138
1 files changed, 71 insertions, 67 deletions
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.c b/xlators/cluster/nsr-recon/src/recon_xlator.c
index c3c8d4d55..868377bd2 100644
--- a/xlators/cluster/nsr-recon/src/recon_xlator.c
+++ b/xlators/cluster/nsr-recon/src/recon_xlator.c
@@ -32,13 +32,6 @@ typedef struct _nsr_recon_fd_s {
call_frame_t *frame;
} nsr_recon_fd_t;
-
-typedef struct _nsr_txn_id_s {
- uint32_t txn_id;
- call_frame_t *frame;
- struct list_head list;
-} nsr_txn_id_t;
-
#if defined(NSR_DEBUG)
void
@@ -81,37 +74,34 @@ static int32_t this_fd_ctx_get(fd_t *fd, xlator_t *this, nsr_recon_fd_t **rfd)
}
}
-// Add the frame in q after associating with txn_id
+// Add the frame in q after associating with term
+// term usage tbd
static void put_frame(nsr_recon_private_t *priv,
call_frame_t *frame,
- uint32_t txn_id)
+ uint32_t term)
{
- xlator_t *this = priv->this;
- nsr_txn_id_t * tid = GF_CALLOC(1, sizeof(nsr_txn_id_t), gf_mt_recon_private_t);
- tid->txn_id = txn_id;
- tid->frame = frame;
- INIT_LIST_HEAD(&(tid->list));
- list_add_tail(&(tid->list), &(priv->list));
- recon_main_log (this->name, GF_LOG_INFO, "adding framef or txn id %d into queue \n", txn_id);
+ xlator_t *this = priv->this;
+ recon_main_log (this->name, GF_LOG_INFO, "adding frame for term %d \n", term);
+ priv->frame = frame;
+ return;
}
-// get the frame from the queue given the txn id
+// get the frame from the queue given the term
+// term usage tbd
static void get_frame(nsr_recon_private_t *priv,
call_frame_t **frame,
- uint32_t txn_id)
+ uint32_t term)
{
- nsr_txn_id_t *tid = NULL;
- xlator_t *this = priv->this;
+ if (frame != NULL)
+ *frame = priv->frame;
+ priv->frame = NULL;
+ return;
+}
- list_for_each_entry(tid, &(priv->list), list) {
- if (tid->txn_id == txn_id) {
- *frame = tid->frame;
- recon_main_log (this->name, GF_LOG_INFO, "got frame for txn id %d into queue \n", txn_id);
- return;
- }
- }
- recon_main_log (this->name, GF_LOG_INFO, "got no frame for txn id %d into queue \n", txn_id);
- GF_ASSERT(0);
+// check if there are outstanding frames
+static gf_boolean_t is_frame(nsr_recon_private_t *priv)
+{
+ return((priv->frame != NULL) ? _gf_true : _gf_false);
}
#define ENTRY_SIZE 128
@@ -215,19 +205,17 @@ void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t
return;
}
-// Return back the frame stored against the txn_id
-void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t txn_id)
+// Return back the frame stored against the term
+void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term, int32_t status, int32_t op_errno)
{
call_frame_t *old_frame = NULL;
xlator_t *this = priv->this;
- int32_t op_ret = 0;
- int32_t op_errno = 0;
- get_frame(priv, &old_frame, txn_id);
+ get_frame(priv, &old_frame, term);
if (old_frame) {
recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev returns old frame \n");
// first return the original write for which this ack was sent
- STACK_UNWIND_STRICT (writev, old_frame, op_ret, op_errno, NULL, NULL, NULL);
+ STACK_UNWIND_STRICT (writev, old_frame, status, op_errno, NULL, NULL, NULL);
} else {
recon_main_log (this->name, GF_LOG_ERROR, "EIII---nsr_recon_writev cnnot return old frame \n");
}
@@ -289,7 +277,7 @@ get_link_using_gfid(nsr_recon_private_t *priv, char *gfid, char *path)
//
// Really, 90% of this code should just GO AWAY in favor of using
// libgfchangelog, enhanced as necessary to support our needs.
-void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf)
+gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf)
{
// do a mmap; seek into the first and read all records till last.
// TBD - right now all records are pseudo holes but mark them as fills.
@@ -307,7 +295,9 @@ void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term,
sprintf(path,"%s/%s%d",bp,"TERM.",term);
fd = open(path, O_RDONLY);
- if (fd != -1) {
+ if (fd == -1) {
+ return _gf_false;
+ } else {
char *start = NULL;
nsr_recon_record_details_t * rec = (nsr_recon_record_details_t *)buf;
@@ -315,7 +305,9 @@ void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term,
lseek(fd, 128, SEEK_SET);
else
lseek(fd, first * 128, SEEK_SET);
- read(fd, rb, (last - first + 1) * 128);
+ if (read(fd, rb, (last - first + 1) * 128) == -1) {
+ return _gf_false;
+ }
start = rb;
index = first;
do {
@@ -532,7 +524,7 @@ finish:
recon_main_log (this->name, GF_LOG_INFO,
"libchangelog_get_records finsihed inspecting records for term %d \n",
term);
- return;
+ return _gf_true;
}
int32_t
@@ -580,20 +572,6 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called for offset %d \n",(unsigned int)offset );
GF_ASSERT(count == 1);
switch (offset) {
- // gets called to return back
- case nsr_recon_xlator_sector_0:
- {
- char c[4];
- uint32_t txn_id;
-
- recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev clled to return back \n");
- memcpy((void *)c, (void *)vector[0].iov_base, 4);
- txn_id = ntohl(atoi(c));
- nsr_recon_return_back(priv, txn_id);
- STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno,
- NULL, NULL, NULL);
- break;
- }
// client(brick, leader) writes the role of the node
case nsr_recon_xlator_sector_1 :
{
@@ -614,22 +592,33 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
GF_ASSERT(rr.num <= MAXIMUM_REPLICA_STRENGTH);
- // Store the stack frame so that when the actual job gets finished
- // we send the response back to the brick.
- if (nsr_recon_driver_set_role(priv->driver_thread_context,
- &rr,
- priv->txn_id) == _gf_false) {
+ // Check if already a role play is going on. If yes return with EAGAIN.
+ // Ideally we should check if we have got a higher term number while
+ // servicing a lower term number; if so abort the older one.
+ // However the abort infrastructure needs to be sketched properly; TBD.
+ if (is_frame(priv) == _gf_true) {
recon_main_log (this->name, GF_LOG_ERROR,
- "nsr_recon_writev set_role - cannot seem to set role \n");
- STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ "nsr_recon_writev set_role - already role play \n");
+ STACK_UNWIND_STRICT (writev, frame, -1, EAGAIN,
NULL, NULL, NULL);
- } else {
- uint32_t old = priv->txn_id;
- atomic_cmpxchg(&priv->txn_id, old,old+1);
- put_frame(priv, frame, old);
- recon_main_log (this->name, GF_LOG_INFO,
- "nsr_recon_writev set_role - set role succesfully \n");
- }
+ } else {
+
+ // Store the stack frame so that when the actual job gets finished
+ // we send the response back to the brick.
+ put_frame(priv, frame, rr.current_term);
+ if (nsr_recon_driver_set_role(priv->driver_thread_context,
+ &rr,
+ rr.current_term) == _gf_false) {
+ get_frame(priv, NULL, rr.current_term);
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "nsr_recon_writev set_role - cannot seem to set role \n");
+ STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ } else {
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_writev set_role - set role succesfully \n");
+ }
+ }
break;
}
// client(reconciliator) writes how much it needs for the read
@@ -679,6 +668,14 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
NULL, NULL, NULL);
break;
}
+ default:
+ {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "nsr_recon_writev called with wrong offset\n");
+ STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ break;
+ }
}
return 0;
@@ -764,6 +761,13 @@ nsr_recon_readv (call_frame_t *frame, xlator_t *this,
memcpy(iobuf->ptr, &lt, size);
goto out;
}
+ default:
+ {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "nsr_recon_readv called with wrong offset\n");
+ op_errno = -1;
+ break;
+ }
}
out: