summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRaghavendra Bhat <raghavendrabhat@gluster.com>2011-08-21 18:53:04 +0530
committerVijay Bellur <vijay@gluster.com>2011-08-21 06:29:21 -0700
commitb6e3e9c480be4226925b51c5e9ee0c368aa94a6d (patch)
tree32bb0f3df77436fdbca48b83b0030e1369420fc6
parent08e8c966869b091fb4df8bfc8cadc37cb40719a5 (diff)
features/marker: changes in marker to avoid race conditions and corruptionsv3.3.0qa6
Change-Id: I38ddfab200d59dd4c8e9f9dd964a98f3d7aa7ab7 BUG: 3389 Reviewed-on: http://review.gluster.com/289 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vijay@gluster.com>
-rw-r--r--xlators/features/marker/src/marker-quota-helper.c15
-rw-r--r--xlators/features/marker/src/marker-quota.c259
-rw-r--r--xlators/features/marker/src/marker-quota.h1
-rw-r--r--xlators/features/marker/src/marker.c3
4 files changed, 217 insertions, 61 deletions
diff --git a/xlators/features/marker/src/marker-quota-helper.c b/xlators/features/marker/src/marker-quota-helper.c
index 2d5b234d1a8..d701cb5a346 100644
--- a/xlators/features/marker/src/marker-quota-helper.c
+++ b/xlators/features/marker/src/marker-quota-helper.c
@@ -32,8 +32,12 @@ quota_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path)
{
int ret = -1;
- if (!loc)
- return ret;
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", inode, out);
+ GF_VALIDATE_OR_GOTO ("marker", path, out);
+ /* Not checking for parent because while filling
+ * loc of root, parent will be NULL
+ */
if (inode) {
loc->inode = inode_ref (inode);
@@ -59,7 +63,7 @@ quota_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path)
loc_wipe:
if (ret < 0)
loc_wipe (loc);
-
+out:
return ret;
}
@@ -180,6 +184,7 @@ __add_new_contribution_node (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc)
uuid_copy (contribution->gfid, loc->parent->gfid);
LOCK_INIT (&contribution->lock);
+ INIT_LIST_HEAD (&contribution->contri_list);
list_add_tail (&contribution->contri_list, &ctx->contribution_head);
@@ -363,7 +368,7 @@ quota_local_unref (xlator_t *this, quota_local_t *local)
QUOTA_SAFE_DECREMENT (&local->lock, local->ref, ref);
- if (ref > 0)
+ if (ref != 0)
goto out;
if (local->fd != NULL)
@@ -374,6 +379,8 @@ quota_local_unref (xlator_t *this, quota_local_t *local)
loc_wipe (&local->parent_loc);
LOCK_DESTROY (&local->lock);
+
+ GF_FREE (local);
out:
return 0;
}
diff --git a/xlators/features/marker/src/marker-quota.c b/xlators/features/marker/src/marker-quota.c
index cb50d71a7de..5cb6dc9ad81 100644
--- a/xlators/features/marker/src/marker-quota.c
+++ b/xlators/features/marker/src/marker-quota.c
@@ -30,6 +30,46 @@
#include "marker-quota.h"
#include "marker-quota-helper.h"
+int
+mq_loc_copy (loc_t *dst, loc_t *src)
+{
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", dst, out);
+ GF_VALIDATE_OR_GOTO ("marker", src, out);
+
+ if (src->inode == NULL ||
+ src->path == NULL) {
+ gf_log ("marker", GF_LOG_WARNING,
+ "src loc is not valid");
+ goto out;
+ }
+
+ ret = loc_copy (dst, src);
+out:
+ return ret;
+}
+
+int32_t
+mq_get_local_err (quota_local_t *local,
+ int32_t *val)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", local, out);
+ GF_VALIDATE_OR_GOTO ("marker", val, out);
+
+ LOCK (&local->lock);
+ {
+ *val = local->err;
+ }
+ UNLOCK (&local->lock);
+
+ ret = 0;
+out:
+ return ret;
+}
+
int32_t
mq_get_ctx_updation_status (quota_inode_ctx_t *ctx,
gf_boolean_t *status)
@@ -182,6 +222,8 @@ release_lock_on_dirty_inode (call_frame_t *frame, void *cookie, xlator_t *this,
{
struct gf_flock lock = {0, };
quota_local_t *local = NULL;
+ loc_t loc = {0, };
+ int ret = -1;
local = frame->local;
@@ -202,11 +244,31 @@ release_lock_on_dirty_inode (call_frame_t *frame, void *cookie, xlator_t *this,
lock.l_len = 0;
lock.l_pid = 0;
+ ret = loc_copy (&loc, &local->loc);
+ if (ret == -1) {
+ local->err = -1;
+ frame->local = NULL;
+ dirty_inode_updation_done (frame, NULL, this, 0, 0);
+ return 0;
+ }
+
+ if (local->loc.inode == NULL) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Inode is NULL, so can't stackwind.");
+ goto out;
+ }
+
STACK_WIND (frame,
dirty_inode_updation_done,
FIRST_CHILD(this),
FIRST_CHILD(this)->fops->inodelk,
- this->name, &local->loc, F_SETLKW, &lock);
+ this->name, &loc, F_SETLKW, &lock);
+
+ loc_wipe (&loc);
+
+ return 0;
+out:
+ dirty_inode_updation_done (frame, NULL, this, -1, 0);
return 0;
}
@@ -339,6 +401,29 @@ err:
}
int32_t
+mq_test_and_set_local_err(quota_local_t *local,
+ int32_t *val)
+{
+ int tmp = 0;
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", local, out);
+ GF_VALIDATE_OR_GOTO ("marker", val, out);
+
+ LOCK (&local->lock);
+ {
+ tmp = local->err;
+ local->err = *val;
+ *val = tmp;
+ }
+ UNLOCK (&local->lock);
+
+ ret = 0;
+out:
+ return ret;
+}
+
+int32_t
get_dirty_inode_size (call_frame_t *frame, xlator_t *this)
{
int32_t ret = -1;
@@ -401,17 +486,19 @@ get_child_contribution (call_frame_t *frame,
QUOTA_STACK_DESTROY (frame, this);
if (op_ret == -1) {
- gf_log (this->name, GF_LOG_ERROR, "%s", strerror (op_errno));
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ strerror (op_errno));
+ val = -2;
+ if (!mq_test_and_set_local_err (local, &val) &&
+ val != -2)
+ release_lock_on_dirty_inode (local->frame, NULL, this, 0, 0);
- local->err = -2;
-
- release_lock_on_dirty_inode (local->frame, NULL, this, 0, 0);
-
- goto out;
+ goto exit;
}
- if (local->err)
- goto out;
+ ret = mq_get_local_err (local, &val);
+ if (!ret && val == -2)
+ goto exit;
GET_CONTRI_KEY (contri_key, local->loc.inode->gfid, ret);
if (ret < 0)
@@ -430,17 +517,16 @@ out:
}
UNLOCK (&local->lock);
- if (val== 0) {
- if (local->err) {
- QUOTA_SAFE_DECREMENT (&local->lock, local->ref, val);
-
- quota_local_unref (this, local);
- } else
- quota_dirty_inode_readdir (local->frame, NULL, this,
+ if (val == 0) {
+ quota_dirty_inode_readdir (local->frame, NULL, this,
0, 0, NULL);
}
+ quota_local_unref (this, local);
return 0;
+exit:
+ quota_local_unref (this, local);
+ return 0;
}
int32_t
@@ -453,6 +539,7 @@ quota_readdir_cbk (call_frame_t *frame,
{
char contri_key [512] = {0, };
int32_t ret = 0;
+ int32_t val = 0;
off_t offset = 0;
int32_t count = 0;
dict_t *dict = NULL;
@@ -493,16 +580,19 @@ quota_readdir_cbk (call_frame_t *frame,
count++;
}
+ if (count == 0) {
+ get_dirty_inode_size (frame, this);
+ return 0;
+ }
+
local->frame = frame;
- if (count > 0) {
- LOCK (&local->lock);
- {
- local->dentry_child_count = count;
- local->d_off = offset;
- }
- UNLOCK (&local->lock);
+ LOCK (&local->lock);
+ {
+ local->dentry_child_count = count;
+ local->d_off = offset;
}
+ UNLOCK (&local->lock);
list_for_each_entry (entry, (&entries->list), list) {
@@ -526,7 +616,7 @@ quota_readdir_cbk (call_frame_t *frame,
goto out;
}
- newframe->local = local;
+ newframe->local = quota_local_ref (local);
dict = dict_new ();
if (!dict) {
@@ -559,18 +649,12 @@ quota_readdir_cbk (call_frame_t *frame,
}
if (ret) {
- LOCK (&local->lock);
- {
- if (local->dentry_child_count == 0)
- local->err = -1;
- else
- local->err = -2;
- }
- UNLOCK (&local->lock);
+ val = -2;
+ mq_test_and_set_local_err (local, &val);
if (newframe) {
newframe->local = NULL;
-
+ quota_local_unref(this, local);
QUOTA_STACK_DESTROY (newframe, this);
}
@@ -578,10 +662,8 @@ quota_readdir_cbk (call_frame_t *frame,
}
}
- if (ret) {
+ if (ret && val != -2) {
release_lock_on_dirty_inode (frame, NULL, this, 0, 0);
- } else if (count == 0 ) {
- get_dirty_inode_size (frame, this);
}
return 0;
@@ -767,8 +849,7 @@ update_dirty_inode (xlator_t *this,
goto fr_destroy;
frame->local = local;
-
- ret = loc_copy (&local->loc, loc);
+ ret = mq_loc_copy (&local->loc, loc);
if (ret < 0)
goto fr_destroy;
@@ -781,6 +862,13 @@ update_dirty_inode (xlator_t *this,
lock.l_start = 0;
lock.l_len = 0;
+ if (local->loc.inode == NULL) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_WARNING,
+ "Inode is NULL, so can't stackwind.");
+ goto fr_destroy;
+ }
+
STACK_WIND (frame,
get_dirty_xattr,
FIRST_CHILD(this),
@@ -1113,26 +1201,53 @@ err:
int32_t
get_parent_inode_local (xlator_t *this, quota_local_t *local)
{
- int32_t ret;
+ int32_t ret = -1;
quota_inode_ctx_t *ctx = NULL;
+ GF_VALIDATE_OR_GOTO ("marker", this, out);
+ GF_VALIDATE_OR_GOTO ("marker", local, out);
+
loc_wipe (&local->loc);
- loc_copy (&local->loc, &local->parent_loc);
+ ret = mq_loc_copy (&local->loc, &local->parent_loc);
+ if (ret < 0) {
+ gf_log_callingfn (this->name, GF_LOG_WARNING,
+ "loc copy failed");
+ goto out;
+ }
loc_wipe (&local->parent_loc);
- quota_inode_loc_fill (NULL, local->loc.parent, &local->parent_loc);
+ ret = quota_inode_loc_fill (NULL, local->loc.parent,
+ &local->parent_loc);
+ if (ret < 0) {
+ gf_log_callingfn (this->name, GF_LOG_WARNING,
+ "failed to build parent loc of %s",
+ local->loc.path);
+ goto out;
+ }
ret = quota_inode_ctx_get (local->loc.inode, this, &ctx);
- if (ret < 0)
- return -1;
+ if (ret < 0) {
+ gf_log_callingfn (this->name, GF_LOG_WARNING,
+ "inode ctx get failed");
+ goto out;
+ }
local->ctx = ctx;
+ if (list_empty (&ctx->contribution_head)) {
+ gf_log_callingfn (this->name, GF_LOG_WARNING,
+ "contribution node list is empty which "
+ "is an error");
+ goto out;
+ }
+
local->contri = (inode_contribution_t *) ctx->contribution_head.next;
- return 0;
+ ret = 0;
+out:
+ return ret;
}
@@ -1228,6 +1343,12 @@ quota_release_parent_lock (call_frame_t *frame, void *cookie,
}
UNLOCK (&ctx->lock);
+ if (local->parent_loc.inode == NULL) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Invalid parent inode.");
+ goto err;
+ }
+
wind:
lock.l_type = F_UNLCK;
lock.l_whence = SEEK_SET;
@@ -1243,6 +1364,10 @@ wind:
F_SETLKW, &lock);
return 0;
+err:
+ xattr_updation_done (frame, NULL, this,
+ 0, 0 , NULL);
+ return 0;
}
@@ -1591,6 +1716,11 @@ quota_fetch_child_size_and_contri (call_frame_t *frame, void *cookie,
if (local->loc.inode->ia_type == IA_IFDIR) {
ret = dict_set_int64 (newdict, QUOTA_SIZE_KEY, 0);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "dict_set failed.");
+ goto err;
+ }
}
GET_CONTRI_KEY (contri_key, local->contri->gfid, ret);
@@ -1600,6 +1730,11 @@ quota_fetch_child_size_and_contri (call_frame_t *frame, void *cookie,
}
ret = dict_set_int64 (newdict, contri_key, 0);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "dict_set failed.");
+ goto err;
+ }
mq_set_ctx_updation_status (local->ctx, _gf_false);
@@ -1609,7 +1744,7 @@ quota_fetch_child_size_and_contri (call_frame_t *frame, void *cookie,
ret = 0;
err:
- if ((op_ret == -1) || (ret == -1)) {
+ if ((op_ret == -1) || (ret < 0)) {
local->err = op_errno;
mq_set_ctx_updation_status (local->ctx, _gf_false);
@@ -1697,6 +1832,13 @@ get_lock_on_parent (call_frame_t *frame, xlator_t *this)
gf_log (this->name, GF_LOG_DEBUG, "taking lock on %s",
local->parent_loc.path);
+ if (local->parent_loc.inode == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "parent inode is not valid, aborting "
+ "transaction.");
+ goto fr_destroy;
+ }
+
lock.l_len = 0;
lock.l_start = 0;
lock.l_type = F_WRLCK;
@@ -1738,7 +1880,7 @@ start_quota_txn (xlator_t *this, loc_t *loc,
frame->local = local;
- ret = loc_copy (&local->loc, loc);
+ ret = mq_loc_copy (&local->loc, loc);
if (ret < 0)
goto fr_destroy;
@@ -1773,7 +1915,9 @@ initiate_quota_txn (xlator_t *this, loc_t *loc)
quota_inode_ctx_t *ctx = NULL;
inode_contribution_t *contribution = NULL;
- VALIDATE_OR_GOTO (loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", this, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
ret = quota_inode_ctx_get (loc->inode, this, &ctx);
if (ret == -1) {
@@ -2073,14 +2217,14 @@ quota_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this,
}
if (strcmp (local->parent_loc.path, "/") != 0) {
- get_parent_inode_local (this, local);
+ ret = get_parent_inode_local (this, local);
+ if (ret < 0)
+ goto out;
start_quota_txn (this, &local->loc, local->ctx, local->contri);
}
-
- /* TODO: free local in quota_local_unref only*/
+out:
quota_local_unref (this, local);
- GF_FREE (local);
return 0;
}
@@ -2237,7 +2381,7 @@ reduce_parent_size (xlator_t *this, loc_t *loc, int64_t contri)
goto out;
}
- ret = loc_copy (&local->loc, loc);
+ ret = mq_loc_copy (&local->loc, loc);
if (ret < 0)
goto out;
@@ -2263,6 +2407,13 @@ reduce_parent_size (xlator_t *this, loc_t *loc, int64_t contri)
lock.l_type = F_WRLCK;
lock.l_whence = SEEK_SET;
+ if (local->parent_loc.inode == NULL) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_WARNING,
+ "Inode is NULL, so can't stackwind.");
+ goto out;
+ }
+
STACK_WIND (frame,
mq_reduce_parent_size_xattr,
FIRST_CHILD(this),
@@ -2272,10 +2423,8 @@ reduce_parent_size (xlator_t *this, loc_t *loc, int64_t contri)
ret = 0;
out:
- if (local != NULL) {
+ if (local != NULL)
quota_local_unref (this, local);
- GF_FREE (local);
- }
return ret;
}
diff --git a/xlators/features/marker/src/marker-quota.h b/xlators/features/marker/src/marker-quota.h
index 70524ab07f5..9633533b3eb 100644
--- a/xlators/features/marker/src/marker-quota.h
+++ b/xlators/features/marker/src/marker-quota.h
@@ -43,7 +43,6 @@
_frame->local = NULL; \
STACK_DESTROY (_frame->root); \
quota_local_unref (_this, _local); \
- GF_FREE (_local); \
} while (0)
diff --git a/xlators/features/marker/src/marker.c b/xlators/features/marker/src/marker.c
index 33e0a477d1e..d0f01465ffa 100644
--- a/xlators/features/marker/src/marker.c
+++ b/xlators/features/marker/src/marker.c
@@ -772,6 +772,7 @@ marker_unlink_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
local = frame->local;
if (local == NULL) {
+ op_errno = EINVAL;
goto err;
}
@@ -781,7 +782,7 @@ marker_unlink_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
FIRST_CHILD(this)->fops->unlink, &local->loc);
return 0;
err:
- STACK_UNWIND_STRICT (unlink, frame, -1, ENOMEM, NULL, NULL);
+ STACK_UNWIND_STRICT (unlink, frame, -1, op_errno, NULL, NULL);
return 0;
}