diff options
author | Raghavendra G <raghavendra@gluster.com> | 2011-06-14 23:54:16 +0000 |
---|---|---|
committer | Anand Avati <avati@gluster.com> | 2011-06-16 22:00:51 -0700 |
commit | a87555181d47522e985325c67b7d17c49dbd38de (patch) | |
tree | 16e7848551e64c1aa8b2038fabe2da4c3bd98dd7 /xlators/features | |
parent | 01d67311c83ae272f3ee3632c1e8f13ccebaca81 (diff) |
features/marker-quota: fixes in rename path.
- remove xattrs from newpath after rename is complete.
- hold inodelk on both parents (if they are different) before doing rename and
gather contribution values of oldpath and newpath to their parents while still
holding the locks. Use these contribution values to reduce parent sizes.
Signed-off-by: Raghavendra G <raghavendra@gluster.com>
Signed-off-by: Anand Avati <avati@gluster.com>
BUG: 2697 (Quota: add-brick creates the size go awkward, though it was perfect earlier)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=2697
Diffstat (limited to 'xlators/features')
-rw-r--r-- | xlators/features/marker/src/marker-quota-helper.c | 35 | ||||
-rw-r--r-- | xlators/features/marker/src/marker-quota-helper.h | 5 | ||||
-rw-r--r-- | xlators/features/marker/src/marker-quota.c | 33 | ||||
-rw-r--r-- | xlators/features/marker/src/marker-quota.h | 3 | ||||
-rw-r--r-- | xlators/features/marker/src/marker.c | 403 | ||||
-rw-r--r-- | xlators/features/marker/src/marker.h | 6 |
6 files changed, 414 insertions, 71 deletions
diff --git a/xlators/features/marker/src/marker-quota-helper.c b/xlators/features/marker/src/marker-quota-helper.c index fba2cdd3f..9a7310e8a 100644 --- a/xlators/features/marker/src/marker-quota-helper.c +++ b/xlators/features/marker/src/marker-quota-helper.c @@ -373,3 +373,38 @@ quota_local_unref (xlator_t *this, quota_local_t *local) out: return 0; } + + +inode_contribution_t * +get_contribution_from_loc (xlator_t *this, loc_t *loc) +{ + int32_t ret = 0; + quota_inode_ctx_t *ctx = NULL; + inode_contribution_t *contribution = NULL; + + ret = quota_inode_ctx_get (loc->inode, this, &ctx); + if (ret < 0) { + gf_log_callingfn (this->name, GF_LOG_WARNING, + "cannot get marker-quota context from inode " + "(ino: %"PRId64", gfid:%s, path:%s)", + loc->inode->ino, + uuid_utoa (loc->inode->gfid), + loc->path); + goto err; + } + + contribution = get_contribution_node (loc->parent, ctx); + if (contribution == NULL) { + gf_log_callingfn (this->name, GF_LOG_WARNING, + "inode (ino:%"PRId64", gfid:%s, path:%s ) has" + " no contribution towards parent (ino:%"PRId64 + ", gfid:%s)", loc->inode->ino, + uuid_utoa (loc->inode->gfid), + loc->path, loc->parent->ino, + uuid_utoa (loc->parent->gfid)); + goto err; + } + +err: + return contribution; +} diff --git a/xlators/features/marker/src/marker-quota-helper.h b/xlators/features/marker/src/marker-quota-helper.h index 9a24c8c3d..c72cd94e8 100644 --- a/xlators/features/marker/src/marker-quota-helper.h +++ b/xlators/features/marker/src/marker-quota-helper.h @@ -22,6 +22,7 @@ #define _CONFIG_H #include "config.h" #endif + #include "marker-quota.h" #define QUOTA_FREE_CONTRIBUTION_NODE(_contribution) \ @@ -73,4 +74,8 @@ quota_local_unref (xlator_t *, quota_local_t *); inode_contribution_t * get_contribution_node (inode_t *, quota_inode_ctx_t *); + +inode_contribution_t * +get_contribution_from_loc (xlator_t *this, loc_t *loc); + #endif diff --git a/xlators/features/marker/src/marker-quota.c b/xlators/features/marker/src/marker-quota.c index 4662c989d..0a82cb5fe 100644 --- a/xlators/features/marker/src/marker-quota.c +++ b/xlators/features/marker/src/marker-quota.c @@ -1762,10 +1762,13 @@ mq_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this, local->err = -1; ret = quota_inode_ctx_get (local->parent_loc.inode, this, &ctx); - if (ret == 0) - ctx->size -= local->contri->contribution; - local->contri->contribution = 0; + if (local->contri->contribution == local->size) { + if (ret == 0) + ctx->size -= local->contri->contribution; + + local->contri->contribution = 0; + } lock.l_type = F_UNLCK; lock.l_whence = SEEK_SET; @@ -1813,10 +1816,13 @@ mq_reduce_parent_size_xattr (call_frame_t *frame, void *cookie, goto err; } - QUOTA_ALLOC_OR_GOTO (size, int64_t, ret, err); + if (local->size < 0) { + local->size = contribution->contribution; + } - *size = hton64 (-contribution->contribution); + QUOTA_ALLOC_OR_GOTO (size, int64_t, ret, err); + *size = hton64 (-local->size); ret = dict_set_bin (dict, QUOTA_SIZE_KEY, size, 8); if (ret < 0) @@ -1838,7 +1844,7 @@ err: } int32_t -reduce_parent_size (xlator_t *this, loc_t *loc) +reduce_parent_size (xlator_t *this, loc_t *loc, int64_t contri) { int32_t ret = -1; struct gf_flock lock = {0,}; @@ -1867,6 +1873,17 @@ reduce_parent_size (xlator_t *this, loc_t *loc) goto out; } + if (contri >= 0) { + local->size = contri; + } else { + local->size = -1; + } + + if (local->size == 0) { + ret = 0; + goto out; + } + ret = loc_copy (&local->loc, loc); if (ret < 0) goto out; @@ -1898,13 +1915,15 @@ reduce_parent_size (xlator_t *this, loc_t *loc) FIRST_CHILD(this), FIRST_CHILD(this)->fops->inodelk, this->name, &local->parent_loc, F_SETLKW, &lock); + local = NULL; ret = 0; out: - if (ret < 0) { + if (local != NULL) { quota_local_unref (this, local); GF_FREE (local); } + return ret; } diff --git a/xlators/features/marker/src/marker-quota.h b/xlators/features/marker/src/marker-quota.h index ea433a455..5535aa04c 100644 --- a/xlators/features/marker/src/marker-quota.h +++ b/xlators/features/marker/src/marker-quota.h @@ -117,6 +117,7 @@ struct quota_local { int32_t err; int32_t ref; int64_t sum; + int64_t size; int32_t hl_count; int32_t dentry_child_count; @@ -155,7 +156,7 @@ quota_dirty_inode_readdir (call_frame_t *, void *, xlator_t *, int32_t, int32_t, fd_t *); int32_t -reduce_parent_size (xlator_t *, loc_t *); +reduce_parent_size (xlator_t *, loc_t *, int64_t); int32_t quota_rename_update_newpath (xlator_t *, loc_t *); diff --git a/xlators/features/marker/src/marker.c b/xlators/features/marker/src/marker.c index f3c9e333c..b00d0a3d2 100644 --- a/xlators/features/marker/src/marker.c +++ b/xlators/features/marker/src/marker.c @@ -27,6 +27,7 @@ #include "marker.h" #include "marker-mem-types.h" #include "marker-quota.h" +#include "marker-quota-helper.h" #include "marker-common.h" void @@ -178,8 +179,8 @@ marker_local_unref (marker_local_t *local) loc_wipe (&local->loc); if (local->oplocal) { - loc_wipe (&local->oplocal->loc); - GF_FREE (local->oplocal); + marker_local_unref (local->oplocal); + local->oplocal = NULL; } GF_FREE (local); out: @@ -679,7 +680,7 @@ marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, priv = this->private; if (priv->feature_enabled & GF_QUOTA) - reduce_parent_size (this, &local->loc); + reduce_parent_size (this, &local->loc, -1); if (priv->feature_enabled & GF_XTIME) marker_xtime_update_marks (this, local); @@ -746,7 +747,7 @@ marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, priv = this->private; if ((priv->feature_enabled & GF_QUOTA) && (local->ia_nlink == 1)) - reduce_parent_size (this, &local->loc); + reduce_parent_size (this, &local->loc, -1); if (priv->feature_enabled & GF_XTIME) marker_xtime_update_marks (this, local); @@ -889,60 +890,63 @@ err: int32_t -marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *buf, - struct iatt *preoldparent, struct iatt *postoldparent, - struct iatt *prenewparent, struct iatt *postnewparent) +marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno) { - marker_conf_t *priv = NULL; - marker_local_t *local = NULL; - marker_local_t *oplocal = NULL; - loc_t newloc = {0, }; + marker_local_t *local = NULL, *oplocal = NULL; + loc_t newloc = {0, }; + marker_conf_t *priv = NULL; - if (op_ret == -1) { - gf_log (this->name, GF_LOG_TRACE, "%s occured while " - "renaming a file ", strerror (op_errno)); - } + local = frame->local; + oplocal = local->oplocal; - local = (marker_local_t *) frame->local; + priv = this->private; frame->local = NULL; - STACK_UNWIND_STRICT (rename, frame, op_ret, op_errno, buf, preoldparent, - postoldparent, prenewparent, postnewparent); - - if (op_ret == -1 || local == NULL) - goto out; + if (op_ret < 0) { + if (local->err == 0) { + local->err = op_errno; + } - oplocal = local->oplocal; - local->oplocal = NULL; + gf_log (this->name, GF_LOG_WARNING, + "inodelk (UNLOCK) failed on path:%s, inode (ino:%"PRId64 + ", gfid:%s)(%s)", local->parent_loc.path, + local->parent_loc.inode->ino, + uuid_utoa (local->parent_loc.inode->gfid), + strerror (op_errno)); + } - priv = this->private; + if (local->stub != NULL) { + call_resume (local->stub); + local->stub = NULL; + } else if (local->err != 0) { + STACK_UNWIND_STRICT (rename, frame, -1, local->err, NULL, NULL, + NULL, NULL, NULL); + } - if (priv->feature_enabled & GF_QUOTA) { - reduce_parent_size (this, &oplocal->loc); + reduce_parent_size (this, &oplocal->loc, oplocal->contribution); - if (local->loc.inode != NULL) { - reduce_parent_size (this, &local->loc); - } + if (local->loc.inode != NULL) { + reduce_parent_size (this, &local->loc, local->contribution); + } - newloc.inode = inode_ref (oplocal->loc.inode); - newloc.path = gf_strdup (local->loc.path); - newloc.name = gf_strdup (local->loc.name); - newloc.parent = inode_ref (local->loc.parent); - newloc.ino = oplocal->loc.inode->ino; + newloc.inode = inode_ref (oplocal->loc.inode); + newloc.path = gf_strdup (local->loc.path); + newloc.name = gf_strdup (local->loc.name); + newloc.parent = inode_ref (local->loc.parent); + newloc.ino = oplocal->loc.inode->ino; - quota_rename_update_newpath (this, &newloc); + quota_rename_update_newpath (this, &newloc); - loc_wipe (&newloc); - } + loc_wipe (&newloc); if (priv->feature_enabled & GF_XTIME) { //update marks on oldpath marker_xtime_update_marks (this, oplocal); marker_xtime_update_marks (this, local); } -out: + marker_local_unref (local); marker_local_unref (oplocal); return 0; @@ -950,35 +954,276 @@ out: int32_t -marker_quota_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) +marker_rename_release_newp_lock (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno) +{ + marker_local_t *local = NULL, *oplocal = NULL; + struct gf_flock lock = {0, }; + + local = frame->local; + oplocal = local->oplocal; + + if (op_ret < 0) { + if (local->err == 0) { + local->err = op_errno; + } + + gf_log (this->name, GF_LOG_WARNING, + "inodelk (UNLOCK) failed on path:%s, inode (ino:%"PRId64 + ", gfid:%s)(%s)", oplocal->parent_loc.path, + oplocal->parent_loc.inode->ino, + uuid_utoa (oplocal->parent_loc.inode->gfid), + strerror (op_errno)); + } + + if (local->next_lock_on == NULL) { + marker_rename_done (frame, NULL, this, 0, 0); + goto out; + } + + lock.l_type = F_UNLCK; + lock.l_whence = SEEK_SET; + lock.l_start = 0; + lock.l_len = 0; + lock.l_pid = 0; + + STACK_WIND (frame, + marker_rename_done, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, &local->parent_loc, F_SETLKW, &lock); + +out: + return 0; +} + + +int32_t +marker_rename_release_oldp_lock (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno) { - marker_local_t *local = NULL, *oplocal = NULL; + marker_local_t *local = NULL, *oplocal = NULL; + struct gf_flock lock = {0, }; + + local = frame->local; + oplocal = local->oplocal; if ((op_ret < 0) && (op_errno != ENOATTR)) { - goto unwind; + local->err = op_errno; + } + + lock.l_type = F_UNLCK; + lock.l_whence = SEEK_SET; + lock.l_start = 0; + lock.l_len = 0; + lock.l_pid = 0; + + STACK_WIND (frame, + marker_rename_release_newp_lock, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, &oplocal->parent_loc, F_SETLKW, &lock); + return 0; +} + + +int32_t +marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *buf, + struct iatt *preoldparent, struct iatt *postoldparent, + struct iatt *prenewparent, struct iatt *postnewparent) +{ + marker_conf_t *priv = NULL; + marker_local_t *local = NULL; + marker_local_t *oplocal = NULL; + call_stub_t *stub = NULL; + int32_t ret = 0; + char contri_key [512] = {0, }; + + local = (marker_local_t *) frame->local; + + if (local != NULL) { + oplocal = local->oplocal; + } + + priv = this->private; + + if (op_ret < 0) { + if (local != NULL) { + local->err = op_errno; + } + + gf_log (this->name, GF_LOG_TRACE, "%s occured while " + "renaming a file ", strerror (op_errno)); + } + + if (priv->feature_enabled & GF_QUOTA) { + if ((op_ret < 0) || (local == NULL)) { + goto quota_err; + } + + stub = fop_rename_cbk_stub (frame, default_rename_cbk, op_ret, + op_errno, buf, preoldparent, + postoldparent, prenewparent, + postnewparent); + if (stub == NULL) { + local->err = ENOMEM; + goto quota_err; + } + + local->stub = stub; + + GET_CONTRI_KEY (contri_key, oplocal->loc.parent->gfid, ret); + if (ret < 0) { + local->err = ENOMEM; + goto quota_err; + } + + STACK_WIND (frame, marker_rename_release_oldp_lock, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->removexattr, &local->loc, + contri_key); + } else { + frame->local = NULL; + + STACK_UNWIND_STRICT (rename, frame, op_ret, op_errno, buf, + preoldparent, postoldparent, prenewparent, + postnewparent); + + if ((op_ret < 0) || (local == NULL)) { + goto out; + } + + if (priv->feature_enabled & GF_XTIME) { + //update marks on oldpath + marker_xtime_update_marks (this, oplocal); + marker_xtime_update_marks (this, local); + } + } + +out: + if (!(priv->feature_enabled & GF_QUOTA)) { + marker_local_unref (local); + marker_local_unref (oplocal); + } + + return 0; + +quota_err: + marker_rename_release_oldp_lock (frame, NULL, this, 0, 0); + return 0; +} + + +int32_t +marker_do_rename (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno) +{ + marker_local_t *local = NULL, *oplocal = NULL; + inode_contribution_t *contribution = NULL; + + local = frame->local; + oplocal = local->oplocal; + + if (op_ret < 0) { + local->err = op_errno; + gf_log (this->name, GF_LOG_WARNING, + "cannot hold inodelk on %s (ino:%"PRId64", gfid:%s)" + "(%s)", + local->next_lock_on->path, + local->next_lock_on->inode->ino, + uuid_utoa (local->next_lock_on->inode->gfid), + strerror (op_errno)); + goto lock_err; } local = frame->local; oplocal = local->oplocal; + if (local->loc.inode != NULL) { + contribution = get_contribution_from_loc (this, &local->loc); + if (contribution == NULL) { + local->contribution = 0; + } else { + local->contribution = contribution->contribution; + } + } + + contribution = get_contribution_from_loc (this, &oplocal->loc); + if (contribution == NULL) { + oplocal->contribution = 0; + } else { + oplocal->contribution = contribution->contribution; + } + STACK_WIND (frame, marker_rename_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, &oplocal->loc, &local->loc); + return 0; -unwind: - STACK_UNWIND_STRICT (rename, frame, -1, ENOMEM, NULL, - NULL, NULL, NULL, NULL); - if (local) { - local->oplocal = NULL; - marker_local_unref (local); - GF_FREE (local); +lock_err: + if ((local->next_lock_on == NULL) + || (local->next_lock_on == &local->parent_loc)) { + local->next_lock_on = NULL; + marker_rename_release_oldp_lock (frame, NULL, this, 0, 0); + } else { + marker_rename_release_newp_lock (frame, NULL, this, 0, 0); } - if (oplocal) { - marker_local_unref (oplocal); - GF_FREE (oplocal); + + return 0; +} + + +int32_t +marker_rename_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno) +{ + marker_local_t *local = NULL, *oplocal = NULL; + loc_t *loc = NULL; + struct gf_flock lock = {0, }; + + local = frame->local; + oplocal = local->oplocal; + + if (op_ret < 0) { + if (local->next_lock_on != &oplocal->parent_loc) { + loc = &oplocal->parent_loc; + } else { + loc = &local->parent_loc; + } + + local->err = op_errno; + gf_log (this->name, GF_LOG_WARNING, + "cannot hold inodelk on %s (ino:%"PRId64", gfid:%s)" + "(%s)", loc->path, loc->inode->ino, + uuid_utoa (loc->inode->gfid), + strerror (op_errno)); + goto err; } + + if (local->next_lock_on != NULL) { + lock.l_len = 0; + lock.l_start = 0; + lock.l_type = F_WRLCK; + lock.l_whence = SEEK_SET; + + STACK_WIND (frame, + marker_do_rename, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, local->next_lock_on, + F_SETLKW, &lock); + } else { + marker_do_rename (frame, 0, this, 0, 0); + } + + return 0; + +err: + marker_rename_done (frame, NULL, this, 0, 0); return 0; } @@ -991,17 +1236,14 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, marker_local_t *local = NULL; marker_local_t *oplocal = NULL; marker_conf_t *priv = NULL; - char contri_key[512] = {0,}; + struct gf_flock lock = {0, }; + loc_t *lock_on = NULL; priv = this->private; if (priv->feature_enabled == 0) goto rename_wind; - GET_CONTRI_KEY (contri_key, oldloc->parent->gfid, ret); - if (ret < 0) - goto err; - ALLOCATE_OR_GOTO (local, marker_local_t, err); MARKER_INIT_LOCAL (frame, local); @@ -1012,18 +1254,53 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, frame->local = local; - local->oplocal = oplocal; + local->oplocal = marker_local_ref (oplocal); ret = loc_copy (&local->loc, newloc); - if (ret == -1) + if (ret < 0) goto err; ret = loc_copy (&oplocal->loc, oldloc); - if (ret == -1) + if (ret < 0) goto err; - STACK_WIND (frame, marker_quota_removexattr_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->removexattr, oldloc, contri_key); + if (!(priv->feature_enabled & GF_QUOTA)) { + goto rename_wind; + } + + ret = quota_inode_loc_fill (NULL, newloc->parent, &local->parent_loc); + if (ret < 0) + goto err; + + ret = quota_inode_loc_fill (NULL, oldloc->parent, &oplocal->parent_loc); + if (ret < 0) + goto err; + + if ((newloc->inode != NULL) && (newloc->parent != oldloc->parent) + && (uuid_compare (newloc->parent->gfid, + oldloc->parent->gfid) < 0)) { + lock_on = &local->parent_loc; + local->next_lock_on = &oplocal->parent_loc; + } else { + lock_on = &oplocal->parent_loc; + if ((newloc->inode != NULL) && (newloc->parent + != oldloc->parent)) { + local->next_lock_on = &local->parent_loc; + } + } + + lock.l_len = 0; + lock.l_start = 0; + lock.l_type = F_WRLCK; + lock.l_whence = SEEK_SET; + + STACK_WIND (frame, + marker_rename_inodelk_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, lock_on, + F_SETLKW, &lock); + return 0; rename_wind: diff --git a/xlators/features/marker/src/marker.h b/xlators/features/marker/src/marker.h index 1e62e5cbc..a5a435e3b 100644 --- a/xlators/features/marker/src/marker.h +++ b/xlators/features/marker/src/marker.h @@ -28,6 +28,7 @@ #include "xlator.h" #include "defaults.h" #include "uuid.h" +#include "call-stub.h" #define MARKER_XATTR_PREFIX "trusted.glusterfs" #define XTIME "xtime" @@ -65,10 +66,15 @@ struct marker_local{ uint32_t timebuf[2]; pid_t pid; loc_t loc; + loc_t parent_loc; + loc_t *next_lock_on; int32_t ref; int32_t ia_nlink; gf_lock_t lock; mode_t mode; + int32_t err; + call_stub_t *stub; + int64_t contribution; struct marker_local *oplocal; }; typedef struct marker_local marker_local_t; |