diff options
-rw-r--r-- | tests/bugs/quota/bug-1235182.t | 17 | ||||
-rw-r--r-- | tests/bugs/quota/bug-1240991.t | 43 | ||||
-rw-r--r-- | xlators/features/marker/src/marker-quota-helper.c | 5 | ||||
-rw-r--r-- | xlators/features/marker/src/marker-quota.c | 549 | ||||
-rw-r--r-- | xlators/features/marker/src/marker-quota.h | 5 | ||||
-rw-r--r-- | xlators/features/marker/src/marker.c | 453 | ||||
-rw-r--r-- | xlators/features/marker/src/marker.h | 4 |
7 files changed, 540 insertions, 536 deletions
diff --git a/tests/bugs/quota/bug-1235182.t b/tests/bugs/quota/bug-1235182.t index e28b557f558..2f963e664c6 100644 --- a/tests/bugs/quota/bug-1235182.t +++ b/tests/bugs/quota/bug-1235182.t @@ -28,13 +28,22 @@ TEST $CLI volume quota $V0 limit-usage / 1GB TEST $CLI volume quota $V0 hard-timeout 0 TEST $CLI volume quota $V0 soft-timeout 0 -$QDD $M0/f1 256 400& +TEST mkdir $M0/1 +$QDD $M0/1/f1 256 400& PID=$! -EXPECT_WITHIN $PROCESS_UP_TIMEOUT "0" STAT $M0/f1 -TESTS_EXPECTED_IN_LOOP=50 +EXPECT_WITHIN $PROCESS_UP_TIMEOUT "0" STAT $M0/1/f1 +TESTS_EXPECTED_IN_LOOP=150 for i in {1..50}; do ii=`expr $i + 1`; - TEST_IN_LOOP mv $M0/f$i $M0/f$ii; + touch $M0/$i/f$ii + echo Hello > $M0/$i/f$ii + + #rename within same dir + TEST_IN_LOOP mv -f $M0/$i/f$i $M0/$i/f$ii; + + #rename to different dir + TEST_IN_LOOP mkdir $M0/$ii + TEST_IN_LOOP mv -f $M0/$i/f$ii $M0/$ii/f$ii; done echo "Wait for process with pid $PID to complete" diff --git a/tests/bugs/quota/bug-1240991.t b/tests/bugs/quota/bug-1240991.t new file mode 100644 index 00000000000..2a1a6d1868e --- /dev/null +++ b/tests/bugs/quota/bug-1240991.t @@ -0,0 +1,43 @@ +#!/bin/bash + +# This regression test tries to ensure renaming a directory with content, and +# no limit set, is accounted properly, when moved into a directory with quota +# limit set. + +. $(dirname $0)/../../include.rc +. $(dirname $0)/../../volume.rc + +cleanup; + +TEST glusterd +TEST pidof glusterd; +TEST $CLI volume info; + +TEST $CLI volume create $V0 $H0:$B0/${V0} +TEST $CLI volume start $V0; + +TEST $CLI volume quota $V0 enable; + +TEST glusterfs --volfile-id=$V0 --volfile-server=$H0 $M0; + +TEST $CLI volume quota $V0 hard-timeout 0 +TEST $CLI volume quota $V0 soft-timeout 0 + +TEST mkdir -p $M0/dir/dir1 +TEST $CLI volume quota $V0 limit-objects /dir 20 + +TEST mkdir $M0/dir/dir1/d{1..5} +TEST touch $M0/dir/dir1/f{1..5} +TEST mv $M0/dir/dir1 $M0/dir/dir2 + +#Number of files under /dir is 5 +EXPECT_WITHIN $MARKER_UPDATE_TIMEOUT "5" quota_object_list_field "/dir" 4 + +#Number of directories under /dir is 7 +EXPECT_WITHIN $MARKER_UPDATE_TIMEOUT "7" quota_object_list_field "/dir" 5 + +TEST $CLI volume stop $V0 +TEST $CLI volume delete $V0 +EXPECT "1" get_aux + +cleanup; diff --git a/xlators/features/marker/src/marker-quota-helper.c b/xlators/features/marker/src/marker-quota-helper.c index 6265ffbac6e..5611c1d8bdc 100644 --- a/xlators/features/marker/src/marker-quota-helper.c +++ b/xlators/features/marker/src/marker-quota-helper.c @@ -169,6 +169,9 @@ mq_get_contribution_node (inode_t *inode, quota_inode_ctx_t *ctx) LOCK (&ctx->lock); { + if (list_empty (&ctx->contribution_head)) + goto unlock; + list_for_each_entry (temp, &ctx->contribution_head, contri_list) { if (gf_uuid_compare (temp->gfid, inode->gfid) == 0) { @@ -178,7 +181,9 @@ mq_get_contribution_node (inode_t *inode, quota_inode_ctx_t *ctx) } } } +unlock: UNLOCK (&ctx->lock); + out: return contri; } diff --git a/xlators/features/marker/src/marker-quota.c b/xlators/features/marker/src/marker-quota.c index ef3203131ee..2aa62a78150 100644 --- a/xlators/features/marker/src/marker-quota.c +++ b/xlators/features/marker/src/marker-quota.c @@ -2591,15 +2591,11 @@ out: int32_t mq_remove_contri (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx, - inode_contribution_t *contri, quota_meta_t *delta, - gf_boolean_t remove_xattr) + inode_contribution_t *contri, quota_meta_t *delta) { int32_t ret = -1; char contri_key[CONTRI_KEY_MAX] = {0, }; - if (remove_xattr == _gf_false) - goto done; - GET_CONTRI_KEY (contri_key, contri->gfid, ret); if (ret < 0) { gf_log (this->name, GF_LOG_ERROR, "get contri_key " @@ -2626,7 +2622,6 @@ mq_remove_contri (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx, } } -done: LOCK (&contri->lock); { contri->contribution += delta->size; @@ -2780,8 +2775,8 @@ mq_synctask_cleanup (int ret, call_frame_t *frame, void *opaque) } int -mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc, - int64_t contri) +mq_synctask1 (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, + loc_t *loc, quota_meta_t *contri) { int32_t ret = -1; quota_synctask_t *args = NULL; @@ -2797,7 +2792,14 @@ mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc, args->this = this; loc_copy (&args->loc, loc); - args->contri = contri; + + if (contri) { + args->contri = *contri; + } else { + args->contri.size = -1; + args->contri.file_count = -1; + args->contri.dir_count = -1; + } if (spawn) { ret = synctask_new1 (this->ctx->env, 1024 * 16, task, @@ -2816,6 +2818,12 @@ out: return ret; } +int +mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc) +{ + return mq_synctask1 (this, task, spawn, loc, NULL); +} + int32_t mq_prevalidate_txn (xlator_t *this, loc_t *origin_loc, loc_t *loc, quota_inode_ctx_t **ctx) @@ -2860,136 +2868,6 @@ out: } int -mq_start_quota_txn_v2 (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx, - inode_contribution_t *contri) -{ - int32_t ret = -1; - loc_t child_loc = {0,}; - loc_t parent_loc = {0,}; - gf_boolean_t locked = _gf_false; - gf_boolean_t dirty = _gf_false; - gf_boolean_t status = _gf_false; - quota_meta_t delta = {0, }; - - GF_VALIDATE_OR_GOTO ("marker", contri, out); - GF_REF_GET (contri); - - GF_VALIDATE_OR_GOTO ("marker", loc, out); - GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); - GF_VALIDATE_OR_GOTO ("marker", ctx, out); - - ret = mq_loc_copy (&child_loc, loc); - if (ret < 0) { - gf_log (this->name, GF_LOG_ERROR, "loc copy failed"); - goto out; - } - - while (!__is_root_gfid (child_loc.gfid)) { - /* To improve performance, abort current transaction - * if one is already in progress for same inode - */ - if (status == _gf_true) { - /* status will alreday set before txn start, - * so it should not be set in first - * loop iteration - */ - ret = mq_test_and_set_ctx_updation_status (ctx, - &status); - if (ret < 0 || status == _gf_true) - goto out; - } - - ret = mq_inode_loc_fill (NULL, child_loc.parent, &parent_loc); - if (ret < 0) { - gf_log (this->name, GF_LOG_ERROR, "loc fill failed"); - goto out; - } - - ret = mq_lock (this, &parent_loc, F_WRLCK); - if (ret < 0) - goto out; - locked = _gf_true; - - mq_set_ctx_updation_status (ctx, _gf_false); - status = _gf_true; - - ret = mq_get_delta (this, &child_loc, &delta, ctx, contri); - if (ret < 0) - goto out; - - if (quota_meta_is_null (&delta)) - goto out; - - ret = mq_mark_dirty (this, &parent_loc, 1); - if (ret < 0) - goto out; - dirty = _gf_true; - - ret = mq_update_contri (this, &child_loc, contri, &delta); - if (ret < 0) - goto out; - - ret = mq_update_size (this, &parent_loc, &delta); - if (ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, "rollback " - "contri updation"); - mq_sub_meta (&delta, NULL); - mq_update_contri (this, &child_loc, contri, &delta); - goto out; - } - - ret = mq_mark_dirty (this, &parent_loc, 0); - dirty = _gf_false; - - ret = mq_lock (this, &parent_loc, F_UNLCK); - locked = _gf_false; - - if (__is_root_gfid (parent_loc.gfid)) - break; - - /* Repeate above steps upwards till the root */ - loc_wipe (&child_loc); - ret = mq_loc_copy (&child_loc, &parent_loc); - if (ret < 0) - goto out; - loc_wipe (&parent_loc); - - ret = mq_inode_ctx_get (child_loc.inode, this, &ctx); - if (ret < 0) - goto out; - - if (list_empty (&ctx->contribution_head)) { - gf_log (this->name, GF_LOG_ERROR, - "contribution node list is empty (%s)", - uuid_utoa(child_loc.inode->gfid)); - ret = -1; - goto out; - } - - GF_REF_PUT (contri); - contri = mq_get_contribution_node (child_loc.parent, ctx); - GF_ASSERT (contri != NULL); - } - -out: - if (ret >= 0 && dirty) - ret = mq_mark_dirty (this, &parent_loc, 0); - - if (locked) - ret = mq_lock (this, &parent_loc, F_UNLCK); - - if (ctx && status == _gf_false) - mq_set_ctx_updation_status (ctx, _gf_false); - - loc_wipe (&child_loc); - loc_wipe (&parent_loc); - if (contri) - GF_REF_PUT (contri); - - return 0; -} - -int mq_create_xattrs_task (void *opaque) { int32_t ret = -1; @@ -3066,7 +2944,7 @@ _mq_create_xattrs_txn (xlator_t *this, loc_t *origin_loc, gf_boolean_t spawn) if (ret < 0 || status == _gf_true) goto out; - ret = mq_synctask (this, mq_create_xattrs_task, spawn, &loc, 0); + ret = mq_synctask (this, mq_create_xattrs_task, spawn, &loc); out: if (ret < 0 && status == _gf_false) mq_set_ctx_create_status (ctx, _gf_false); @@ -3108,13 +2986,13 @@ mq_reduce_parent_size_task (void *opaque) quota_inode_ctx_t *ctx = NULL; inode_contribution_t *contribution = NULL; quota_meta_t delta = {0, }; + quota_meta_t contri = {0, }; loc_t parent_loc = {0,}; gf_boolean_t locked = _gf_false; gf_boolean_t dirty = _gf_false; quota_synctask_t *args = NULL; xlator_t *this = NULL; loc_t *loc = NULL; - int64_t contri = 0; gf_boolean_t remove_xattr = _gf_true; GF_ASSERT (opaque); @@ -3132,26 +3010,6 @@ mq_reduce_parent_size_task (void *opaque) goto out; } - contribution = mq_get_contribution_node (loc->parent, ctx); - if (contribution == NULL) { - ret = -1; - gf_log_callingfn (this->name, GF_LOG_WARNING, - "contribution for the node %s is NULL", - loc->path); - goto out; - } - - if (contri >= 0) { - /* contri paramater is supplied only for rename operation. - * remove xattr is alreday performed, we need to skip - * removexattr for rename operation - */ - remove_xattr = _gf_false; - delta.size = contri; - delta.file_count = 1; - delta.dir_count = 0; - } - ret = mq_inode_loc_fill (NULL, loc->parent, &parent_loc); if (ret < 0) { gf_log (this->name, GF_LOG_ERROR, "loc fill failed"); @@ -3163,7 +3021,26 @@ mq_reduce_parent_size_task (void *opaque) goto out; locked = _gf_true; - if (contri < 0) { + if (contri.size >= 0) { + /* contri paramater is supplied only for rename operation. + * remove xattr is alreday performed, we need to skip + * removexattr for rename operation + */ + remove_xattr = _gf_false; + delta.size = contri.size; + delta.file_count = contri.file_count; + delta.dir_count = contri.dir_count; + } else { + remove_xattr = _gf_true; + contribution = mq_get_contribution_node (loc->parent, ctx); + if (contribution == NULL) { + ret = -1; + gf_log (this->name, GF_LOG_DEBUG, + "contribution for the node %s is NULL", + loc->path); + goto out; + } + LOCK (&contribution->lock); { delta.size = contribution->contribution; @@ -3173,26 +3050,6 @@ mq_reduce_parent_size_task (void *opaque) UNLOCK (&contribution->lock); } - /* TODO: Handle handlinks with better approach - Iterating dentry_list without a lock is not a good idea - if (loc->inode->ia_type != IA_IFDIR) { - list_for_each_entry (dentry, &inode->dentry_list, inode_list) { - if (loc->parent == dentry->parent) { - * If the file has another link within the same - * directory, we should not be reducing the size - * of parent - * - delta = 0; - idelta = 0; - break; - } - } - } - */ - - if (quota_meta_is_null (&delta)) - goto out; - ret = mq_mark_dirty (this, &parent_loc, 1); if (ret < 0) goto out; @@ -3200,9 +3057,13 @@ mq_reduce_parent_size_task (void *opaque) mq_sub_meta (&delta, NULL); - ret = mq_remove_contri (this, loc, ctx, contribution, &delta, - remove_xattr); - if (ret < 0) + if (remove_xattr) { + ret = mq_remove_contri (this, loc, ctx, contribution, &delta); + if (ret < 0) + goto out; + } + + if (quota_meta_is_null (&delta)) goto out; ret = mq_update_size (this, &parent_loc, &delta); @@ -3228,7 +3089,8 @@ out: } int32_t -mq_reduce_parent_size_txn (xlator_t *this, loc_t *origin_loc, int64_t contri) +mq_reduce_parent_size_txn (xlator_t *this, loc_t *origin_loc, + quota_meta_t *contri) { int32_t ret = -1; loc_t loc = {0, }; @@ -3251,8 +3113,8 @@ mq_reduce_parent_size_txn (xlator_t *this, loc_t *origin_loc, int64_t contri) goto out; } - ret = mq_synctask (this, mq_reduce_parent_size_task, _gf_true, &loc, - contri); + ret = mq_synctask1 (this, mq_reduce_parent_size_task, _gf_true, &loc, + contri); out: loc_wipe (&loc); return ret; @@ -3261,74 +3123,204 @@ out: int mq_initiate_quota_task (void *opaque) { - int32_t ret = -1; - quota_inode_ctx_t *ctx = NULL; - inode_contribution_t *contribution = NULL; - quota_synctask_t *args = NULL; - xlator_t *this = NULL; - loc_t *loc = NULL; - - GF_ASSERT (opaque); + int32_t ret = -1; + loc_t child_loc = {0,}; + loc_t parent_loc = {0,}; + gf_boolean_t locked = _gf_false; + gf_boolean_t dirty = _gf_false; + gf_boolean_t status = _gf_false; + quota_meta_t delta = {0, }; + quota_synctask_t *args = NULL; + xlator_t *this = NULL; + loc_t *loc = NULL; + inode_contribution_t *contri = NULL; + quota_inode_ctx_t *ctx = NULL; + inode_t *tmp_parent = NULL; + + GF_VALIDATE_OR_GOTO ("marker", opaque, out); args = (quota_synctask_t *) opaque; loc = &args->loc; this = args->this; + + GF_VALIDATE_OR_GOTO ("marker", this, out); THIS = this; - ret = mq_inode_ctx_get (loc->inode, this, &ctx); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "inode ctx get failed, aborting quota txn"); + GF_VALIDATE_OR_GOTO (this->name, loc, out); + GF_VALIDATE_OR_GOTO (this->name, loc->inode, out); + + ret = mq_loc_copy (&child_loc, loc); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, "loc copy failed"); goto out; } - /* Create the contribution node if its absent. Is it right to - assume that if the contribution node is not there, then - create one and proceed instead of returning? - Reason for this assumption is for hard links. Suppose - hard link for a file f1 present in a directory d1 is - created in the directory d2 (as f2). Now, since d2's - contribution is not there in f1's inode ctx, d2's - contribution xattr wont be created and will create problems - for quota operations. - */ - contribution = mq_get_contribution_node (loc->parent, ctx); - if (!contribution) { - if (!loc_is_root(loc)) - gf_log_callingfn (this->name, GF_LOG_TRACE, - "contribution node for the " - "path (%s) with parent (%s) " - "not found", loc->path, - loc->parent ? - uuid_utoa (loc->parent->gfid) : - NULL); + while (!__is_root_gfid (child_loc.gfid)) { - contribution = mq_add_new_contribution_node (this, ctx, loc); - if (!contribution) { - if (!loc_is_root(loc)) - gf_log_callingfn (this->name, GF_LOG_WARNING, - "could not allocate " - " contribution node for (%s) " - "parent: (%s)", loc->path, - loc->parent ? - uuid_utoa (loc->parent->gfid) : - NULL); - ret = -1; + ret = mq_inode_ctx_get (child_loc.inode, this, &ctx); + if (ret < 0) { + gf_log (this->name, GF_LOG_WARNING, + "inode ctx get failed for %s, " + "aborting update txn", child_loc.path); goto out; } - } - mq_start_quota_txn_v2 (this, loc, ctx, contribution); + /* To improve performance, abort current transaction + * if one is already in progress for same inode + */ + if (status == _gf_true) { + /* status will alreday set before txn start, + * so it should not be set in first + * loop iteration + */ + ret = mq_test_and_set_ctx_updation_status (ctx, + &status); + if (ret < 0 || status == _gf_true) + goto out; + } + + ret = mq_inode_loc_fill (NULL, child_loc.parent, &parent_loc); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, "loc fill failed"); + goto out; + } + + ret = mq_lock (this, &parent_loc, F_WRLCK); + if (ret < 0) + goto out; + locked = _gf_true; + + mq_set_ctx_updation_status (ctx, _gf_false); + status = _gf_true; + + /* Contribution node can be NULL in below scenarios and + create if needed: + + Scenario 1) + In this case create a new contribution node + Suppose hard link for a file f1 present in a directory d1 is + created in the directory d2 (as f2). Now, since d2's + contribution is not there in f1's inode ctx, d2's + contribution xattr wont be created and will create problems + for quota operations. + + Don't create contribution if parent has been changed after + taking a lock, this can happen when rename is performed + and writes is still in-progress for the same file + + Scenario 2) + When a rename operation is performed, contribution node + for olp path will be removed. + + Create contribution node only if oldparent is same as + newparent. + Consider below example + 1) rename FOP invoked on file 'x' + 2) write is still in progress for file 'x' + 3) rename takes a lock on old-parent + 4) write-update txn blocked on old-parent to acquire lock + 5) in rename_cbk, contri xattrs are removed and contribution + is deleted and lock is released + 6) now write-update txn gets the lock and updates the + wrong parent as it was holding lock on old parent + so validate parent once the lock is acquired + + For more information on thsi problem, please see + doc for marker_rename in file marker.c + */ + contri = mq_get_contribution_node (child_loc.parent, ctx); + if (contri == NULL) { + tmp_parent = inode_parent (child_loc.inode, 0, NULL); + if (tmp_parent == NULL) { + ret = -1; + goto out; + } + if (gf_uuid_compare(tmp_parent->gfid, + parent_loc.gfid)) { + /* abort txn if parent has changed */ + ret = 0; + goto out; + } + + inode_unref (tmp_parent); + tmp_parent = NULL; + + contri = mq_add_new_contribution_node (this, ctx, + &child_loc); + if (contri == NULL) { + gf_log (this->name, GF_LOG_ERROR, "Failed to " + "create contribution node for %s, " + "abort update txn", child_loc.path); + ret = -1; + goto out; + } + } + + ret = mq_get_delta (this, &child_loc, &delta, ctx, contri); + if (ret < 0) + goto out; + + if (quota_meta_is_null (&delta)) + goto out; + + ret = mq_mark_dirty (this, &parent_loc, 1); + if (ret < 0) + goto out; + dirty = _gf_true; + + ret = mq_update_contri (this, &child_loc, contri, &delta); + if (ret < 0) + goto out; + + ret = mq_update_size (this, &parent_loc, &delta); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, "rollback " + "contri updation"); + mq_sub_meta (&delta, NULL); + mq_update_contri (this, &child_loc, contri, &delta); + goto out; + } + + ret = mq_mark_dirty (this, &parent_loc, 0); + dirty = _gf_false; + + ret = mq_lock (this, &parent_loc, F_UNLCK); + locked = _gf_false; + + if (__is_root_gfid (parent_loc.gfid)) + break; + + /* Repeate above steps upwards till the root */ + loc_wipe (&child_loc); + ret = mq_loc_copy (&child_loc, &parent_loc); + if (ret < 0) + goto out; + + loc_wipe (&parent_loc); + GF_REF_PUT (contri); + contri = NULL; + } - ret = 0; out: - if (contribution) - GF_REF_PUT (contribution); + if (ret >= 0 && dirty) + ret = mq_mark_dirty (this, &parent_loc, 0); - if (ctx && ret < 0) + if (locked) + ret = mq_lock (this, &parent_loc, F_UNLCK); + + if (ctx && status == _gf_false) mq_set_ctx_updation_status (ctx, _gf_false); - return ret; + loc_wipe (&child_loc); + loc_wipe (&parent_loc); + + if (tmp_parent) + inode_unref (tmp_parent); + + if (contri) + GF_REF_PUT (contri); + + return 0; } int @@ -3358,7 +3350,7 @@ _mq_initiate_quota_txn (xlator_t *this, loc_t *origin_loc, gf_boolean_t spawn) if (ret < 0 || status == _gf_true) goto out; - ret = mq_synctask (this, mq_initiate_quota_task, spawn, &loc, 0); + ret = mq_synctask (this, mq_initiate_quota_task, spawn, &loc); out: if (ret < 0 && status == _gf_false) @@ -3555,14 +3547,14 @@ mq_update_dirty_inode_txn (xlator_t *this, loc_t *loc) GF_VALIDATE_OR_GOTO ("marker", loc, out); GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); - ret = mq_synctask (this, mq_update_dirty_inode_task, _gf_true, - loc, 0); + ret = mq_synctask (this, mq_update_dirty_inode_task, _gf_true, loc); out: return ret; } int32_t -mq_inspect_directory_xattr (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx, +mq_inspect_directory_xattr (xlator_t *this, quota_inode_ctx_t *ctx, + inode_contribution_t *contribution, loc_t *loc, dict_t *dict, struct iatt buf) { int32_t ret = 0; @@ -3571,19 +3563,6 @@ mq_inspect_directory_xattr (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx, quota_meta_t contri = {0, }; quota_meta_t delta = {0, }; char contri_key[CONTRI_KEY_MAX] = {0, }; - inode_contribution_t *contribution = NULL; - - if (!loc_is_root(loc)) { - contribution = mq_add_new_contribution_node (this, ctx, loc); - if (contribution == NULL) { - if (!gf_uuid_is_null (loc->inode->gfid)) - gf_log (this->name, GF_LOG_DEBUG, - "cannot add a new contribution node " - "(%s)", uuid_utoa (loc->inode->gfid)); - ret = -1; - goto out; - } - } ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty); if (ret < 0) { @@ -3646,14 +3625,12 @@ create_xattr: ret = mq_create_xattrs_txn (this, loc); out: - if (contribution) - GF_REF_PUT (contribution); - return ret; } int32_t -mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc, +mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx, + inode_contribution_t *contribution, loc_t *loc, dict_t *dict, struct iatt buf) { int32_t ret = -1; @@ -3661,15 +3638,6 @@ mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc, quota_meta_t contri = {0, }; quota_meta_t delta = {0, }; char contri_key[CONTRI_KEY_MAX] = {0, }; - inode_contribution_t *contribution = NULL; - - contribution = mq_add_new_contribution_node (this, ctx, loc); - if (contribution == NULL) { - gf_log_callingfn (this->name, GF_LOG_DEBUG, "cannot allocate " - "contribution node (path:%s)", loc->path); - ret = -1; - goto out; - } LOCK (&ctx->lock); { @@ -3707,9 +3675,6 @@ mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc, /* TODO: revist this code when fixing hardlinks */ out: - if (contribution) - GF_REF_PUT (contribution); - return ret; } @@ -3717,22 +3682,48 @@ int32_t mq_xattr_state (xlator_t *this, loc_t *origin_loc, dict_t *dict, struct iatt buf) { - int32_t ret = -1; - quota_inode_ctx_t *ctx = NULL; - loc_t loc = {0, }; + int32_t ret = -1; + quota_inode_ctx_t *ctx = NULL; + loc_t loc = {0, }; + inode_contribution_t *contribution = NULL; + + if (((buf.ia_type == IA_IFREG) && !dht_is_linkfile (&buf, dict)) + || (buf.ia_type == IA_IFLNK) || (buf.ia_type == IA_IFDIR)) { + /* do healing only for these type of files */ + } else { + ret = 0; + goto out; + } ret = mq_prevalidate_txn (this, origin_loc, &loc, &ctx); if (ret < 0) goto out; - if (((buf.ia_type == IA_IFREG) && !dht_is_linkfile (&buf, dict)) - || (buf.ia_type == IA_IFLNK)) { - mq_inspect_file_xattr (this, ctx, &loc, dict, buf); - } else if (buf.ia_type == IA_IFDIR) - mq_inspect_directory_xattr (this, &loc, ctx, dict, buf); + if (!loc_is_root(&loc)) { + contribution = mq_add_new_contribution_node (this, ctx, &loc); + if (contribution == NULL) { + if (!gf_uuid_is_null (loc.inode->gfid)) + gf_log (this->name, GF_LOG_WARNING, + "cannot add a new contribution node " + "(%s)", uuid_utoa (loc.gfid)); + ret = -1; + goto out; + } + } + + if (buf.ia_type == IA_IFDIR) + mq_inspect_directory_xattr (this, ctx, contribution, &loc, dict, + buf); + else + mq_inspect_file_xattr (this, ctx, contribution, &loc, dict, + buf); out: loc_wipe (&loc); + + if (contribution) + GF_REF_PUT (contribution); + return ret; } diff --git a/xlators/features/marker/src/marker-quota.h b/xlators/features/marker/src/marker-quota.h index e1d5c8519fd..6bae110f014 100644 --- a/xlators/features/marker/src/marker-quota.h +++ b/xlators/features/marker/src/marker-quota.h @@ -13,6 +13,7 @@ #include "xlator.h" #include "marker-mem-types.h" #include "refcount.h" +#include "quota-common-utils.h" #define QUOTA_XATTR_PREFIX "trusted.glusterfs" #define QUOTA_DIRTY_KEY "trusted.glusterfs.quota.dirty" @@ -93,7 +94,7 @@ typedef struct quota_inode_ctx quota_inode_ctx_t; struct quota_synctask { xlator_t *this; loc_t loc; - int64_t contri; + quota_meta_t contri; gf_boolean_t is_static; }; typedef struct quota_synctask quota_synctask_t; @@ -141,7 +142,7 @@ int32_t mq_reduce_parent_size (xlator_t *, loc_t *, int64_t); int32_t -mq_reduce_parent_size_txn (xlator_t *, loc_t *, int64_t); +mq_reduce_parent_size_txn (xlator_t *, loc_t *, quota_meta_t *); int32_t mq_rename_update_newpath (xlator_t *, loc_t *); diff --git a/xlators/features/marker/src/marker.c b/xlators/features/marker/src/marker.c index 8d39bece01b..2ebbde2d75b 100644 --- a/xlators/features/marker/src/marker.c +++ b/xlators/features/marker/src/marker.c @@ -205,6 +205,11 @@ marker_local_unref (marker_local_t *local) if (local->xdata) dict_unref (local->xdata); + if (local->lk_frame) { + STACK_DESTROY (local->lk_frame->root); + local->lk_frame = NULL; + } + if (local->oplocal) { marker_local_unref (local->oplocal); local->oplocal = NULL; @@ -833,7 +838,7 @@ marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, priv = this->private; if (priv->feature_enabled & GF_QUOTA) - mq_reduce_parent_size_txn (this, &local->loc, -1); + mq_reduce_parent_size_txn (this, &local->loc, NULL); if (priv->feature_enabled & GF_XTIME) marker_xtime_update_marks (this, local); @@ -902,7 +907,7 @@ marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, if (priv->feature_enabled & GF_QUOTA) { if (!local->skip_txn) - mq_reduce_parent_size_txn (this, &local->loc, -1); + mq_reduce_parent_size_txn (this, &local->loc, NULL); } if (priv->feature_enabled & GF_XTIME) @@ -1045,35 +1050,23 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; if (op_ret < 0) { - if (local->err == 0) { - local->err = op_errno ? op_errno : EINVAL; - } - gf_log (this->name, GF_LOG_WARNING, "inodelk (UNLOCK) failed on path:%s (gfid:%s) (%s)", - local->parent_loc.path, - uuid_utoa (local->parent_loc.inode->gfid), + oplocal->parent_loc.path, + uuid_utoa (oplocal->parent_loc.inode->gfid), strerror (op_errno)); } - if (local->stub != NULL) { - call_resume (local->stub); - local->stub = NULL; - } else if (local->err != 0) { - STACK_UNWIND_STRICT (rename, frame, -1, local->err, NULL, NULL, - NULL, NULL, NULL, NULL); - } else { - gf_log (this->name, GF_LOG_CRITICAL, - "continuation stub to unwind the call is absent, hence " - "call will be hung (call-stack id = %"PRIu64")", - frame->root->unique); - } + if (local->err != 0) + goto err; - mq_reduce_parent_size_txn (this, &oplocal->loc, oplocal->contribution); + mq_reduce_parent_size_txn (this, &oplocal->loc, &oplocal->contribution); if (local->loc.inode != NULL) { - mq_reduce_parent_size_txn (this, &local->loc, - local->contribution); + /* If destination file exits before rename, it would have + * been unlinked while renaming a file + */ + mq_reduce_parent_size_txn (this, &local->loc, NULL); } newloc.inode = inode_ref (oplocal->loc.inode); @@ -1094,39 +1087,26 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this, marker_xtime_update_marks (this, local); } +err: marker_local_unref (local); marker_local_unref (oplocal); + return 0; } -int32_t -marker_rename_release_newp_lock (call_frame_t *frame, void *cookie, - xlator_t *this, int32_t op_ret, - int32_t op_errno, dict_t *xdata) +void +marker_rename_release_oldp_lock (marker_local_t *local, xlator_t *this) { - marker_local_t *local = NULL, *oplocal = NULL; - struct gf_flock lock = {0, }; + marker_local_t *oplocal = NULL; + call_frame_t *lk_frame = NULL; + struct gf_flock lock = {0, }; - local = frame->local; oplocal = local->oplocal; + lk_frame = local->lk_frame; - if (op_ret < 0) { - if (local->err == 0) { - local->err = op_errno ? op_errno : EINVAL; - } - - gf_log (this->name, GF_LOG_WARNING, - "inodelk (UNLOCK) failed on %s (gfid:%s) (%s)", - oplocal->parent_loc.path, - uuid_utoa (oplocal->parent_loc.inode->gfid), - strerror (op_errno)); - } - - if (local->next_lock_on == NULL) { - marker_rename_done (frame, NULL, this, 0, 0, NULL); - goto out; - } + if (lk_frame == NULL) + goto err; lock.l_type = F_UNLCK; lock.l_whence = SEEK_SET; @@ -1134,47 +1114,75 @@ marker_rename_release_newp_lock (call_frame_t *frame, void *cookie, lock.l_len = 0; lock.l_pid = 0; - STACK_WIND (frame, + STACK_WIND (lk_frame, marker_rename_done, FIRST_CHILD(this), FIRST_CHILD(this)->fops->inodelk, - this->name, &local->parent_loc, F_SETLKW, &lock, NULL); + this->name, &oplocal->parent_loc, F_SETLKW, &lock, NULL); -out: - return 0; + return; + +err: + marker_local_unref (local); + marker_local_unref (oplocal); } int32_t -marker_rename_release_oldp_lock (call_frame_t *frame, void *cookie, - xlator_t *this, int32_t op_ret, - int32_t op_errno, dict_t *xdata) +marker_rename_unwind (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) { - marker_local_t *local = NULL, *oplocal = NULL; - struct gf_flock lock = {0, }; + marker_local_t *local = NULL; + marker_local_t *oplocal = NULL; + quota_inode_ctx_t *ctx = NULL; + inode_contribution_t *contri = NULL; local = frame->local; oplocal = local->oplocal; - - if ((op_ret < 0) && (op_errno != ENOATTR) && (op_errno != ENODATA)) { - local->err = op_errno; - } + frame->local = NULL; //Reset frame uid and gid if set. if (cookie == (void *) _GF_UID_GID_CHANGED) MARKER_RESET_UID_GID (frame, frame->root, local); - lock.l_type = F_UNLCK; - lock.l_whence = SEEK_SET; - lock.l_start = 0; - lock.l_len = 0; - lock.l_pid = 0; + if (op_ret < 0) + local->err = op_errno ? op_errno : EINVAL; + + if (local->stub != NULL) { + /* Remove contribution node from in-memory even if + * remove-xattr has failed as the rename is already performed + * if local->stub is set, which means rename was sucessful + */ + mq_inode_ctx_get (oplocal->loc.inode, this, &ctx); + if (ctx) { + contri = mq_get_contribution_node (oplocal->loc.parent, + ctx); + if (contri) { + QUOTA_FREE_CONTRIBUTION_NODE (ctx, contri); + GF_REF_PUT (contri); + } + } + + call_resume (local->stub); + local->stub = NULL; + local->err = 0; + } else if (local->err != 0) { + STACK_UNWIND_STRICT (rename, frame, -1, local->err, NULL, NULL, + NULL, NULL, NULL, NULL); + } else { + gf_log (this->name, GF_LOG_CRITICAL, + "continuation stub to unwind the call is absent, hence " + "call will be hung (call-stack id = %"PRIu64")", + frame->root->unique); + } + + /* If there are in-progress writes on old-path when during rename + * operation, update txn will update the wrong path if lock + * is released before rename unwind. + * So release lock only after rename unwind + */ + marker_rename_release_oldp_lock (local, this); - STACK_WIND (frame, - marker_rename_release_newp_lock, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->inodelk, - this->name, &oplocal->parent_loc, F_SETLKW, &lock, NULL); return 0; } @@ -1246,7 +1254,7 @@ marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this, newloc.parent = inode_ref (local->loc.parent); gf_uuid_copy (newloc.gfid, oplocal->loc.inode->gfid); - STACK_WIND_COOKIE (frame, marker_rename_release_oldp_lock, + STACK_WIND_COOKIE (frame, marker_rename_unwind, frame->cookie, FIRST_CHILD(this), FIRST_CHILD(this)->fops->removexattr, &newloc, contri_key, NULL); @@ -1280,7 +1288,7 @@ out: return 0; quota_err: - marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); + marker_rename_unwind (frame, NULL, this, 0, 0, NULL); return 0; } @@ -1293,60 +1301,7 @@ marker_do_rename (call_frame_t *frame, void *cookie, xlator_t *this, marker_local_t *oplocal = NULL; char contri_key[CONTRI_KEY_MAX] = {0, }; int32_t ret = 0; - int64_t *contribution = 0; - - local = frame->local; - oplocal = local->oplocal; - - //Reset frame uid and gid if set. - if (cookie == (void *) _GF_UID_GID_CHANGED) - MARKER_RESET_UID_GID (frame, frame->root, local); - - if ((op_ret < 0) && (op_errno != ENOATTR) && (op_errno != ENODATA)) { - local->err = op_errno ? op_errno : EINVAL; - gf_log (this->name, GF_LOG_WARNING, - "fetching contribution values from %s (gfid:%s) " - "failed (%s)", local->loc.path, - uuid_utoa (local->loc.inode->gfid), - strerror (op_errno)); - goto err; - } - - if (local->loc.inode != NULL) { - GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret); - if (ret < 0) { - local->err = errno ? errno : ENOMEM; - goto err; - } - - if (dict_get_bin (dict, contri_key, - (void **) &contribution) == 0) { - local->contribution = ntoh64 (*contribution); - } - } - - STACK_WIND (frame, marker_rename_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->rename, &oplocal->loc, - &local->loc, local->xdata); - - return 0; - -err: - marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); - return 0; -} - - -int32_t -marker_get_newpath_contribution (call_frame_t *frame, void *cookie, - xlator_t *this, int32_t op_ret, - int32_t op_errno, dict_t *dict, dict_t *xdata) -{ - marker_local_t *local = NULL; - marker_local_t *oplocal = NULL; - char contri_key[CONTRI_KEY_MAX] = {0, }; - int32_t ret = 0; - int64_t *contribution = 0; + quota_meta_t contribution = {0, }; local = frame->local; oplocal = local->oplocal; @@ -1370,68 +1325,51 @@ marker_get_newpath_contribution (call_frame_t *frame, void *cookie, local->err = errno ? errno : ENOMEM; goto err; } + quota_dict_get_meta (dict, contri_key, &contribution); + oplocal->contribution = contribution; - if (dict_get_bin (dict, contri_key, (void **) &contribution) == 0) - oplocal->contribution = ntoh64 (*contribution); - - if (local->loc.inode != NULL) { - GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret); - if (ret < 0) { - local->err = errno ? errno : ENOMEM; - goto err; - } - - /* getxattr requires uid and gid to be 0, - * reset them in the callback. - */ - MARKER_SET_UID_GID (frame, local, frame->root); - if (gf_uuid_is_null (local->loc.gfid)) - gf_uuid_copy (local->loc.gfid, local->loc.inode->gfid); - - GF_UUID_ASSERT (local->loc.gfid); - - STACK_WIND_COOKIE (frame, marker_do_rename, - frame->cookie, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->getxattr, - &local->loc, contri_key, NULL); - } else { - marker_do_rename (frame, NULL, this, 0, 0, NULL, NULL); - } + STACK_WIND (frame, marker_rename_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->rename, &oplocal->loc, + &local->loc, local->xdata); return 0; + err: - marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); + marker_rename_unwind (frame, NULL, this, 0, 0, NULL); return 0; } - int32_t -marker_get_oldpath_contribution (call_frame_t *frame, void *cookie, +marker_get_oldpath_contribution (call_frame_t *lk_frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { - marker_local_t *local = NULL; - marker_local_t *oplocal = NULL; - char contri_key[CONTRI_KEY_MAX] = {0, }; - int32_t ret = 0; + call_frame_t *frame = NULL; + marker_local_t *local = NULL; + marker_local_t *oplocal = NULL; + char contri_key[CONTRI_KEY_MAX] = {0, }; + int32_t ret = 0; - local = frame->local; + local = lk_frame->local; oplocal = local->oplocal; + frame = local->frame; if (op_ret < 0) { local->err = op_errno ? op_errno : EINVAL; gf_log (this->name, GF_LOG_WARNING, "cannot hold inodelk on %s (gfid:%s) (%s)", - local->next_lock_on->path, - uuid_utoa (local->next_lock_on->inode->gfid), + oplocal->loc.path, uuid_utoa (oplocal->loc.inode->gfid), strerror (op_errno)); - goto lock_err; + goto err; + + STACK_DESTROY (local->lk_frame->root); + local->lk_frame = NULL; } GET_CONTRI_KEY (contri_key, oplocal->loc.parent->gfid, ret); if (ret < 0) { local->err = errno ? errno : ENOMEM; - goto quota_err; + goto err; } /* getxattr requires uid and gid to be 0, @@ -1445,79 +1383,102 @@ marker_get_oldpath_contribution (call_frame_t *frame, void *cookie, GF_UUID_ASSERT (oplocal->loc.gfid); - STACK_WIND_COOKIE (frame, marker_get_newpath_contribution, + STACK_WIND_COOKIE (frame, marker_do_rename, frame->cookie, FIRST_CHILD(this), FIRST_CHILD(this)->fops->getxattr, &oplocal->loc, contri_key, NULL); - return 0; - -quota_err: - marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); - return 0; - -lock_err: - if ((local->next_lock_on == NULL) - || (local->next_lock_on == &local->parent_loc)) { - local->next_lock_on = NULL; - marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); - } else { - marker_rename_release_newp_lock (frame, NULL, this, 0, 0, NULL); - } return 0; -} - - -int32_t -marker_rename_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, dict_t *xdata) -{ - marker_local_t *local = NULL, *oplocal = NULL; - loc_t *loc = NULL; - struct gf_flock lock = {0, }; - - local = frame->local; - oplocal = local->oplocal; - - if (op_ret < 0) { - if (local->next_lock_on != &oplocal->parent_loc) { - loc = &oplocal->parent_loc; - } else { - loc = &local->parent_loc; - } - - local->err = op_errno ? op_errno : EINVAL; - gf_log (this->name, GF_LOG_WARNING, - "cannot hold inodelk on %s (gfid:%s) (%s)", - loc->path, uuid_utoa (loc->inode->gfid), - strerror (op_errno)); - goto err; - } - - if (local->next_lock_on != NULL) { - lock.l_len = 0; - lock.l_start = 0; - lock.l_type = F_WRLCK; - lock.l_whence = SEEK_SET; - - STACK_WIND (frame, - marker_get_oldpath_contribution, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->inodelk, - this->name, local->next_lock_on, - F_SETLKW, &lock, NULL); - } else { - marker_get_oldpath_contribution (frame, 0, this, 0, 0, NULL); - } - - return 0; - err: - marker_rename_done (frame, NULL, this, 0, 0, NULL); - return 0; -} - - + marker_rename_unwind (frame, NULL, this, 0, 0, NULL); + return 0; +} + + +/* For a marker_rename FOP, following is the algorithm used for Quota + * accounting. The use-case considered is: + * 1. rename (src, dst) + * 2. both src and dst exist + * 3. there are parallel operations on src and dst (lets say through fds + * opened on them before rename was initiated). + * + * PS: We've not thought through whether this algo works in the presence of + * hardlinks to src and/or dst. + * + * Algorithm: + * ========== + * + * 1) set inodelk on src-parent + * As part of rename operation, parent can change for the file. + * We need to remove contribution (both on disk xattr and in-memory one) + * to src-parent (and its ancestors) and add the contribution to dst-parent + * (and its ancestors). While we are doing these operations, contribution of + * the file/directory shouldn't be changing as we want to be sure that + * a) what we subtract from src-parent is exactly what we add to dst-parent + * b) we should subtract from src-parent exactly what we contributed to + * src-parent + * So, We hold a lock on src-parent to block any parallel transcations on + * src-inode (since thats the one which survives rename). + * + * If there are any parallel transactions on dst-inode they keep succeeding + * till the association of dst-inode with dst-parent is broken because of an + * inode_rename after unwind of rename fop from marker. Only after unwind + * (and hence inode_rename), we delete and subtract the contribution of + * dst-inode to dst-parent. That way we are making sure we subtract exactly + * what dst-inode contributed to dst-parent. + * + * 2) lookup contribution to src-parent on src-inode. + * We need to save the contribution info for use at step-8. + * + * 3) wind rename + * Perform rename on disk + * + * 4) remove xattr on src-loc + * After rename, parent can change, so + * need to remove xattrs storing contribution to src-parent. + * + * 5) remove contribution node corresponding to src-parent from the in-memory + * list. + * After rename, contri gfid can change and we have + * also removed xattr from file. + * We need to remove in-memory contribution node to prevent updations to + * src-parent even after a successful rename + * + * 6) unwind rename + * This will ensure that rename is done in the server + * inode table. An inode_rename disassociates src-inode from src-parent and + * associates it with dst-parent. It also disassociates dst-inode from + * dst-parent. After inode_rename, inode_parent on src-inode will give + * dst-parent and inode_parent on dst-inode will return NULL (assuming + * dst-inode doesn't have any hardlinks). + * + * 7) release inodelk on src-parent + * Lock on src-parent should be released only after + * rename on disk, remove xattr and rename_unwind (and hence inode_rename) + * operations. If lock is released before inode_rename, a parallel + * transaction on src-inode can still update src-parent (as inode_parent on + * src-inode can still return src-parent). This would make the + * contribution from src-inode to src-parent stored in step-2 stale. + * + * 8) Initiate mq_reduce_parent_size_txn on src-parent to remove contribution + * of src-inode to src-parent. We use the contribution stored in step-2. + * Since, we had acquired the lock on src-parent all along step-2 through + * inode_rename, we can be sure that a parallel transaction wouldn't have + * added a delta to src-parent. + * + * 9) Initiate mq_reduce_parent_size_txn on dst-parent if dst-inode exists. + * The size reduced from dst-parent and its ancestors is the + * size stored as contribution to dst-parent in dst-inode. + * If the destination file had existed, rename will unlink the + * destination file as part of its operation. + * We need to reduce the size on the dest parent similarly to + * unlink. Since, we are initiating reduce-parent-size transaction after + * inode_rename, we can be sure that a parallel transaction wouldn't add + * delta to dst-parent while we are reducing the contribution of dst-inode + * from its ancestors before rename. + * + * 10) create contribution xattr to dst-parent on src-inode. + */ int32_t marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) @@ -1527,7 +1488,6 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, marker_local_t *oplocal = NULL; marker_conf_t *priv = NULL; struct gf_flock lock = {0, }; - loc_t *lock_on = NULL; priv = this->private; @@ -1566,19 +1526,6 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, if (ret < 0) goto err; - if ((newloc->inode != NULL) && (newloc->parent != oldloc->parent) - && (gf_uuid_compare (newloc->parent->gfid, - oldloc->parent->gfid) < 0)) { - lock_on = &local->parent_loc; - local->next_lock_on = &oplocal->parent_loc; - } else { - lock_on = &oplocal->parent_loc; - if ((newloc->inode != NULL) && (newloc->parent - != oldloc->parent)) { - local->next_lock_on = &local->parent_loc; - } - } - lock.l_len = 0; lock.l_start = 0; lock.l_type = F_WRLCK; @@ -1586,14 +1533,22 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, local->xdata = dict_ref (xdata); - if (is_lk_owner_null (&frame->root->lk_owner)) - set_lk_owner_from_ptr (&frame->root->lk_owner, frame->root); + local->frame = frame; + local->lk_frame = create_frame (this, this->ctx->pool); + if (local->lk_frame == NULL) + goto err; + + local->lk_frame->root->uid = 0; + local->lk_frame->root->gid = 0; + local->lk_frame->local = local; + set_lk_owner_from_ptr (&local->lk_frame->root->lk_owner, + local->lk_frame->root); - STACK_WIND (frame, - marker_rename_inodelk_cbk, + STACK_WIND (local->lk_frame, + marker_get_oldpath_contribution, FIRST_CHILD(this), FIRST_CHILD(this)->fops->inodelk, - this->name, lock_on, + this->name, &oplocal->parent_loc, F_SETLKW, &lock, NULL); return 0; diff --git a/xlators/features/marker/src/marker.h b/xlators/features/marker/src/marker.h index 6e43e74d5a6..228fd6fa5d8 100644 --- a/xlators/features/marker/src/marker.h +++ b/xlators/features/marker/src/marker.h @@ -92,7 +92,6 @@ struct marker_local{ pid_t pid; loc_t loc; loc_t parent_loc; - loc_t *next_lock_on; uid_t uid; gid_t gid; int32_t ref; @@ -101,7 +100,8 @@ struct marker_local{ mode_t mode; int32_t err; call_stub_t *stub; - int64_t contribution; + call_frame_t *lk_frame; + quota_meta_t contribution; struct marker_local *oplocal; /* marker quota specific */ |