diff options
Diffstat (limited to 'xlators/cluster/nsr-recon/src/recon_xlator.c')
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_xlator.c | 138 |
1 files changed, 71 insertions, 67 deletions
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.c b/xlators/cluster/nsr-recon/src/recon_xlator.c index c3c8d4d55..868377bd2 100644 --- a/xlators/cluster/nsr-recon/src/recon_xlator.c +++ b/xlators/cluster/nsr-recon/src/recon_xlator.c @@ -32,13 +32,6 @@ typedef struct _nsr_recon_fd_s { call_frame_t *frame; } nsr_recon_fd_t; - -typedef struct _nsr_txn_id_s { - uint32_t txn_id; - call_frame_t *frame; - struct list_head list; -} nsr_txn_id_t; - #if defined(NSR_DEBUG) void @@ -81,37 +74,34 @@ static int32_t this_fd_ctx_get(fd_t *fd, xlator_t *this, nsr_recon_fd_t **rfd) } } -// Add the frame in q after associating with txn_id +// Add the frame in q after associating with term +// term usage tbd static void put_frame(nsr_recon_private_t *priv, call_frame_t *frame, - uint32_t txn_id) + uint32_t term) { - xlator_t *this = priv->this; - nsr_txn_id_t * tid = GF_CALLOC(1, sizeof(nsr_txn_id_t), gf_mt_recon_private_t); - tid->txn_id = txn_id; - tid->frame = frame; - INIT_LIST_HEAD(&(tid->list)); - list_add_tail(&(tid->list), &(priv->list)); - recon_main_log (this->name, GF_LOG_INFO, "adding framef or txn id %d into queue \n", txn_id); + xlator_t *this = priv->this; + recon_main_log (this->name, GF_LOG_INFO, "adding frame for term %d \n", term); + priv->frame = frame; + return; } -// get the frame from the queue given the txn id +// get the frame from the queue given the term +// term usage tbd static void get_frame(nsr_recon_private_t *priv, call_frame_t **frame, - uint32_t txn_id) + uint32_t term) { - nsr_txn_id_t *tid = NULL; - xlator_t *this = priv->this; + if (frame != NULL) + *frame = priv->frame; + priv->frame = NULL; + return; +} - list_for_each_entry(tid, &(priv->list), list) { - if (tid->txn_id == txn_id) { - *frame = tid->frame; - recon_main_log (this->name, GF_LOG_INFO, "got frame for txn id %d into queue \n", txn_id); - return; - } - } - recon_main_log (this->name, GF_LOG_INFO, "got no frame for txn id %d into queue \n", txn_id); - GF_ASSERT(0); +// check if there are outstanding frames +static gf_boolean_t is_frame(nsr_recon_private_t *priv) +{ + return((priv->frame != NULL) ? _gf_true : _gf_false); } #define ENTRY_SIZE 128 @@ -215,19 +205,17 @@ void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t return; } -// Return back the frame stored against the txn_id -void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t txn_id) +// Return back the frame stored against the term +void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term, int32_t status, int32_t op_errno) { call_frame_t *old_frame = NULL; xlator_t *this = priv->this; - int32_t op_ret = 0; - int32_t op_errno = 0; - get_frame(priv, &old_frame, txn_id); + get_frame(priv, &old_frame, term); if (old_frame) { recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev returns old frame \n"); // first return the original write for which this ack was sent - STACK_UNWIND_STRICT (writev, old_frame, op_ret, op_errno, NULL, NULL, NULL); + STACK_UNWIND_STRICT (writev, old_frame, status, op_errno, NULL, NULL, NULL); } else { recon_main_log (this->name, GF_LOG_ERROR, "EIII---nsr_recon_writev cnnot return old frame \n"); } @@ -289,7 +277,7 @@ get_link_using_gfid(nsr_recon_private_t *priv, char *gfid, char *path) // // Really, 90% of this code should just GO AWAY in favor of using // libgfchangelog, enhanced as necessary to support our needs. -void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf) +gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf) { // do a mmap; seek into the first and read all records till last. // TBD - right now all records are pseudo holes but mark them as fills. @@ -307,7 +295,9 @@ void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, sprintf(path,"%s/%s%d",bp,"TERM.",term); fd = open(path, O_RDONLY); - if (fd != -1) { + if (fd == -1) { + return _gf_false; + } else { char *start = NULL; nsr_recon_record_details_t * rec = (nsr_recon_record_details_t *)buf; @@ -315,7 +305,9 @@ void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, lseek(fd, 128, SEEK_SET); else lseek(fd, first * 128, SEEK_SET); - read(fd, rb, (last - first + 1) * 128); + if (read(fd, rb, (last - first + 1) * 128) == -1) { + return _gf_false; + } start = rb; index = first; do { @@ -532,7 +524,7 @@ finish: recon_main_log (this->name, GF_LOG_INFO, "libchangelog_get_records finsihed inspecting records for term %d \n", term); - return; + return _gf_true; } int32_t @@ -580,20 +572,6 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called for offset %d \n",(unsigned int)offset ); GF_ASSERT(count == 1); switch (offset) { - // gets called to return back - case nsr_recon_xlator_sector_0: - { - char c[4]; - uint32_t txn_id; - - recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev clled to return back \n"); - memcpy((void *)c, (void *)vector[0].iov_base, 4); - txn_id = ntohl(atoi(c)); - nsr_recon_return_back(priv, txn_id); - STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, - NULL, NULL, NULL); - break; - } // client(brick, leader) writes the role of the node case nsr_recon_xlator_sector_1 : { @@ -614,22 +592,33 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, GF_ASSERT(rr.num <= MAXIMUM_REPLICA_STRENGTH); - // Store the stack frame so that when the actual job gets finished - // we send the response back to the brick. - if (nsr_recon_driver_set_role(priv->driver_thread_context, - &rr, - priv->txn_id) == _gf_false) { + // Check if already a role play is going on. If yes return with EAGAIN. + // Ideally we should check if we have got a higher term number while + // servicing a lower term number; if so abort the older one. + // However the abort infrastructure needs to be sketched properly; TBD. + if (is_frame(priv) == _gf_true) { recon_main_log (this->name, GF_LOG_ERROR, - "nsr_recon_writev set_role - cannot seem to set role \n"); - STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + "nsr_recon_writev set_role - already role play \n"); + STACK_UNWIND_STRICT (writev, frame, -1, EAGAIN, NULL, NULL, NULL); - } else { - uint32_t old = priv->txn_id; - atomic_cmpxchg(&priv->txn_id, old,old+1); - put_frame(priv, frame, old); - recon_main_log (this->name, GF_LOG_INFO, - "nsr_recon_writev set_role - set role succesfully \n"); - } + } else { + + // Store the stack frame so that when the actual job gets finished + // we send the response back to the brick. + put_frame(priv, frame, rr.current_term); + if (nsr_recon_driver_set_role(priv->driver_thread_context, + &rr, + rr.current_term) == _gf_false) { + get_frame(priv, NULL, rr.current_term); + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_writev set_role - cannot seem to set role \n"); + STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + NULL, NULL, NULL); + } else { + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_writev set_role - set role succesfully \n"); + } + } break; } // client(reconciliator) writes how much it needs for the read @@ -679,6 +668,14 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, NULL, NULL, NULL); break; } + default: + { + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_writev called with wrong offset\n"); + STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + NULL, NULL, NULL); + break; + } } return 0; @@ -764,6 +761,13 @@ nsr_recon_readv (call_frame_t *frame, xlator_t *this, memcpy(iobuf->ptr, <, size); goto out; } + default: + { + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_readv called with wrong offset\n"); + op_errno = -1; + break; + } } out: |