diff options
Diffstat (limited to 'xlators/features/quota/src/quota.c')
-rw-r--r-- | xlators/features/quota/src/quota.c | 325 |
1 files changed, 271 insertions, 54 deletions
diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c index 3c1b8e09c5c..f903b4e57b7 100644 --- a/xlators/features/quota/src/quota.c +++ b/xlators/features/quota/src/quota.c @@ -14,17 +14,6 @@ #include "defaults.h" #include "statedump.h" -void -quota_get_limit_dir (call_frame_t *frame, inode_t *cur_inode, xlator_t *this); - -int32_t -quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, - char *name, uuid_t par); - -int -quota_fill_inodectx (xlator_t *this, inode_t *inode, dict_t *dict, - loc_t *loc, struct iatt *buf, int32_t *op_errno); - struct volume_options options[]; static int32_t @@ -251,6 +240,164 @@ out: return; } +static inline inode_t* +__quota_inode_parent (inode_t *inode, uuid_t pargfid, const char *name) +{ + inode_t *parent = NULL; + + parent = inode_parent (inode, pargfid, name); + inode_unref (inode); + return parent; +} + +static inline inode_t* +quota_inode_parent (inode_t *inode, uuid_t pargfid, const char *name) +{ + inode_t *parent = NULL; + + parent = __quota_inode_parent (inode, pargfid, name); + if (!parent) + gf_log_callingfn (THIS->name, GF_LOG_ERROR, "Failed to find " + "ancestor for inode (%s)", + uuid_utoa(inode->gfid)); + + return parent; +} + +int32_t +quota_inode_depth (inode_t *inode) +{ + int depth = 0; + inode_t *cur_inode = NULL; + + cur_inode = inode_ref (inode); + while (cur_inode && !__is_root_gfid (cur_inode->gfid)) { + depth++; + cur_inode = quota_inode_parent (cur_inode, 0 , NULL); + if (!cur_inode) + depth = -1; + } + + if (cur_inode) + inode_unref (cur_inode); + + return depth; +} + +int32_t quota_find_common_ancestor (inode_t *inode1, inode_t *inode2, + uuid_t *common_ancestor) +{ + int32_t depth1 = 0; + int32_t depth2 = 0; + int32_t ret = -1; + inode_t *cur_inode1 = NULL; + inode_t *cur_inode2 = NULL; + + depth1 = quota_inode_depth (inode1); + if (depth1 < 0) + goto out; + + depth2 = quota_inode_depth (inode2); + if (depth2 < 0) + goto out; + + cur_inode1 = inode_ref (inode1); + cur_inode2 = inode_ref (inode2); + + while (cur_inode1 && depth1 > depth2) { + cur_inode1 = quota_inode_parent (cur_inode1, 0 , NULL); + depth1--; + } + + while (cur_inode2 && depth2 > depth1) { + cur_inode2 = quota_inode_parent (cur_inode2, 0 , NULL); + depth2--; + } + + while (depth1 && cur_inode1 && cur_inode2 && cur_inode1 != cur_inode2) { + cur_inode1 = quota_inode_parent (cur_inode1, 0 , NULL); + cur_inode2 = quota_inode_parent (cur_inode2, 0 , NULL); + depth1--; + } + + if (cur_inode1 && cur_inode2) { + uuid_copy (*common_ancestor, cur_inode1->gfid); + ret = 0; + } +out: + if (cur_inode1) + inode_unref (cur_inode1); + + if (cur_inode2) + inode_unref (cur_inode2); + + return ret; + } + +void +check_ancestory_continue (struct list_head *parents, inode_t *inode, + int32_t op_ret, int32_t op_errno, void *data) +{ + call_frame_t *frame = NULL; + quota_local_t *local = NULL; + uint32_t link_count = 0; + + frame = data; + local = frame->local; + + if (op_ret < 0 || (parents && list_empty (parents))) { + if (op_ret >= 0) { + gf_log (THIS->name, GF_LOG_WARNING, + "Couldn't build ancestry for inode (gfid:%s). " + "Without knowing ancestors till root, quota " + "cannot be enforced. " + "Hence, failing fop with EIO", + uuid_utoa (inode->gfid)); + op_errno = EIO; + op_ret = -1; + } + } + + LOCK (&local->lock); + { + link_count = --local->link_count; + if (op_ret < 0) { + local->op_ret = op_ret; + local->op_errno = op_errno; + } + } + UNLOCK (&local->lock); + + if (link_count == 0) + local->fop_continue_cbk (frame); +} + +void +check_ancestory (call_frame_t *frame, inode_t *inode) +{ + inode_t *cur_inode = NULL; + inode_t *parent = NULL; + + cur_inode = inode_ref (inode); + while (cur_inode && !__is_root_gfid (cur_inode->gfid)) { + parent = inode_parent (cur_inode, 0, NULL); + if (!parent) { + quota_build_ancestry (cur_inode, + check_ancestory_continue, frame); + return; + } + inode_unref (cur_inode); + cur_inode = parent; + } + + if (cur_inode) { + inode_unref (cur_inode); + check_ancestory_continue (NULL, NULL, 0, 0, frame); + } else { + check_ancestory_continue (NULL, NULL, -1, ESTALE, frame); + } +} + static inline void quota_link_count_decrement (quota_local_t *local) { @@ -827,6 +974,14 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, } do { + /* In a rename operation, enforce should be stopped at common + ancestor */ + if (!uuid_is_null (local->common_ancestor) && + !uuid_compare (_inode->gfid, local->common_ancestor)) { + quota_link_count_decrement (local); + break; + } + if (ctx != NULL && (ctx->hard_lim > 0 || ctx->soft_lim > 0)) { wouldbe_size = ctx->size + delta; @@ -2046,63 +2201,51 @@ out: return 0; } -int32_t -quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, - loc_t *newloc, dict_t *xdata) +void +quota_rename_continue (call_frame_t *frame) { - quota_priv_t *priv = NULL; - int32_t ret = -1, op_errno = ENOMEM; - quota_local_t *local = NULL; - quota_inode_ctx_t *ctx = NULL; - call_stub_t *stub = NULL; - - priv = this->private; - - WIND_IF_QUOTAOFF (priv->is_quota_on, off); - - local = quota_local_new (); - if (local == NULL) { - goto err; - } - - frame->local = local; + int32_t ret = -1; + int32_t op_errno = EIO; + quota_local_t *local = NULL; + uuid_t common_ancestor = {0}; + xlator_t *this = NULL; + quota_inode_ctx_t *ctx = NULL; - ret = loc_copy (&local->oldloc, oldloc); - if (ret < 0) { - gf_log (this->name, GF_LOG_WARNING, "loc_copy failed"); - goto err; - } + local = frame->local; + this = THIS; - ret = loc_copy (&local->newloc, newloc); - if (ret < 0) { - gf_log (this->name, GF_LOG_WARNING, "loc_copy failed"); + if (local->op_ret < 0) { + op_errno = local->op_errno; goto err; } - stub = fop_rename_stub (frame, quota_rename_helper, oldloc, newloc, - xdata); - if (stub == NULL) { + ret = quota_find_common_ancestor (local->oldloc.parent, + local->newloc.parent, + &common_ancestor); + if (ret < 0 || uuid_is_null(common_ancestor)) { + gf_log (this->name, GF_LOG_ERROR, "failed to get " + "common_ancestor for %s and %s", + local->oldloc.path, local->newloc.path); + op_errno = ESTALE; goto err; } LOCK (&local->lock); { local->link_count = 1; - local->stub = stub; + uuid_copy (local->common_ancestor, common_ancestor); } UNLOCK (&local->lock); - if (QUOTA_REG_OR_LNK_FILE (oldloc->inode->ia_type)) { - ret = quota_inode_ctx_get (oldloc->inode, this, &ctx, 0); + if (QUOTA_REG_OR_LNK_FILE (local->oldloc.inode->ia_type)) { + ret = quota_inode_ctx_get (local->oldloc.inode, this, &ctx, 0); if (ctx == NULL) { gf_log (this->name, GF_LOG_WARNING, "quota context not set in inode (gfid:%s), " "considering file size as zero while enforcing " "quota on new ancestry", - oldloc->inode ? uuid_utoa (oldloc->inode->gfid) - : "0"); + uuid_utoa (local->oldloc.inode->gfid)); local->delta = 0; - } else { /* FIXME: We need to account for the size occupied by this @@ -2112,25 +2255,99 @@ quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, * directory inode*/ /* FIXME: The following code assumes that regular files and - *linkfiles are present, in their entirety, in a single - brick. This *assumption is invalid in the case of - stripe.*/ + * linkfiles are present, in their entirety, in a single + * brick. This *assumption is invalid in the case of + * stripe.*/ local->delta = ctx->buf.ia_blocks * 512; } - } else if (IA_ISDIR (oldloc->inode->ia_type)) { - ret = quota_validate (frame, oldloc->inode, this, + } else if (IA_ISDIR (local->oldloc.inode->ia_type)) { + ret = quota_validate (frame, local->oldloc.inode, this, quota_rename_get_size_cbk); if (ret){ op_errno = -ret; goto err; } - return 0; + return; } - quota_check_limit (frame, newloc->parent, this, NULL, NULL); + quota_check_limit (frame, local->newloc.parent, this, NULL, NULL); + return; + +err: + if (local && local->stub) + call_stub_destroy (local->stub); + + QUOTA_STACK_UNWIND (rename, frame, -1, op_errno, NULL, + NULL, NULL, NULL, NULL, NULL); + return; + +} + +int32_t +quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, + loc_t *newloc, dict_t *xdata) +{ + quota_priv_t *priv = NULL; + int32_t ret = -1; + int32_t op_errno = ENOMEM; + quota_local_t *local = NULL; + call_stub_t *stub = NULL; + uuid_t common_ancestor = {0}; + + priv = this->private; + + WIND_IF_QUOTAOFF (priv->is_quota_on, off); + + /* No need to check quota limit if src and dst parents are same */ + if (oldloc->parent && newloc->parent && + !uuid_compare(oldloc->parent->gfid, newloc->parent->gfid)) { + gf_log (this->name, GF_LOG_DEBUG, "rename %s -> %s are " + "in the same directory, so skip check limit", + oldloc->path, newloc->path); + goto off; + } + + local = quota_local_new (); + if (local == NULL) { + goto err; + } + + frame->local = local; + + ret = loc_copy (&local->oldloc, oldloc); + if (ret < 0) { + gf_log (this->name, GF_LOG_WARNING, "loc_copy failed"); + goto err; + } + + ret = loc_copy (&local->newloc, newloc); + if (ret < 0) { + gf_log (this->name, GF_LOG_WARNING, "loc_copy failed"); + goto err; + } + + stub = fop_rename_stub (frame, quota_rename_helper, oldloc, newloc, + xdata); + if (stub == NULL) { + goto err; + } + + LOCK (&local->lock); + { + /* link_count here tell how many check_ancestory should be done + * before continuing the FOP + */ + local->link_count = 2; + local->stub = stub; + local->fop_continue_cbk = quota_rename_continue; + } + UNLOCK (&local->lock); + + check_ancestory (frame, newloc->parent); + check_ancestory (frame, oldloc->parent); return 0; err: |