diff options
author | Jeff Darcy <jdarcy@redhat.com> | 2014-01-28 12:49:39 +0000 |
---|---|---|
committer | Jeff Darcy <jdarcy@redhat.com> | 2014-01-28 12:49:39 +0000 |
commit | 738e76f0799fa598eac308144174c6cf9db21b7a (patch) | |
tree | 0cb787d50ff0753b81cd3b0913476c27adbe4686 /xlators/features | |
parent | 007182f1aad9d14e8d5bc7771d500b35026f0afa (diff) | |
parent | 6dfe01d7e726675913e98dc65c6c7406e5060e15 (diff) |
Merge branch 'upstream'
Diffstat (limited to 'xlators/features')
-rw-r--r-- | xlators/features/gfid-access/src/gfid-access.c | 67 | ||||
-rw-r--r-- | xlators/features/gfid-access/src/gfid-access.h | 6 | ||||
-rw-r--r-- | xlators/features/locks/src/entrylk.c | 7 | ||||
-rw-r--r-- | xlators/features/locks/src/inodelk.c | 1 | ||||
-rw-r--r-- | xlators/features/quota/src/quota-enforcer-client.c | 43 | ||||
-rw-r--r-- | xlators/features/quota/src/quota.c | 944 | ||||
-rw-r--r-- | xlators/features/quota/src/quota.h | 46 |
7 files changed, 620 insertions, 494 deletions
diff --git a/xlators/features/gfid-access/src/gfid-access.c b/xlators/features/gfid-access/src/gfid-access.c index 1566278ef..362fdab5a 100644 --- a/xlators/features/gfid-access/src/gfid-access.c +++ b/xlators/features/gfid-access/src/gfid-access.c @@ -252,7 +252,7 @@ err: } static int32_t -ga_fill_tmp_loc (loc_t *loc, xlator_t *this, char *gfid, +ga_fill_tmp_loc (loc_t *loc, xlator_t *this, uuid_t gfid, char *bname, dict_t *xdata, loc_t *new_loc) { int ret = -1; @@ -263,6 +263,8 @@ ga_fill_tmp_loc (loc_t *loc, xlator_t *this, char *gfid, ret = inode_ctx_get (loc->inode, this, &value); if (!ret) { parent = (void *)value; + if (uuid_is_null (parent->gfid)) + parent = loc->inode; } /* parent itself should be looked up */ @@ -421,15 +423,20 @@ ga_new_entry (call_frame_t *frame, xlator_t *this, loc_t *loc, data_t *data, call_frame_t *new_frame = NULL; mode_t mode = 0; ga_local_t *local = NULL; + uuid_t gfid = {0,}; args = ga_newfile_parse_args (this, data); if (!args) goto out; + ret = uuid_parse (args->gfid, gfid); + if (ret) + goto out; + if (!xdata) xdata = dict_new (); - ret = ga_fill_tmp_loc (loc, this, args->gfid, + ret = ga_fill_tmp_loc (loc, this, gfid, args->bname, xdata, &tmp_loc); if (ret) goto out; @@ -485,15 +492,20 @@ ga_heal_entry (call_frame_t *frame, xlator_t *this, loc_t *loc, data_t *data, ga_heal_args_t *args = NULL; loc_t tmp_loc = {0,}; call_frame_t *new_frame = NULL; + uuid_t gfid = {0,}; args = ga_heal_parse_args (this, data); if (!args) goto out; + ret = uuid_parse (args->gfid, gfid); + if (ret) + goto out; + if (!xdata) xdata = dict_new (); - ret = ga_fill_tmp_loc (loc, this, args->gfid, args->bname, + ret = ga_fill_tmp_loc (loc, this, gfid, args->bname, xdata, &tmp_loc); if (ret) goto out; @@ -617,7 +629,8 @@ ga_virtual_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, /* the inode is not present in itable, ie, the actual path is not yet looked up. Use the current inode itself for now */ - inode_ref (inode); + + inode_link (inode, NULL, NULL, buf); } else { /* 'inode_ref()' has been done in inode_find() */ inode = true_inode; @@ -708,8 +721,30 @@ ga_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) /* if its revalidate, and inode is not of type directory, proceed with 'wind' */ if (loc->inode && loc->inode->ia_type && - !IA_ISDIR (loc->inode->ia_type)) + !IA_ISDIR (loc->inode->ia_type)) { + + /* a revalidate on ".gfid/<dentry>" is possible, check for it */ + if (((loc->parent && + __is_gfid_access_dir (loc->parent->gfid)) || + __is_gfid_access_dir (loc->pargfid))) { + + /* here, just send 'loc->gfid' and 'loc->inode' */ + tmp_loc.inode = inode_ref (loc->inode); + uuid_copy (tmp_loc.gfid, loc->inode->gfid); + + STACK_WIND (frame, default_lookup_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->lookup, + &tmp_loc, xdata); + + inode_unref (tmp_loc.inode); + + return 0; + } + + /* not something to bother, continue the flow */ goto wind; + } priv = this->private; @@ -729,8 +764,26 @@ ga_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) /* now, check if the lookup() is on an existing entry, but on gfid-path */ if (!((loc->parent && __is_gfid_access_dir (loc->parent->gfid)) || - __is_gfid_access_dir (loc->pargfid))) - goto wind; + __is_gfid_access_dir (loc->pargfid))) { + if (!loc->parent) + goto wind; + + ret = inode_ctx_get (loc->parent, this, &value); + if (ret) + goto wind; + + inode = (inode_t *) value; + + ret = loc_copy_overload_parent (&tmp_loc, loc, inode); + if (ret) + goto err; + + STACK_WIND (frame, ga_lookup_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->lookup, &tmp_loc, xdata); + + loc_wipe (&tmp_loc); + return 0; + } /* make sure the 'basename' is actually a 'canonical-gfid', otherwise, return error */ diff --git a/xlators/features/gfid-access/src/gfid-access.h b/xlators/features/gfid-access/src/gfid-access.h index 3b74ce112..e883eca69 100644 --- a/xlators/features/gfid-access/src/gfid-access.h +++ b/xlators/features/gfid-access/src/gfid-access.h @@ -44,8 +44,7 @@ if (ret) \ goto lbl; \ tmp_inode = (inode_t *)value; \ - unref = inode_ref (tmp_inode); \ - l->parent = tmp_inode; \ + l->parent = inode_ref (tmp_inode); \ /* if parent is virtual, no need to handle */ \ /* loc->inode */ \ break; \ @@ -59,8 +58,7 @@ if (ret) \ goto lbl; \ tmp_inode = (inode_t *)value; \ - unref = inode_ref (tmp_inode); \ - l->inode = tmp_inode; \ + l->inode = inode_ref (tmp_inode); \ } \ \ } while (0) diff --git a/xlators/features/locks/src/entrylk.c b/xlators/features/locks/src/entrylk.c index 208bc140e..c176306fe 100644 --- a/xlators/features/locks/src/entrylk.c +++ b/xlators/features/locks/src/entrylk.c @@ -371,7 +371,6 @@ __lock_entrylk (xlator_t *this, pl_inode_t *pinode, pl_entry_lock_t *lock, __pl_entrylk_ref (lock); gettimeofday (&lock->granted_time, NULL); list_add (&lock->domain_list, &dom->entrylk_list); - lock->frame = NULL; ret = 0; out: @@ -576,10 +575,12 @@ pl_common_entrylk (call_frame_t *frame, xlator_t *this, reqlock->pinode = pinode; ret = __lock_entrylk (this, pinode, reqlock, nonblock, dom); - if (ret == 0) + if (ret == 0) { + reqlock->frame = NULL; op_ret = 0; - else + } else { op_errno = -ret; + } if (ctx && (!ret || !nonblock)) list_add (&reqlock->client_list, diff --git a/xlators/features/locks/src/inodelk.c b/xlators/features/locks/src/inodelk.c index 969b67a61..e7093e60e 100644 --- a/xlators/features/locks/src/inodelk.c +++ b/xlators/features/locks/src/inodelk.c @@ -478,6 +478,7 @@ pl_inode_setlk (xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode, if (lock->fl_type != F_UNLCK) { ret = __lock_inodelk (this, pl_inode, lock, can_block, dom); if (ret == 0) { + lock->frame = NULL; gf_log (this->name, GF_LOG_TRACE, "%s (pid=%d) (lk-owner=%s) %"PRId64" - %"PRId64" => OK", lock->fl_type == F_UNLCK ? "Unlock" : "Lock", diff --git a/xlators/features/quota/src/quota-enforcer-client.c b/xlators/features/quota/src/quota-enforcer-client.c index bfea5e420..7d8ab937d 100644 --- a/xlators/features/quota/src/quota-enforcer-client.c +++ b/xlators/features/quota/src/quota-enforcer-client.c @@ -295,6 +295,37 @@ quota_enforcer_notify (struct rpc_clnt *rpc, void *mydata, return ret; } +int +quota_enforcer_blocking_connect (rpc_clnt_t *rpc) +{ + dict_t *options = NULL; + int ret = -1; + + options = dict_new (); + if (options == NULL) + goto out; + + ret = dict_set_str (options, "non-blocking-io", "no"); + if (ret) + goto out; + + rpc->conn.trans->reconfigure (rpc->conn.trans, options); + + rpc_clnt_start (rpc); + + ret = dict_set_str (options, "non-blocking-io", "yes"); + if (ret) + goto out; + + rpc->conn.trans->reconfigure (rpc->conn.trans, options); + + ret = 0; +out: + dict_unref (options); + + return ret; +} + //Returns a started rpc_clnt. Creates a new rpc_clnt if quota_priv doesn't have //one already struct rpc_clnt * @@ -309,9 +340,13 @@ quota_enforcer_init (xlator_t *this, dict_t *options) gf_log (this->name, GF_LOG_TRACE, "quota enforcer clnt already " "inited"); //Turns out to be a NOP if the clnt is already connected. - rpc_clnt_start (priv->rpc_clnt); + ret = quota_enforcer_blocking_connect (priv->rpc_clnt); + if (ret) + goto out; + return priv->rpc_clnt; } + priv->quota_enforcer = "a_enforcer_clnt; ret = dict_set_str (options, "transport.address-family", "unix"); @@ -339,7 +374,11 @@ quota_enforcer_init (xlator_t *this, dict_t *options) goto out; } - rpc_clnt_start (rpc); + ret = quota_enforcer_blocking_connect (rpc); + if (ret) + goto out; + + ret = 0; out: if (ret) { if (rpc) diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c index 5cbd9f02d..2812a2b13 100644 --- a/xlators/features/quota/src/quota.c +++ b/xlators/features/quota/src/quota.c @@ -215,6 +215,7 @@ __quota_dentry_new (quota_inode_ctx_t *ctx, char *name, uuid_t par) dentry->name = gf_strdup (name); if (dentry->name == NULL) { GF_FREE (dentry); + dentry = NULL; goto err; } @@ -244,7 +245,7 @@ out: } inline void -quota_resume_fop_if_validation_done (quota_local_t *local) +quota_link_count_decrement (quota_local_t *local) { call_stub_t *stub = NULL; int link_count = -1; @@ -254,7 +255,7 @@ quota_resume_fop_if_validation_done (quota_local_t *local) LOCK (&local->lock); { - link_count = local->link_count; + link_count = --local->link_count; if (link_count == 0) { stub = local->stub; local->stub = NULL; @@ -282,13 +283,11 @@ quota_handle_validate_error (quota_local_t *local, int32_t op_ret, local->op_ret = op_ret; local->op_errno = op_errno; } - - /* we abort checking limits on this path to root */ - local->link_count--; } UNLOCK (&local->lock); - quota_resume_fop_if_validation_done (local); + /* we abort checking limits on this path to root */ + quota_link_count_decrement (local); out: return; } @@ -378,31 +377,51 @@ quota_timeout (struct timeval *tv, int32_t timeout) return timed_out; } +inline void +quota_add_parent (quota_dentry_t *dentry, struct list_head *list) +{ + quota_dentry_t *entry = NULL; + gf_boolean_t found = _gf_false; + + if ((dentry == NULL) || (list == NULL)) { + goto out; + } + + list_for_each_entry (entry, list, next) { + if (uuid_compare (dentry->par, entry->par) == 0) { + found = _gf_true; + goto out; + } + } + + list_add_tail (&dentry->next, list); + +out: + return; +} + int32_t quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, gf_dirent_t *entries, dict_t *xdata) { - inode_t *parent = NULL; - gf_dirent_t *entry = NULL; - loc_t loc = {0, }; - quota_dentry_t *dentry = NULL, *tmp = NULL; - quota_inode_ctx_t *ctx = NULL; - struct list_head parents = {0, }; - quota_local_t *local = NULL; - call_frame_t *continuation_frame = NULL; + inode_t *parent = NULL, *tmp_parent = NULL; + gf_dirent_t *entry = NULL; + loc_t loc = {0, }; + quota_dentry_t *dentry = NULL, *tmp = NULL; + quota_inode_ctx_t *ctx = NULL; + struct list_head parents = {0, }; + quota_local_t *local = NULL; INIT_LIST_HEAD (&parents); - continuation_frame = frame->local; + local = frame->local; frame->local = NULL; - local = continuation_frame->local; - if (op_ret < 0) goto err; - parent = inode_parent (local->validate_loc.inode, 0, NULL); + parent = inode_parent (local->loc.inode, 0, NULL); if (parent == NULL) { gf_log (this->name, GF_LOG_WARNING, "parent is NULL"); op_errno = EINVAL; @@ -423,61 +442,77 @@ quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this, * pointer and continue */ - parent = NULL; + tmp_parent = NULL; } uuid_copy (loc.gfid, entry->d_stat.ia_gfid); loc.inode = inode_ref (entry->inode); - loc.parent = inode_ref (parent); + loc.parent = inode_ref (tmp_parent); loc.name = entry->d_name; quota_fill_inodectx (this, entry->inode, entry->dict, &loc, &entry->d_stat, &op_errno); - parent = entry->inode; + tmp_parent = entry->inode; loc_wipe (&loc); } } - quota_inode_ctx_get (local->validate_loc.inode, this, &ctx, 0); - - local->link_count = 0; + quota_inode_ctx_get (local->loc.inode, this, &ctx, 0); if (ctx != NULL) { LOCK (&ctx->lock); { list_for_each_entry (dentry, &ctx->parents, next) { + /* we built ancestry for a non-directory */ tmp = __quota_dentry_new (NULL, dentry->name, dentry->par); - list_add_tail (&tmp->next, &parents); - local->link_count++; + quota_add_parent (tmp, &parents); + + if (list_empty (&tmp->next)) { + __quota_dentry_free (tmp); + tmp = NULL; + } } } UNLOCK (&ctx->lock); } - if (local->link_count != 0) { - list_for_each_entry_safe (dentry, tmp, &parents, next) { - quota_check_limit (continuation_frame, - local->validate_loc.inode, - this, dentry->name, dentry->par); - __quota_dentry_free (dentry); + if (list_empty (&parents)) { + /* we built ancestry for a directory */ + list_for_each_entry (entry, &entries->list, list) { + if (entry->inode == local->loc.inode) + break; } - } else { - local->link_count = 1; - quota_check_limit (continuation_frame, parent, this, NULL, - NULL); + + GF_ASSERT (&entry->list != &entries->list); + + tmp = __quota_dentry_new (NULL, entry->d_name, parent->gfid); + quota_add_parent (tmp, &parents); } - STACK_DESTROY (frame->root); - return 0; + local->ancestry_cbk (&parents, local->loc.inode, 0, 0, + local->ancestry_data); + goto cleanup; err: + local->ancestry_cbk (NULL, NULL, -1, op_errno, local->ancestry_data); + +cleanup: STACK_DESTROY (frame->root); + quota_local_cleanup (this, local); + + if (parent != NULL) { + inode_unref (parent); + parent = NULL; + } + + list_for_each_entry_safe (dentry, tmp, &parents, next) { + __quota_dentry_free (dentry); + } - quota_handle_validate_error (local, -1, op_errno); return 0; } @@ -486,85 +521,91 @@ quota_build_ancestry_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, fd_t *fd, dict_t *xdata) { - int ret = -1; - dict_t *xdata_req = NULL; - quota_local_t *local = NULL; - call_frame_t *continuation_frame = NULL; + dict_t *xdata_req = NULL; + quota_local_t *local = NULL; + + if (op_ret < 0) { + goto err; + } xdata_req = dict_new (); if (xdata_req == NULL) { - ret = -ENOMEM; + op_ret = -ENOMEM; goto err; } - ret = dict_set_int8 (xdata_req, QUOTA_LIMIT_KEY, 1); - if (ret < 0) + op_ret = dict_set_int8 (xdata_req, QUOTA_LIMIT_KEY, 1); + if (op_ret < 0) { + op_errno = -op_ret; goto err; + } - ret = dict_set_int8 (xdata_req, GET_ANCESTRY_DENTRY_KEY, 1); - if (ret < 0) + op_ret = dict_set_int8 (xdata_req, GET_ANCESTRY_DENTRY_KEY, 1); + if (op_ret < 0) { + op_errno = -op_ret; goto err; + } /* This would ask posix layer to construct dentry chain till root */ STACK_WIND (frame, quota_build_ancestry_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, fd, 0, 0, xdata_req); - ret = 0; + op_ret = 0; err: fd_unref (fd); dict_unref (xdata_req); - if (ret < 0) { - continuation_frame = frame->local; + if (op_ret < 0) { + local = frame->local; frame->local = NULL; + local->ancestry_cbk (NULL, NULL, -1, op_errno, + local->ancestry_data); + quota_local_cleanup (this, local); STACK_DESTROY (frame->root); - - local = continuation_frame->local; - quota_handle_validate_error (local, -1, op_errno); } - return ret; + return 0; } int -quota_build_ancestry (call_frame_t *frame, inode_t *inode, xlator_t *this) +quota_build_ancestry (inode_t *inode, quota_ancestry_built_t ancestry_cbk, + void *data) { loc_t loc = {0, }; fd_t *fd = NULL; quota_local_t *local = NULL; call_frame_t *new_frame = NULL; - int ret = -1; + int op_errno = EINVAL; + xlator_t *this = NULL; + + this = THIS; loc.inode = inode_ref (inode); uuid_copy (loc.gfid, inode->gfid); - gf_log (this->name, GF_LOG_WARNING, "building ancestry"); - - local = frame->local; - - LOCK (&local->lock); - { - loc_wipe (&local->validate_loc); + fd = fd_create (inode, 0); - ret = quota_inode_loc_fill (inode, &local->validate_loc); - if (ret < 0) { - gf_log (this->name, GF_LOG_WARNING, - "cannot fill loc for inode (gfid:%s), hence " - "aborting quota-checks and continuing with fop", - uuid_utoa (inode->gfid)); - } + new_frame = create_frame (this, this->ctx->pool); + if (new_frame == NULL) { + op_errno = ENOMEM; + goto err; } - UNLOCK (&local->lock); - fd = fd_create (inode, 0); - - new_frame = copy_frame (frame); new_frame->root->uid = new_frame->root->gid = 0; - new_frame->local = frame; + local = quota_local_new (); + if (local == NULL) { + op_errno = ENOMEM; + goto err; + } + + new_frame->local = local; + local->ancestry_cbk = ancestry_cbk; + local->ancestry_data = data; + local->loc.inode = inode_ref (inode); if (IA_ISDIR (inode->ia_type)) { STACK_WIND (new_frame, quota_build_ancestry_open_cbk, @@ -579,7 +620,25 @@ quota_build_ancestry (call_frame_t *frame, inode_t *inode, xlator_t *this) } loc_wipe (&loc); + return 0; + +err: + ancestry_cbk (NULL, NULL, -1, op_errno, data); + + fd_unref (fd); + + local = new_frame->local; + new_frame->local = NULL; + if (local != NULL) { + quota_local_cleanup (this, local); + } + + if (new_frame != NULL) { + STACK_DESTROY (new_frame->root); + } + + loc_wipe (&loc); return 0; } @@ -646,6 +705,56 @@ err: return ret; } +void +quota_check_limit_continuation (struct list_head *parents, inode_t *inode, + int32_t op_ret, int32_t op_errno, void *data) +{ + call_frame_t *frame = NULL; + xlator_t *this = NULL; + quota_local_t *local = NULL; + quota_dentry_t *entry = NULL; + inode_t *parent = NULL; + int parent_count = 0; + + frame = data; + local = frame->local; + this = THIS; + + if ((op_ret < 0) || list_empty (parents)) { + if (list_empty (parents)) { + gf_log (this->name, GF_LOG_WARNING, + "Couldn't build ancestry for inode (gfid:%s). " + "Without knowing ancestors till root, quota " + "cannot be enforced. " + "Hence, failing fop with EIO", + uuid_utoa (inode->gfid)); + op_errno = EIO; + } + + quota_handle_validate_error (local, -1, op_errno); + goto out; + } + + list_for_each_entry (entry, parents, next) { + parent_count++; + } + + LOCK (&local->lock); + { + local->link_count += (parent_count - 1); + } + UNLOCK (&local->lock); + + list_for_each_entry (entry, parents, next) { + parent = inode_find (inode->table, entry->par); + + quota_check_limit (frame, parent, this, NULL, NULL); + } + +out: + return; +} + int32_t quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, char *name, uuid_t par) @@ -684,12 +793,8 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, */ if (0 > frame->root->pid) { ret = 0; - LOCK (&local->lock); - { - --local->link_count; - } - UNLOCK (&local->lock); - goto resume; + quota_link_count_decrement (local); + goto done; } priv = this->private; @@ -772,12 +877,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, } if (__is_root_gfid (_inode->gfid)) { - LOCK (&local->lock); - { - --local->link_count; - } - UNLOCK (&local->lock); - + quota_link_count_decrement (local); break; } @@ -789,7 +889,9 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, } if (parent == NULL) { - ret = quota_build_ancestry (frame, _inode, this); + ret = quota_build_ancestry (_inode, + quota_check_limit_continuation, + frame); if (ret < 0) { op_errno = -ret; goto err; @@ -816,8 +918,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, _inode = NULL; } -resume: - quota_resume_fop_if_validation_done (local); +done: return 0; err: @@ -978,7 +1079,7 @@ quota_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, if (!xattr_req) goto err; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -995,14 +1096,8 @@ quota_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, goto err; } -wind: - /* TODO: check with vshastry@redhat.com to cleanup the ugliness of - * checking priv->is_quota_on here by using STACK_WIND_TAIL macro - */ - STACK_WIND (frame, - priv->is_quota_on ? quota_lookup_cbk : default_lookup_cbk, - FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup, loc, - xattr_req); + STACK_WIND (frame, quota_lookup_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->lookup, loc, xattr_req); ret = 0; @@ -1016,79 +1111,13 @@ err: } return 0; -} - - -void -quota_update_size (xlator_t *this, inode_t *inode, char *name, uuid_t par, - int64_t delta) -{ - inode_t *_inode = NULL; - inode_t *parent = NULL; - uint64_t value = 0; - quota_inode_ctx_t *ctx = NULL; - uuid_t trav_uuid = {0,}; - - GF_VALIDATE_OR_GOTO ("quota", this, out); - GF_VALIDATE_OR_GOTO (this->name, inode, out); - - inode_ctx_get (inode, this, &value); - ctx = (quota_inode_ctx_t *)(unsigned long)value; - - _inode = inode_ref (inode); - - if ( par != NULL ) { - uuid_copy (trav_uuid, par); - } - - do { - if ((ctx != NULL) && (ctx->hard_lim >= 0)) { - quota_log_usage (this, ctx, _inode, delta); - LOCK (&ctx->lock); - { - ctx->size += delta; - if (ctx->size < 0) - ctx->size = 0; - } - UNLOCK (&ctx->lock); - } - - if (__is_root_gfid (_inode->gfid)) { - break; - } - - parent = inode_parent (_inode, trav_uuid, name); - if (parent == NULL) { - /* TODO: build ancestry and continue updating size */ - gf_log (this->name, GF_LOG_DEBUG, - "cannot find parent for inode (gfid:%s), hence " - "aborting size updation of parents", - uuid_utoa (_inode->gfid)); - } - - if (name != NULL) { - name = NULL; - uuid_clear (trav_uuid); - } - - inode_unref (_inode); - _inode = parent; - - if (_inode == NULL) { - break; - } - value = 0; - ctx = NULL; - inode_ctx_get (_inode, this, &value); - ctx = (quota_inode_ctx_t *)(unsigned long)value; - } while (1); - -out: - return; +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->lookup, loc, xattr_req); + return 0; } - int32_t quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, @@ -1098,9 +1127,6 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, uint64_t ctx_int = 0; quota_inode_ctx_t *ctx = NULL; quota_local_t *local = NULL; - quota_dentry_t *dentry = NULL, *tmp = NULL; - int64_t delta = 0; - struct list_head head = {0, }; local = frame->local; @@ -1108,8 +1134,6 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, goto out; } - INIT_LIST_HEAD (&head); - ret = inode_ctx_get (local->loc.inode, this, &ctx_int); if (ret) { gf_log (this->name, GF_LOG_WARNING, @@ -1129,25 +1153,9 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, LOCK (&ctx->lock); { ctx->buf = *postbuf; - - list_for_each_entry (dentry, &ctx->parents, next) { - tmp = __quota_dentry_new (NULL, dentry->name, - dentry->par); - list_add_tail (&tmp->next, &head); - } - } UNLOCK (&ctx->lock); - if (postbuf->ia_blocks != prebuf->ia_blocks) - delta = local->delta; - - list_for_each_entry_safe (dentry, tmp, &head, next) { - quota_update_size (this, local->loc.inode, dentry->name, - dentry->par, delta); - __quota_dentry_free (dentry); - } - out: QUOTA_STACK_UNWIND (writev, frame, op_ret, op_errno, prebuf, postbuf, xdata); @@ -1202,8 +1210,7 @@ quota_writev_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, } } - STACK_WIND (frame, - priv->is_quota_on? quota_writev_cbk: default_writev_cbk, + STACK_WIND (frame, quota_writev_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, fd, vector, count, off, flags, iobref, xdata); @@ -1235,7 +1242,7 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); INIT_LIST_HEAD (&head); @@ -1284,13 +1291,18 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, UNLOCK (&ctx->lock); } - local->delta = size; - - local->link_count = parents; - local->stub = stub; + LOCK (&local->lock); + { + local->delta = size; + local->link_count = (parents != 0) ? parents : 1; + local->stub = stub; + } + UNLOCK (&local->lock); if (parents == 0) { - local->link_count = 1; + /* nameless lookup on this inode, allow quota to reconstruct + * ancestry as part of check_limit. + */ quota_check_limit (frame, fd->inode, this, NULL, NULL); } else { list_for_each_entry_safe (dentry, tmp, &head, next) { @@ -1306,10 +1318,10 @@ unwind: QUOTA_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL); return 0; -wind: - STACK_WIND (frame, default_writev_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->writev, fd, - vector, count, off, flags, iobref, xdata); +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->writev, fd, + vector, count, off, flags, iobref, xdata); return 0; } @@ -1345,8 +1357,7 @@ quota_mkdir_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, goto unwind; } - STACK_WIND (frame, - quota_mkdir_cbk, + STACK_WIND (frame, quota_mkdir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata); @@ -1370,7 +1381,7 @@ quota_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -1394,9 +1405,13 @@ quota_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, goto err; } - local->stub = stub; - local->delta = 0; - local->link_count = 1; + LOCK (&local->lock); + { + local->stub = stub; + local->delta = 0; + local->link_count = 1; + } + UNLOCK (&local->lock); quota_check_limit (frame, loc->parent, this, NULL, NULL); return 0; @@ -1407,11 +1422,10 @@ err: return 0; -wind: - STACK_WIND (frame, - priv->is_quota_on? quota_mkdir_cbk: default_mkdir_cbk, - FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir, loc, - mode, umask, xdata); +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->mkdir, + loc, mode, umask, xdata); return 0; } @@ -1493,8 +1507,7 @@ quota_create_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, } - STACK_WIND (frame, - priv->is_quota_on? quota_create_cbk: default_create_cbk, + STACK_WIND (frame, quota_create_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->create, loc, flags, mode, umask, fd, xdata); return 0; @@ -1518,7 +1531,7 @@ quota_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -1541,9 +1554,13 @@ quota_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, goto err; } - local->link_count = 1; - local->stub = stub; - local->delta = 0; + LOCK (&local->lock); + { + local->link_count = 1; + local->stub = stub; + local->delta = 0; + } + UNLOCK (&local->lock); quota_check_limit (frame, loc->parent, this, NULL, NULL); return 0; @@ -1553,11 +1570,10 @@ err: return 0; -wind: - STACK_WIND (frame, - priv->is_quota_on? quota_create_cbk: default_create_cbk, - FIRST_CHILD (this), FIRST_CHILD (this)->fops->create, loc, - flags, mode, umask, fd, xdata); +off: + STACK_WIND_TAIL (frame, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->create, loc, + flags, mode, umask, fd, xdata); return 0; } @@ -1589,12 +1605,6 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, goto out; } - if (!local->skip_check) - quota_update_size (this, local->loc.inode, - (char *)local->loc.name, - local->loc.parent->gfid, - (-(ctx->buf.ia_blocks * 512))); - LOCK (&ctx->lock); { list_for_each_entry (dentry, &ctx->parents, next) { @@ -1627,7 +1637,7 @@ quota_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -1646,11 +1656,8 @@ quota_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag, goto err; } -wind: - STACK_WIND (frame, - priv->is_quota_on? quota_unlink_cbk: default_unlink_cbk, - FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, loc, - xflag, xdata); + STACK_WIND (frame, quota_unlink_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata); ret = 0; @@ -1660,6 +1667,11 @@ err: } return 0; + +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata); + return 0; } @@ -1684,9 +1696,6 @@ quota_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this, if (local->skip_check) goto out; - quota_update_size (this, local->loc.parent, NULL, NULL, - (buf->ia_blocks * 512)); - ret = quota_inode_ctx_get (inode, this, &ctx, 0); if ((ret == -1) || (ctx == NULL)) { gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " @@ -1763,7 +1772,7 @@ quota_link_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc, goto unwind; } - STACK_WIND (frame, priv->is_quota_on? quota_link_cbk: default_link_cbk, + STACK_WIND (frame, quota_link_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata); return 0; @@ -1787,7 +1796,11 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); + + if (xdata && dict_get (xdata, GLUSTERFS_INTERNAL_FOP_KEY)) { + goto off; + } quota_inode_ctx_get (oldloc->inode, this, &ctx, 0); if (ctx == NULL) { @@ -1805,10 +1818,6 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, frame->local = (void *) local; - if (xdata && dict_get (xdata, GLUSTERFS_INTERNAL_FOP_KEY)) { - local->skip_check = _gf_true; - goto wind; - } ret = loc_copy (&local->loc, newloc); if (ret == -1) { @@ -1821,9 +1830,13 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, goto err; } - local->link_count = 1; - local->stub = stub; - local->delta = (ctx != NULL) ? ctx->buf.ia_blocks * 512 : 0; + LOCK (&local->lock); + { + local->link_count = 1; + local->stub = stub; + local->delta = (ctx != NULL) ? ctx->buf.ia_blocks * 512 : 0; + } + UNLOCK (&local->lock); quota_check_limit (frame, newloc->parent, this, NULL, NULL); return 0; @@ -1831,13 +1844,12 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, err: QUOTA_STACK_UNWIND (link, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL); - return 0; -wind: - STACK_WIND (frame, default_link_cbk, - FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, oldloc, - newloc, xdata); +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->link, oldloc, + newloc, xdata); return 0; } @@ -1872,13 +1884,6 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this, size = buf->ia_blocks * 512; } - if (local->oldloc.parent != local->newloc.parent) { - quota_update_size (this, local->oldloc.parent, NULL, NULL, - (-size)); - quota_update_size (this, local->newloc.parent, NULL, NULL, - size); - } - if (!QUOTA_REG_OR_LNK_FILE (local->oldloc.inode->ia_type)) { goto out; } @@ -1980,8 +1985,7 @@ quota_rename_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc, goto unwind; } - STACK_WIND (frame, - priv->is_quota_on? quota_rename_cbk: default_rename_cbk, + STACK_WIND (frame, quota_rename_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata); @@ -2046,7 +2050,7 @@ quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -2073,8 +2077,12 @@ quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, goto err; } - local->link_count = 1; - local->stub = stub; + LOCK (&local->lock); + { + local->link_count = 1; + local->stub = stub; + } + UNLOCK (&local->lock); if (QUOTA_REG_OR_LNK_FILE (oldloc->inode->ia_type)) { ret = quota_inode_ctx_get (oldloc->inode, this, &ctx, 0); @@ -2122,10 +2130,10 @@ err: NULL, NULL, NULL, NULL, NULL); return 0; -wind: - STACK_WIND (frame, default_rename_cbk, - FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, - newloc, xdata); +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->rename, oldloc, + newloc, xdata); return 0; } @@ -2137,7 +2145,6 @@ quota_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, struct iatt *buf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { - int64_t size = 0; quota_local_t *local = NULL; quota_inode_ctx_t *ctx = NULL; quota_dentry_t *dentry = NULL; @@ -2147,9 +2154,6 @@ quota_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, } local = frame->local; - size = buf->ia_blocks * 512; - - quota_update_size (this, local->loc.parent, NULL, NULL, size); quota_inode_ctx_get (local->loc.inode, this, &ctx, 1); if (ctx == NULL) { @@ -2208,8 +2212,7 @@ quota_symlink_helper (call_frame_t *frame, xlator_t *this, const char *linkpath, goto unwind; } - STACK_WIND (frame, - priv->is_quota_on? quota_symlink_cbk: default_symlink_cbk, + STACK_WIND (frame, quota_symlink_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink, linkpath, loc, umask, xdata); return 0; @@ -2233,7 +2236,7 @@ quota_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -2254,9 +2257,13 @@ quota_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath, goto err; } - local->stub = stub; - local->delta = strlen (linkpath); - local->link_count = 1; + LOCK (&local->lock); + { + local->stub = stub; + local->delta = strlen (linkpath); + local->link_count = 1; + } + UNLOCK (&local->lock); quota_check_limit (frame, loc->parent, this, NULL, NULL); return 0; @@ -2267,11 +2274,10 @@ err: return 0; -wind: - STACK_WIND (frame, - priv->is_quota_on? quota_symlink_cbk: default_symlink_cbk, - FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink, - linkpath, loc, umask, xdata); +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->symlink, + linkpath, loc, umask, xdata); return 0; } @@ -2283,7 +2289,6 @@ quota_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, { quota_local_t *local = NULL; quota_inode_ctx_t *ctx = NULL; - int64_t delta = 0; if (op_ret < 0) { goto out; @@ -2295,10 +2300,6 @@ quota_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, goto out; } - delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512; - - quota_update_size (this, local->loc.inode, NULL, NULL, delta); - quota_inode_ctx_get (local->loc.inode, this, &ctx, 0); if (ctx == NULL) { gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " @@ -2332,8 +2333,7 @@ quota_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); - + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -2348,17 +2348,19 @@ quota_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, goto err; } -wind: - STACK_WIND (frame, - priv->is_quota_on? quota_truncate_cbk: default_truncate_cbk, - FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, loc, - offset, xdata); + STACK_WIND (frame, quota_truncate_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); return 0; + err: QUOTA_STACK_UNWIND (truncate, frame, -1, ENOMEM, NULL, NULL, NULL); return 0; +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); + return 0; } @@ -2369,7 +2371,6 @@ quota_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, { quota_local_t *local = NULL; quota_inode_ctx_t *ctx = NULL; - int64_t delta = 0; if (op_ret < 0) { goto out; @@ -2381,10 +2382,6 @@ quota_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, goto out; } - delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512; - - quota_update_size (this, local->loc.inode, NULL, NULL, delta); - quota_inode_ctx_get (local->loc.inode, this, &ctx, 0); if (ctx == NULL) { gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " @@ -2417,7 +2414,7 @@ quota_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) @@ -2427,9 +2424,7 @@ quota_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, local->loc.inode = inode_ref (fd->inode); -wind: - STACK_WIND (frame, priv->is_quota_on? - quota_ftruncate_cbk: default_ftruncate_cbk, + STACK_WIND (frame, quota_ftruncate_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); @@ -2438,6 +2433,12 @@ err: QUOTA_STACK_UNWIND (ftruncate, frame, -1, ENOMEM, NULL, NULL, NULL); return 0; + +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->ftruncate, fd, + offset, xdata); + return 0; } @@ -2581,8 +2582,7 @@ quota_stat (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); - + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -2596,15 +2596,20 @@ quota_stat (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) goto unwind; } -wind: - STACK_WIND (frame, priv->is_quota_on? quota_stat_cbk: default_stat_cbk, - FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat, loc, + STACK_WIND (frame, quota_stat_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->stat, loc, xdata); return 0; unwind: QUOTA_STACK_UNWIND (stat, frame, -1, ENOMEM, NULL, NULL); return 0; + +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->stat, loc, + xdata); + return 0; } @@ -2661,7 +2666,7 @@ quota_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -2672,9 +2677,7 @@ quota_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) local->loc.inode = inode_ref (fd->inode); -wind: - STACK_WIND (frame, - priv->is_quota_on? quota_fstat_cbk: default_fstat_cbk, + STACK_WIND (frame, quota_fstat_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat, fd, xdata); return 0; @@ -2682,6 +2685,12 @@ wind: unwind: QUOTA_STACK_UNWIND (fstat, frame, -1, ENOMEM, NULL, NULL); return 0; + +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fstat, fd, + xdata); + return 0; } @@ -2736,7 +2745,7 @@ quota_readlink (call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -2751,9 +2760,7 @@ quota_readlink (call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size, goto unwind; } -wind: - STACK_WIND (frame, priv->is_quota_on? - quota_readlink_cbk: default_readlink_cbk, + STACK_WIND (frame, quota_readlink_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readlink, loc, size, xdata); return 0; @@ -2761,6 +2768,12 @@ wind: unwind: QUOTA_STACK_UNWIND (readlink, frame, -1, ENOMEM, NULL, NULL, NULL); return 0; + +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readlink, loc, + size, xdata); + return 0; } @@ -2815,7 +2828,7 @@ quota_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -2826,9 +2839,7 @@ quota_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, local->loc.inode = inode_ref (fd->inode); -wind: - STACK_WIND (frame, - priv->is_quota_on? quota_readv_cbk: default_readv_cbk, + STACK_WIND (frame, quota_readv_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, xdata); return 0; @@ -2837,6 +2848,12 @@ unwind: QUOTA_STACK_UNWIND (readv, frame, -1, ENOMEM, NULL, -1, NULL, NULL, NULL); return 0; + +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readv, fd, + size, offset, flags, xdata); + return 0; } @@ -2890,7 +2907,7 @@ quota_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -2901,10 +2918,8 @@ quota_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags, frame->local = local; -wind: - STACK_WIND (frame, - priv->is_quota_on? quota_fsync_cbk: default_fsync_cbk, - FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsync, fd, + STACK_WIND (frame, quota_fsync_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsync, fd, flags, xdata); return 0; @@ -2912,6 +2927,11 @@ unwind: QUOTA_STACK_UNWIND (fsync, frame, -1, ENOMEM, NULL, NULL, NULL); return 0; +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsync, fd, + flags, xdata); + return 0; } @@ -2970,7 +2990,7 @@ quota_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -2985,16 +3005,20 @@ quota_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, goto unwind; } -wind: - STACK_WIND (frame, - priv->is_quota_on? quota_setattr_cbk: default_setattr_cbk, - FIRST_CHILD (this), FIRST_CHILD (this)->fops->setattr, loc, + STACK_WIND (frame, quota_setattr_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->setattr, loc, stbuf, valid, xdata); return 0; unwind: QUOTA_STACK_UNWIND (setattr, frame, -1, ENOMEM, NULL, NULL, NULL); return 0; + +off: + STACK_WIND_TAIL (frame, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->setattr, loc, + stbuf, valid, xdata); + return 0; } @@ -3051,8 +3075,7 @@ quota_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); - + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -3063,9 +3086,7 @@ quota_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd, local->loc.inode = inode_ref (fd->inode); -wind: - STACK_WIND (frame, - priv->is_quota_on? quota_fsetattr_cbk: default_fsetattr_cbk, + STACK_WIND (frame, quota_fsetattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetattr, fd, stbuf, valid, xdata); return 0; @@ -3073,6 +3094,12 @@ wind: unwind: QUOTA_STACK_UNWIND (fsetattr, frame, -1, ENOMEM, NULL, NULL, NULL); return 0; + +off: + STACK_WIND_TAIL (frame, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->fsetattr, fd, + stbuf, valid, xdata); + return 0; } @@ -3148,8 +3175,7 @@ quota_mknod_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, goto unwind; } - STACK_WIND (frame, - priv->is_quota_on? quota_mknod_cbk: default_mknod_cbk, + STACK_WIND (frame, quota_mknod_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, umask, xdata); @@ -3173,7 +3199,7 @@ quota_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); if (local == NULL) { @@ -3194,26 +3220,27 @@ quota_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, goto err; } - local->link_count = 1; - local->stub = stub; - local->delta = 0; + LOCK (&local->lock); + { + local->link_count = 1; + local->stub = stub; + local->delta = 0; + } + UNLOCK (&local->lock); quota_check_limit (frame, loc->parent, this, NULL, NULL); return 0; + err: QUOTA_STACK_UNWIND (mknod, frame, -1, ENOMEM, NULL, NULL, NULL, NULL, NULL); - return 0; -wind: - STACK_WIND (frame, - priv->is_quota_on? quota_mknod_cbk: default_mknod_cbk, - FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, loc, - mode, rdev, umask, xdata); - +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->mknod, loc, + mode, rdev, umask, xdata); return 0; - } int @@ -3255,42 +3282,20 @@ quota_setxattr (call_frame_t *frame, xlator_t *this, int op_ret = -1; int64_t hard_lim = -1, soft_lim = -1; quota_local_t *local = NULL; - char *src = NULL; - char *dst = NULL; - int len = 0; - int ret = -1; priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); VALIDATE_OR_GOTO (frame, err); VALIDATE_OR_GOTO (this, err); VALIDATE_OR_GOTO (loc, err); - if (0 <= frame->root->pid) { - ret = dict_get_ptr_and_len (dict, QUOTA_LIMIT_KEY, - (void **)&src, &len); - if (ret) { - gf_log (this->name, GF_LOG_DEBUG, "dict_get on %s " - "failed", QUOTA_LIMIT_KEY); - } else { - dst = GF_CALLOC (len, sizeof (char), gf_common_mt_char); - if (dst) - memcpy (dst, src, len); - } - - GF_REMOVE_INTERNAL_XATTR ("trusted.glusterfs.quota*", - dict); - if (!ret && IA_ISDIR (loc->inode->ia_type) && dst) { - ret = dict_set_dynptr (dict, QUOTA_LIMIT_KEY, - dst, len); - if (ret) - gf_log (this->name, GF_LOG_WARNING, "setting " - "key %s failed", QUOTA_LIMIT_KEY); - else - dst = NULL; - } + if (frame->root->pid >= 0) { + GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.quota*", dict, + op_errno, err); + GF_IF_INTERNAL_XATTR_GOTO ("trusted.pgfid*", dict, op_errno, + err); } quota_get_limits (this, dict, &hard_lim, &soft_lim); @@ -3309,15 +3314,19 @@ quota_setxattr (call_frame_t *frame, xlator_t *this, local->limit.soft_lim_percent = soft_lim; } -wind: - STACK_WIND (frame, - priv->is_quota_on? quota_setxattr_cbk: default_setxattr_cbk, + STACK_WIND (frame, quota_setxattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, xdata); return 0; err: QUOTA_STACK_UNWIND (setxattr, frame, op_ret, op_errno, NULL); return 0; + +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->setxattr, loc, + dict, flags, xdata); + return 0; } int @@ -3361,15 +3370,18 @@ quota_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); VALIDATE_OR_GOTO (frame, err); VALIDATE_OR_GOTO (this, err); VALIDATE_OR_GOTO (fd, err); - if (0 <= frame->root->pid) - GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.quota*", dict, + if (0 <= frame->root->pid) { + GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.quota*", + dict, op_errno, err); + GF_IF_INTERNAL_XATTR_GOTO ("trusted.pgfid*", dict, op_errno, err); + } quota_get_limits (this, dict, &hard_lim, &soft_lim); @@ -3382,15 +3394,19 @@ quota_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd, local->limit.soft_lim_percent = soft_lim; } -wind: - STACK_WIND (frame, priv->is_quota_on? - quota_fsetxattr_cbk: default_fsetxattr_cbk, + STACK_WIND (frame, quota_fsetxattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata); return 0; - err: +err: QUOTA_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, NULL); return 0; + +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsetxattr, fd, + dict, flags, xdata); + return 0; } @@ -3411,28 +3427,37 @@ quota_removexattr (call_frame_t *frame, xlator_t *this, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); VALIDATE_OR_GOTO (this, err); /* all quota xattrs can be cleaned up by doing setxattr on special key. * Hence its ok that we don't allow removexattr on quota keys here. */ - GF_IF_NATIVE_XATTR_GOTO ("trusted.quota*", - name, op_errno, err); + if (frame->root->pid >= 0) { + GF_IF_NATIVE_XATTR_GOTO ("trusted.glusterfs.quota*", + name, op_errno, err); + GF_IF_NATIVE_XATTR_GOTO ("trusted.pgfid*", name, + op_errno, err); + } VALIDATE_OR_GOTO (frame, err); VALIDATE_OR_GOTO (loc, err); -wind: - STACK_WIND (frame, priv->is_quota_on? - quota_removexattr_cbk: default_removexattr_cbk, + STACK_WIND (frame, quota_removexattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->removexattr, loc, name, xdata); return 0; + err: QUOTA_STACK_UNWIND (removexattr, frame, -1, op_errno, NULL); return 0; + +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->removexattr, + loc, name, xdata); + return 0; } @@ -3454,24 +3479,31 @@ quota_fremovexattr (call_frame_t *frame, xlator_t *this, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); VALIDATE_OR_GOTO (frame, err); VALIDATE_OR_GOTO (this, err); VALIDATE_OR_GOTO (fd, err); - GF_IF_NATIVE_XATTR_GOTO ("trusted.quota*", - name, op_errno, err); - -wind: - STACK_WIND (frame, priv->is_quota_on? - quota_fremovexattr_cbk: default_fremovexattr_cbk, + if (frame->root->pid >= 0) { + GF_IF_NATIVE_XATTR_GOTO ("trusted.glusterfs.quota*", + name, op_errno, err); + GF_IF_NATIVE_XATTR_GOTO ("trusted.pgfid*", name, + op_errno, err); + } + STACK_WIND (frame, quota_fremovexattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata); return 0; - err: +err: QUOTA_STACK_UNWIND (fremovexattr, frame, op_ret, op_errno, NULL); return 0; + +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fremovexattr, + fd, name, xdata); + return 0; } @@ -3640,9 +3672,7 @@ quota_statfs_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, UNLOCK (&ctx->lock); resume: - --local->link_count; - - quota_resume_fop_if_validation_done (local); + quota_link_count_decrement (local); return 0; } @@ -3657,7 +3687,7 @@ quota_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); if (priv->consider_statfs && loc->inode) { local = quota_local_new (); @@ -3667,44 +3697,46 @@ quota_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) } frame->local = local; - local->inode = inode_ref (loc->inode); - local->link_count = 1; - stub = fop_statfs_stub (frame, quota_statfs_helper, loc, xdata); if (!stub) { op_errno = ENOMEM; goto err; } - local->stub = stub; + LOCK (&local->lock); + { + local->inode = inode_ref (loc->inode); + local->link_count = 1; + local->stub = stub; + } + UNLOCK (&local->lock); ret = quota_validate (frame, local->inode, this, quota_statfs_validate_cbk); if (0 > ret) { - op_errno = -ret; - --local->link_count; + quota_handle_validate_error (local, -1, -ret); } - quota_resume_fop_if_validation_done (local); - } - else { - /* - * We have to make sure that we never get to quota_statfs_cbk - * with a cookie that points to something other than an inode, - * which is exactly what would happen with STACK_UNWIND using - * that as a callback. Therefore, use default_statfs_cbk in - * this case instead. - * - * Also if the option deem-statfs is not set to "on" don't - * bother calculating quota limit on / in statfs_cbk. - */ - if (priv->consider_statfs) - gf_log(this->name,GF_LOG_WARNING, - "missing inode, cannot adjust for quota"); -wind: - STACK_WIND (frame, default_statfs_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->statfs, loc, xdata); + return 0; } + + /* + * We have to make sure that we never get to quota_statfs_cbk + * with a cookie that points to something other than an inode, + * which is exactly what would happen with STACK_UNWIND using + * that as a callback. Therefore, use default_statfs_cbk in + * this case instead. + * + * Also if the option deem-statfs is not set to "on" don't + * bother calculating quota limit on / in statfs_cbk. + */ + if (priv->consider_statfs) + gf_log (this->name,GF_LOG_WARNING, + "missing inode, cannot adjust for quota"); + +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->statfs, loc, xdata); return 0; err: @@ -3763,7 +3795,7 @@ quota_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, priv = this->private; - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); local = quota_local_new (); @@ -3789,9 +3821,7 @@ quota_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, } } -wind: - STACK_WIND (frame, - priv->is_quota_on? quota_readdirp_cbk: default_readdirp_cbk, + STACK_WIND (frame, quota_readdirp_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, fd, size, offset, dict); @@ -3808,6 +3838,12 @@ err: } return 0; + +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readdirp, fd, + size, offset, dict); + return 0; } int32_t @@ -3819,8 +3855,6 @@ quota_fallocate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, uint64_t ctx_int = 0; quota_inode_ctx_t *ctx = NULL; quota_local_t *local = NULL; - quota_dentry_t *dentry = NULL; - int64_t delta = 0; local = frame->local; @@ -3850,12 +3884,6 @@ quota_fallocate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, } UNLOCK (&ctx->lock); - list_for_each_entry (dentry, &ctx->parents, next) { - delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512; - quota_update_size (this, local->loc.inode, - dentry->name, dentry->par, delta); - } - out: QUOTA_STACK_UNWIND (fallocate, frame, op_ret, op_errno, prebuf, postbuf, xdata); @@ -3885,8 +3913,7 @@ quota_fallocate_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, goto unwind; } - STACK_WIND (frame, priv->is_quota_on? - quota_fallocate_cbk: default_fallocate_cbk, + STACK_WIND (frame, quota_fallocate_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fallocate, fd, mode, offset, len, xdata); @@ -3913,7 +3940,7 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode, priv = this->private; GF_VALIDATE_OR_GOTO (this->name, priv, unwind); - WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + WIND_IF_QUOTAOFF (priv->is_quota_on, off); GF_ASSERT (frame); GF_VALIDATE_OR_GOTO ("quota", this, unwind); @@ -3981,12 +4008,10 @@ unwind: QUOTA_STACK_UNWIND (fallocate, frame, -1, op_errno, NULL, NULL, NULL); return 0; -wind: - STACK_WIND (frame, priv->is_quota_on? - quota_fallocate_cbk: default_fallocate_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->fallocate, fd, mode, offset, len, - xdata); +off: + STACK_WIND_TAIL (frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fallocate, fd, mode, offset, + len, xdata); return 0; } @@ -4152,14 +4177,15 @@ err: int reconfigure (xlator_t *this, dict_t *options) { - int32_t ret = -1; - quota_priv_t *priv = NULL; + int32_t ret = -1; + quota_priv_t *priv = NULL; + gf_boolean_t quota_on = _gf_false; priv = this->private; GF_OPTION_RECONF ("deem-statfs", priv->consider_statfs, options, bool, out); - GF_OPTION_RECONF ("server-quota", priv->is_quota_on, options, bool, + GF_OPTION_RECONF ("server-quota", quota_on, options, bool, out); GF_OPTION_RECONF ("default-soft-limit", priv->default_soft_lim, options, percent, out); @@ -4170,7 +4196,7 @@ reconfigure (xlator_t *this, dict_t *options) GF_OPTION_RECONF ("hard-timeout", priv->hard_timeout, options, time, out); - if (priv->is_quota_on) { + if (quota_on) { priv->rpc_clnt = quota_enforcer_init (this, this->options); if (priv->rpc_clnt == NULL) { @@ -4191,6 +4217,8 @@ reconfigure (xlator_t *this, dict_t *options) } } + priv->is_quota_on = quota_on; + ret = 0; out: return ret; diff --git a/xlators/features/quota/src/quota.h b/xlators/features/quota/src/quota.h index 02bc0d8b6..84c3257fe 100644 --- a/xlators/features/quota/src/quota.h +++ b/xlators/features/quota/src/quota.h @@ -160,28 +160,34 @@ struct quota_limit { } __attribute__ ((packed)); typedef struct quota_limit quota_limit_t; +typedef void +(*quota_ancestry_built_t) (struct list_head *parents, inode_t *inode, + int32_t op_ret, int32_t op_errno, void *data); + struct quota_local { - gf_lock_t lock; - uint32_t validate_count; - uint32_t link_count; - loc_t loc; - loc_t oldloc; - loc_t newloc; - loc_t validate_loc; - int64_t delta; - int32_t op_ret; - int32_t op_errno; - int64_t size; - gf_boolean_t skip_check; - char just_validated; - fop_lookup_cbk_t validate_cbk; - inode_t *inode; - call_stub_t *stub; - struct iobref *iobref; - quota_limit_t limit; - int64_t space_available; + gf_lock_t lock; + uint32_t validate_count; + uint32_t link_count; + loc_t loc; + loc_t oldloc; + loc_t newloc; + loc_t validate_loc; + int64_t delta; + int32_t op_ret; + int32_t op_errno; + int64_t size; + gf_boolean_t skip_check; + char just_validated; + fop_lookup_cbk_t validate_cbk; + inode_t *inode; + call_stub_t *stub; + struct iobref *iobref; + quota_limit_t limit; + int64_t space_available; + quota_ancestry_built_t ancestry_cbk; + void *ancestry_data; }; -typedef struct quota_local quota_local_t; +typedef struct quota_local quota_local_t; struct quota_priv { uint32_t soft_timeout; |