diff options
-rw-r--r-- | tests/bugs/quota/bug-1235182.t | 2 | ||||
-rw-r--r-- | xlators/features/quota/src/quota.c | 408 | ||||
-rw-r--r-- | xlators/features/quota/src/quota.h | 9 |
3 files changed, 296 insertions, 123 deletions
diff --git a/tests/bugs/quota/bug-1235182.t b/tests/bugs/quota/bug-1235182.t index ec62b69638d..e28b557f558 100644 --- a/tests/bugs/quota/bug-1235182.t +++ b/tests/bugs/quota/bug-1235182.t @@ -17,7 +17,7 @@ TEST glusterd TEST pidof glusterd; TEST $CLI volume info; -TEST $CLI volume create $V0 $H0:$B0/${V0}{1}; +TEST $CLI volume create $V0 $H0:$B0/${V0}; TEST $CLI volume start $V0; TEST $CLI volume quota $V0 enable; diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c index 921d014d228..a44471337d2 100644 --- a/xlators/features/quota/src/quota.c +++ b/xlators/features/quota/src/quota.c @@ -155,7 +155,7 @@ err: int32_t -quota_local_cleanup (xlator_t *this, quota_local_t *local) +quota_local_cleanup (quota_local_t *local) { if (local == NULL) { goto out; @@ -243,6 +243,31 @@ out: return; } +void +__quota_dentry_del (quota_inode_ctx_t *ctx, const char *name, uuid_t par) +{ + quota_dentry_t *dentry = NULL; + quota_dentry_t *tmp = NULL; + + list_for_each_entry_safe (dentry, tmp, &ctx->parents, next) { + if ((strcmp (dentry->name, name) == 0) && + (gf_uuid_compare (dentry->par, par) == 0)) { + __quota_dentry_free (dentry); + break; + } + } +} + +void +quota_dentry_del (quota_inode_ctx_t *ctx, const char *name, uuid_t par) +{ + LOCK (&ctx->lock); + { + __quota_dentry_del (ctx, name, par); + } + UNLOCK (&ctx->lock); +} + static inline inode_t* __quota_inode_parent (inode_t *inode, uuid_t pargfid, const char *name) { @@ -485,10 +510,18 @@ out: } static inline void -quota_link_count_decrement (quota_local_t *local) +quota_link_count_decrement (call_frame_t *frame) { - call_stub_t *stub = NULL; - int link_count = -1; + call_frame_t *tmpframe = NULL; + quota_local_t *local = NULL; + call_stub_t *stub = NULL; + int link_count = -1; + + local = frame->local; + if (local && local->par_frame) { + local = local->par_frame->local; + tmpframe = frame; + } if (local == NULL) goto out; @@ -506,14 +539,30 @@ quota_link_count_decrement (quota_local_t *local) if (stub != NULL) { call_resume (stub); } + out: + if (tmpframe) { + local = tmpframe->local; + tmpframe->local = NULL; + + STACK_DESTROY (frame->root); + if (local) + quota_local_cleanup (local); + } + return; } static inline void -quota_handle_validate_error (quota_local_t *local, int32_t op_ret, +quota_handle_validate_error (call_frame_t *frame, int32_t op_ret, int32_t op_errno) { + quota_local_t *local; + + local = frame->local; + if (local && local->par_frame) + local = local->par_frame->local; + if (local == NULL) goto out; @@ -527,7 +576,7 @@ quota_handle_validate_error (quota_local_t *local, int32_t op_ret, UNLOCK (&local->lock); /* we abort checking limits on this path to root */ - quota_link_count_decrement (local); + quota_link_count_decrement (frame); out: return; } @@ -595,7 +644,7 @@ quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, return 0; unwind: - quota_handle_validate_error (local, op_ret, op_errno); + quota_handle_validate_error (frame, op_ret, op_errno); return 0; } @@ -622,27 +671,65 @@ quota_timeout (struct timeval *tv, int32_t timeout) return timed_out; } -static inline void -quota_add_parent (quota_dentry_t *dentry, struct list_head *list) +/* Return: 1 if new entry added + * 0 no entry added + */ +static inline int32_t +quota_add_parent (struct list_head *list, char *name, uuid_t pgfid) { quota_dentry_t *entry = NULL; gf_boolean_t found = _gf_false; - if ((dentry == NULL) || (list == NULL)) { + if (list == NULL) { goto out; } list_for_each_entry (entry, list, next) { - if (gf_uuid_compare (dentry->par, entry->par) == 0) { + if (gf_uuid_compare (pgfid, entry->par) == 0) { found = _gf_true; goto out; } } - list_add_tail (&dentry->next, list); + entry = __quota_dentry_new (NULL, name, pgfid); + list_add_tail (&entry->next, list); out: - return; + if (found) + return 0; + else + return 1; + +} + +/* This function iterates the parent list in inode + * context and add unique parent to the list + * Returns number of dentry added to the list + */ +static inline int32_t +quota_add_parents_from_ctx (quota_inode_ctx_t *ctx, struct list_head *list) +{ + int ret = 0; + quota_dentry_t *dentry = NULL; + int32_t count = 0; + + if (ctx == NULL || list == NULL) + goto out; + + LOCK (&ctx->lock); + { + list_for_each_entry (dentry, &ctx->parents, next) { + ret = quota_add_parent (list, dentry->name, + dentry->par); + + if (ret == 1) + count++; + } + } + UNLOCK (&ctx->lock); + +out: + return count; } int32_t @@ -708,23 +795,7 @@ quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this, quota_inode_ctx_get (local->loc.inode, this, &ctx, 0); - if (ctx != NULL) { - LOCK (&ctx->lock); - { - list_for_each_entry (dentry, &ctx->parents, next) { - /* we built ancestry for a non-directory */ - tmp = __quota_dentry_new (NULL, dentry->name, - dentry->par); - quota_add_parent (tmp, &parents); - - if (list_empty (&tmp->next)) { - __quota_dentry_free (tmp); - tmp = NULL; - } - } - } - UNLOCK (&ctx->lock); - } + quota_add_parents_from_ctx (ctx, &parents); if (list_empty (&parents)) { /* we built ancestry for a directory */ @@ -735,8 +806,7 @@ quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this, GF_ASSERT (&entry->list != &entries->list); - tmp = __quota_dentry_new (NULL, entry->d_name, parent->gfid); - quota_add_parent (tmp, &parents); + quota_add_parent (&parents, entry->d_name, parent->gfid); } local->ancestry_cbk (&parents, local->loc.inode, 0, 0, @@ -748,7 +818,7 @@ err: cleanup: STACK_DESTROY (frame->root); - quota_local_cleanup (this, local); + quota_local_cleanup (local); if (parent != NULL) { inode_unref (parent); @@ -838,7 +908,7 @@ err: } if (local) - quota_local_cleanup (this, local); + quota_local_cleanup (local); } return 0; @@ -919,6 +989,7 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode, call_frame_t *frame = NULL; xlator_t *this = NULL; quota_local_t *local = NULL; + quota_local_t *par_local = NULL; quota_dentry_t *entry = NULL; inode_t *parent = NULL; int parent_count = 0; @@ -927,6 +998,12 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode, local = frame->local; this = THIS; + if (local->par_frame) + par_local = local->par_frame->local; + else + par_local = local; + + if ((op_ret < 0) || list_empty (parents)) { if (op_ret >= 0) { gf_msg (this->name, GF_LOG_WARNING, EIO, @@ -939,7 +1016,7 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode, op_errno = EIO; } - quota_handle_validate_error (local, -1, op_errno); + quota_handle_validate_error (frame, -1, op_errno); goto out; } @@ -947,16 +1024,27 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode, parent_count++; } - LOCK (&local->lock); + LOCK (&par_local->lock); { - local->link_count += (parent_count - 1); + par_local->link_count += (parent_count - 1); } - UNLOCK (&local->lock); + UNLOCK (&par_local->lock); - list_for_each_entry (entry, parents, next) { - parent = inode_find (inode->table, entry->par); - quota_check_limit (frame, parent, this, NULL, NULL); - inode_unref (parent); + if (local->par_frame) { + list_for_each_entry (entry, parents, next) { + parent = inode_find (inode->table, entry->par); + quota_check_limit (frame, parent, this, NULL, NULL); + inode_unref (parent); + } + } else { + list_for_each_entry (entry, parents, next) { + parent = do_quota_check_limit (frame, inode, this, + entry); + if (parent) + inode_unref (parent); + else + quota_link_count_decrement (frame); + } } out: @@ -1130,6 +1218,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, quota_inode_ctx_t *ctx = NULL; quota_priv_t *priv = NULL; quota_local_t *local = NULL; + quota_local_t *par_local = NULL; char need_validate = 0; char just_validated = 0; gf_boolean_t hard_limit_exceeded = 0; @@ -1145,9 +1234,16 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, local = frame->local; GF_VALIDATE_OR_GOTO (this->name, local, err); - delta = local->delta; + if (local->par_frame) { + par_local = local->par_frame->local; + GF_VALIDATE_OR_GOTO (this->name, par_local, err); + } else { + par_local = local; + } + + delta = par_local->delta; - GF_VALIDATE_OR_GOTO (this->name, local->stub, err); + GF_VALIDATE_OR_GOTO (this->name, par_local->stub, err); /* Allow all the trusted clients * Don't block the gluster internal processes like rebalance, gsyncd, * self heal etc from the disk quotas. @@ -1158,7 +1254,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, */ if (0 > frame->root->pid) { ret = 0; - quota_link_count_decrement (local); + quota_link_count_decrement (frame); goto done; } @@ -1183,15 +1279,16 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, do { /* In a rename operation, enforce should be stopped at common ancestor */ - if (!gf_uuid_is_null (local->common_ancestor) && - !gf_uuid_compare (_inode->gfid, local->common_ancestor)) { - quota_link_count_decrement (local); + if (!gf_uuid_is_null (par_local->common_ancestor) && + !gf_uuid_compare (_inode->gfid, par_local->common_ancestor) + ) { + quota_link_count_decrement (frame); break; } ret = quota_check_object_limit (frame, ctx, priv, _inode, this, &op_errno, just_validated, - local, &skip_check); + par_local, &skip_check); if (skip_check == _gf_true) goto done; @@ -1205,7 +1302,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, ret = quota_check_size_limit (frame, ctx, priv, _inode, this, &op_errno, just_validated, delta, - local, &skip_check); + par_local, &skip_check); if (skip_check == _gf_true) goto done; @@ -1218,7 +1315,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, } if (__is_root_gfid (_inode->gfid)) { - quota_link_count_decrement (local); + quota_link_count_decrement (frame); break; } @@ -1231,8 +1328,8 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, if (parent == NULL) { ret = quota_build_ancestry (_inode, - quota_check_limit_continuation, - frame); + quota_check_limit_continuation, + frame); if (ret < 0) { op_errno = -ret; goto err; @@ -1250,7 +1347,6 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, ctx = (quota_inode_ctx_t *)(unsigned long)value; } while (1); - done: if (_inode != NULL) { inode_unref (_inode); @@ -1259,12 +1355,66 @@ done: return 0; err: - quota_handle_validate_error (local, -1, op_errno); + quota_handle_validate_error (frame, -1, op_errno); inode_unref (_inode); return 0; } +inode_t * +do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, + quota_dentry_t *dentry) +{ + int32_t ret = -1; + inode_t *parent = NULL; + call_frame_t *new_frame = NULL; + quota_local_t *local = NULL; + quota_local_t *new_local = NULL; + + local = frame->local; + + parent = inode_parent (inode, dentry->par, dentry->name); + if (parent == NULL) + parent = inode_find (inode->table, dentry->par); + if (parent == NULL) + goto out; + + new_frame = create_frame (this, this->ctx->pool); + if (new_frame == NULL) + goto out; + + new_local = quota_local_new (); + if (new_local == NULL) + goto out; + + new_frame->root->uid = new_frame->root->gid = 0; + new_frame->local = new_local; + new_local->par_frame = frame; + + quota_check_limit (new_frame, parent, this, NULL, NULL); + + ret = 0; +out: + if (ret < 0) { + if (parent) { + /* Caller should decrement link_count, in case parent is + * NULL + */ + quota_handle_validate_error (frame, -1, ENOMEM); + } + + if (new_frame) { + new_frame->local = NULL; + STACK_DESTROY (new_frame->root); + } + + if (new_local) + quota_local_cleanup (new_local); + } + + return parent; +} + static inline int quota_get_limits (xlator_t *this, dict_t *dict, int64_t *hard_lim, int64_t *soft_lim, int64_t *object_hard_limit, @@ -1443,7 +1593,7 @@ out: if (this_inode) inode_unref (this_inode); - quota_local_cleanup (this, local); + quota_local_cleanup (local); return 0; } @@ -1635,15 +1785,17 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, int32_t count, off_t off, uint32_t flags, struct iobref *iobref, dict_t *xdata) { - quota_priv_t *priv = NULL; - int32_t ret = -1, op_errno = EINVAL; - int32_t parents = 0; - uint64_t size = 0; - quota_local_t *local = NULL; - quota_inode_ctx_t *ctx = NULL; - quota_dentry_t *dentry = NULL, *tmp = NULL; - call_stub_t *stub = NULL; - struct list_head head = {0, }; + quota_priv_t *priv = NULL; + int32_t ret = -1, op_errno = EINVAL; + int32_t parents = 0; + int32_t fail_count = 0; + uint64_t size = 0; + quota_local_t *local = NULL; + quota_inode_ctx_t *ctx = NULL; + quota_dentry_t *dentry = NULL, *tmp = NULL; + call_stub_t *stub = NULL; + struct list_head head = {0, }; + inode_t *par_inode = NULL; priv = this->private; @@ -1682,18 +1834,8 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, GF_VALIDATE_OR_GOTO (this->name, priv, unwind); size = iov_length (vector, count); - if (ctx != NULL) { - LOCK (&ctx->lock); - { - list_for_each_entry (dentry, &ctx->parents, next) { - tmp = __quota_dentry_new (NULL, dentry->name, - dentry->par); - list_add_tail (&tmp->next, &head); - parents++; - } - } - UNLOCK (&ctx->lock); - } + + parents = quota_add_parents_from_ctx (ctx, &head); LOCK (&local->lock); { @@ -1710,10 +1852,33 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, quota_check_limit (frame, fd->inode, this, NULL, NULL); } else { list_for_each_entry_safe (dentry, tmp, &head, next) { - quota_check_limit (frame, fd->inode, this, dentry->name, - dentry->par); + par_inode = do_quota_check_limit (frame, fd->inode, + this, dentry); + if (par_inode == NULL) { + /* remove stale entry from inode ctx */ + quota_dentry_del (ctx, dentry->name, + dentry->par); + parents--; + fail_count++; + } else { + inode_unref (par_inode); + } __quota_dentry_free (dentry); } + + if (parents == 0) { + LOCK (&local->lock); + { + local->link_count++; + } + UNLOCK (&local->lock); + quota_check_limit (frame, fd->inode, this, NULL, NULL); + } + + while (fail_count != 0) { + quota_link_count_decrement (frame); + fail_count--; + } } return 0; @@ -1989,8 +2154,6 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, quota_local_t *local = NULL; quota_inode_ctx_t *ctx = NULL; uint64_t value = 0; - quota_dentry_t *dentry = NULL; - quota_dentry_t *old_dentry = NULL; if (op_ret < 0) { goto out; @@ -2009,20 +2172,7 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, goto out; } - LOCK (&ctx->lock); - { - list_for_each_entry (dentry, &ctx->parents, next) { - if ((strcmp (dentry->name, local->loc.name) == 0) && - (gf_uuid_compare (local->loc.parent->gfid, - dentry->par) == 0)) { - old_dentry = dentry; - break; - } - } - if (old_dentry) - __quota_dentry_free (old_dentry); - } - UNLOCK (&ctx->lock); + quota_dentry_del (ctx, local->loc.name, local->loc.parent->gfid); out: QUOTA_STACK_UNWIND (unlink, frame, op_ret, op_errno, preparent, @@ -2270,7 +2420,7 @@ off: FIRST_CHILD(this)->fops->link, &(local->oldloc), &(local->newloc), local->xdata); - quota_local_cleanup (this, local); + quota_local_cleanup (local); return; } @@ -2529,7 +2679,7 @@ quota_rename_get_size_cbk (call_frame_t *frame, void *cookie, xlator_t *this, return 0; out: - quota_handle_validate_error (local, -1, op_errno); + quota_handle_validate_error (frame, -1, op_errno); return 0; } @@ -4233,7 +4383,7 @@ quota_statfs_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, UNLOCK (&ctx->lock); resume: - quota_link_count_decrement (local); + quota_link_count_decrement (frame); return 0; } @@ -4263,7 +4413,7 @@ quota_get_limit_dir_continuation (struct list_head *parents, inode_t *inode, op_errno = EIO; } - quota_handle_validate_error (local, -1, op_errno); + quota_handle_validate_error (frame, -1, op_errno); goto out; } @@ -4293,7 +4443,7 @@ quota_statfs_continue (call_frame_t *frame, xlator_t *this, inode_t *inode) ret = quota_validate (frame, local->inode, this, quota_statfs_validate_cbk); if (0 > ret) - quota_handle_validate_error (local, -1, -ret); + quota_handle_validate_error (frame, -1, -ret); } void @@ -4645,13 +4795,17 @@ int32_t quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode, off_t offset, size_t len, dict_t *xdata) { - int32_t ret = -1, op_errno = EINVAL; - int32_t parents = 0; - quota_local_t *local = NULL; - quota_inode_ctx_t *ctx = NULL; - quota_priv_t *priv = NULL; - quota_dentry_t *dentry = NULL; - call_stub_t *stub = NULL; + int32_t ret = -1, op_errno = EINVAL; + int32_t parents = 0; + int32_t fail_count = 0; + quota_local_t *local = NULL; + quota_inode_ctx_t *ctx = NULL; + quota_priv_t *priv = NULL; + quota_dentry_t *dentry = NULL; + quota_dentry_t *tmp = NULL; + call_stub_t *stub = NULL; + struct list_head head = {0, }; + inode_t *par_inode = NULL; priv = this->private; GF_VALIDATE_OR_GOTO (this->name, priv, unwind); @@ -4688,15 +4842,7 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode, priv = this->private; GF_VALIDATE_OR_GOTO (this->name, priv, unwind); - if (ctx != NULL) { - LOCK (&ctx->lock); - { - list_for_each_entry (dentry, &ctx->parents, next) { - parents++; - } - } - UNLOCK (&ctx->lock); - } + parents = quota_add_parents_from_ctx (ctx, &head); /* * Note that by using len as the delta we're assuming the range from @@ -4711,9 +4857,33 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode, local->link_count = 1; quota_check_limit (frame, fd->inode, this, NULL, NULL); } else { - list_for_each_entry (dentry, &ctx->parents, next) { - quota_check_limit (frame, fd->inode, this, dentry->name, - dentry->par); + list_for_each_entry_safe (dentry, tmp, &head, next) { + par_inode = do_quota_check_limit (frame, fd->inode, + this, dentry); + if (par_inode == NULL) { + /* remove stale entry from inode_ctx */ + quota_dentry_del (ctx, dentry->name, + dentry->par); + parents--; + fail_count++; + } else { + inode_unref (par_inode); + } + __quota_dentry_free (dentry); + } + + if (parents == 0) { + LOCK (&local->lock); + { + local->link_count++; + } + UNLOCK (&local->lock); + quota_check_limit (frame, fd->inode, this, NULL, NULL); + } + + while (fail_count != 0) { + quota_link_count_decrement (frame); + fail_count--; } } diff --git a/xlators/features/quota/src/quota.h b/xlators/features/quota/src/quota.h index 0022089daa0..e7b109d9f15 100644 --- a/xlators/features/quota/src/quota.h +++ b/xlators/features/quota/src/quota.h @@ -97,7 +97,7 @@ STACK_WIND_TAIL (frame, params); \ \ if (_local) \ - quota_local_cleanup (_this, _local); \ + quota_local_cleanup (_local); \ } while (0) #define QUOTA_STACK_UNWIND(fop, frame, params...) \ @@ -110,7 +110,7 @@ frame->local = NULL; \ } \ STACK_UNWIND_STRICT (fop, frame, params); \ - quota_local_cleanup (_this, _local); \ + quota_local_cleanup (_local); \ } while (0) #define QUOTA_FREE_CONTRIBUTION_NODE(_contribution) \ @@ -188,7 +188,6 @@ typedef void struct quota_local { gf_lock_t lock; - uint32_t validate_count; uint32_t link_count; loc_t loc; loc_t oldloc; @@ -214,6 +213,7 @@ struct quota_local { dict_t *validate_xdata; int32_t quotad_conn_retry; xlator_t *this; + call_frame_t *par_frame; }; typedef struct quota_local quota_local_t; @@ -261,6 +261,9 @@ int32_t quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, char *name, uuid_t par); +inode_t * +do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, + quota_dentry_t *dentry); int quota_fill_inodectx (xlator_t *this, inode_t *inode, dict_t *dict, loc_t *loc, struct iatt *buf, int32_t *op_errno); |