/* No stub needed for access */ /* No cbk needed for access */ int32_t nsr_access (call_frame_t *frame, xlator_t *this, loc_t * loc, int32_t mask, dict_t * xdata) { nsr_private_t *priv = this->private; gf_boolean_t in_recon = _gf_false; int32_t recon_term, recon_index; // allow reads during reconciliation // TBD: allow "dirty" reads on non-leaders if (xdata && (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { in_recon = _gf_true; } if ((!priv->leader) && (in_recon == _gf_false)) { goto err; } STACK_WIND (frame, default_access_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->access, loc, mask, xdata); return 0; err: STACK_UNWIND_STRICT (access, frame, -1, EREMOTE, NULL); return 0; } int32_t nsr_create_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, fd_t * fd, inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, fd, inode, buf, preparent, postparent, xdata); return 0; } int32_t nsr_create_continue (call_frame_t *frame, xlator_t *this, loc_t * loc, int32_t flags, mode_t mode, mode_t umask, fd_t * fd, dict_t * xdata) { STACK_WIND (frame, nsr_create_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd, xdata); return 0; } int32_t nsr_create_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, fd_t * fd, inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_create (call_frame_t *frame, xlator_t *this, loc_t * loc, int32_t flags, mode_t mode, mode_t umask, fd_t * fd, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_create_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_create_stub (frame,nsr_create_continue, loc, flags, mode, umask, fd, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_create_fan_in, trav->xlator, trav->xlator->fops->create, loc, flags, mode, umask, fd, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (create, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL, NULL); return 0; } int32_t nsr_discard_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (discard, frame, op_ret, op_errno, preop_stbuf, postop_stbuf, xdata); return 0; } int32_t nsr_discard_continue (call_frame_t *frame, xlator_t *this, fd_t * fd, off_t offset, size_t len, dict_t * xdata) { STACK_WIND (frame, nsr_discard_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->discard, fd, offset, len, xdata); return 0; } int32_t nsr_discard_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_discard (call_frame_t *frame, xlator_t *this, fd_t * fd, off_t offset, size_t len, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_discard_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->discard, fd, offset, len, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_discard_stub (frame,nsr_discard_continue, fd, offset, len, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_discard_fan_in, trav->xlator, trav->xlator->fops->discard, fd, offset, len, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (discard, frame, -1, op_errno, NULL, NULL, NULL); return 0; } /* No code emitted for entrylk */ int32_t nsr_fallocate_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (fallocate, frame, op_ret, op_errno, preop_stbuf, postop_stbuf, xdata); return 0; } int32_t nsr_fallocate_continue (call_frame_t *frame, xlator_t *this, fd_t * fd, int32_t keep_size, off_t offset, size_t len, dict_t * xdata) { STACK_WIND (frame, nsr_fallocate_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fallocate, fd, keep_size, offset, len, xdata); return 0; } int32_t nsr_fallocate_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_fallocate (call_frame_t *frame, xlator_t *this, fd_t * fd, int32_t keep_size, off_t offset, size_t len, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_fallocate_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fallocate, fd, keep_size, offset, len, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_fallocate_stub (frame,nsr_fallocate_continue, fd, keep_size, offset, len, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_fallocate_fan_in, trav->xlator, trav->xlator->fops->fallocate, fd, keep_size, offset, len, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (fallocate, frame, -1, op_errno, NULL, NULL, NULL); return 0; } /* No code emitted for fentrylk */ /* No stub needed for fgetxattr */ /* No cbk needed for fgetxattr */ int32_t nsr_fgetxattr (call_frame_t *frame, xlator_t *this, fd_t * fd, const char * name, dict_t * xdata) { nsr_private_t *priv = this->private; gf_boolean_t in_recon = _gf_false; int32_t recon_term, recon_index; // allow reads during reconciliation // TBD: allow "dirty" reads on non-leaders if (xdata && (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { in_recon = _gf_true; } if ((!priv->leader) && (in_recon == _gf_false)) { goto err; } STACK_WIND (frame, default_fgetxattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fgetxattr, fd, name, xdata); return 0; err: STACK_UNWIND_STRICT (fgetxattr, frame, -1, EREMOTE, NULL, NULL); return 0; } /* No code emitted for finodelk */ /* No code emitted for flush */ int32_t nsr_fremovexattr_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (fremovexattr, frame, op_ret, op_errno, xdata); return 0; } int32_t nsr_fremovexattr_continue (call_frame_t *frame, xlator_t *this, fd_t * fd, const char * name, dict_t * xdata) { STACK_WIND (frame, nsr_fremovexattr_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata); return 0; } int32_t nsr_fremovexattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_fremovexattr (call_frame_t *frame, xlator_t *this, fd_t * fd, const char * name, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_fremovexattr_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_fremovexattr_stub (frame,nsr_fremovexattr_continue, fd, name, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_fremovexattr_fan_in, trav->xlator, trav->xlator->fops->fremovexattr, fd, name, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (fremovexattr, frame, -1, op_errno, NULL); return 0; } int32_t nsr_fsetattr_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (fsetattr, frame, op_ret, op_errno, preop_stbuf, postop_stbuf, xdata); return 0; } int32_t nsr_fsetattr_continue (call_frame_t *frame, xlator_t *this, fd_t * fd, struct iatt * stbuf, int32_t valid, dict_t * xdata) { STACK_WIND (frame, nsr_fsetattr_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetattr, fd, stbuf, valid, xdata); return 0; } int32_t nsr_fsetattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_fsetattr (call_frame_t *frame, xlator_t *this, fd_t * fd, struct iatt * stbuf, int32_t valid, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_fsetattr_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetattr, fd, stbuf, valid, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_fsetattr_stub (frame,nsr_fsetattr_continue, fd, stbuf, valid, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_fsetattr_fan_in, trav->xlator, trav->xlator->fops->fsetattr, fd, stbuf, valid, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (fsetattr, frame, -1, op_errno, NULL, NULL, NULL); return 0; } int32_t nsr_fsetxattr_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (fsetxattr, frame, op_ret, op_errno, xdata); return 0; } int32_t nsr_fsetxattr_continue (call_frame_t *frame, xlator_t *this, fd_t * fd, dict_t * dict, int32_t flags, dict_t * xdata) { STACK_WIND (frame, nsr_fsetxattr_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata); return 0; } int32_t nsr_fsetxattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t * fd, dict_t * dict, int32_t flags, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_fsetxattr_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_fsetxattr_stub (frame,nsr_fsetxattr_continue, fd, dict, flags, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_fsetxattr_fan_in, trav->xlator, trav->xlator->fops->fsetxattr, fd, dict, flags, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (fsetxattr, frame, -1, op_errno, NULL); return 0; } /* No stub needed for fstat */ /* No cbk needed for fstat */ int32_t nsr_fstat (call_frame_t *frame, xlator_t *this, fd_t * fd, dict_t * xdata) { nsr_private_t *priv = this->private; gf_boolean_t in_recon = _gf_false; int32_t recon_term, recon_index; // allow reads during reconciliation // TBD: allow "dirty" reads on non-leaders if (xdata && (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { in_recon = _gf_true; } if ((!priv->leader) && (in_recon == _gf_false)) { goto err; } STACK_WIND (frame, default_fstat_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat, fd, xdata); return 0; err: STACK_UNWIND_STRICT (fstat, frame, -1, EREMOTE, NULL, NULL); return 0; } /* No code emitted for fsync */ /* No code emitted for fsyncdir */ int32_t nsr_ftruncate_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (ftruncate, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; } int32_t nsr_ftruncate_continue (call_frame_t *frame, xlator_t *this, fd_t * fd, off_t offset, dict_t * xdata) { STACK_WIND (frame, nsr_ftruncate_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); return 0; } int32_t nsr_ftruncate_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_ftruncate (call_frame_t *frame, xlator_t *this, fd_t * fd, off_t offset, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_ftruncate_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_ftruncate_stub (frame,nsr_ftruncate_continue, fd, offset, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_ftruncate_fan_in, trav->xlator, trav->xlator->fops->ftruncate, fd, offset, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (ftruncate, frame, -1, op_errno, NULL, NULL, NULL); return 0; } int32_t nsr_fxattrop_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t * xattr, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (fxattrop, frame, op_ret, op_errno, xattr, xdata); return 0; } int32_t nsr_fxattrop_continue (call_frame_t *frame, xlator_t *this, fd_t * fd, gf_xattrop_flags_t optype, dict_t * xattr, dict_t * xdata) { STACK_WIND (frame, nsr_fxattrop_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fxattrop, fd, optype, xattr, xdata); return 0; } int32_t nsr_fxattrop_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t * xattr, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_fxattrop (call_frame_t *frame, xlator_t *this, fd_t * fd, gf_xattrop_flags_t optype, dict_t * xattr, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_fxattrop_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fxattrop, fd, optype, xattr, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_fxattrop_stub (frame,nsr_fxattrop_continue, fd, optype, xattr, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_fxattrop_fan_in, trav->xlator, trav->xlator->fops->fxattrop, fd, optype, xattr, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (fxattrop, frame, -1, op_errno, NULL, NULL); return 0; } /* No code emitted for getspec */ /* No stub needed for getxattr */ /* No cbk needed for getxattr */ int32_t nsr_getxattr (call_frame_t *frame, xlator_t *this, loc_t * loc, const char * name, dict_t * xdata) { nsr_private_t *priv = this->private; gf_boolean_t in_recon = _gf_false; int32_t recon_term, recon_index; // allow reads during reconciliation // TBD: allow "dirty" reads on non-leaders if (xdata && (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { in_recon = _gf_true; } if ((!priv->leader) && (in_recon == _gf_false)) { goto err; } STACK_WIND (frame, default_getxattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->getxattr, loc, name, xdata); return 0; err: STACK_UNWIND_STRICT (getxattr, frame, -1, EREMOTE, NULL, NULL); return 0; } /* No code emitted for inodelk */ int32_t nsr_link_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (link, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; } int32_t nsr_link_continue (call_frame_t *frame, xlator_t *this, loc_t * oldloc, loc_t * newloc, dict_t * xdata) { STACK_WIND (frame, nsr_link_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata); return 0; } int32_t nsr_link_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_link (call_frame_t *frame, xlator_t *this, loc_t * oldloc, loc_t * newloc, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_link_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_link_stub (frame,nsr_link_continue, oldloc, newloc, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_link_fan_in, trav->xlator, trav->xlator->fops->link, oldloc, newloc, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (link, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL); return 0; } /* No code emitted for lk */ /* No code emitted for lookup */ int32_t nsr_mkdir_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (mkdir, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; } int32_t nsr_mkdir_continue (call_frame_t *frame, xlator_t *this, loc_t * loc, mode_t mode, mode_t umask, dict_t * xdata) { STACK_WIND (frame, nsr_mkdir_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata); return 0; } int32_t nsr_mkdir_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_mkdir (call_frame_t *frame, xlator_t *this, loc_t * loc, mode_t mode, mode_t umask, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_mkdir_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_mkdir_stub (frame,nsr_mkdir_continue, loc, mode, umask, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_mkdir_fan_in, trav->xlator, trav->xlator->fops->mkdir, loc, mode, umask, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (mkdir, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL); return 0; } int32_t nsr_mknod_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (mknod, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; } int32_t nsr_mknod_continue (call_frame_t *frame, xlator_t *this, loc_t * loc, mode_t mode, dev_t rdev, mode_t umask, dict_t * xdata) { STACK_WIND (frame, nsr_mknod_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, umask, xdata); return 0; } int32_t nsr_mknod_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_mknod (call_frame_t *frame, xlator_t *this, loc_t * loc, mode_t mode, dev_t rdev, mode_t umask, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_mknod_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, umask, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_mknod_stub (frame,nsr_mknod_continue, loc, mode, rdev, umask, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_mknod_fan_in, trav->xlator, trav->xlator->fops->mknod, loc, mode, rdev, umask, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (mknod, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL); return 0; } int32_t nsr_open_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, fd_t * fd, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (open, frame, op_ret, op_errno, fd, xdata); return 0; } int32_t nsr_open_continue (call_frame_t *frame, xlator_t *this, loc_t * loc, int32_t flags, fd_t * fd, dict_t * xdata) { STACK_WIND (frame, nsr_open_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->open, loc, flags, fd, xdata); return 0; } int32_t nsr_open_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, fd_t * fd, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_open (call_frame_t *frame, xlator_t *this, loc_t * loc, int32_t flags, fd_t * fd, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_open_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->open, loc, flags, fd, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_open_stub (frame,nsr_open_continue, loc, flags, fd, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_open_fan_in, trav->xlator, trav->xlator->fops->open, loc, flags, fd, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (open, frame, -1, op_errno, NULL, NULL); return 0; } /* No stub needed for opendir */ /* No cbk needed for opendir */ int32_t nsr_opendir (call_frame_t *frame, xlator_t *this, loc_t * loc, fd_t * fd, dict_t * xdata) { nsr_private_t *priv = this->private; gf_boolean_t in_recon = _gf_false; int32_t recon_term, recon_index; // allow reads during reconciliation // TBD: allow "dirty" reads on non-leaders if (xdata && (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { in_recon = _gf_true; } if ((!priv->leader) && (in_recon == _gf_false)) { goto err; } STACK_WIND (frame, default_opendir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->opendir, loc, fd, xdata); return 0; err: STACK_UNWIND_STRICT (opendir, frame, -1, EREMOTE, NULL, NULL); return 0; } /* No stub needed for rchecksum */ /* No cbk needed for rchecksum */ int32_t nsr_rchecksum (call_frame_t *frame, xlator_t *this, fd_t * fd, off_t offset, int32_t len, dict_t * xdata) { nsr_private_t *priv = this->private; gf_boolean_t in_recon = _gf_false; int32_t recon_term, recon_index; // allow reads during reconciliation // TBD: allow "dirty" reads on non-leaders if (xdata && (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { in_recon = _gf_true; } if ((!priv->leader) && (in_recon == _gf_false)) { goto err; } STACK_WIND (frame, default_rchecksum_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rchecksum, fd, offset, len, xdata); return 0; err: STACK_UNWIND_STRICT (rchecksum, frame, -1, EREMOTE, 0, NULL, NULL); return 0; } /* No stub needed for readdir */ /* No cbk needed for readdir */ int32_t nsr_readdir (call_frame_t *frame, xlator_t *this, fd_t * fd, size_t size, off_t offset, dict_t * xdata) { nsr_private_t *priv = this->private; gf_boolean_t in_recon = _gf_false; int32_t recon_term, recon_index; // allow reads during reconciliation // TBD: allow "dirty" reads on non-leaders if (xdata && (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { in_recon = _gf_true; } if ((!priv->leader) && (in_recon == _gf_false)) { goto err; } STACK_WIND (frame, default_readdir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdir, fd, size, offset, xdata); return 0; err: STACK_UNWIND_STRICT (readdir, frame, -1, EREMOTE, NULL, NULL); return 0; } /* No stub needed for readdirp */ /* No cbk needed for readdirp */ int32_t nsr_readdirp (call_frame_t *frame, xlator_t *this, fd_t * fd, size_t size, off_t offset, dict_t * xdata) { nsr_private_t *priv = this->private; gf_boolean_t in_recon = _gf_false; int32_t recon_term, recon_index; // allow reads during reconciliation // TBD: allow "dirty" reads on non-leaders if (xdata && (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { in_recon = _gf_true; } if ((!priv->leader) && (in_recon == _gf_false)) { goto err; } STACK_WIND (frame, default_readdirp_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, fd, size, offset, xdata); return 0; err: STACK_UNWIND_STRICT (readdirp, frame, -1, EREMOTE, NULL, NULL); return 0; } /* No stub needed for readlink */ /* No cbk needed for readlink */ int32_t nsr_readlink (call_frame_t *frame, xlator_t *this, loc_t * loc, size_t size, dict_t * xdata) { nsr_private_t *priv = this->private; gf_boolean_t in_recon = _gf_false; int32_t recon_term, recon_index; // allow reads during reconciliation // TBD: allow "dirty" reads on non-leaders if (xdata && (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { in_recon = _gf_true; } if ((!priv->leader) && (in_recon == _gf_false)) { goto err; } STACK_WIND (frame, default_readlink_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readlink, loc, size, xdata); return 0; err: STACK_UNWIND_STRICT (readlink, frame, -1, EREMOTE, NULL, NULL, NULL); return 0; } /* No stub needed for readv */ /* No cbk needed for readv */ int32_t nsr_readv (call_frame_t *frame, xlator_t *this, fd_t * fd, size_t size, off_t offset, uint32_t flags, dict_t * xdata) { nsr_private_t *priv = this->private; gf_boolean_t in_recon = _gf_false; int32_t recon_term, recon_index; // allow reads during reconciliation // TBD: allow "dirty" reads on non-leaders if (xdata && (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { in_recon = _gf_true; } if ((!priv->leader) && (in_recon == _gf_false)) { goto err; } STACK_WIND (frame, default_readv_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, xdata); return 0; err: STACK_UNWIND_STRICT (readv, frame, -1, EREMOTE, NULL, 0, NULL, NULL, NULL); return 0; } int32_t nsr_removexattr_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (removexattr, frame, op_ret, op_errno, xdata); return 0; } int32_t nsr_removexattr_continue (call_frame_t *frame, xlator_t *this, loc_t * loc, const char * name, dict_t * xdata) { STACK_WIND (frame, nsr_removexattr_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->removexattr, loc, name, xdata); return 0; } int32_t nsr_removexattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_removexattr (call_frame_t *frame, xlator_t *this, loc_t * loc, const char * name, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_removexattr_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->removexattr, loc, name, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_removexattr_stub (frame,nsr_removexattr_continue, loc, name, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_removexattr_fan_in, trav->xlator, trav->xlator->fops->removexattr, loc, name, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (removexattr, frame, -1, op_errno, NULL); return 0; } int32_t nsr_rename_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * buf, struct iatt * preoldparent, struct iatt * postoldparent, struct iatt * prenewparent, struct iatt * postnewparent, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (rename, frame, op_ret, op_errno, buf, preoldparent, postoldparent, prenewparent, postnewparent, xdata); return 0; } int32_t nsr_rename_continue (call_frame_t *frame, xlator_t *this, loc_t * oldloc, loc_t * newloc, dict_t * xdata) { STACK_WIND (frame, nsr_rename_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata); return 0; } int32_t nsr_rename_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * buf, struct iatt * preoldparent, struct iatt * postoldparent, struct iatt * prenewparent, struct iatt * postnewparent, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_rename (call_frame_t *frame, xlator_t *this, loc_t * oldloc, loc_t * newloc, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_rename_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_rename_stub (frame,nsr_rename_continue, oldloc, newloc, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_rename_fan_in, trav->xlator, trav->xlator->fops->rename, oldloc, newloc, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (rename, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL, NULL); return 0; } int32_t nsr_rmdir_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (rmdir, frame, op_ret, op_errno, preparent, postparent, xdata); return 0; } int32_t nsr_rmdir_continue (call_frame_t *frame, xlator_t *this, loc_t * loc, int xflags, dict_t * xdata) { STACK_WIND (frame, nsr_rmdir_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rmdir, loc, xflags, xdata); return 0; } int32_t nsr_rmdir_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_rmdir (call_frame_t *frame, xlator_t *this, loc_t * loc, int xflags, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_rmdir_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rmdir, loc, xflags, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_rmdir_stub (frame,nsr_rmdir_continue, loc, xflags, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_rmdir_fan_in, trav->xlator, trav->xlator->fops->rmdir, loc, xflags, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (rmdir, frame, -1, op_errno, NULL, NULL, NULL); return 0; } int32_t nsr_setattr_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, preop_stbuf, postop_stbuf, xdata); return 0; } int32_t nsr_setattr_continue (call_frame_t *frame, xlator_t *this, loc_t * loc, struct iatt * stbuf, int32_t valid, dict_t * xdata) { STACK_WIND (frame, nsr_setattr_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setattr, loc, stbuf, valid, xdata); return 0; } int32_t nsr_setattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * preop_stbuf, struct iatt * postop_stbuf, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_setattr (call_frame_t *frame, xlator_t *this, loc_t * loc, struct iatt * stbuf, int32_t valid, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_setattr_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setattr, loc, stbuf, valid, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_setattr_stub (frame,nsr_setattr_continue, loc, stbuf, valid, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_setattr_fan_in, trav->xlator, trav->xlator->fops->setattr, loc, stbuf, valid, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (setattr, frame, -1, op_errno, NULL, NULL, NULL); return 0; } int32_t nsr_setxattr_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (setxattr, frame, op_ret, op_errno, xdata); return 0; } int32_t nsr_setxattr_continue (call_frame_t *frame, xlator_t *this, loc_t * loc, dict_t * dict, int32_t flags, dict_t * xdata) { STACK_WIND (frame, nsr_setxattr_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, xdata); return 0; } int32_t nsr_setxattr_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_setxattr (call_frame_t *frame, xlator_t *this, loc_t * loc, dict_t * dict, int32_t flags, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_setxattr_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_setxattr_stub (frame,nsr_setxattr_continue, loc, dict, flags, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_setxattr_fan_in, trav->xlator, trav->xlator->fops->setxattr, loc, dict, flags, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (setxattr, frame, -1, op_errno, NULL); return 0; } /* No stub needed for stat */ /* No cbk needed for stat */ int32_t nsr_stat (call_frame_t *frame, xlator_t *this, loc_t * loc, dict_t * xdata) { nsr_private_t *priv = this->private; gf_boolean_t in_recon = _gf_false; int32_t recon_term, recon_index; // allow reads during reconciliation // TBD: allow "dirty" reads on non-leaders if (xdata && (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { in_recon = _gf_true; } if ((!priv->leader) && (in_recon == _gf_false)) { goto err; } STACK_WIND (frame, default_stat_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat, loc, xdata); return 0; err: STACK_UNWIND_STRICT (stat, frame, -1, EREMOTE, NULL, NULL); return 0; } /* No stub needed for statfs */ /* No cbk needed for statfs */ int32_t nsr_statfs (call_frame_t *frame, xlator_t *this, loc_t * loc, dict_t * xdata) { nsr_private_t *priv = this->private; gf_boolean_t in_recon = _gf_false; int32_t recon_term, recon_index; // allow reads during reconciliation // TBD: allow "dirty" reads on non-leaders if (xdata && (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { in_recon = _gf_true; } if ((!priv->leader) && (in_recon == _gf_false)) { goto err; } STACK_WIND (frame, default_statfs_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->statfs, loc, xdata); return 0; err: STACK_UNWIND_STRICT (statfs, frame, -1, EREMOTE, NULL, NULL); return 0; } int32_t nsr_symlink_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (symlink, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; } int32_t nsr_symlink_continue (call_frame_t *frame, xlator_t *this, const char * linkname, loc_t * loc, mode_t umask, dict_t * xdata) { STACK_WIND (frame, nsr_symlink_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink, linkname, loc, umask, xdata); return 0; } int32_t nsr_symlink_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t * inode, struct iatt * buf, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_symlink (call_frame_t *frame, xlator_t *this, const char * linkname, loc_t * loc, mode_t umask, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_symlink_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink, linkname, loc, umask, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_symlink_stub (frame,nsr_symlink_continue, linkname, loc, umask, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_symlink_fan_in, trav->xlator, trav->xlator->fops->symlink, linkname, loc, umask, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (symlink, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL); return 0; } int32_t nsr_truncate_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (truncate, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; } int32_t nsr_truncate_continue (call_frame_t *frame, xlator_t *this, loc_t * loc, off_t offset, dict_t * xdata) { STACK_WIND (frame, nsr_truncate_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); return 0; } int32_t nsr_truncate_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_truncate (call_frame_t *frame, xlator_t *this, loc_t * loc, off_t offset, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_truncate_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_truncate_stub (frame,nsr_truncate_continue, loc, offset, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_truncate_fan_in, trav->xlator, trav->xlator->fops->truncate, loc, offset, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (truncate, frame, -1, op_errno, NULL, NULL, NULL); return 0; } int32_t nsr_unlink_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (unlink, frame, op_ret, op_errno, preparent, postparent, xdata); return 0; } int32_t nsr_unlink_continue (call_frame_t *frame, xlator_t *this, loc_t * loc, int xflags, dict_t * xdata) { STACK_WIND (frame, nsr_unlink_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, loc, xflags, xdata); return 0; } int32_t nsr_unlink_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * preparent, struct iatt * postparent, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_unlink (call_frame_t *frame, xlator_t *this, loc_t * loc, int xflags, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_unlink_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, loc, xflags, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_unlink_stub (frame,nsr_unlink_continue, loc, xflags, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_unlink_fan_in, trav->xlator, trav->xlator->fops->unlink, loc, xflags, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (unlink, frame, -1, op_errno, NULL, NULL, NULL); return 0; } #define NSR_CG_FSYNC #define NSR_CG_QUEUE #define NSR_CG_NEED_FD int32_t nsr_writev_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; } int32_t nsr_writev_continue (call_frame_t *frame, xlator_t *this, fd_t * fd, struct iovec * vector, int32_t count, off_t offset, uint32_t flags, struct iobref * iobref, dict_t * xdata) { STACK_WIND (frame, nsr_writev_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, fd, vector, count, offset, flags, iobref, xdata); return 0; } int32_t nsr_writev_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt * prebuf, struct iatt * postbuf, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_writev (call_frame_t *frame, xlator_t *this, fd_t * fd, struct iovec * vector, int32_t count, off_t offset, uint32_t flags, struct iobref * iobref, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_writev_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, fd, vector, count, offset, flags, iobref, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_writev_stub (frame,nsr_writev_continue, fd, vector, count, offset, flags, iobref, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_writev_fan_in, trav->xlator, trav->xlator->fops->writev, fd, vector, count, offset, flags, iobref, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (writev, frame, -1, op_errno, NULL, NULL, NULL); return 0; } #undef NSR_CG_FSYNC #undef NSR_CG_QUEUE #undef NSR_CG_NEED_FD int32_t nsr_xattrop_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t * xattr, dict_t * xdata) { #if NSR_CG_NEED_FD nsr_local_t *local = frame->local; #endif #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,local->fd); if (ictx) { /* TBD: LOCK */ if (ictx->pending) { gf_log (this->name, GF_LOG_DEBUG, "unblocking %u requests", ictx->pending); /* TBD: actually dequeue */ ictx->pending = 0; } /* TBD: UNLOCK */ } #endif #if NSR_CG_FSYNC nsr_mark_fd_dirty(this,local); #endif #if NSR_CG_NEED_FD fd_unref(local->fd); #endif STACK_UNWIND_STRICT (xattrop, frame, op_ret, op_errno, xattr, xdata); return 0; } int32_t nsr_xattrop_continue (call_frame_t *frame, xlator_t *this, loc_t * loc, gf_xattrop_flags_t optype, dict_t * xattr, dict_t * xdata) { STACK_WIND (frame, nsr_xattrop_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->xattrop, loc, optype, xattr, xdata); return 0; } int32_t nsr_xattrop_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t * xattr, dict_t * xdata) { nsr_local_t *local = frame->local; uint8_t call_count; gf_log (this->name, GF_LOG_TRACE, "op_ret = %d, op_errno = %d\n", op_ret, op_errno); LOCK(&frame->lock); call_count = --(local->call_count); UNLOCK(&frame->lock); // TBD: variable Completion count if (call_count == 0) { call_resume(local->stub); } return 0; } int32_t nsr_xattrop (call_frame_t *frame, xlator_t *this, loc_t * loc, gf_xattrop_flags_t optype, dict_t * xattr, dict_t * xdata) { nsr_local_t *local = NULL; nsr_private_t *priv = this->private; xlator_list_t *trav; int op_errno = ENOMEM; int from_leader; int from_recon; local = mem_get0(this->local_pool); if (!local) { goto err; } #if NSR_CG_NEED_FD local->fd = fd_ref(fd) #else local->fd = NULL #endif frame->local = local; if (xdata) { from_leader = !!dict_get(xdata,NSR_TERM_XATTR); from_recon = !!dict_get(xdata,RECON_TERM_XATTR) && !!dict_get(xdata,RECON_INDEX_XATTR); } else { from_leader = from_recon = _gf_false; } // follower/recon path // just send it to local node if (from_leader || from_recon) { STACK_WIND (frame, nsr_xattrop_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->xattrop, loc, optype, xattr, xdata); return 0; } if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } #if NSR_CG_QUEUE nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd); if (!ictx) { op_errno = EIO; goto err; } /* TBD: LOCK */ if (ictx->active) { gf_log (this->name, GF_LOG_DEBUG, "queuing request due to conflict"); ++(ictx->pending); /* TBD: actually enqueue */ } else { ++(ictx->active); } /* TBD: UNLOCK */ #endif if (!xdata) { xdata = dict_new(); if (!xdata) { gf_log (this->name, GF_LOG_ERROR, "failed to allocate xdata"); goto err; } } if (dict_set_int32(xdata,NSR_TERM_XATTR,priv->current_term) != 0) { gf_log (this->name, GF_LOG_ERROR, "failed to set nsr-term"); goto err; } local->stub = fop_xattrop_stub (frame,nsr_xattrop_continue, loc, optype, xattr, xdata); if (!local->stub) { goto err; } local->call_count = priv->n_children - 1; for (trav = this->children->next; trav; trav = trav->next) { STACK_WIND (frame, nsr_xattrop_fan_in, trav->xlator, trav->xlator->fops->xattrop, loc, optype, xattr, xdata); } // TBD: variable Issue count return 0; err: if (local) { if (local->stub) { call_stub_destroy(local->stub); } if (local->fd) { fd_unref(local->fd); } mem_put(local); } STACK_UNWIND_STRICT (xattrop, frame, -1, op_errno, NULL, NULL); return 0; } /* No code emitted for zerofill */ struct xlator_fops fops = { .access = nsr_access, .create = nsr_create, .discard = nsr_discard, .fallocate = nsr_fallocate, .fgetxattr = nsr_fgetxattr, .fremovexattr = nsr_fremovexattr, .fsetattr = nsr_fsetattr, .fsetxattr = nsr_fsetxattr, .fstat = nsr_fstat, .ftruncate = nsr_ftruncate, .fxattrop = nsr_fxattrop, .getxattr = nsr_getxattr, .link = nsr_link, .mkdir = nsr_mkdir, .mknod = nsr_mknod, .open = nsr_open, .opendir = nsr_opendir, .rchecksum = nsr_rchecksum, .readdir = nsr_readdir, .readdirp = nsr_readdirp, .readlink = nsr_readlink, .readv = nsr_readv, .removexattr = nsr_removexattr, .rename = nsr_rename, .rmdir = nsr_rmdir, .setattr = nsr_setattr, .setxattr = nsr_setxattr, .stat = nsr_stat, .statfs = nsr_statfs, .symlink = nsr_symlink, .truncate = nsr_truncate, .unlink = nsr_unlink, .writev = nsr_writev, .xattrop = nsr_xattrop, };