From ef171ff2bfd114e46442441fbdeb692a416cc951 Mon Sep 17 00:00:00 2001 From: Jeff Darcy Date: Wed, 11 Dec 2013 16:26:25 -0500 Subject: Roll-up patch for NSR so far. Previous history: https://forge.gluster.org/~jdarcy/glusterfs-core/glusterfs-nsr Change-Id: I2b56328788753c6a74d9589815f2dd705ac9ce6a Signed-off-by: Jeff Darcy --- xlators/cluster/nsr-server/src/nsr-cg.c | 4444 +++++++++++++++++++++++++++++++ 1 file changed, 4444 insertions(+) create mode 100644 xlators/cluster/nsr-server/src/nsr-cg.c (limited to 'xlators/cluster/nsr-server/src/nsr-cg.c') diff --git a/xlators/cluster/nsr-server/src/nsr-cg.c b/xlators/cluster/nsr-server/src/nsr-cg.c new file mode 100644 index 000000000..54f370b75 --- /dev/null +++ b/xlators/cluster/nsr-server/src/nsr-cg.c @@ -0,0 +1,4444 @@ +/* No stub needed for access */ + +/* No cbk needed for access */ + +int32_t +nsr_access (call_frame_t *frame, xlator_t *this, + loc_t * loc, int32_t mask, dict_t * xdata) +{ + nsr_private_t *priv = this->private; + gf_boolean_t in_recon = _gf_false; + int32_t recon_term, recon_index; + + // allow reads during reconciliation + // TBD: allow "dirty" reads on non-leaders + if (xdata && + (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + in_recon = _gf_true; + } + + if ((!priv->leader) && (in_recon == _gf_false)) { + goto err; + } + + STACK_WIND (frame, default_access_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->access, + loc, mask, xdata); + return 0; + +err: + STACK_UNWIND_STRICT (access, frame, -1, EREMOTE, + NULL); + return 0; +} + +int32_t +nsr_create_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + fd_t * fd, inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, + fd, inode, buf, preparent, postparent, xdata); + return 0; + +} +int32_t +nsr_create_continue (call_frame_t *frame, xlator_t *this, + loc_t * loc, int32_t flags, mode_t mode, mode_t umask, fd_t * fd, dict_t * xdata) +{ + STACK_WIND (frame, nsr_create_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->create, + loc, flags, mode, umask, fd, xdata); + return 0; +} + +int32_t +nsr_create_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + fd_t * fd, inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_create (call_frame_t *frame, xlator_t *this, + loc_t * loc, int32_t flags, mode_t mode, mode_t umask, fd_t * fd, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_create_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->create, + loc, flags, mode, umask, fd, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_create_stub (frame,nsr_create_continue, + loc, flags, mode, umask, fd, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_create_fan_in, + trav->xlator, trav->xlator->fops->create, + loc, flags, mode, umask, fd, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (create, frame, -1, op_errno, + NULL, NULL, NULL, NULL, NULL, NULL); + return 0; +} + +int32_t +nsr_discard_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (discard, frame, op_ret, op_errno, + preop_stbuf, postop_stbuf, xdata); + return 0; + +} +int32_t +nsr_discard_continue (call_frame_t *frame, xlator_t *this, + fd_t * fd, off_t offset, size_t len, dict_t * xdata) +{ + STACK_WIND (frame, nsr_discard_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->discard, + fd, offset, len, xdata); + return 0; +} + +int32_t +nsr_discard_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_discard (call_frame_t *frame, xlator_t *this, + fd_t * fd, off_t offset, size_t len, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_discard_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->discard, + fd, offset, len, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_discard_stub (frame,nsr_discard_continue, + fd, offset, len, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_discard_fan_in, + trav->xlator, trav->xlator->fops->discard, + fd, offset, len, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (discard, frame, -1, op_errno, + NULL, NULL, NULL); + return 0; +} + +/* No code emitted for entrylk */ + +int32_t +nsr_fallocate_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (fallocate, frame, op_ret, op_errno, + preop_stbuf, postop_stbuf, xdata); + return 0; + +} +int32_t +nsr_fallocate_continue (call_frame_t *frame, xlator_t *this, + fd_t * fd, int32_t keep_size, off_t offset, size_t len, dict_t * xdata) +{ + STACK_WIND (frame, nsr_fallocate_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fallocate, + fd, keep_size, offset, len, xdata); + return 0; +} + +int32_t +nsr_fallocate_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_fallocate (call_frame_t *frame, xlator_t *this, + fd_t * fd, int32_t keep_size, off_t offset, size_t len, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_fallocate_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fallocate, + fd, keep_size, offset, len, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_fallocate_stub (frame,nsr_fallocate_continue, + fd, keep_size, offset, len, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_fallocate_fan_in, + trav->xlator, trav->xlator->fops->fallocate, + fd, keep_size, offset, len, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (fallocate, frame, -1, op_errno, + NULL, NULL, NULL); + return 0; +} + +/* No code emitted for fentrylk */ + +/* No stub needed for fgetxattr */ + +/* No cbk needed for fgetxattr */ + +int32_t +nsr_fgetxattr (call_frame_t *frame, xlator_t *this, + fd_t * fd, const char * name, dict_t * xdata) +{ + nsr_private_t *priv = this->private; + gf_boolean_t in_recon = _gf_false; + int32_t recon_term, recon_index; + + // allow reads during reconciliation + // TBD: allow "dirty" reads on non-leaders + if (xdata && + (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + in_recon = _gf_true; + } + + if ((!priv->leader) && (in_recon == _gf_false)) { + goto err; + } + + STACK_WIND (frame, default_fgetxattr_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fgetxattr, + fd, name, xdata); + return 0; + +err: + STACK_UNWIND_STRICT (fgetxattr, frame, -1, EREMOTE, + NULL, NULL); + return 0; +} + +/* No code emitted for finodelk */ + +/* No code emitted for flush */ + +int32_t +nsr_fremovexattr_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (fremovexattr, frame, op_ret, op_errno, + xdata); + return 0; + +} +int32_t +nsr_fremovexattr_continue (call_frame_t *frame, xlator_t *this, + fd_t * fd, const char * name, dict_t * xdata) +{ + STACK_WIND (frame, nsr_fremovexattr_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fremovexattr, + fd, name, xdata); + return 0; +} + +int32_t +nsr_fremovexattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_fremovexattr (call_frame_t *frame, xlator_t *this, + fd_t * fd, const char * name, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_fremovexattr_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fremovexattr, + fd, name, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_fremovexattr_stub (frame,nsr_fremovexattr_continue, + fd, name, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_fremovexattr_fan_in, + trav->xlator, trav->xlator->fops->fremovexattr, + fd, name, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (fremovexattr, frame, -1, op_errno, + NULL); + return 0; +} + +int32_t +nsr_fsetattr_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (fsetattr, frame, op_ret, op_errno, + preop_stbuf, postop_stbuf, xdata); + return 0; + +} +int32_t +nsr_fsetattr_continue (call_frame_t *frame, xlator_t *this, + fd_t * fd, struct iatt * stbuf, int32_t valid, dict_t * xdata) +{ + STACK_WIND (frame, nsr_fsetattr_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetattr, + fd, stbuf, valid, xdata); + return 0; +} + +int32_t +nsr_fsetattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_fsetattr (call_frame_t *frame, xlator_t *this, + fd_t * fd, struct iatt * stbuf, int32_t valid, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_fsetattr_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetattr, + fd, stbuf, valid, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_fsetattr_stub (frame,nsr_fsetattr_continue, + fd, stbuf, valid, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_fsetattr_fan_in, + trav->xlator, trav->xlator->fops->fsetattr, + fd, stbuf, valid, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (fsetattr, frame, -1, op_errno, + NULL, NULL, NULL); + return 0; +} + +int32_t +nsr_fsetxattr_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (fsetxattr, frame, op_ret, op_errno, + xdata); + return 0; + +} +int32_t +nsr_fsetxattr_continue (call_frame_t *frame, xlator_t *this, + fd_t * fd, dict_t * dict, int32_t flags, dict_t * xdata) +{ + STACK_WIND (frame, nsr_fsetxattr_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, + fd, dict, flags, xdata); + return 0; +} + +int32_t +nsr_fsetxattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_fsetxattr (call_frame_t *frame, xlator_t *this, + fd_t * fd, dict_t * dict, int32_t flags, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_fsetxattr_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, + fd, dict, flags, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_fsetxattr_stub (frame,nsr_fsetxattr_continue, + fd, dict, flags, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_fsetxattr_fan_in, + trav->xlator, trav->xlator->fops->fsetxattr, + fd, dict, flags, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (fsetxattr, frame, -1, op_errno, + NULL); + return 0; +} + +/* No stub needed for fstat */ + +/* No cbk needed for fstat */ + +int32_t +nsr_fstat (call_frame_t *frame, xlator_t *this, + fd_t * fd, dict_t * xdata) +{ + nsr_private_t *priv = this->private; + gf_boolean_t in_recon = _gf_false; + int32_t recon_term, recon_index; + + // allow reads during reconciliation + // TBD: allow "dirty" reads on non-leaders + if (xdata && + (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + in_recon = _gf_true; + } + + if ((!priv->leader) && (in_recon == _gf_false)) { + goto err; + } + + STACK_WIND (frame, default_fstat_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat, + fd, xdata); + return 0; + +err: + STACK_UNWIND_STRICT (fstat, frame, -1, EREMOTE, + NULL, NULL); + return 0; +} + +/* No code emitted for fsync */ + +/* No code emitted for fsyncdir */ + +int32_t +nsr_ftruncate_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (ftruncate, frame, op_ret, op_errno, + prebuf, postbuf, xdata); + return 0; + +} +int32_t +nsr_ftruncate_continue (call_frame_t *frame, xlator_t *this, + fd_t * fd, off_t offset, dict_t * xdata) +{ + STACK_WIND (frame, nsr_ftruncate_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate, + fd, offset, xdata); + return 0; +} + +int32_t +nsr_ftruncate_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_ftruncate (call_frame_t *frame, xlator_t *this, + fd_t * fd, off_t offset, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_ftruncate_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate, + fd, offset, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_ftruncate_stub (frame,nsr_ftruncate_continue, + fd, offset, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_ftruncate_fan_in, + trav->xlator, trav->xlator->fops->ftruncate, + fd, offset, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (ftruncate, frame, -1, op_errno, + NULL, NULL, NULL); + return 0; +} + +int32_t +nsr_fxattrop_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + dict_t * xattr, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (fxattrop, frame, op_ret, op_errno, + xattr, xdata); + return 0; + +} +int32_t +nsr_fxattrop_continue (call_frame_t *frame, xlator_t *this, + fd_t * fd, gf_xattrop_flags_t optype, dict_t * xattr, dict_t * xdata) +{ + STACK_WIND (frame, nsr_fxattrop_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fxattrop, + fd, optype, xattr, xdata); + return 0; +} + +int32_t +nsr_fxattrop_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + dict_t * xattr, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_fxattrop (call_frame_t *frame, xlator_t *this, + fd_t * fd, gf_xattrop_flags_t optype, dict_t * xattr, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_fxattrop_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fxattrop, + fd, optype, xattr, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_fxattrop_stub (frame,nsr_fxattrop_continue, + fd, optype, xattr, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_fxattrop_fan_in, + trav->xlator, trav->xlator->fops->fxattrop, + fd, optype, xattr, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (fxattrop, frame, -1, op_errno, + NULL, NULL); + return 0; +} + +/* No code emitted for getspec */ + +/* No stub needed for getxattr */ + +/* No cbk needed for getxattr */ + +int32_t +nsr_getxattr (call_frame_t *frame, xlator_t *this, + loc_t * loc, const char * name, dict_t * xdata) +{ + nsr_private_t *priv = this->private; + gf_boolean_t in_recon = _gf_false; + int32_t recon_term, recon_index; + + // allow reads during reconciliation + // TBD: allow "dirty" reads on non-leaders + if (xdata && + (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + in_recon = _gf_true; + } + + if ((!priv->leader) && (in_recon == _gf_false)) { + goto err; + } + + STACK_WIND (frame, default_getxattr_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->getxattr, + loc, name, xdata); + return 0; + +err: + STACK_UNWIND_STRICT (getxattr, frame, -1, EREMOTE, + NULL, NULL); + return 0; +} + +/* No code emitted for inodelk */ + +int32_t +nsr_link_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (link, frame, op_ret, op_errno, + inode, buf, preparent, postparent, xdata); + return 0; + +} +int32_t +nsr_link_continue (call_frame_t *frame, xlator_t *this, + loc_t * oldloc, loc_t * newloc, dict_t * xdata) +{ + STACK_WIND (frame, nsr_link_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, + oldloc, newloc, xdata); + return 0; +} + +int32_t +nsr_link_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_link (call_frame_t *frame, xlator_t *this, + loc_t * oldloc, loc_t * newloc, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_link_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, + oldloc, newloc, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_link_stub (frame,nsr_link_continue, + oldloc, newloc, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_link_fan_in, + trav->xlator, trav->xlator->fops->link, + oldloc, newloc, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (link, frame, -1, op_errno, + NULL, NULL, NULL, NULL, NULL); + return 0; +} + +/* No code emitted for lk */ + +/* No code emitted for lookup */ + +int32_t +nsr_mkdir_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (mkdir, frame, op_ret, op_errno, + inode, buf, preparent, postparent, xdata); + return 0; + +} +int32_t +nsr_mkdir_continue (call_frame_t *frame, xlator_t *this, + loc_t * loc, mode_t mode, mode_t umask, dict_t * xdata) +{ + STACK_WIND (frame, nsr_mkdir_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir, + loc, mode, umask, xdata); + return 0; +} + +int32_t +nsr_mkdir_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_mkdir (call_frame_t *frame, xlator_t *this, + loc_t * loc, mode_t mode, mode_t umask, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_mkdir_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir, + loc, mode, umask, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_mkdir_stub (frame,nsr_mkdir_continue, + loc, mode, umask, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_mkdir_fan_in, + trav->xlator, trav->xlator->fops->mkdir, + loc, mode, umask, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (mkdir, frame, -1, op_errno, + NULL, NULL, NULL, NULL, NULL); + return 0; +} + +int32_t +nsr_mknod_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (mknod, frame, op_ret, op_errno, + inode, buf, preparent, postparent, xdata); + return 0; + +} +int32_t +nsr_mknod_continue (call_frame_t *frame, xlator_t *this, + loc_t * loc, mode_t mode, dev_t rdev, mode_t umask, dict_t * xdata) +{ + STACK_WIND (frame, nsr_mknod_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, + loc, mode, rdev, umask, xdata); + return 0; +} + +int32_t +nsr_mknod_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_mknod (call_frame_t *frame, xlator_t *this, + loc_t * loc, mode_t mode, dev_t rdev, mode_t umask, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_mknod_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, + loc, mode, rdev, umask, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_mknod_stub (frame,nsr_mknod_continue, + loc, mode, rdev, umask, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_mknod_fan_in, + trav->xlator, trav->xlator->fops->mknod, + loc, mode, rdev, umask, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (mknod, frame, -1, op_errno, + NULL, NULL, NULL, NULL, NULL); + return 0; +} + +int32_t +nsr_open_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + fd_t * fd, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (open, frame, op_ret, op_errno, + fd, xdata); + return 0; + +} +int32_t +nsr_open_continue (call_frame_t *frame, xlator_t *this, + loc_t * loc, int32_t flags, fd_t * fd, dict_t * xdata) +{ + STACK_WIND (frame, nsr_open_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->open, + loc, flags, fd, xdata); + return 0; +} + +int32_t +nsr_open_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + fd_t * fd, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_open (call_frame_t *frame, xlator_t *this, + loc_t * loc, int32_t flags, fd_t * fd, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_open_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->open, + loc, flags, fd, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_open_stub (frame,nsr_open_continue, + loc, flags, fd, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_open_fan_in, + trav->xlator, trav->xlator->fops->open, + loc, flags, fd, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (open, frame, -1, op_errno, + NULL, NULL); + return 0; +} + +/* No stub needed for opendir */ + +/* No cbk needed for opendir */ + +int32_t +nsr_opendir (call_frame_t *frame, xlator_t *this, + loc_t * loc, fd_t * fd, dict_t * xdata) +{ + nsr_private_t *priv = this->private; + gf_boolean_t in_recon = _gf_false; + int32_t recon_term, recon_index; + + // allow reads during reconciliation + // TBD: allow "dirty" reads on non-leaders + if (xdata && + (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + in_recon = _gf_true; + } + + if ((!priv->leader) && (in_recon == _gf_false)) { + goto err; + } + + STACK_WIND (frame, default_opendir_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->opendir, + loc, fd, xdata); + return 0; + +err: + STACK_UNWIND_STRICT (opendir, frame, -1, EREMOTE, + NULL, NULL); + return 0; +} + +/* No stub needed for rchecksum */ + +/* No cbk needed for rchecksum */ + +int32_t +nsr_rchecksum (call_frame_t *frame, xlator_t *this, + fd_t * fd, off_t offset, int32_t len, dict_t * xdata) +{ + nsr_private_t *priv = this->private; + gf_boolean_t in_recon = _gf_false; + int32_t recon_term, recon_index; + + // allow reads during reconciliation + // TBD: allow "dirty" reads on non-leaders + if (xdata && + (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + in_recon = _gf_true; + } + + if ((!priv->leader) && (in_recon == _gf_false)) { + goto err; + } + + STACK_WIND (frame, default_rchecksum_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->rchecksum, + fd, offset, len, xdata); + return 0; + +err: + STACK_UNWIND_STRICT (rchecksum, frame, -1, EREMOTE, + 0, NULL, NULL); + return 0; +} + +/* No stub needed for readdir */ + +/* No cbk needed for readdir */ + +int32_t +nsr_readdir (call_frame_t *frame, xlator_t *this, + fd_t * fd, size_t size, off_t offset, dict_t * xdata) +{ + nsr_private_t *priv = this->private; + gf_boolean_t in_recon = _gf_false; + int32_t recon_term, recon_index; + + // allow reads during reconciliation + // TBD: allow "dirty" reads on non-leaders + if (xdata && + (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + in_recon = _gf_true; + } + + if ((!priv->leader) && (in_recon == _gf_false)) { + goto err; + } + + STACK_WIND (frame, default_readdir_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdir, + fd, size, offset, xdata); + return 0; + +err: + STACK_UNWIND_STRICT (readdir, frame, -1, EREMOTE, + NULL, NULL); + return 0; +} + +/* No stub needed for readdirp */ + +/* No cbk needed for readdirp */ + +int32_t +nsr_readdirp (call_frame_t *frame, xlator_t *this, + fd_t * fd, size_t size, off_t offset, dict_t * xdata) +{ + nsr_private_t *priv = this->private; + gf_boolean_t in_recon = _gf_false; + int32_t recon_term, recon_index; + + // allow reads during reconciliation + // TBD: allow "dirty" reads on non-leaders + if (xdata && + (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + in_recon = _gf_true; + } + + if ((!priv->leader) && (in_recon == _gf_false)) { + goto err; + } + + STACK_WIND (frame, default_readdirp_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, + fd, size, offset, xdata); + return 0; + +err: + STACK_UNWIND_STRICT (readdirp, frame, -1, EREMOTE, + NULL, NULL); + return 0; +} + +/* No stub needed for readlink */ + +/* No cbk needed for readlink */ + +int32_t +nsr_readlink (call_frame_t *frame, xlator_t *this, + loc_t * loc, size_t size, dict_t * xdata) +{ + nsr_private_t *priv = this->private; + gf_boolean_t in_recon = _gf_false; + int32_t recon_term, recon_index; + + // allow reads during reconciliation + // TBD: allow "dirty" reads on non-leaders + if (xdata && + (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + in_recon = _gf_true; + } + + if ((!priv->leader) && (in_recon == _gf_false)) { + goto err; + } + + STACK_WIND (frame, default_readlink_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->readlink, + loc, size, xdata); + return 0; + +err: + STACK_UNWIND_STRICT (readlink, frame, -1, EREMOTE, + NULL, NULL, NULL); + return 0; +} + +/* No stub needed for readv */ + +/* No cbk needed for readv */ + +int32_t +nsr_readv (call_frame_t *frame, xlator_t *this, + fd_t * fd, size_t size, off_t offset, uint32_t flags, dict_t * xdata) +{ + nsr_private_t *priv = this->private; + gf_boolean_t in_recon = _gf_false; + int32_t recon_term, recon_index; + + // allow reads during reconciliation + // TBD: allow "dirty" reads on non-leaders + if (xdata && + (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + in_recon = _gf_true; + } + + if ((!priv->leader) && (in_recon == _gf_false)) { + goto err; + } + + STACK_WIND (frame, default_readv_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv, + fd, size, offset, flags, xdata); + return 0; + +err: + STACK_UNWIND_STRICT (readv, frame, -1, EREMOTE, + NULL, 0, NULL, NULL, NULL); + return 0; +} + +int32_t +nsr_removexattr_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (removexattr, frame, op_ret, op_errno, + xdata); + return 0; + +} +int32_t +nsr_removexattr_continue (call_frame_t *frame, xlator_t *this, + loc_t * loc, const char * name, dict_t * xdata) +{ + STACK_WIND (frame, nsr_removexattr_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->removexattr, + loc, name, xdata); + return 0; +} + +int32_t +nsr_removexattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_removexattr (call_frame_t *frame, xlator_t *this, + loc_t * loc, const char * name, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_removexattr_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->removexattr, + loc, name, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_removexattr_stub (frame,nsr_removexattr_continue, + loc, name, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_removexattr_fan_in, + trav->xlator, trav->xlator->fops->removexattr, + loc, name, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (removexattr, frame, -1, op_errno, + NULL); + return 0; +} + +int32_t +nsr_rename_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * buf, struct iatt * preoldparent, struct iatt * postoldparent, struct iatt * prenewparent, struct iatt * postnewparent, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (rename, frame, op_ret, op_errno, + buf, preoldparent, postoldparent, prenewparent, postnewparent, xdata); + return 0; + +} +int32_t +nsr_rename_continue (call_frame_t *frame, xlator_t *this, + loc_t * oldloc, loc_t * newloc, dict_t * xdata) +{ + STACK_WIND (frame, nsr_rename_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, + oldloc, newloc, xdata); + return 0; +} + +int32_t +nsr_rename_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * buf, struct iatt * preoldparent, struct iatt * postoldparent, struct iatt * prenewparent, struct iatt * postnewparent, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_rename (call_frame_t *frame, xlator_t *this, + loc_t * oldloc, loc_t * newloc, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_rename_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, + oldloc, newloc, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_rename_stub (frame,nsr_rename_continue, + oldloc, newloc, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_rename_fan_in, + trav->xlator, trav->xlator->fops->rename, + oldloc, newloc, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (rename, frame, -1, op_errno, + NULL, NULL, NULL, NULL, NULL, NULL); + return 0; +} + +int32_t +nsr_rmdir_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * preparent, struct iatt * postparent, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (rmdir, frame, op_ret, op_errno, + preparent, postparent, xdata); + return 0; + +} +int32_t +nsr_rmdir_continue (call_frame_t *frame, xlator_t *this, + loc_t * loc, int xflags, dict_t * xdata) +{ + STACK_WIND (frame, nsr_rmdir_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->rmdir, + loc, xflags, xdata); + return 0; +} + +int32_t +nsr_rmdir_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * preparent, struct iatt * postparent, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_rmdir (call_frame_t *frame, xlator_t *this, + loc_t * loc, int xflags, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_rmdir_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->rmdir, + loc, xflags, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_rmdir_stub (frame,nsr_rmdir_continue, + loc, xflags, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_rmdir_fan_in, + trav->xlator, trav->xlator->fops->rmdir, + loc, xflags, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (rmdir, frame, -1, op_errno, + NULL, NULL, NULL); + return 0; +} + +int32_t +nsr_setattr_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, + preop_stbuf, postop_stbuf, xdata); + return 0; + +} +int32_t +nsr_setattr_continue (call_frame_t *frame, xlator_t *this, + loc_t * loc, struct iatt * stbuf, int32_t valid, dict_t * xdata) +{ + STACK_WIND (frame, nsr_setattr_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->setattr, + loc, stbuf, valid, xdata); + return 0; +} + +int32_t +nsr_setattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_setattr (call_frame_t *frame, xlator_t *this, + loc_t * loc, struct iatt * stbuf, int32_t valid, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_setattr_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->setattr, + loc, stbuf, valid, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_setattr_stub (frame,nsr_setattr_continue, + loc, stbuf, valid, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_setattr_fan_in, + trav->xlator, trav->xlator->fops->setattr, + loc, stbuf, valid, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (setattr, frame, -1, op_errno, + NULL, NULL, NULL); + return 0; +} + +int32_t +nsr_setxattr_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (setxattr, frame, op_ret, op_errno, + xdata); + return 0; + +} +int32_t +nsr_setxattr_continue (call_frame_t *frame, xlator_t *this, + loc_t * loc, dict_t * dict, int32_t flags, dict_t * xdata) +{ + STACK_WIND (frame, nsr_setxattr_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, + loc, dict, flags, xdata); + return 0; +} + +int32_t +nsr_setxattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_setxattr (call_frame_t *frame, xlator_t *this, + loc_t * loc, dict_t * dict, int32_t flags, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_setxattr_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, + loc, dict, flags, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_setxattr_stub (frame,nsr_setxattr_continue, + loc, dict, flags, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_setxattr_fan_in, + trav->xlator, trav->xlator->fops->setxattr, + loc, dict, flags, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (setxattr, frame, -1, op_errno, + NULL); + return 0; +} + +/* No stub needed for stat */ + +/* No cbk needed for stat */ + +int32_t +nsr_stat (call_frame_t *frame, xlator_t *this, + loc_t * loc, dict_t * xdata) +{ + nsr_private_t *priv = this->private; + gf_boolean_t in_recon = _gf_false; + int32_t recon_term, recon_index; + + // allow reads during reconciliation + // TBD: allow "dirty" reads on non-leaders + if (xdata && + (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + in_recon = _gf_true; + } + + if ((!priv->leader) && (in_recon == _gf_false)) { + goto err; + } + + STACK_WIND (frame, default_stat_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat, + loc, xdata); + return 0; + +err: + STACK_UNWIND_STRICT (stat, frame, -1, EREMOTE, + NULL, NULL); + return 0; +} + +/* No stub needed for statfs */ + +/* No cbk needed for statfs */ + +int32_t +nsr_statfs (call_frame_t *frame, xlator_t *this, + loc_t * loc, dict_t * xdata) +{ + nsr_private_t *priv = this->private; + gf_boolean_t in_recon = _gf_false; + int32_t recon_term, recon_index; + + // allow reads during reconciliation + // TBD: allow "dirty" reads on non-leaders + if (xdata && + (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + in_recon = _gf_true; + } + + if ((!priv->leader) && (in_recon == _gf_false)) { + goto err; + } + + STACK_WIND (frame, default_statfs_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->statfs, + loc, xdata); + return 0; + +err: + STACK_UNWIND_STRICT (statfs, frame, -1, EREMOTE, + NULL, NULL); + return 0; +} + +int32_t +nsr_symlink_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (symlink, frame, op_ret, op_errno, + inode, buf, preparent, postparent, xdata); + return 0; + +} +int32_t +nsr_symlink_continue (call_frame_t *frame, xlator_t *this, + const char * linkname, loc_t * loc, mode_t umask, dict_t * xdata) +{ + STACK_WIND (frame, nsr_symlink_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink, + linkname, loc, umask, xdata); + return 0; +} + +int32_t +nsr_symlink_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_symlink (call_frame_t *frame, xlator_t *this, + const char * linkname, loc_t * loc, mode_t umask, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_symlink_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink, + linkname, loc, umask, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_symlink_stub (frame,nsr_symlink_continue, + linkname, loc, umask, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_symlink_fan_in, + trav->xlator, trav->xlator->fops->symlink, + linkname, loc, umask, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (symlink, frame, -1, op_errno, + NULL, NULL, NULL, NULL, NULL); + return 0; +} + +int32_t +nsr_truncate_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (truncate, frame, op_ret, op_errno, + prebuf, postbuf, xdata); + return 0; + +} +int32_t +nsr_truncate_continue (call_frame_t *frame, xlator_t *this, + loc_t * loc, off_t offset, dict_t * xdata) +{ + STACK_WIND (frame, nsr_truncate_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, + loc, offset, xdata); + return 0; +} + +int32_t +nsr_truncate_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_truncate (call_frame_t *frame, xlator_t *this, + loc_t * loc, off_t offset, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_truncate_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, + loc, offset, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_truncate_stub (frame,nsr_truncate_continue, + loc, offset, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_truncate_fan_in, + trav->xlator, trav->xlator->fops->truncate, + loc, offset, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (truncate, frame, -1, op_errno, + NULL, NULL, NULL); + return 0; +} + +int32_t +nsr_unlink_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * preparent, struct iatt * postparent, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (unlink, frame, op_ret, op_errno, + preparent, postparent, xdata); + return 0; + +} +int32_t +nsr_unlink_continue (call_frame_t *frame, xlator_t *this, + loc_t * loc, int xflags, dict_t * xdata) +{ + STACK_WIND (frame, nsr_unlink_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, + loc, xflags, xdata); + return 0; +} + +int32_t +nsr_unlink_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * preparent, struct iatt * postparent, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_unlink (call_frame_t *frame, xlator_t *this, + loc_t * loc, int xflags, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_unlink_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, + loc, xflags, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_unlink_stub (frame,nsr_unlink_continue, + loc, xflags, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_unlink_fan_in, + trav->xlator, trav->xlator->fops->unlink, + loc, xflags, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (unlink, frame, -1, op_errno, + NULL, NULL, NULL); + return 0; +} + +#define NSR_CG_FSYNC +#define NSR_CG_QUEUE +#define NSR_CG_NEED_FD +int32_t +nsr_writev_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, + prebuf, postbuf, xdata); + return 0; + +} +int32_t +nsr_writev_continue (call_frame_t *frame, xlator_t *this, + fd_t * fd, struct iovec * vector, int32_t count, off_t offset, uint32_t flags, struct iobref * iobref, dict_t * xdata) +{ + STACK_WIND (frame, nsr_writev_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, + fd, vector, count, offset, flags, iobref, xdata); + return 0; +} + +int32_t +nsr_writev_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_writev (call_frame_t *frame, xlator_t *this, + fd_t * fd, struct iovec * vector, int32_t count, off_t offset, uint32_t flags, struct iobref * iobref, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_writev_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, + fd, vector, count, offset, flags, iobref, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_writev_stub (frame,nsr_writev_continue, + fd, vector, count, offset, flags, iobref, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_writev_fan_in, + trav->xlator, trav->xlator->fops->writev, + fd, vector, count, offset, flags, iobref, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + NULL, NULL, NULL); + return 0; +} + +#undef NSR_CG_FSYNC +#undef NSR_CG_QUEUE +#undef NSR_CG_NEED_FD +int32_t +nsr_xattrop_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + dict_t * xattr, dict_t * xdata) +{ +#if NSR_CG_NEED_FD + nsr_local_t *local = frame->local; +#endif + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); + if (ictx) { + /* TBD: LOCK */ + if (ictx->pending) { + gf_log (this->name, GF_LOG_DEBUG, + "unblocking %u requests", + ictx->pending); + /* TBD: actually dequeue */ + ictx->pending = 0; + } + /* TBD: UNLOCK */ + } +#endif + +#if NSR_CG_FSYNC + nsr_mark_fd_dirty(this,local); +#endif + +#if NSR_CG_NEED_FD + fd_unref(local->fd); +#endif + + STACK_UNWIND_STRICT (xattrop, frame, op_ret, op_errno, + xattr, xdata); + return 0; + +} +int32_t +nsr_xattrop_continue (call_frame_t *frame, xlator_t *this, + loc_t * loc, gf_xattrop_flags_t optype, dict_t * xattr, dict_t * xdata) +{ + STACK_WIND (frame, nsr_xattrop_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->xattrop, + loc, optype, xattr, xdata); + return 0; +} + +int32_t +nsr_xattrop_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + dict_t * xattr, dict_t * xdata) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_log (this->name, GF_LOG_TRACE, + "op_ret = %d, op_errno = %d\n", op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + UNLOCK(&frame->lock); + + // TBD: variable Completion count + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +int32_t +nsr_xattrop (call_frame_t *frame, xlator_t *this, + loc_t * loc, gf_xattrop_flags_t optype, dict_t * xattr, dict_t * xdata) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if NSR_CG_NEED_FD + local->fd = fd_ref(fd) +#else + local->fd = NULL +#endif + frame->local = local; + + if (xdata) { + from_leader = !!dict_get(xdata,NSR_TERM_XATTR); + from_recon = !!dict_get(xdata,RECON_TERM_XATTR) + && !!dict_get(xdata,RECON_INDEX_XATTR); + } + else { + from_leader = from_recon = _gf_false; + } + + // follower/recon path + // just send it to local node + if (from_leader || from_recon) { + STACK_WIND (frame, nsr_xattrop_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->xattrop, + loc, optype, xattr, xdata); + return 0; + } + + if (!priv->leader || priv->fence_io) { + op_errno = EREMOTE; + goto err; + } + +#if NSR_CG_QUEUE + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); + if (!ictx) { + op_errno = EIO; + goto err; + } + /* TBD: LOCK */ + if (ictx->active) { + gf_log (this->name, GF_LOG_DEBUG, + "queuing request due to conflict"); + ++(ictx->pending); + /* TBD: actually enqueue */ + } + else { + ++(ictx->active); + } + /* TBD: UNLOCK */ +#endif + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_log (this->name, GF_LOG_ERROR, + "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set nsr-term"); + goto err; + } + + local->stub = fop_xattrop_stub (frame,nsr_xattrop_continue, + loc, optype, xattr, xdata); + if (!local->stub) { + goto err; + } + + local->call_count = priv->n_children - 1; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_xattrop_fan_in, + trav->xlator, trav->xlator->fops->xattrop, + loc, optype, xattr, xdata); + } + + // TBD: variable Issue count + return 0; + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (xattrop, frame, -1, op_errno, + NULL, NULL); + return 0; +} + +/* No code emitted for zerofill */ + +struct xlator_fops fops = { + .access = nsr_access, + .create = nsr_create, + .discard = nsr_discard, + .fallocate = nsr_fallocate, + .fgetxattr = nsr_fgetxattr, + .fremovexattr = nsr_fremovexattr, + .fsetattr = nsr_fsetattr, + .fsetxattr = nsr_fsetxattr, + .fstat = nsr_fstat, + .ftruncate = nsr_ftruncate, + .fxattrop = nsr_fxattrop, + .getxattr = nsr_getxattr, + .link = nsr_link, + .mkdir = nsr_mkdir, + .mknod = nsr_mknod, + .open = nsr_open, + .opendir = nsr_opendir, + .rchecksum = nsr_rchecksum, + .readdir = nsr_readdir, + .readdirp = nsr_readdirp, + .readlink = nsr_readlink, + .readv = nsr_readv, + .removexattr = nsr_removexattr, + .rename = nsr_rename, + .rmdir = nsr_rmdir, + .setattr = nsr_setattr, + .setxattr = nsr_setxattr, + .stat = nsr_stat, + .statfs = nsr_statfs, + .symlink = nsr_symlink, + .truncate = nsr_truncate, + .unlink = nsr_unlink, + .writev = nsr_writev, + .xattrop = nsr_xattrop, +}; -- cgit