summaryrefslogtreecommitdiffstats
path: root/xlators
diff options
context:
space:
mode:
authorvmallika <vmallika@redhat.com>2015-07-12 08:49:49 +0530
committerRaghavendra G <rgowdapp@redhat.com>2015-08-19 10:43:54 -0700
commit4efc8ea17f8452cf5a5f3a504419c1269d332d79 (patch)
tree05a385130b54d282766baf5db74be8178b1dcc4c /xlators
parentd290dc62e474cfe4a305f503dff815c73d3f2578 (diff)
quota/marker: fix inode quota with rename
There are three problems with marker-rename which is fixed in this patch Problem 1) 1) mq_reduce_parent_size is not handling inode-quota contribution 2) When dest files exists and IO is happening Now renaming will overwrite existing file mq_reduce_parent_size called on dest file with saved contribution, this can be a problem is IO is still happening contribution might have changed Problem 2) There is a small race between rename and in-progress write Consider below scenario 1) rename FOP invoked on file 'x' 2) write is still in progress for file 'x' 3) rename takes a lock on old-parent 4) write-update txn blocked on old-parent to acquire lock 5) in rename_cbk, contri xattrs are removed and contribution is deleted and lock is released 6) now write-update txn gets the lock and updates the wrong parent as it was holding lock on old parent so validate parent once the lock is acquired Problem 3) when a rename operation is performed, a lock is held on old parent. This lock is release before unwinding the rename operation. This can be a problem if there are in-progress writes happening during rename, where update txn can take a lock and update the old parent as inode table is not updated with new parent Change-Id: Ic3316097c001c33533f98592e8fcf234b1ee2aa2 BUG: 1240991 Signed-off-by: vmallika <vmallika@redhat.com> Reviewed-on: http://review.gluster.org/11578 Tested-by: Gluster Build System <jenkins@build.gluster.com> Tested-by: NetBSD Build System <jenkins@build.gluster.org> Reviewed-by: Raghavendra G <rgowdapp@redhat.com>
Diffstat (limited to 'xlators')
-rw-r--r--xlators/features/marker/src/marker-quota-helper.c5
-rw-r--r--xlators/features/marker/src/marker-quota.c549
-rw-r--r--xlators/features/marker/src/marker-quota.h5
-rw-r--r--xlators/features/marker/src/marker.c453
-rw-r--r--xlators/features/marker/src/marker.h4
5 files changed, 484 insertions, 532 deletions
diff --git a/xlators/features/marker/src/marker-quota-helper.c b/xlators/features/marker/src/marker-quota-helper.c
index 6265ffbac6e..5611c1d8bdc 100644
--- a/xlators/features/marker/src/marker-quota-helper.c
+++ b/xlators/features/marker/src/marker-quota-helper.c
@@ -169,6 +169,9 @@ mq_get_contribution_node (inode_t *inode, quota_inode_ctx_t *ctx)
LOCK (&ctx->lock);
{
+ if (list_empty (&ctx->contribution_head))
+ goto unlock;
+
list_for_each_entry (temp, &ctx->contribution_head,
contri_list) {
if (gf_uuid_compare (temp->gfid, inode->gfid) == 0) {
@@ -178,7 +181,9 @@ mq_get_contribution_node (inode_t *inode, quota_inode_ctx_t *ctx)
}
}
}
+unlock:
UNLOCK (&ctx->lock);
+
out:
return contri;
}
diff --git a/xlators/features/marker/src/marker-quota.c b/xlators/features/marker/src/marker-quota.c
index ef3203131ee..2aa62a78150 100644
--- a/xlators/features/marker/src/marker-quota.c
+++ b/xlators/features/marker/src/marker-quota.c
@@ -2591,15 +2591,11 @@ out:
int32_t
mq_remove_contri (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
- inode_contribution_t *contri, quota_meta_t *delta,
- gf_boolean_t remove_xattr)
+ inode_contribution_t *contri, quota_meta_t *delta)
{
int32_t ret = -1;
char contri_key[CONTRI_KEY_MAX] = {0, };
- if (remove_xattr == _gf_false)
- goto done;
-
GET_CONTRI_KEY (contri_key, contri->gfid, ret);
if (ret < 0) {
gf_log (this->name, GF_LOG_ERROR, "get contri_key "
@@ -2626,7 +2622,6 @@ mq_remove_contri (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
}
}
-done:
LOCK (&contri->lock);
{
contri->contribution += delta->size;
@@ -2780,8 +2775,8 @@ mq_synctask_cleanup (int ret, call_frame_t *frame, void *opaque)
}
int
-mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc,
- int64_t contri)
+mq_synctask1 (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn,
+ loc_t *loc, quota_meta_t *contri)
{
int32_t ret = -1;
quota_synctask_t *args = NULL;
@@ -2797,7 +2792,14 @@ mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc,
args->this = this;
loc_copy (&args->loc, loc);
- args->contri = contri;
+
+ if (contri) {
+ args->contri = *contri;
+ } else {
+ args->contri.size = -1;
+ args->contri.file_count = -1;
+ args->contri.dir_count = -1;
+ }
if (spawn) {
ret = synctask_new1 (this->ctx->env, 1024 * 16, task,
@@ -2816,6 +2818,12 @@ out:
return ret;
}
+int
+mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc)
+{
+ return mq_synctask1 (this, task, spawn, loc, NULL);
+}
+
int32_t
mq_prevalidate_txn (xlator_t *this, loc_t *origin_loc, loc_t *loc,
quota_inode_ctx_t **ctx)
@@ -2860,136 +2868,6 @@ out:
}
int
-mq_start_quota_txn_v2 (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
- inode_contribution_t *contri)
-{
- int32_t ret = -1;
- loc_t child_loc = {0,};
- loc_t parent_loc = {0,};
- gf_boolean_t locked = _gf_false;
- gf_boolean_t dirty = _gf_false;
- gf_boolean_t status = _gf_false;
- quota_meta_t delta = {0, };
-
- GF_VALIDATE_OR_GOTO ("marker", contri, out);
- GF_REF_GET (contri);
-
- GF_VALIDATE_OR_GOTO ("marker", loc, out);
- GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
- GF_VALIDATE_OR_GOTO ("marker", ctx, out);
-
- ret = mq_loc_copy (&child_loc, loc);
- if (ret < 0) {
- gf_log (this->name, GF_LOG_ERROR, "loc copy failed");
- goto out;
- }
-
- while (!__is_root_gfid (child_loc.gfid)) {
- /* To improve performance, abort current transaction
- * if one is already in progress for same inode
- */
- if (status == _gf_true) {
- /* status will alreday set before txn start,
- * so it should not be set in first
- * loop iteration
- */
- ret = mq_test_and_set_ctx_updation_status (ctx,
- &status);
- if (ret < 0 || status == _gf_true)
- goto out;
- }
-
- ret = mq_inode_loc_fill (NULL, child_loc.parent, &parent_loc);
- if (ret < 0) {
- gf_log (this->name, GF_LOG_ERROR, "loc fill failed");
- goto out;
- }
-
- ret = mq_lock (this, &parent_loc, F_WRLCK);
- if (ret < 0)
- goto out;
- locked = _gf_true;
-
- mq_set_ctx_updation_status (ctx, _gf_false);
- status = _gf_true;
-
- ret = mq_get_delta (this, &child_loc, &delta, ctx, contri);
- if (ret < 0)
- goto out;
-
- if (quota_meta_is_null (&delta))
- goto out;
-
- ret = mq_mark_dirty (this, &parent_loc, 1);
- if (ret < 0)
- goto out;
- dirty = _gf_true;
-
- ret = mq_update_contri (this, &child_loc, contri, &delta);
- if (ret < 0)
- goto out;
-
- ret = mq_update_size (this, &parent_loc, &delta);
- if (ret < 0) {
- gf_log (this->name, GF_LOG_DEBUG, "rollback "
- "contri updation");
- mq_sub_meta (&delta, NULL);
- mq_update_contri (this, &child_loc, contri, &delta);
- goto out;
- }
-
- ret = mq_mark_dirty (this, &parent_loc, 0);
- dirty = _gf_false;
-
- ret = mq_lock (this, &parent_loc, F_UNLCK);
- locked = _gf_false;
-
- if (__is_root_gfid (parent_loc.gfid))
- break;
-
- /* Repeate above steps upwards till the root */
- loc_wipe (&child_loc);
- ret = mq_loc_copy (&child_loc, &parent_loc);
- if (ret < 0)
- goto out;
- loc_wipe (&parent_loc);
-
- ret = mq_inode_ctx_get (child_loc.inode, this, &ctx);
- if (ret < 0)
- goto out;
-
- if (list_empty (&ctx->contribution_head)) {
- gf_log (this->name, GF_LOG_ERROR,
- "contribution node list is empty (%s)",
- uuid_utoa(child_loc.inode->gfid));
- ret = -1;
- goto out;
- }
-
- GF_REF_PUT (contri);
- contri = mq_get_contribution_node (child_loc.parent, ctx);
- GF_ASSERT (contri != NULL);
- }
-
-out:
- if (ret >= 0 && dirty)
- ret = mq_mark_dirty (this, &parent_loc, 0);
-
- if (locked)
- ret = mq_lock (this, &parent_loc, F_UNLCK);
-
- if (ctx && status == _gf_false)
- mq_set_ctx_updation_status (ctx, _gf_false);
-
- loc_wipe (&child_loc);
- loc_wipe (&parent_loc);
- if (contri)
- GF_REF_PUT (contri);
-
- return 0;
-}
-
-int
mq_create_xattrs_task (void *opaque)
{
int32_t ret = -1;
@@ -3066,7 +2944,7 @@ _mq_create_xattrs_txn (xlator_t *this, loc_t *origin_loc, gf_boolean_t spawn)
if (ret < 0 || status == _gf_true)
goto out;
- ret = mq_synctask (this, mq_create_xattrs_task, spawn, &loc, 0);
+ ret = mq_synctask (this, mq_create_xattrs_task, spawn, &loc);
out:
if (ret < 0 && status == _gf_false)
mq_set_ctx_create_status (ctx, _gf_false);
@@ -3108,13 +2986,13 @@ mq_reduce_parent_size_task (void *opaque)
quota_inode_ctx_t *ctx = NULL;
inode_contribution_t *contribution = NULL;
quota_meta_t delta = {0, };
+ quota_meta_t contri = {0, };
loc_t parent_loc = {0,};
gf_boolean_t locked = _gf_false;
gf_boolean_t dirty = _gf_false;
quota_synctask_t *args = NULL;
xlator_t *this = NULL;
loc_t *loc = NULL;
- int64_t contri = 0;
gf_boolean_t remove_xattr = _gf_true;
GF_ASSERT (opaque);
@@ -3132,26 +3010,6 @@ mq_reduce_parent_size_task (void *opaque)
goto out;
}
- contribution = mq_get_contribution_node (loc->parent, ctx);
- if (contribution == NULL) {
- ret = -1;
- gf_log_callingfn (this->name, GF_LOG_WARNING,
- "contribution for the node %s is NULL",
- loc->path);
- goto out;
- }
-
- if (contri >= 0) {
- /* contri paramater is supplied only for rename operation.
- * remove xattr is alreday performed, we need to skip
- * removexattr for rename operation
- */
- remove_xattr = _gf_false;
- delta.size = contri;
- delta.file_count = 1;
- delta.dir_count = 0;
- }
-
ret = mq_inode_loc_fill (NULL, loc->parent, &parent_loc);
if (ret < 0) {
gf_log (this->name, GF_LOG_ERROR, "loc fill failed");
@@ -3163,7 +3021,26 @@ mq_reduce_parent_size_task (void *opaque)
goto out;
locked = _gf_true;
- if (contri < 0) {
+ if (contri.size >= 0) {
+ /* contri paramater is supplied only for rename operation.
+ * remove xattr is alreday performed, we need to skip
+ * removexattr for rename operation
+ */
+ remove_xattr = _gf_false;
+ delta.size = contri.size;
+ delta.file_count = contri.file_count;
+ delta.dir_count = contri.dir_count;
+ } else {
+ remove_xattr = _gf_true;
+ contribution = mq_get_contribution_node (loc->parent, ctx);
+ if (contribution == NULL) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "contribution for the node %s is NULL",
+ loc->path);
+ goto out;
+ }
+
LOCK (&contribution->lock);
{
delta.size = contribution->contribution;
@@ -3173,26 +3050,6 @@ mq_reduce_parent_size_task (void *opaque)
UNLOCK (&contribution->lock);
}
- /* TODO: Handle handlinks with better approach
- Iterating dentry_list without a lock is not a good idea
- if (loc->inode->ia_type != IA_IFDIR) {
- list_for_each_entry (dentry, &inode->dentry_list, inode_list) {
- if (loc->parent == dentry->parent) {
- * If the file has another link within the same
- * directory, we should not be reducing the size
- * of parent
- *
- delta = 0;
- idelta = 0;
- break;
- }
- }
- }
- */
-
- if (quota_meta_is_null (&delta))
- goto out;
-
ret = mq_mark_dirty (this, &parent_loc, 1);
if (ret < 0)
goto out;
@@ -3200,9 +3057,13 @@ mq_reduce_parent_size_task (void *opaque)
mq_sub_meta (&delta, NULL);
- ret = mq_remove_contri (this, loc, ctx, contribution, &delta,
- remove_xattr);
- if (ret < 0)
+ if (remove_xattr) {
+ ret = mq_remove_contri (this, loc, ctx, contribution, &delta);
+ if (ret < 0)
+ goto out;
+ }
+
+ if (quota_meta_is_null (&delta))
goto out;
ret = mq_update_size (this, &parent_loc, &delta);
@@ -3228,7 +3089,8 @@ out:
}
int32_t
-mq_reduce_parent_size_txn (xlator_t *this, loc_t *origin_loc, int64_t contri)
+mq_reduce_parent_size_txn (xlator_t *this, loc_t *origin_loc,
+ quota_meta_t *contri)
{
int32_t ret = -1;
loc_t loc = {0, };
@@ -3251,8 +3113,8 @@ mq_reduce_parent_size_txn (xlator_t *this, loc_t *origin_loc, int64_t contri)
goto out;
}
- ret = mq_synctask (this, mq_reduce_parent_size_task, _gf_true, &loc,
- contri);
+ ret = mq_synctask1 (this, mq_reduce_parent_size_task, _gf_true, &loc,
+ contri);
out:
loc_wipe (&loc);
return ret;
@@ -3261,74 +3123,204 @@ out:
int
mq_initiate_quota_task (void *opaque)
{
- int32_t ret = -1;
- quota_inode_ctx_t *ctx = NULL;
- inode_contribution_t *contribution = NULL;
- quota_synctask_t *args = NULL;
- xlator_t *this = NULL;
- loc_t *loc = NULL;
-
- GF_ASSERT (opaque);
+ int32_t ret = -1;
+ loc_t child_loc = {0,};
+ loc_t parent_loc = {0,};
+ gf_boolean_t locked = _gf_false;
+ gf_boolean_t dirty = _gf_false;
+ gf_boolean_t status = _gf_false;
+ quota_meta_t delta = {0, };
+ quota_synctask_t *args = NULL;
+ xlator_t *this = NULL;
+ loc_t *loc = NULL;
+ inode_contribution_t *contri = NULL;
+ quota_inode_ctx_t *ctx = NULL;
+ inode_t *tmp_parent = NULL;
+
+ GF_VALIDATE_OR_GOTO ("marker", opaque, out);
args = (quota_synctask_t *) opaque;
loc = &args->loc;
this = args->this;
+
+ GF_VALIDATE_OR_GOTO ("marker", this, out);
THIS = this;
- ret = mq_inode_ctx_get (loc->inode, this, &ctx);
- if (ret == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "inode ctx get failed, aborting quota txn");
+ GF_VALIDATE_OR_GOTO (this->name, loc, out);
+ GF_VALIDATE_OR_GOTO (this->name, loc->inode, out);
+
+ ret = mq_loc_copy (&child_loc, loc);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "loc copy failed");
goto out;
}
- /* Create the contribution node if its absent. Is it right to
- assume that if the contribution node is not there, then
- create one and proceed instead of returning?
- Reason for this assumption is for hard links. Suppose
- hard link for a file f1 present in a directory d1 is
- created in the directory d2 (as f2). Now, since d2's
- contribution is not there in f1's inode ctx, d2's
- contribution xattr wont be created and will create problems
- for quota operations.
- */
- contribution = mq_get_contribution_node (loc->parent, ctx);
- if (!contribution) {
- if (!loc_is_root(loc))
- gf_log_callingfn (this->name, GF_LOG_TRACE,
- "contribution node for the "
- "path (%s) with parent (%s) "
- "not found", loc->path,
- loc->parent ?
- uuid_utoa (loc->parent->gfid) :
- NULL);
+ while (!__is_root_gfid (child_loc.gfid)) {
- contribution = mq_add_new_contribution_node (this, ctx, loc);
- if (!contribution) {
- if (!loc_is_root(loc))
- gf_log_callingfn (this->name, GF_LOG_WARNING,
- "could not allocate "
- " contribution node for (%s) "
- "parent: (%s)", loc->path,
- loc->parent ?
- uuid_utoa (loc->parent->gfid) :
- NULL);
- ret = -1;
+ ret = mq_inode_ctx_get (child_loc.inode, this, &ctx);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "inode ctx get failed for %s, "
+ "aborting update txn", child_loc.path);
goto out;
}
- }
- mq_start_quota_txn_v2 (this, loc, ctx, contribution);
+ /* To improve performance, abort current transaction
+ * if one is already in progress for same inode
+ */
+ if (status == _gf_true) {
+ /* status will alreday set before txn start,
+ * so it should not be set in first
+ * loop iteration
+ */
+ ret = mq_test_and_set_ctx_updation_status (ctx,
+ &status);
+ if (ret < 0 || status == _gf_true)
+ goto out;
+ }
+
+ ret = mq_inode_loc_fill (NULL, child_loc.parent, &parent_loc);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "loc fill failed");
+ goto out;
+ }
+
+ ret = mq_lock (this, &parent_loc, F_WRLCK);
+ if (ret < 0)
+ goto out;
+ locked = _gf_true;
+
+ mq_set_ctx_updation_status (ctx, _gf_false);
+ status = _gf_true;
+
+ /* Contribution node can be NULL in below scenarios and
+ create if needed:
+
+ Scenario 1)
+ In this case create a new contribution node
+ Suppose hard link for a file f1 present in a directory d1 is
+ created in the directory d2 (as f2). Now, since d2's
+ contribution is not there in f1's inode ctx, d2's
+ contribution xattr wont be created and will create problems
+ for quota operations.
+
+ Don't create contribution if parent has been changed after
+ taking a lock, this can happen when rename is performed
+ and writes is still in-progress for the same file
+
+ Scenario 2)
+ When a rename operation is performed, contribution node
+ for olp path will be removed.
+
+ Create contribution node only if oldparent is same as
+ newparent.
+ Consider below example
+ 1) rename FOP invoked on file 'x'
+ 2) write is still in progress for file 'x'
+ 3) rename takes a lock on old-parent
+ 4) write-update txn blocked on old-parent to acquire lock
+ 5) in rename_cbk, contri xattrs are removed and contribution
+ is deleted and lock is released
+ 6) now write-update txn gets the lock and updates the
+ wrong parent as it was holding lock on old parent
+ so validate parent once the lock is acquired
+
+ For more information on thsi problem, please see
+ doc for marker_rename in file marker.c
+ */
+ contri = mq_get_contribution_node (child_loc.parent, ctx);
+ if (contri == NULL) {
+ tmp_parent = inode_parent (child_loc.inode, 0, NULL);
+ if (tmp_parent == NULL) {
+ ret = -1;
+ goto out;
+ }
+ if (gf_uuid_compare(tmp_parent->gfid,
+ parent_loc.gfid)) {
+ /* abort txn if parent has changed */
+ ret = 0;
+ goto out;
+ }
+
+ inode_unref (tmp_parent);
+ tmp_parent = NULL;
+
+ contri = mq_add_new_contribution_node (this, ctx,
+ &child_loc);
+ if (contri == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to "
+ "create contribution node for %s, "
+ "abort update txn", child_loc.path);
+ ret = -1;
+ goto out;
+ }
+ }
+
+ ret = mq_get_delta (this, &child_loc, &delta, ctx, contri);
+ if (ret < 0)
+ goto out;
+
+ if (quota_meta_is_null (&delta))
+ goto out;
+
+ ret = mq_mark_dirty (this, &parent_loc, 1);
+ if (ret < 0)
+ goto out;
+ dirty = _gf_true;
+
+ ret = mq_update_contri (this, &child_loc, contri, &delta);
+ if (ret < 0)
+ goto out;
+
+ ret = mq_update_size (this, &parent_loc, &delta);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_DEBUG, "rollback "
+ "contri updation");
+ mq_sub_meta (&delta, NULL);
+ mq_update_contri (this, &child_loc, contri, &delta);
+ goto out;
+ }
+
+ ret = mq_mark_dirty (this, &parent_loc, 0);
+ dirty = _gf_false;
+
+ ret = mq_lock (this, &parent_loc, F_UNLCK);
+ locked = _gf_false;
+
+ if (__is_root_gfid (parent_loc.gfid))
+ break;
+
+ /* Repeate above steps upwards till the root */
+ loc_wipe (&child_loc);
+ ret = mq_loc_copy (&child_loc, &parent_loc);
+ if (ret < 0)
+ goto out;
+
+ loc_wipe (&parent_loc);
+ GF_REF_PUT (contri);
+ contri = NULL;
+ }
- ret = 0;
out:
- if (contribution)
- GF_REF_PUT (contribution);
+ if (ret >= 0 && dirty)
+ ret = mq_mark_dirty (this, &parent_loc, 0);
- if (ctx && ret < 0)
+ if (locked)
+ ret = mq_lock (this, &parent_loc, F_UNLCK);
+
+ if (ctx && status == _gf_false)
mq_set_ctx_updation_status (ctx, _gf_false);
- return ret;
+ loc_wipe (&child_loc);
+ loc_wipe (&parent_loc);
+
+ if (tmp_parent)
+ inode_unref (tmp_parent);
+
+ if (contri)
+ GF_REF_PUT (contri);
+
+ return 0;
}
int
@@ -3358,7 +3350,7 @@ _mq_initiate_quota_txn (xlator_t *this, loc_t *origin_loc, gf_boolean_t spawn)
if (ret < 0 || status == _gf_true)
goto out;
- ret = mq_synctask (this, mq_initiate_quota_task, spawn, &loc, 0);
+ ret = mq_synctask (this, mq_initiate_quota_task, spawn, &loc);
out:
if (ret < 0 && status == _gf_false)
@@ -3555,14 +3547,14 @@ mq_update_dirty_inode_txn (xlator_t *this, loc_t *loc)
GF_VALIDATE_OR_GOTO ("marker", loc, out);
GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
- ret = mq_synctask (this, mq_update_dirty_inode_task, _gf_true,
- loc, 0);
+ ret = mq_synctask (this, mq_update_dirty_inode_task, _gf_true, loc);
out:
return ret;
}
int32_t
-mq_inspect_directory_xattr (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
+mq_inspect_directory_xattr (xlator_t *this, quota_inode_ctx_t *ctx,
+ inode_contribution_t *contribution, loc_t *loc,
dict_t *dict, struct iatt buf)
{
int32_t ret = 0;
@@ -3571,19 +3563,6 @@ mq_inspect_directory_xattr (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
quota_meta_t contri = {0, };
quota_meta_t delta = {0, };
char contri_key[CONTRI_KEY_MAX] = {0, };
- inode_contribution_t *contribution = NULL;
-
- if (!loc_is_root(loc)) {
- contribution = mq_add_new_contribution_node (this, ctx, loc);
- if (contribution == NULL) {
- if (!gf_uuid_is_null (loc->inode->gfid))
- gf_log (this->name, GF_LOG_DEBUG,
- "cannot add a new contribution node "
- "(%s)", uuid_utoa (loc->inode->gfid));
- ret = -1;
- goto out;
- }
- }
ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty);
if (ret < 0) {
@@ -3646,14 +3625,12 @@ create_xattr:
ret = mq_create_xattrs_txn (this, loc);
out:
- if (contribution)
- GF_REF_PUT (contribution);
-
return ret;
}
int32_t
-mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc,
+mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx,
+ inode_contribution_t *contribution, loc_t *loc,
dict_t *dict, struct iatt buf)
{
int32_t ret = -1;
@@ -3661,15 +3638,6 @@ mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc,
quota_meta_t contri = {0, };
quota_meta_t delta = {0, };
char contri_key[CONTRI_KEY_MAX] = {0, };
- inode_contribution_t *contribution = NULL;
-
- contribution = mq_add_new_contribution_node (this, ctx, loc);
- if (contribution == NULL) {
- gf_log_callingfn (this->name, GF_LOG_DEBUG, "cannot allocate "
- "contribution node (path:%s)", loc->path);
- ret = -1;
- goto out;
- }
LOCK (&ctx->lock);
{
@@ -3707,9 +3675,6 @@ mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc,
/* TODO: revist this code when fixing hardlinks */
out:
- if (contribution)
- GF_REF_PUT (contribution);
-
return ret;
}
@@ -3717,22 +3682,48 @@ int32_t
mq_xattr_state (xlator_t *this, loc_t *origin_loc, dict_t *dict,
struct iatt buf)
{
- int32_t ret = -1;
- quota_inode_ctx_t *ctx = NULL;
- loc_t loc = {0, };
+ int32_t ret = -1;
+ quota_inode_ctx_t *ctx = NULL;
+ loc_t loc = {0, };
+ inode_contribution_t *contribution = NULL;
+
+ if (((buf.ia_type == IA_IFREG) && !dht_is_linkfile (&buf, dict))
+ || (buf.ia_type == IA_IFLNK) || (buf.ia_type == IA_IFDIR)) {
+ /* do healing only for these type of files */
+ } else {
+ ret = 0;
+ goto out;
+ }
ret = mq_prevalidate_txn (this, origin_loc, &loc, &ctx);
if (ret < 0)
goto out;
- if (((buf.ia_type == IA_IFREG) && !dht_is_linkfile (&buf, dict))
- || (buf.ia_type == IA_IFLNK)) {
- mq_inspect_file_xattr (this, ctx, &loc, dict, buf);
- } else if (buf.ia_type == IA_IFDIR)
- mq_inspect_directory_xattr (this, &loc, ctx, dict, buf);
+ if (!loc_is_root(&loc)) {
+ contribution = mq_add_new_contribution_node (this, ctx, &loc);
+ if (contribution == NULL) {
+ if (!gf_uuid_is_null (loc.inode->gfid))
+ gf_log (this->name, GF_LOG_WARNING,
+ "cannot add a new contribution node "
+ "(%s)", uuid_utoa (loc.gfid));
+ ret = -1;
+ goto out;
+ }
+ }
+
+ if (buf.ia_type == IA_IFDIR)
+ mq_inspect_directory_xattr (this, ctx, contribution, &loc, dict,
+ buf);
+ else
+ mq_inspect_file_xattr (this, ctx, contribution, &loc, dict,
+ buf);
out:
loc_wipe (&loc);
+
+ if (contribution)
+ GF_REF_PUT (contribution);
+
return ret;
}
diff --git a/xlators/features/marker/src/marker-quota.h b/xlators/features/marker/src/marker-quota.h
index e1d5c8519fd..6bae110f014 100644
--- a/xlators/features/marker/src/marker-quota.h
+++ b/xlators/features/marker/src/marker-quota.h
@@ -13,6 +13,7 @@
#include "xlator.h"
#include "marker-mem-types.h"
#include "refcount.h"
+#include "quota-common-utils.h"
#define QUOTA_XATTR_PREFIX "trusted.glusterfs"
#define QUOTA_DIRTY_KEY "trusted.glusterfs.quota.dirty"
@@ -93,7 +94,7 @@ typedef struct quota_inode_ctx quota_inode_ctx_t;
struct quota_synctask {
xlator_t *this;
loc_t loc;
- int64_t contri;
+ quota_meta_t contri;
gf_boolean_t is_static;
};
typedef struct quota_synctask quota_synctask_t;
@@ -141,7 +142,7 @@ int32_t
mq_reduce_parent_size (xlator_t *, loc_t *, int64_t);
int32_t
-mq_reduce_parent_size_txn (xlator_t *, loc_t *, int64_t);
+mq_reduce_parent_size_txn (xlator_t *, loc_t *, quota_meta_t *);
int32_t
mq_rename_update_newpath (xlator_t *, loc_t *);
diff --git a/xlators/features/marker/src/marker.c b/xlators/features/marker/src/marker.c
index 8d39bece01b..2ebbde2d75b 100644
--- a/xlators/features/marker/src/marker.c
+++ b/xlators/features/marker/src/marker.c
@@ -205,6 +205,11 @@ marker_local_unref (marker_local_t *local)
if (local->xdata)
dict_unref (local->xdata);
+ if (local->lk_frame) {
+ STACK_DESTROY (local->lk_frame->root);
+ local->lk_frame = NULL;
+ }
+
if (local->oplocal) {
marker_local_unref (local->oplocal);
local->oplocal = NULL;
@@ -833,7 +838,7 @@ marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
priv = this->private;
if (priv->feature_enabled & GF_QUOTA)
- mq_reduce_parent_size_txn (this, &local->loc, -1);
+ mq_reduce_parent_size_txn (this, &local->loc, NULL);
if (priv->feature_enabled & GF_XTIME)
marker_xtime_update_marks (this, local);
@@ -902,7 +907,7 @@ marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
if (priv->feature_enabled & GF_QUOTA) {
if (!local->skip_txn)
- mq_reduce_parent_size_txn (this, &local->loc, -1);
+ mq_reduce_parent_size_txn (this, &local->loc, NULL);
}
if (priv->feature_enabled & GF_XTIME)
@@ -1045,35 +1050,23 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,
frame->local = NULL;
if (op_ret < 0) {
- if (local->err == 0) {
- local->err = op_errno ? op_errno : EINVAL;
- }
-
gf_log (this->name, GF_LOG_WARNING,
"inodelk (UNLOCK) failed on path:%s (gfid:%s) (%s)",
- local->parent_loc.path,
- uuid_utoa (local->parent_loc.inode->gfid),
+ oplocal->parent_loc.path,
+ uuid_utoa (oplocal->parent_loc.inode->gfid),
strerror (op_errno));
}
- if (local->stub != NULL) {
- call_resume (local->stub);
- local->stub = NULL;
- } else if (local->err != 0) {
- STACK_UNWIND_STRICT (rename, frame, -1, local->err, NULL, NULL,
- NULL, NULL, NULL, NULL);
- } else {
- gf_log (this->name, GF_LOG_CRITICAL,
- "continuation stub to unwind the call is absent, hence "
- "call will be hung (call-stack id = %"PRIu64")",
- frame->root->unique);
- }
+ if (local->err != 0)
+ goto err;
- mq_reduce_parent_size_txn (this, &oplocal->loc, oplocal->contribution);
+ mq_reduce_parent_size_txn (this, &oplocal->loc, &oplocal->contribution);
if (local->loc.inode != NULL) {
- mq_reduce_parent_size_txn (this, &local->loc,
- local->contribution);
+ /* If destination file exits before rename, it would have
+ * been unlinked while renaming a file
+ */
+ mq_reduce_parent_size_txn (this, &local->loc, NULL);
}
newloc.inode = inode_ref (oplocal->loc.inode);
@@ -1094,39 +1087,26 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,
marker_xtime_update_marks (this, local);
}
+err:
marker_local_unref (local);
marker_local_unref (oplocal);
+
return 0;
}
-int32_t
-marker_rename_release_newp_lock (call_frame_t *frame, void *cookie,
- xlator_t *this, int32_t op_ret,
- int32_t op_errno, dict_t *xdata)
+void
+marker_rename_release_oldp_lock (marker_local_t *local, xlator_t *this)
{
- marker_local_t *local = NULL, *oplocal = NULL;
- struct gf_flock lock = {0, };
+ marker_local_t *oplocal = NULL;
+ call_frame_t *lk_frame = NULL;
+ struct gf_flock lock = {0, };
- local = frame->local;
oplocal = local->oplocal;
+ lk_frame = local->lk_frame;
- if (op_ret < 0) {
- if (local->err == 0) {
- local->err = op_errno ? op_errno : EINVAL;
- }
-
- gf_log (this->name, GF_LOG_WARNING,
- "inodelk (UNLOCK) failed on %s (gfid:%s) (%s)",
- oplocal->parent_loc.path,
- uuid_utoa (oplocal->parent_loc.inode->gfid),
- strerror (op_errno));
- }
-
- if (local->next_lock_on == NULL) {
- marker_rename_done (frame, NULL, this, 0, 0, NULL);
- goto out;
- }
+ if (lk_frame == NULL)
+ goto err;
lock.l_type = F_UNLCK;
lock.l_whence = SEEK_SET;
@@ -1134,47 +1114,75 @@ marker_rename_release_newp_lock (call_frame_t *frame, void *cookie,
lock.l_len = 0;
lock.l_pid = 0;
- STACK_WIND (frame,
+ STACK_WIND (lk_frame,
marker_rename_done,
FIRST_CHILD(this),
FIRST_CHILD(this)->fops->inodelk,
- this->name, &local->parent_loc, F_SETLKW, &lock, NULL);
+ this->name, &oplocal->parent_loc, F_SETLKW, &lock, NULL);
-out:
- return 0;
+ return;
+
+err:
+ marker_local_unref (local);
+ marker_local_unref (oplocal);
}
int32_t
-marker_rename_release_oldp_lock (call_frame_t *frame, void *cookie,
- xlator_t *this, int32_t op_ret,
- int32_t op_errno, dict_t *xdata)
+marker_rename_unwind (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
- marker_local_t *local = NULL, *oplocal = NULL;
- struct gf_flock lock = {0, };
+ marker_local_t *local = NULL;
+ marker_local_t *oplocal = NULL;
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contri = NULL;
local = frame->local;
oplocal = local->oplocal;
-
- if ((op_ret < 0) && (op_errno != ENOATTR) && (op_errno != ENODATA)) {
- local->err = op_errno;
- }
+ frame->local = NULL;
//Reset frame uid and gid if set.
if (cookie == (void *) _GF_UID_GID_CHANGED)
MARKER_RESET_UID_GID (frame, frame->root, local);
- lock.l_type = F_UNLCK;
- lock.l_whence = SEEK_SET;
- lock.l_start = 0;
- lock.l_len = 0;
- lock.l_pid = 0;
+ if (op_ret < 0)
+ local->err = op_errno ? op_errno : EINVAL;
+
+ if (local->stub != NULL) {
+ /* Remove contribution node from in-memory even if
+ * remove-xattr has failed as the rename is already performed
+ * if local->stub is set, which means rename was sucessful
+ */
+ mq_inode_ctx_get (oplocal->loc.inode, this, &ctx);
+ if (ctx) {
+ contri = mq_get_contribution_node (oplocal->loc.parent,
+ ctx);
+ if (contri) {
+ QUOTA_FREE_CONTRIBUTION_NODE (ctx, contri);
+ GF_REF_PUT (contri);
+ }
+ }
+
+ call_resume (local->stub);
+ local->stub = NULL;
+ local->err = 0;
+ } else if (local->err != 0) {
+ STACK_UNWIND_STRICT (rename, frame, -1, local->err, NULL, NULL,
+ NULL, NULL, NULL, NULL);
+ } else {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "continuation stub to unwind the call is absent, hence "
+ "call will be hung (call-stack id = %"PRIu64")",
+ frame->root->unique);
+ }
+
+ /* If there are in-progress writes on old-path when during rename
+ * operation, update txn will update the wrong path if lock
+ * is released before rename unwind.
+ * So release lock only after rename unwind
+ */
+ marker_rename_release_oldp_lock (local, this);
- STACK_WIND (frame,
- marker_rename_release_newp_lock,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->inodelk,
- this->name, &oplocal->parent_loc, F_SETLKW, &lock, NULL);
return 0;
}
@@ -1246,7 +1254,7 @@ marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
newloc.parent = inode_ref (local->loc.parent);
gf_uuid_copy (newloc.gfid, oplocal->loc.inode->gfid);
- STACK_WIND_COOKIE (frame, marker_rename_release_oldp_lock,
+ STACK_WIND_COOKIE (frame, marker_rename_unwind,
frame->cookie, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->removexattr,
&newloc, contri_key, NULL);
@@ -1280,7 +1288,7 @@ out:
return 0;
quota_err:
- marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL);
+ marker_rename_unwind (frame, NULL, this, 0, 0, NULL);
return 0;
}
@@ -1293,60 +1301,7 @@ marker_do_rename (call_frame_t *frame, void *cookie, xlator_t *this,
marker_local_t *oplocal = NULL;
char contri_key[CONTRI_KEY_MAX] = {0, };
int32_t ret = 0;
- int64_t *contribution = 0;
-
- local = frame->local;
- oplocal = local->oplocal;
-
- //Reset frame uid and gid if set.
- if (cookie == (void *) _GF_UID_GID_CHANGED)
- MARKER_RESET_UID_GID (frame, frame->root, local);
-
- if ((op_ret < 0) && (op_errno != ENOATTR) && (op_errno != ENODATA)) {
- local->err = op_errno ? op_errno : EINVAL;
- gf_log (this->name, GF_LOG_WARNING,
- "fetching contribution values from %s (gfid:%s) "
- "failed (%s)", local->loc.path,
- uuid_utoa (local->loc.inode->gfid),
- strerror (op_errno));
- goto err;
- }
-
- if (local->loc.inode != NULL) {
- GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret);
- if (ret < 0) {
- local->err = errno ? errno : ENOMEM;
- goto err;
- }
-
- if (dict_get_bin (dict, contri_key,
- (void **) &contribution) == 0) {
- local->contribution = ntoh64 (*contribution);
- }
- }
-
- STACK_WIND (frame, marker_rename_cbk, FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->rename, &oplocal->loc,
- &local->loc, local->xdata);
-
- return 0;
-
-err:
- marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL);
- return 0;
-}
-
-
-int32_t
-marker_get_newpath_contribution (call_frame_t *frame, void *cookie,
- xlator_t *this, int32_t op_ret,
- int32_t op_errno, dict_t *dict, dict_t *xdata)
-{
- marker_local_t *local = NULL;
- marker_local_t *oplocal = NULL;
- char contri_key[CONTRI_KEY_MAX] = {0, };
- int32_t ret = 0;
- int64_t *contribution = 0;
+ quota_meta_t contribution = {0, };
local = frame->local;
oplocal = local->oplocal;
@@ -1370,68 +1325,51 @@ marker_get_newpath_contribution (call_frame_t *frame, void *cookie,
local->err = errno ? errno : ENOMEM;
goto err;
}
+ quota_dict_get_meta (dict, contri_key, &contribution);
+ oplocal->contribution = contribution;
- if (dict_get_bin (dict, contri_key, (void **) &contribution) == 0)
- oplocal->contribution = ntoh64 (*contribution);
-
- if (local->loc.inode != NULL) {
- GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret);
- if (ret < 0) {
- local->err = errno ? errno : ENOMEM;
- goto err;
- }
-
- /* getxattr requires uid and gid to be 0,
- * reset them in the callback.
- */
- MARKER_SET_UID_GID (frame, local, frame->root);
- if (gf_uuid_is_null (local->loc.gfid))
- gf_uuid_copy (local->loc.gfid, local->loc.inode->gfid);
-
- GF_UUID_ASSERT (local->loc.gfid);
-
- STACK_WIND_COOKIE (frame, marker_do_rename,
- frame->cookie, FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->getxattr,
- &local->loc, contri_key, NULL);
- } else {
- marker_do_rename (frame, NULL, this, 0, 0, NULL, NULL);
- }
+ STACK_WIND (frame, marker_rename_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->rename, &oplocal->loc,
+ &local->loc, local->xdata);
return 0;
+
err:
- marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL);
+ marker_rename_unwind (frame, NULL, this, 0, 0, NULL);
return 0;
}
-
int32_t
-marker_get_oldpath_contribution (call_frame_t *frame, void *cookie,
+marker_get_oldpath_contribution (call_frame_t *lk_frame, void *cookie,
xlator_t *this, int32_t op_ret,
int32_t op_errno, dict_t *xdata)
{
- marker_local_t *local = NULL;
- marker_local_t *oplocal = NULL;
- char contri_key[CONTRI_KEY_MAX] = {0, };
- int32_t ret = 0;
+ call_frame_t *frame = NULL;
+ marker_local_t *local = NULL;
+ marker_local_t *oplocal = NULL;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ int32_t ret = 0;
- local = frame->local;
+ local = lk_frame->local;
oplocal = local->oplocal;
+ frame = local->frame;
if (op_ret < 0) {
local->err = op_errno ? op_errno : EINVAL;
gf_log (this->name, GF_LOG_WARNING,
"cannot hold inodelk on %s (gfid:%s) (%s)",
- local->next_lock_on->path,
- uuid_utoa (local->next_lock_on->inode->gfid),
+ oplocal->loc.path, uuid_utoa (oplocal->loc.inode->gfid),
strerror (op_errno));
- goto lock_err;
+ goto err;
+
+ STACK_DESTROY (local->lk_frame->root);
+ local->lk_frame = NULL;
}
GET_CONTRI_KEY (contri_key, oplocal->loc.parent->gfid, ret);
if (ret < 0) {
local->err = errno ? errno : ENOMEM;
- goto quota_err;
+ goto err;
}
/* getxattr requires uid and gid to be 0,
@@ -1445,79 +1383,102 @@ marker_get_oldpath_contribution (call_frame_t *frame, void *cookie,
GF_UUID_ASSERT (oplocal->loc.gfid);
- STACK_WIND_COOKIE (frame, marker_get_newpath_contribution,
+ STACK_WIND_COOKIE (frame, marker_do_rename,
frame->cookie, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->getxattr,
&oplocal->loc, contri_key, NULL);
- return 0;
-
-quota_err:
- marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL);
- return 0;
-
-lock_err:
- if ((local->next_lock_on == NULL)
- || (local->next_lock_on == &local->parent_loc)) {
- local->next_lock_on = NULL;
- marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL);
- } else {
- marker_rename_release_newp_lock (frame, NULL, this, 0, 0, NULL);
- }
return 0;
-}
-
-
-int32_t
-marker_rename_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, dict_t *xdata)
-{
- marker_local_t *local = NULL, *oplocal = NULL;
- loc_t *loc = NULL;
- struct gf_flock lock = {0, };
-
- local = frame->local;
- oplocal = local->oplocal;
-
- if (op_ret < 0) {
- if (local->next_lock_on != &oplocal->parent_loc) {
- loc = &oplocal->parent_loc;
- } else {
- loc = &local->parent_loc;
- }
-
- local->err = op_errno ? op_errno : EINVAL;
- gf_log (this->name, GF_LOG_WARNING,
- "cannot hold inodelk on %s (gfid:%s) (%s)",
- loc->path, uuid_utoa (loc->inode->gfid),
- strerror (op_errno));
- goto err;
- }
-
- if (local->next_lock_on != NULL) {
- lock.l_len = 0;
- lock.l_start = 0;
- lock.l_type = F_WRLCK;
- lock.l_whence = SEEK_SET;
-
- STACK_WIND (frame,
- marker_get_oldpath_contribution,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->inodelk,
- this->name, local->next_lock_on,
- F_SETLKW, &lock, NULL);
- } else {
- marker_get_oldpath_contribution (frame, 0, this, 0, 0, NULL);
- }
-
- return 0;
-
err:
- marker_rename_done (frame, NULL, this, 0, 0, NULL);
- return 0;
-}
-
-
+ marker_rename_unwind (frame, NULL, this, 0, 0, NULL);
+ return 0;
+}
+
+
+/* For a marker_rename FOP, following is the algorithm used for Quota
+ * accounting. The use-case considered is:
+ * 1. rename (src, dst)
+ * 2. both src and dst exist
+ * 3. there are parallel operations on src and dst (lets say through fds
+ * opened on them before rename was initiated).
+ *
+ * PS: We've not thought through whether this algo works in the presence of
+ * hardlinks to src and/or dst.
+ *
+ * Algorithm:
+ * ==========
+ *
+ * 1) set inodelk on src-parent
+ * As part of rename operation, parent can change for the file.
+ * We need to remove contribution (both on disk xattr and in-memory one)
+ * to src-parent (and its ancestors) and add the contribution to dst-parent
+ * (and its ancestors). While we are doing these operations, contribution of
+ * the file/directory shouldn't be changing as we want to be sure that
+ * a) what we subtract from src-parent is exactly what we add to dst-parent
+ * b) we should subtract from src-parent exactly what we contributed to
+ * src-parent
+ * So, We hold a lock on src-parent to block any parallel transcations on
+ * src-inode (since thats the one which survives rename).
+ *
+ * If there are any parallel transactions on dst-inode they keep succeeding
+ * till the association of dst-inode with dst-parent is broken because of an
+ * inode_rename after unwind of rename fop from marker. Only after unwind
+ * (and hence inode_rename), we delete and subtract the contribution of
+ * dst-inode to dst-parent. That way we are making sure we subtract exactly
+ * what dst-inode contributed to dst-parent.
+ *
+ * 2) lookup contribution to src-parent on src-inode.
+ * We need to save the contribution info for use at step-8.
+ *
+ * 3) wind rename
+ * Perform rename on disk
+ *
+ * 4) remove xattr on src-loc
+ * After rename, parent can change, so
+ * need to remove xattrs storing contribution to src-parent.
+ *
+ * 5) remove contribution node corresponding to src-parent from the in-memory
+ * list.
+ * After rename, contri gfid can change and we have
+ * also removed xattr from file.
+ * We need to remove in-memory contribution node to prevent updations to
+ * src-parent even after a successful rename
+ *
+ * 6) unwind rename
+ * This will ensure that rename is done in the server
+ * inode table. An inode_rename disassociates src-inode from src-parent and
+ * associates it with dst-parent. It also disassociates dst-inode from
+ * dst-parent. After inode_rename, inode_parent on src-inode will give
+ * dst-parent and inode_parent on dst-inode will return NULL (assuming
+ * dst-inode doesn't have any hardlinks).
+ *
+ * 7) release inodelk on src-parent
+ * Lock on src-parent should be released only after
+ * rename on disk, remove xattr and rename_unwind (and hence inode_rename)
+ * operations. If lock is released before inode_rename, a parallel
+ * transaction on src-inode can still update src-parent (as inode_parent on
+ * src-inode can still return src-parent). This would make the
+ * contribution from src-inode to src-parent stored in step-2 stale.
+ *
+ * 8) Initiate mq_reduce_parent_size_txn on src-parent to remove contribution
+ * of src-inode to src-parent. We use the contribution stored in step-2.
+ * Since, we had acquired the lock on src-parent all along step-2 through
+ * inode_rename, we can be sure that a parallel transaction wouldn't have
+ * added a delta to src-parent.
+ *
+ * 9) Initiate mq_reduce_parent_size_txn on dst-parent if dst-inode exists.
+ * The size reduced from dst-parent and its ancestors is the
+ * size stored as contribution to dst-parent in dst-inode.
+ * If the destination file had existed, rename will unlink the
+ * destination file as part of its operation.
+ * We need to reduce the size on the dest parent similarly to
+ * unlink. Since, we are initiating reduce-parent-size transaction after
+ * inode_rename, we can be sure that a parallel transaction wouldn't add
+ * delta to dst-parent while we are reducing the contribution of dst-inode
+ * from its ancestors before rename.
+ *
+ * 10) create contribution xattr to dst-parent on src-inode.
+ */
int32_t
marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
loc_t *newloc, dict_t *xdata)
@@ -1527,7 +1488,6 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
marker_local_t *oplocal = NULL;
marker_conf_t *priv = NULL;
struct gf_flock lock = {0, };
- loc_t *lock_on = NULL;
priv = this->private;
@@ -1566,19 +1526,6 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
if (ret < 0)
goto err;
- if ((newloc->inode != NULL) && (newloc->parent != oldloc->parent)
- && (gf_uuid_compare (newloc->parent->gfid,
- oldloc->parent->gfid) < 0)) {
- lock_on = &local->parent_loc;
- local->next_lock_on = &oplocal->parent_loc;
- } else {
- lock_on = &oplocal->parent_loc;
- if ((newloc->inode != NULL) && (newloc->parent
- != oldloc->parent)) {
- local->next_lock_on = &local->parent_loc;
- }
- }
-
lock.l_len = 0;
lock.l_start = 0;
lock.l_type = F_WRLCK;
@@ -1586,14 +1533,22 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
local->xdata = dict_ref (xdata);
- if (is_lk_owner_null (&frame->root->lk_owner))
- set_lk_owner_from_ptr (&frame->root->lk_owner, frame->root);
+ local->frame = frame;
+ local->lk_frame = create_frame (this, this->ctx->pool);
+ if (local->lk_frame == NULL)
+ goto err;
+
+ local->lk_frame->root->uid = 0;
+ local->lk_frame->root->gid = 0;
+ local->lk_frame->local = local;
+ set_lk_owner_from_ptr (&local->lk_frame->root->lk_owner,
+ local->lk_frame->root);
- STACK_WIND (frame,
- marker_rename_inodelk_cbk,
+ STACK_WIND (local->lk_frame,
+ marker_get_oldpath_contribution,
FIRST_CHILD(this),
FIRST_CHILD(this)->fops->inodelk,
- this->name, lock_on,
+ this->name, &oplocal->parent_loc,
F_SETLKW, &lock, NULL);
return 0;
diff --git a/xlators/features/marker/src/marker.h b/xlators/features/marker/src/marker.h
index 6e43e74d5a6..228fd6fa5d8 100644
--- a/xlators/features/marker/src/marker.h
+++ b/xlators/features/marker/src/marker.h
@@ -92,7 +92,6 @@ struct marker_local{
pid_t pid;
loc_t loc;
loc_t parent_loc;
- loc_t *next_lock_on;
uid_t uid;
gid_t gid;
int32_t ref;
@@ -101,7 +100,8 @@ struct marker_local{
mode_t mode;
int32_t err;
call_stub_t *stub;
- int64_t contribution;
+ call_frame_t *lk_frame;
+ quota_meta_t contribution;
struct marker_local *oplocal;
/* marker quota specific */