diff options
Diffstat (limited to 'xlators/experimental/jbr-server')
-rw-r--r-- | xlators/experimental/jbr-server/src/all-templates.c | 41 | ||||
-rwxr-xr-x | xlators/experimental/jbr-server/src/gen-fops.py | 4 | ||||
-rw-r--r-- | xlators/experimental/jbr-server/src/jbr.c | 407 |
3 files changed, 413 insertions, 39 deletions
diff --git a/xlators/experimental/jbr-server/src/all-templates.c b/xlators/experimental/jbr-server/src/all-templates.c index adae2431157..7314701029c 100644 --- a/xlators/experimental/jbr-server/src/all-templates.c +++ b/xlators/experimental/jbr-server/src/all-templates.c @@ -351,6 +351,7 @@ jbr_@NAME@_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, @LONG_ARGS@) { + int32_t ret = -1; gf_boolean_t result = _gf_false; jbr_private_t *priv = NULL; jbr_local_t *local = NULL; @@ -371,43 +372,9 @@ jbr_@NAME@_complete (call_frame_t *frame, void *cookie, xlator_t *this, UNLOCK(&frame->lock); #if defined(JBR_CG_QUEUE) - jbr_inode_ctx_t *ictx; - jbr_local_t *next; - - if (local->qlinks.next != &local->qlinks) { - list_del(&local->qlinks); - ictx = jbr_get_inode_ctx(this, local->fd->inode); - if (ictx) { - LOCK(&ictx->lock); - if (ictx->pending) { - /* - * TBD: dequeue *all* non-conflicting - * reqs - * - * With the stub implementation there - * can only be one request active at a - * time (zero here) so it's not an - * issue. In a real implementation - * there might still be other active - * requests to check against, and - * multiple pending requests that could - * continue. - */ - gf_msg_debug (this->name, 0, - "unblocking next request"); - --(ictx->pending); - next = list_entry (ictx->pqueue.next, - jbr_local_t, qlinks); - list_del(&next->qlinks); - list_add_tail(&next->qlinks, - &ictx->aqueue); - call_resume(next->qstub); - } else { - --(ictx->active); - } - UNLOCK(&ictx->lock); - } - } + ret = jbr_remove_from_queue (frame, this); + if (ret) + goto err; #endif #if defined(JBR_CG_FSYNC) diff --git a/xlators/experimental/jbr-server/src/gen-fops.py b/xlators/experimental/jbr-server/src/gen-fops.py index 36bf1e35d27..8a2b47c5345 100755 --- a/xlators/experimental/jbr-server/src/gen-fops.py +++ b/xlators/experimental/jbr-server/src/gen-fops.py @@ -78,7 +78,7 @@ fop_table = { "getxattr": "read", # "inodelk": "read", "link": "write", -# "lk": "write", + "lk": "write,queue", # "lookup": "read", "mkdir": "write", "mknod": "write", @@ -107,7 +107,7 @@ fop_table = { # only a few common functions will be generated, and mention those # functions. Rest of the functions can be customized selective_generate = { -# "lk": "fop,dispatch,call_dispatch", + "lk": "fop,dispatch,call_dispatch", } # Stolen from gen_fdl.py diff --git a/xlators/experimental/jbr-server/src/jbr.c b/xlators/experimental/jbr-server/src/jbr.c index a342d3b83d5..d27d8ab5140 100644 --- a/xlators/experimental/jbr-server/src/jbr.c +++ b/xlators/experimental/jbr-server/src/jbr.c @@ -38,6 +38,20 @@ enum { JBR_SERVER_NEXT_ENTRY }; +/* + * Need to declare jbr_lk_call_dispatch as jbr_lk_continue and * + * jbr_lk_perform_local_op call it, before code is generated. * + */ +int32_t +jbr_lk_call_dispatch (call_frame_t *frame, xlator_t *this, int *op_errno, + fd_t *fd, int32_t cmd, struct gf_flock *lock, + dict_t *xdata); + +int32_t +jbr_lk_dispatch (call_frame_t *frame, xlator_t *this, + fd_t *fd, int32_t cmd, struct gf_flock *lock, + dict_t *xdata); + /* Used to check the quorum of acks received after the fop * confirming the status of the fop on all the brick processes * for this particular subvolume @@ -312,6 +326,399 @@ out: return ret; } +int32_t +jbr_remove_from_queue (call_frame_t *frame, xlator_t *this) +{ + int32_t ret = -1; + jbr_inode_ctx_t *ictx = NULL; + jbr_local_t *local = NULL; + jbr_local_t *next = NULL; + + GF_VALIDATE_OR_GOTO ("jbr", this, out); + GF_VALIDATE_OR_GOTO (this->name, frame, out); + local = frame->local; + GF_VALIDATE_OR_GOTO (this->name, local, out); + + if (local->qlinks.next != &local->qlinks) { + list_del(&local->qlinks); + ictx = jbr_get_inode_ctx(this, local->fd->inode); + if (ictx) { + LOCK(&ictx->lock); + if (ictx->pending) { + /* + * TBD: dequeue *all* non-conflicting + * reqs + * + * With the stub implementation there + * can only be one request active at a + * time (zero here) so it's not an + * issue. In a real implementation + * there might still be other active + * requests to check against, and + * multiple pending requests that could + * continue. + */ + gf_msg_debug (this->name, 0, + "unblocking next request"); + --(ictx->pending); + next = list_entry (ictx->pqueue.next, + jbr_local_t, qlinks); + list_del(&next->qlinks); + list_add_tail(&next->qlinks, + &ictx->aqueue); + call_resume(next->qstub); + } else { + --(ictx->active); + } + UNLOCK(&ictx->lock); + } + } + + ret = 0; + +out: + return ret; +} + +int32_t +jbr_lk_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct gf_flock *flock, dict_t *xdata) +{ + int32_t ret = -1; + jbr_private_t *priv = NULL; + jbr_local_t *local = NULL; + gf_boolean_t result = _gf_false; + + GF_VALIDATE_OR_GOTO ("jbr", this, err); + priv = this->private; + GF_VALIDATE_OR_GOTO (this->name, priv, err); + GF_VALIDATE_OR_GOTO (this->name, frame, err); + local = frame->local; + GF_VALIDATE_OR_GOTO (this->name, local, err); + GF_VALIDATE_OR_GOTO (this->name, flock, err); + GF_VALIDATE_OR_GOTO (this->name, xdata, err); + + /* + * Remove from queue for unlock operation only * + * For lock operation, it will be done in fan-in * + */ + if (flock->l_type == F_UNLCK) { + ret = jbr_remove_from_queue (frame, this); + if (ret) + goto err; + } + + /* + * On a follower, unwind with the op_ret and op_errno. On a * + * leader, if the fop is a locking fop, and its a failure, * + * send fail, else call stub which will dispatch the fop to * + * the followers. * + * * + * If the fop is a unlocking fop, check quorum. If quorum * + * is met, then send success. Else Rollback on leader, * + * followed by followers, and then send -ve ack to client. * + */ + if (priv->leader) { + + /* Increase the successful acks if it's a success. */ + LOCK(&frame->lock); + if (op_ret != -1) + (local->successful_acks)++; + UNLOCK(&frame->lock); + + if (flock->l_type == F_UNLCK) { + result = fop_quorum_check (this, + (double)priv->n_children, + (double)local->successful_acks); + if (result == _gf_false) { + op_ret = -1; + op_errno = EROFS; + gf_msg (this->name, GF_LOG_ERROR, EROFS, + J_MSG_QUORUM_NOT_MET, + "Quorum is not met. " + "The operation has failed."); + + /* TODO: PERFORM UNLOCK ROLLBACK ON LEADER * + * FOLLOWED BY FOLLOWERS. */ + } else { + op_ret = 0; + op_errno = 0; + } + + fd_unref(local->fd); + STACK_UNWIND_STRICT (lk, frame, op_ret, op_errno, + flock, xdata); + } else { + if (op_ret == -1) { + gf_msg (this->name, GF_LOG_ERROR, 0, + J_MSG_LOCK_FAILURE, + "The lock operation failed on " + "the leader."); + + fd_unref(local->fd); + STACK_UNWIND_STRICT (lk, frame, op_ret, + op_errno, flock, xdata); + } else { + if (!local->stub) { + goto err; + } + + call_resume(local->stub); + } + } + } else { + fd_unref(local->fd); + STACK_UNWIND_STRICT (lk, frame, op_ret, op_errno, + flock, xdata); + } + + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->qstub) { + call_stub_destroy(local->qstub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (lk, frame, -1, op_errno, + flock, xdata); + return 0; +} + +int32_t +jbr_lk_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct gf_flock *flock, + dict_t *xdata) +{ + uint8_t call_count = -1; + int32_t ret = -1; + gf_boolean_t result = _gf_false; + jbr_local_t *local = NULL; + jbr_private_t *priv = NULL; + + GF_VALIDATE_OR_GOTO ("jbr", this, out); + GF_VALIDATE_OR_GOTO (this->name, frame, out); + priv = this->private; + local = frame->local; + GF_VALIDATE_OR_GOTO (this->name, priv, out); + GF_VALIDATE_OR_GOTO (this->name, local, out); + + gf_msg_trace (this->name, 0, "op_ret = %d, op_errno = %d\n", + op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + if (op_ret != -1) { + /* Increment the number of successful acks * + * received for the operation. * + */ + (local->successful_acks)++; + local->successful_op_ret = op_ret; + } + gf_msg_debug (this->name, 0, "succ_acks = %d, op_ret = %d, op_errno = %d\n", + op_ret, op_errno, local->successful_acks); + UNLOCK(&frame->lock); + + if (call_count == 0) { + /* + * If the fop is a locking fop, then check quorum. If quorum * + * is met, send successful ack to the client. If quorum is * + * not met, then rollback locking on followers, followed by * + * rollback of locking on leader, and then sending -ve ack * + * to the client. * + * * + * If the fop is a unlocking fop, then call stub. * + */ + if (flock->l_type == F_UNLCK) { + call_resume(local->stub); + } else { + /* + * Remove from queue for locking fops, for unlocking * + * fops, it is taken care of in jbr_lk_complete * + */ + ret = jbr_remove_from_queue (frame, this); + if (ret) + goto out; + + fd_unref(local->fd); + + result = fop_quorum_check (this, + (double)priv->n_children, + (double)local->successful_acks); + if (result == _gf_false) { + gf_msg (this->name, GF_LOG_ERROR, EROFS, + J_MSG_QUORUM_NOT_MET, + "Didn't receive enough acks to meet " + "quorum. Failing the locking " + "operation and initiating rollback on " + "followers and the leader " + "respectively."); + + /* TODO: PERFORM ROLLBACK OF LOCKING ON + * FOLLOWERS, FOLLOWED BY ROLLBACK ON + * LEADER. + */ + + STACK_UNWIND_STRICT (lk, frame, -1, EROFS, + flock, xdata); + } else { + STACK_UNWIND_STRICT (lk, frame, 0, 0, + flock, xdata); + } + } + } + + ret = 0; +out: + return ret; +} + +/* + * Called from leader for locking fop, being writen as a separate * + * function so as to support queues. * + */ +int32_t +jbr_perform_lk_on_leader (call_frame_t *frame, xlator_t *this, + fd_t *fd, int32_t cmd, struct gf_flock *flock, + dict_t *xdata) +{ + int32_t ret = -1; + + GF_VALIDATE_OR_GOTO ("jbr", this, out); + GF_VALIDATE_OR_GOTO (this->name, frame, out); + GF_VALIDATE_OR_GOTO (this->name, flock, out); + GF_VALIDATE_OR_GOTO (this->name, fd, out); + + STACK_WIND (frame, jbr_lk_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->lk, + fd, cmd, flock, xdata); + + ret = 0; +out: + return ret; +} + +int32_t +jbr_lk_perform_local_op (call_frame_t *frame, xlator_t *this, int *op_errno, + fd_t *fd, int32_t cmd, struct gf_flock *flock, + dict_t *xdata) +{ + int32_t ret = -1; + jbr_local_t *local = NULL; + + GF_VALIDATE_OR_GOTO ("jbr", this, out); + GF_VALIDATE_OR_GOTO (this->name, frame, out); + local = frame->local; + GF_VALIDATE_OR_GOTO (this->name, local, out); + GF_VALIDATE_OR_GOTO (this->name, fd, out); + GF_VALIDATE_OR_GOTO (this->name, op_errno, out); + GF_VALIDATE_OR_GOTO (this->name, flock, out); + + /* + * Check if the fop is a locking fop or unlocking fop, and + * handle it accordingly. If it is a locking fop, take the + * lock on leader first, and then send it to the followers. + * If it is a unlocking fop, unlock the followers first, + * and then on meeting quorum perform the unlock on the leader. + */ + if (flock->l_type == F_UNLCK) { + ret = jbr_lk_call_dispatch (frame, this, op_errno, + fd, cmd, flock, xdata); + if (ret) + goto out; + } else { + jbr_inode_ctx_t *ictx = jbr_get_inode_ctx(this, fd->inode); + + if (!ictx) { + *op_errno = EIO; + goto out; + } + + LOCK(&ictx->lock); + if (ictx->active) { + gf_msg_debug (this->name, 0, + "queuing request due to conflict"); + + local->qstub = fop_lk_stub (frame, + jbr_perform_lk_on_leader, + fd, cmd, flock, xdata); + if (!local->qstub) { + UNLOCK(&ictx->lock); + goto out; + } + list_add_tail(&local->qlinks, &ictx->pqueue); + ++(ictx->pending); + UNLOCK(&ictx->lock); + ret = 0; + goto out; + } else { + list_add_tail(&local->qlinks, &ictx->aqueue); + ++(ictx->active); + } + UNLOCK(&ictx->lock); + ret = jbr_perform_lk_on_leader (frame, this, fd, cmd, + flock, xdata); + } + + ret = 0; +out: + return ret; +} + +int32_t +jbr_lk_continue (call_frame_t *frame, xlator_t *this, + fd_t *fd, int32_t cmd, struct gf_flock *flock, dict_t *xdata) +{ + int32_t ret = -1; + jbr_local_t *local = NULL; + jbr_private_t *priv = NULL; + + GF_VALIDATE_OR_GOTO ("jbr", this, out); + GF_VALIDATE_OR_GOTO (this->name, frame, out); + priv = this->private; + local = frame->local; + GF_VALIDATE_OR_GOTO (this->name, priv, out); + GF_VALIDATE_OR_GOTO (this->name, local, out); + GF_VALIDATE_OR_GOTO (this->name, flock, out); + GF_VALIDATE_OR_GOTO (this->name, fd, out); + GF_VALIDATE_OR_GOTO (this->name, xdata, out); + + /* + * If it's a locking fop, then call dispatch to followers * + * If it's a unlock fop, then perform the unlock operation * + */ + if (flock->l_type == F_UNLCK) { + STACK_WIND (frame, jbr_lk_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->lk, + fd, cmd, flock, xdata); + } else { + /* + * Directly call jbr_lk_dispatch instead of appending * + * in queue, which is done at jbr_lk_perform_local_op * + * for locking fops * + */ + ret = jbr_lk_dispatch (frame, this, fd, cmd, + flock, xdata); + if (ret) { + STACK_UNWIND_STRICT (lk, frame, -1, 0, + flock, xdata); + goto out; + } + } + + ret = 0; +out: + return ret; +} + #pragma generate uint8_t |