diff options
author | Raghavendra G <rgowdapp@redhat.com> | 2013-12-02 09:32:53 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2014-01-25 07:51:10 -0800 |
commit | 49eb5ea29cf8d6eab0c5f60d70fe8d6a0113b61e (patch) | |
tree | a95529ae5f7cbfffe0e3a0876f2397c6d85091f7 /xlators | |
parent | 9a34ea6a0a95154013676cabf8528b2679fb36c4 (diff) |
features/quota: remove in-memory accounting of files in enforcer
Accounting was done in enforcer (though marker is the ultimate source
of truth) to offset cached directory size becoming stale. However,
with enforcer being moved to brick we can no longer maintain correct
cluster wide size for a directory. Hence removing accounting code from
enforcer.
Change-Id: I5ea94234da4da85ed5f5ced1354d8de3454b3fcb
BUG: 969461
Signed-off-by: Raghavendra G <rgowdapp@redhat.com>
Reviewed-on: http://review.gluster.org/6434
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators')
-rw-r--r-- | xlators/features/quota/src/quota.c | 385 | ||||
-rw-r--r-- | xlators/features/quota/src/quota.h | 46 | ||||
-rw-r--r-- | xlators/storage/posix/src/posix.c | 1 |
3 files changed, 210 insertions, 222 deletions
diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c index 2c7e2984938..0af514cef0c 100644 --- a/xlators/features/quota/src/quota.c +++ b/xlators/features/quota/src/quota.c @@ -215,6 +215,7 @@ __quota_dentry_new (quota_inode_ctx_t *ctx, char *name, uuid_t par) dentry->name = gf_strdup (name); if (dentry->name == NULL) { GF_FREE (dentry); + dentry = NULL; goto err; } @@ -378,31 +379,51 @@ quota_timeout (struct timeval *tv, int32_t timeout) return timed_out; } +inline void +quota_add_parent (quota_dentry_t *dentry, struct list_head *list) +{ + quota_dentry_t *entry = NULL; + gf_boolean_t found = _gf_false; + + if ((dentry == NULL) || (list == NULL)) { + goto out; + } + + list_for_each_entry (entry, list, next) { + if (uuid_compare (dentry->par, entry->par) == 0) { + found = _gf_true; + goto out; + } + } + + list_add_tail (&dentry->next, list); + +out: + return; +} + int32_t quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, gf_dirent_t *entries, dict_t *xdata) { - inode_t *parent = NULL; - gf_dirent_t *entry = NULL; - loc_t loc = {0, }; - quota_dentry_t *dentry = NULL, *tmp = NULL; - quota_inode_ctx_t *ctx = NULL; - struct list_head parents = {0, }; - quota_local_t *local = NULL; - call_frame_t *continuation_frame = NULL; + inode_t *parent = NULL, *tmp_parent = NULL; + gf_dirent_t *entry = NULL; + loc_t loc = {0, }; + quota_dentry_t *dentry = NULL, *tmp = NULL; + quota_inode_ctx_t *ctx = NULL; + struct list_head parents = {0, }; + quota_local_t *local = NULL; INIT_LIST_HEAD (&parents); - continuation_frame = frame->local; + local = frame->local; frame->local = NULL; - local = continuation_frame->local; - if (op_ret < 0) goto err; - parent = inode_parent (local->validate_loc.inode, 0, NULL); + parent = inode_parent (local->loc.inode, 0, NULL); if (parent == NULL) { gf_log (this->name, GF_LOG_WARNING, "parent is NULL"); op_errno = EINVAL; @@ -423,61 +444,77 @@ quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this, * pointer and continue */ - parent = NULL; + tmp_parent = NULL; } uuid_copy (loc.gfid, entry->d_stat.ia_gfid); loc.inode = inode_ref (entry->inode); - loc.parent = inode_ref (parent); + loc.parent = inode_ref (tmp_parent); loc.name = entry->d_name; quota_fill_inodectx (this, entry->inode, entry->dict, &loc, &entry->d_stat, &op_errno); - parent = entry->inode; + tmp_parent = entry->inode; loc_wipe (&loc); } } - quota_inode_ctx_get (local->validate_loc.inode, this, &ctx, 0); - - local->link_count = 0; + quota_inode_ctx_get (local->loc.inode, this, &ctx, 0); if (ctx != NULL) { LOCK (&ctx->lock); { list_for_each_entry (dentry, &ctx->parents, next) { + /* we built ancestry for a non-directory */ tmp = __quota_dentry_new (NULL, dentry->name, dentry->par); - list_add_tail (&tmp->next, &parents); - local->link_count++; + quota_add_parent (tmp, &parents); + + if (list_empty (&tmp->next)) { + __quota_dentry_free (tmp); + tmp = NULL; + } } } UNLOCK (&ctx->lock); } - if (local->link_count != 0) { - list_for_each_entry_safe (dentry, tmp, &parents, next) { - quota_check_limit (continuation_frame, - local->validate_loc.inode, - this, dentry->name, dentry->par); - __quota_dentry_free (dentry); + if (list_empty (&parents)) { + /* we built ancestry for a directory */ + list_for_each_entry (entry, &entries->list, list) { + if (entry->inode == local->loc.inode) + break; } - } else { - local->link_count = 1; - quota_check_limit (continuation_frame, parent, this, NULL, - NULL); + + GF_ASSERT (&entry->list != &entries->list); + + tmp = __quota_dentry_new (NULL, entry->d_name, parent->gfid); + quota_add_parent (tmp, &parents); } - STACK_DESTROY (frame->root); - return 0; + local->ancestry_cbk (&parents, local->loc.inode, 0, 0, + local->ancestry_data); + goto cleanup; err: + local->ancestry_cbk (NULL, NULL, -1, op_errno, local->ancestry_data); + +cleanup: STACK_DESTROY (frame->root); + quota_local_cleanup (this, local); + + if (parent != NULL) { + inode_unref (parent); + parent = NULL; + } + + list_for_each_entry_safe (dentry, tmp, &parents, next) { + __quota_dentry_free (dentry); + } - quota_handle_validate_error (local, -1, op_errno); return 0; } @@ -486,85 +523,91 @@ quota_build_ancestry_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, fd_t *fd, dict_t *xdata) { - int ret = -1; - dict_t *xdata_req = NULL; - quota_local_t *local = NULL; - call_frame_t *continuation_frame = NULL; + dict_t *xdata_req = NULL; + quota_local_t *local = NULL; + + if (op_ret < 0) { + goto err; + } xdata_req = dict_new (); if (xdata_req == NULL) { - ret = -ENOMEM; + op_ret = -ENOMEM; goto err; } - ret = dict_set_int8 (xdata_req, QUOTA_LIMIT_KEY, 1); - if (ret < 0) + op_ret = dict_set_int8 (xdata_req, QUOTA_LIMIT_KEY, 1); + if (op_ret < 0) { + op_errno = -op_ret; goto err; + } - ret = dict_set_int8 (xdata_req, GET_ANCESTRY_DENTRY_KEY, 1); - if (ret < 0) + op_ret = dict_set_int8 (xdata_req, GET_ANCESTRY_DENTRY_KEY, 1); + if (op_ret < 0) { + op_errno = -op_ret; goto err; + } /* This would ask posix layer to construct dentry chain till root */ STACK_WIND (frame, quota_build_ancestry_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, fd, 0, 0, xdata_req); - ret = 0; + op_ret = 0; err: fd_unref (fd); dict_unref (xdata_req); - if (ret < 0) { - continuation_frame = frame->local; + if (op_ret < 0) { + local = frame->local; frame->local = NULL; + local->ancestry_cbk (NULL, NULL, -1, op_errno, + local->ancestry_data); + quota_local_cleanup (this, local); STACK_DESTROY (frame->root); - - local = continuation_frame->local; - quota_handle_validate_error (local, -1, op_errno); } - return ret; + return 0; } int -quota_build_ancestry (call_frame_t *frame, inode_t *inode, xlator_t *this) +quota_build_ancestry (inode_t *inode, quota_ancestry_built_t ancestry_cbk, + void *data) { loc_t loc = {0, }; fd_t *fd = NULL; quota_local_t *local = NULL; call_frame_t *new_frame = NULL; - int ret = -1; + int op_errno = EINVAL; + xlator_t *this = NULL; + + this = THIS; loc.inode = inode_ref (inode); uuid_copy (loc.gfid, inode->gfid); - gf_log (this->name, GF_LOG_WARNING, "building ancestry"); - - local = frame->local; - - LOCK (&local->lock); - { - loc_wipe (&local->validate_loc); + fd = fd_create (inode, 0); - ret = quota_inode_loc_fill (inode, &local->validate_loc); - if (ret < 0) { - gf_log (this->name, GF_LOG_WARNING, - "cannot fill loc for inode (gfid:%s), hence " - "aborting quota-checks and continuing with fop", - uuid_utoa (inode->gfid)); - } + new_frame = create_frame (this, this->ctx->pool); + if (new_frame == NULL) { + op_errno = ENOMEM; + goto err; } - UNLOCK (&local->lock); - fd = fd_create (inode, 0); - - new_frame = copy_frame (frame); new_frame->root->uid = new_frame->root->gid = 0; - new_frame->local = frame; + local = quota_local_new (); + if (local == NULL) { + op_errno = ENOMEM; + goto err; + } + + new_frame->local = local; + local->ancestry_cbk = ancestry_cbk; + local->ancestry_data = data; + local->loc.inode = inode_ref (inode); if (IA_ISDIR (inode->ia_type)) { STACK_WIND (new_frame, quota_build_ancestry_open_cbk, @@ -579,7 +622,25 @@ quota_build_ancestry (call_frame_t *frame, inode_t *inode, xlator_t *this) } loc_wipe (&loc); + return 0; +err: + ancestry_cbk (NULL, NULL, -1, op_errno, data); + + fd_unref (fd); + + local = new_frame->local; + new_frame->local = NULL; + + if (local != NULL) { + quota_local_cleanup (this, local); + } + + if (new_frame != NULL) { + STACK_DESTROY (new_frame->root); + } + + loc_wipe (&loc); return 0; } @@ -646,6 +707,56 @@ err: return ret; } +void +quota_check_limit_continuation (struct list_head *parents, inode_t *inode, + int32_t op_ret, int32_t op_errno, void *data) +{ + call_frame_t *frame = NULL; + xlator_t *this = NULL; + quota_local_t *local = NULL; + quota_dentry_t *entry = NULL; + inode_t *parent = NULL; + int parent_count = 0; + + frame = data; + local = frame->local; + this = THIS; + + if ((op_ret < 0) || list_empty (parents)) { + if (list_empty (parents)) { + gf_log (this->name, GF_LOG_WARNING, + "Couldn't build ancestry for inode (gfid:%s). " + "Without knowing ancestors till root, quota " + "cannot be enforced. " + "Hence, failing fop with EIO", + uuid_utoa (inode->gfid)); + op_errno = EIO; + } + + quota_handle_validate_error (local, -1, op_errno); + goto out; + } + + list_for_each_entry (entry, parents, next) { + parent_count++; + } + + LOCK (&local->lock); + { + local->link_count += (parent_count - 1); + } + UNLOCK (&local->lock); + + list_for_each_entry (entry, parents, next) { + parent = inode_find (inode->table, entry->par); + + quota_check_limit (frame, parent, this, NULL, NULL); + } + +out: + return; +} + int32_t quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, char *name, uuid_t par) @@ -789,7 +900,9 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, } if (parent == NULL) { - ret = quota_build_ancestry (frame, _inode, this); + ret = quota_build_ancestry (_inode, + quota_check_limit_continuation, + frame); if (ret < 0) { op_errno = -ret; goto err; @@ -1017,77 +1130,6 @@ off: return 0; } - -void -quota_update_size (xlator_t *this, inode_t *inode, char *name, uuid_t par, - int64_t delta) -{ - inode_t *_inode = NULL; - inode_t *parent = NULL; - uint64_t value = 0; - quota_inode_ctx_t *ctx = NULL; - uuid_t trav_uuid = {0,}; - - GF_VALIDATE_OR_GOTO ("quota", this, out); - GF_VALIDATE_OR_GOTO (this->name, inode, out); - - inode_ctx_get (inode, this, &value); - ctx = (quota_inode_ctx_t *)(unsigned long)value; - - _inode = inode_ref (inode); - - if ( par != NULL ) { - uuid_copy (trav_uuid, par); - } - - do { - if ((ctx != NULL) && (ctx->hard_lim >= 0)) { - quota_log_usage (this, ctx, _inode, delta); - LOCK (&ctx->lock); - { - ctx->size += delta; - if (ctx->size < 0) - ctx->size = 0; - } - UNLOCK (&ctx->lock); - } - - if (__is_root_gfid (_inode->gfid)) { - break; - } - - parent = inode_parent (_inode, trav_uuid, name); - if (parent == NULL) { - /* TODO: build ancestry and continue updating size */ - gf_log (this->name, GF_LOG_DEBUG, - "cannot find parent for inode (gfid:%s), hence " - "aborting size updation of parents", - uuid_utoa (_inode->gfid)); - } - - if (name != NULL) { - name = NULL; - uuid_clear (trav_uuid); - } - - inode_unref (_inode); - _inode = parent; - - if (_inode == NULL) { - break; - } - - value = 0; - ctx = NULL; - inode_ctx_get (_inode, this, &value); - ctx = (quota_inode_ctx_t *)(unsigned long)value; - } while (1); - -out: - return; -} - - int32_t quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, @@ -1097,9 +1139,6 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, uint64_t ctx_int = 0; quota_inode_ctx_t *ctx = NULL; quota_local_t *local = NULL; - quota_dentry_t *dentry = NULL, *tmp = NULL; - int64_t delta = 0; - struct list_head head = {0, }; local = frame->local; @@ -1107,8 +1146,6 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, goto out; } - INIT_LIST_HEAD (&head); - ret = inode_ctx_get (local->loc.inode, this, &ctx_int); if (ret) { gf_log (this->name, GF_LOG_WARNING, @@ -1128,25 +1165,9 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, LOCK (&ctx->lock); { ctx->buf = *postbuf; - - list_for_each_entry (dentry, &ctx->parents, next) { - tmp = __quota_dentry_new (NULL, dentry->name, - dentry->par); - list_add_tail (&tmp->next, &head); - } - } UNLOCK (&ctx->lock); - if (postbuf->ia_blocks != prebuf->ia_blocks) - delta = local->delta; - - list_for_each_entry_safe (dentry, tmp, &head, next) { - quota_update_size (this, local->loc.inode, dentry->name, - dentry->par, delta); - __quota_dentry_free (dentry); - } - out: QUOTA_STACK_UNWIND (writev, frame, op_ret, op_errno, prebuf, postbuf, xdata); @@ -1583,12 +1604,6 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, goto out; } - if (!local->skip_check) - quota_update_size (this, local->loc.inode, - (char *)local->loc.name, - local->loc.parent->gfid, - (-(ctx->buf.ia_blocks * 512))); - LOCK (&ctx->lock); { list_for_each_entry (dentry, &ctx->parents, next) { @@ -1680,9 +1695,6 @@ quota_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this, if (local->skip_check) goto out; - quota_update_size (this, local->loc.parent, NULL, NULL, - (buf->ia_blocks * 512)); - ret = quota_inode_ctx_get (inode, this, &ctx, 0); if ((ret == -1) || (ctx == NULL)) { gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " @@ -1867,13 +1879,6 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this, size = buf->ia_blocks * 512; } - if (local->oldloc.parent != local->newloc.parent) { - quota_update_size (this, local->oldloc.parent, NULL, NULL, - (-size)); - quota_update_size (this, local->newloc.parent, NULL, NULL, - size); - } - if (!QUOTA_REG_OR_LNK_FILE (local->oldloc.inode->ia_type)) { goto out; } @@ -2131,7 +2136,6 @@ quota_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, struct iatt *buf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { - int64_t size = 0; quota_local_t *local = NULL; quota_inode_ctx_t *ctx = NULL; quota_dentry_t *dentry = NULL; @@ -2141,9 +2145,6 @@ quota_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, } local = frame->local; - size = buf->ia_blocks * 512; - - quota_update_size (this, local->loc.parent, NULL, NULL, size); quota_inode_ctx_get (local->loc.inode, this, &ctx, 1); if (ctx == NULL) { @@ -2275,7 +2276,6 @@ quota_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, { quota_local_t *local = NULL; quota_inode_ctx_t *ctx = NULL; - int64_t delta = 0; if (op_ret < 0) { goto out; @@ -2287,10 +2287,6 @@ quota_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, goto out; } - delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512; - - quota_update_size (this, local->loc.inode, NULL, NULL, delta); - quota_inode_ctx_get (local->loc.inode, this, &ctx, 0); if (ctx == NULL) { gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " @@ -2362,7 +2358,6 @@ quota_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, { quota_local_t *local = NULL; quota_inode_ctx_t *ctx = NULL; - int64_t delta = 0; if (op_ret < 0) { goto out; @@ -2374,10 +2369,6 @@ quota_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, goto out; } - delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512; - - quota_update_size (this, local->loc.inode, NULL, NULL, delta); - quota_inode_ctx_get (local->loc.inode, this, &ctx, 0); if (ctx == NULL) { gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " @@ -3849,8 +3840,6 @@ quota_fallocate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, uint64_t ctx_int = 0; quota_inode_ctx_t *ctx = NULL; quota_local_t *local = NULL; - quota_dentry_t *dentry = NULL; - int64_t delta = 0; local = frame->local; @@ -3880,12 +3869,6 @@ quota_fallocate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, } UNLOCK (&ctx->lock); - list_for_each_entry (dentry, &ctx->parents, next) { - delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512; - quota_update_size (this, local->loc.inode, - dentry->name, dentry->par, delta); - } - out: QUOTA_STACK_UNWIND (fallocate, frame, op_ret, op_errno, prebuf, postbuf, xdata); diff --git a/xlators/features/quota/src/quota.h b/xlators/features/quota/src/quota.h index 02bc0d8b6b4..84c3257feec 100644 --- a/xlators/features/quota/src/quota.h +++ b/xlators/features/quota/src/quota.h @@ -160,28 +160,34 @@ struct quota_limit { } __attribute__ ((packed)); typedef struct quota_limit quota_limit_t; +typedef void +(*quota_ancestry_built_t) (struct list_head *parents, inode_t *inode, + int32_t op_ret, int32_t op_errno, void *data); + struct quota_local { - gf_lock_t lock; - uint32_t validate_count; - uint32_t link_count; - loc_t loc; - loc_t oldloc; - loc_t newloc; - loc_t validate_loc; - int64_t delta; - int32_t op_ret; - int32_t op_errno; - int64_t size; - gf_boolean_t skip_check; - char just_validated; - fop_lookup_cbk_t validate_cbk; - inode_t *inode; - call_stub_t *stub; - struct iobref *iobref; - quota_limit_t limit; - int64_t space_available; + gf_lock_t lock; + uint32_t validate_count; + uint32_t link_count; + loc_t loc; + loc_t oldloc; + loc_t newloc; + loc_t validate_loc; + int64_t delta; + int32_t op_ret; + int32_t op_errno; + int64_t size; + gf_boolean_t skip_check; + char just_validated; + fop_lookup_cbk_t validate_cbk; + inode_t *inode; + call_stub_t *stub; + struct iobref *iobref; + quota_limit_t limit; + int64_t space_available; + quota_ancestry_built_t ancestry_cbk; + void *ancestry_data; }; -typedef struct quota_local quota_local_t; +typedef struct quota_local quota_local_t; struct quota_priv { uint32_t soft_timeout; diff --git a/xlators/storage/posix/src/posix.c b/xlators/storage/posix/src/posix.c index 83b689d061b..c1cbc83f4fb 100644 --- a/xlators/storage/posix/src/posix.c +++ b/xlators/storage/posix/src/posix.c @@ -885,7 +885,6 @@ posix_opendir (call_frame_t *frame, xlator_t *this, VALIDATE_OR_GOTO (frame, out); VALIDATE_OR_GOTO (this, out); VALIDATE_OR_GOTO (loc, out); - VALIDATE_OR_GOTO (loc->path, out); VALIDATE_OR_GOTO (fd, out); SET_FS_ID (frame->root->uid, frame->root->gid); |