summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/afr/src/afr-common.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/afr/src/afr-common.c')
-rw-r--r--xlators/cluster/afr/src/afr-common.c777
1 files changed, 482 insertions, 295 deletions
diff --git a/xlators/cluster/afr/src/afr-common.c b/xlators/cluster/afr/src/afr-common.c
index 3ddfa6dff7b..e5046cb69d8 100644
--- a/xlators/cluster/afr/src/afr-common.c
+++ b/xlators/cluster/afr/src/afr-common.c
@@ -329,6 +329,8 @@ afr_local_sh_cleanup (afr_local_t *local, xlator_t *this)
if (sh->linkname)
GF_FREE ((char *)sh->linkname);
+ if (sh->child_success)
+ GF_FREE (sh->child_success);
loc_wipe (&sh->parent_loc);
}
@@ -418,6 +420,18 @@ afr_local_cleanup (afr_local_t *local, xlator_t *this)
if (local->cont.lookup.inode) {
inode_unref (local->cont.lookup.inode);
}
+
+ if (local->cont.lookup.postparents)
+ GF_FREE (local->cont.lookup.postparents);
+
+ if (local->cont.lookup.bufs)
+ GF_FREE (local->cont.lookup.bufs);
+
+ if (local->cont.lookup.child_success)
+ GF_FREE (local->cont.lookup.child_success);
+
+ if (local->cont.lookup.sources)
+ GF_FREE (local->cont.lookup.sources);
}
{ /* getxattr */
@@ -510,6 +524,22 @@ afr_up_children_count (int child_count, unsigned char *child_up)
return ret;
}
+gf_boolean_t
+afr_is_fresh_lookup (loc_t *loc, xlator_t *this)
+{
+ uint64_t ctx = 0;
+ int32_t ret = 0;
+
+ GF_ASSERT (loc);
+ GF_ASSERT (this);
+ GF_ASSERT (loc->inode);
+
+ ret = inode_ctx_get (loc->inode, this, &ctx);
+ if (0 == ret)
+ return _gf_false;
+ return _gf_true;
+}
+
void
afr_update_loc_gfids (loc_t *loc, struct iatt *buf, struct iatt *postparent)
{
@@ -534,68 +564,96 @@ afr_self_heal_lookup_unwind (call_frame_t *frame, xlator_t *this)
}
AFR_STACK_UNWIND (lookup, frame, local->op_ret, local->op_errno,
- local->cont.lookup.inode,
- &local->cont.lookup.buf,
+ local->cont.lookup.inode, &local->cont.lookup.buf,
local->cont.lookup.xattr,
&local->cont.lookup.postparent);
return 0;
}
+void
+afr_lookup_build_response_params (afr_local_t *local, xlator_t *this)
+{
+ int32_t read_child = -1;
+ struct iatt *buf = NULL;
+ struct iatt *postparent = NULL;
+ dict_t **xattr = NULL;
-static void
-afr_lookup_collect_xattr (afr_local_t *local, xlator_t *this,
- int child_index, dict_t *xattr)
+ GF_ASSERT (local);
+ GF_ASSERT (local->cont.lookup.read_child >= 0);
+
+ buf = &local->cont.lookup.buf;
+ postparent = &local->cont.lookup.postparent;
+ xattr = &local->cont.lookup.xattr;
+
+ read_child = local->cont.lookup.read_child;
+ *xattr = dict_ref (local->cont.lookup.xattrs[read_child]);
+ *buf = local->cont.lookup.bufs[read_child];
+ *postparent = local->cont.lookup.postparents[read_child];
+
+ if (IA_INVAL == local->cont.lookup.inode->ia_type) {
+ /* fix for RT #602 */
+ local->cont.lookup.inode->ia_type = buf->ia_type;
+ }
+}
+
+
+ static void
+afr_lookup_update_lk_counts (afr_local_t *local, xlator_t *this,
+ int child_index, dict_t *xattr)
{
uint32_t inodelk_count = 0;
uint32_t entrylk_count = 0;
- int ret = 0;
+ int ret = -1;
+
+ GF_ASSERT (local);
+ GF_ASSERT (this);
+ GF_ASSERT (xattr);
+ GF_ASSERT (child_index >= 0);
+
+ ret = dict_get_uint32 (xattr, GLUSTERFS_INODELK_COUNT,
+ &inodelk_count);
+ if (ret == 0)
+ local->inodelk_count += inodelk_count;
- if (afr_sh_has_metadata_pending (xattr, child_index, this)) {
+ ret = dict_get_uint32 (xattr, GLUSTERFS_ENTRYLK_COUNT,
+ &entrylk_count);
+ if (ret == 0)
+ local->entrylk_count += entrylk_count;
+}
+
+static void
+afr_lookup_detect_self_heal_by_xattr (afr_local_t *local, xlator_t *this,
+ dict_t *xattr)
+{
+ GF_ASSERT (local);
+ GF_ASSERT (this);
+ GF_ASSERT (xattr);
+
+ if (afr_sh_has_metadata_pending (xattr, this)) {
local->self_heal.need_metadata_self_heal = _gf_true;
gf_log(this->name, GF_LOG_DEBUG,
"metadata self-heal is pending for %s.",
local->loc.path);
}
- if (afr_sh_has_entry_pending (xattr, child_index, this)) {
+ if (afr_sh_has_entry_pending (xattr, this)) {
local->self_heal.need_entry_self_heal = _gf_true;
gf_log(this->name, GF_LOG_DEBUG,
"entry self-heal is pending for %s.", local->loc.path);
}
- if (afr_sh_has_data_pending (xattr, child_index, this)) {
+ if (afr_sh_has_data_pending (xattr, this)) {
local->self_heal.need_data_self_heal = _gf_true;
gf_log(this->name, GF_LOG_DEBUG,
"data self-heal is pending for %s.", local->loc.path);
}
-
- ret = dict_get_uint32 (xattr, GLUSTERFS_INODELK_COUNT,
- &inodelk_count);
- if (ret == 0)
- local->inodelk_count += inodelk_count;
-
- ret = dict_get_uint32 (xattr, GLUSTERFS_ENTRYLK_COUNT,
- &entrylk_count);
- if (ret == 0)
- local->entrylk_count += entrylk_count;
}
-
static void
-afr_lookup_self_heal_check (xlator_t *this, afr_local_t *local,
+afr_detect_self_heal_by_iatt (afr_local_t *local, xlator_t *this,
struct iatt *buf, struct iatt *lookup_buf)
{
- if (FILETYPE_DIFFERS (buf, lookup_buf)) {
- /* mismatching filetypes with same name
- */
-
- gf_log (this->name, GF_LOG_INFO,
- "filetype differs for %s ", local->loc.path);
-
- local->govinda_gOvinda = 1;
- }
-
if (PERMISSION_DIFFERS (buf, lookup_buf)) {
/* mismatching permissions */
gf_log (this->name, GF_LOG_INFO,
@@ -624,105 +682,298 @@ afr_lookup_self_heal_check (xlator_t *this, afr_local_t *local,
}
}
-
static void
-afr_lookup_done (call_frame_t *frame, xlator_t *this, struct iatt *lookup_buf)
+afr_detect_self_heal_by_lookup_status (afr_local_t *local, xlator_t *this)
{
- int unwind = 1;
- int source = -1;
- int up_count = 0;
- char sh_type_str[256] = {0,};
- afr_private_t *priv = NULL;
- afr_local_t *local = NULL;
-
- priv = this->private;
- local = frame->local;
-
- up_count = afr_up_children_count (priv->child_count, priv->child_up);
- if (up_count == 1) {
- gf_log (this->name, GF_LOG_DEBUG,
- "Only 1 child up - do not attempt to detect self heal");
-
- goto unwind;
- }
+ GF_ASSERT (local);
+ GF_ASSERT (this);
- if (local->success_count && local->enoent_count) {
+ if ((local->success_count > 0) && (local->enoent_count > 0)) {
local->self_heal.need_metadata_self_heal = _gf_true;
local->self_heal.need_data_self_heal = _gf_true;
local->self_heal.need_entry_self_heal = _gf_true;
gf_log(this->name, GF_LOG_INFO,
"entries are missing in lookup of %s.",
local->loc.path);
+ //If all self-heals are needed no need to check for other rules
+ goto out;
}
- if (local->success_count) {
- /* check for split-brain case in previous lookup */
- if (afr_is_split_brain (this, local->cont.lookup.inode)) {
+ if (local->success_count > 0) {
+ if (afr_is_split_brain (this, local->cont.lookup.inode) &&
+ IA_ISREG (local->cont.lookup.inode->ia_type)) {
local->self_heal.need_data_self_heal = _gf_true;
- gf_log(this->name, GF_LOG_WARNING,
- "split brain detected during lookup of %s.",
- local->loc.path);
+ gf_log (this->name, GF_LOG_WARNING,
+ "split brain detected during lookup of %s.",
+ local->loc.path);
}
}
- if ((local->self_heal.need_metadata_self_heal
- || local->self_heal.need_data_self_heal
- || local->self_heal.need_entry_self_heal)
- && ((!local->cont.lookup.is_revalidate)
- || (local->op_ret != -1))) {
+out:
+ return;
+}
- if (local->inodelk_count || local->entrylk_count) {
+gf_boolean_t
+afr_can_self_heal_proceed (afr_self_heal_t *sh, afr_private_t *priv)
+{
+ GF_ASSERT (sh);
+ GF_ASSERT (priv);
- /* Someone else is doing self-heal on this file.
- So just make a best effort to set the read-subvolume
- and return */
+ return ((priv->data_self_heal && sh->need_data_self_heal)
+ || (priv->metadata_self_heal && sh->need_metadata_self_heal)
+ || (priv->entry_self_heal && sh->need_entry_self_heal));
+}
- if (IA_ISREG (local->cont.lookup.inode->ia_type)) {
- source = afr_self_heal_get_source (this, local, local->cont.lookup.xattrs);
+gf_boolean_t
+afr_is_self_heal_enabled (afr_private_t *priv)
+{
+ GF_ASSERT (priv);
- if (source >= 0) {
- afr_set_read_child (this,
- local->cont.lookup.inode,
- source);
- }
- }
- goto unwind;
+ return (priv->data_self_heal || priv->metadata_self_heal
+ || priv->entry_self_heal);
+}
+
+int
+afr_lookup_select_read_child (afr_local_t *local, xlator_t *this,
+ int32_t *read_child)
+{
+ int32_t source = -1;
+ ia_type_t ia_type = 0;
+ int ret = -1;
+ afr_transaction_type type = AFR_METADATA_TRANSACTION;
+ dict_t **xattrs = NULL;
+ int32_t *child_success = NULL;
+ struct iatt *bufs = NULL;
+
+ GF_ASSERT (local);
+ GF_ASSERT (this);
+
+ bufs = local->cont.lookup.bufs;
+ child_success = local->cont.lookup.child_success;
+ ia_type = local->cont.lookup.bufs[child_success[0]].ia_type;
+ if (IA_ISDIR (ia_type)) {
+ type = AFR_ENTRY_TRANSACTION;
+ } else if (IA_ISREG (ia_type)) {
+ type = AFR_DATA_TRANSACTION;
+ }
+ xattrs = local->cont.lookup.xattrs;
+ source = afr_lookup_select_read_child_by_txn_type (this, local, xattrs,
+ type);
+ if (source < 0)
+ goto out;
+
+ *read_child = source;
+ ret = 0;
+out:
+ return ret;
+}
+
+static inline gf_boolean_t
+afr_is_self_heal_running (afr_local_t *local)
+{
+ GF_ASSERT (local);
+ return ((local->inodelk_count > 0) || (local->entrylk_count > 0));
+}
+
+static void
+afr_launch_self_heal (call_frame_t *frame, xlator_t *this,
+ gf_boolean_t is_background, ia_type_t ia_type,
+ int (*unwind) (call_frame_t *frame, xlator_t *this))
+{
+ afr_local_t *local = NULL;
+ char sh_type_str[256] = {0,};
+
+ GF_ASSERT (frame);
+ GF_ASSERT (this);
+
+ local = frame->local;
+ local->self_heal.background = is_background;
+ local->self_heal.type = ia_type;
+ local->self_heal.unwind = unwind;
+
+ afr_self_heal_type_str_get (&local->self_heal,
+ sh_type_str,
+ sizeof (sh_type_str));
+
+ gf_log (this->name, GF_LOG_INFO,
+ "background %s self-heal triggered. path: %s",
+ sh_type_str, local->loc.path);
+
+ afr_self_heal (frame, this);
+}
+
+static void
+afr_lookup_detect_self_heal (afr_local_t *local, xlator_t *this)
+{
+ int i = 0;
+ struct iatt *bufs = NULL;
+ dict_t **xattr = NULL;
+ afr_private_t *priv = NULL;
+ int32_t child1 = -1;
+ int32_t child2 = -1;
+
+ afr_detect_self_heal_by_lookup_status (local, this);
+
+ bufs = local->cont.lookup.bufs;
+ for (i = 1; i < local->success_count; i++) {
+ child1 = local->cont.lookup.child_success[i-1];
+ child2 = local->cont.lookup.child_success[i];;
+ afr_detect_self_heal_by_iatt (local, this,
+ &bufs[child1], &bufs[child2]);
+ }
+
+ xattr = local->cont.lookup.xattrs;
+ priv = this->private;
+ for (i = 0; i < local->success_count; i++) {
+ child1 = local->cont.lookup.child_success[i];;
+ afr_lookup_detect_self_heal_by_xattr (local, this,
+ xattr[child1]);
+ }
+}
+
+static void
+afr_lookup_perform_self_heal_if_needed (call_frame_t *frame, xlator_t *this,
+ gf_boolean_t *sh_launched)
+{
+ size_t up_count = 0;
+ afr_private_t *priv = NULL;
+ afr_local_t *local = NULL;
+
+ GF_ASSERT (sh_launched);
+ *sh_launched = _gf_false;
+ priv = this->private;
+ local = frame->local;
+
+ up_count = afr_up_children_count (priv->child_count, local->child_up);
+ if (up_count == 1) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Only 1 child up - do not attempt to detect self heal");
+ goto out;
+ }
+
+ if (_gf_false == afr_is_self_heal_enabled (priv)) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Self heal is not enabled");
+ goto out;
+ }
+
+ afr_lookup_detect_self_heal (local, this);
+ if (afr_can_self_heal_proceed (&local->self_heal, priv)) {
+ if (afr_is_self_heal_running (local)) {
+ goto out;
}
- if (!local->cont.lookup.inode->ia_type) {
- /* fix for RT #602 */
- local->cont.lookup.inode->ia_type =
- lookup_buf->ia_type;
+ afr_launch_self_heal (frame, this, _gf_true,
+ local->cont.lookup.buf.ia_type,
+ afr_self_heal_lookup_unwind);
+ *sh_launched = _gf_true;
+ }
+out:
+ return;
+}
+
+static gf_boolean_t
+afr_lookup_split_brain (afr_local_t *local, xlator_t *this)
+{
+ int i = 0;
+ gf_boolean_t symptom = _gf_false;
+ struct iatt *bufs = NULL;
+ int32_t *child_success = NULL;
+ struct iatt *child1 = NULL;
+ struct iatt *child2 = NULL;
+ const char *path = NULL;
+
+ bufs = local->cont.lookup.bufs;
+ child_success = local->cont.lookup.child_success;
+ for (i = 1; i < local->success_count; i++) {
+ child1 = &bufs[child_success[i-1]];
+ child2 = &bufs[child_success[i]];
+ /*
+ * TODO: gfid self-heal
+ * if (uuid_compare (child1->ia_gfid, child2->ia_gfid)) {
+ * gf_log (this->name, GF_LOG_WARNING, "%s: gfid differs"
+ * " on subvolumes (%d, %d)", local->loc.path,
+ * child_success[i-1], child_success[i]);
+ * symptom = _gf_true;
+ * }
+ */
+
+ if (FILETYPE_DIFFERS (child1, child2)) {
+ path = local->loc.path;
+ gf_log (this->name, GF_LOG_WARNING, "%s: filetype "
+ "differs on subvolumes (%d, %d)", path,
+ child_success[i-1], child_success[i]);
+ symptom = _gf_true;
+ local->govinda_gOvinda = 1;
}
+ if (symptom)
+ break;
+ }
+ return symptom;
+}
- local->self_heal.background = _gf_true;
- local->self_heal.type = local->cont.lookup.buf.ia_type;
- local->self_heal.unwind = afr_self_heal_lookup_unwind;
+static int
+afr_lookup_set_read_child (afr_local_t *local, xlator_t *this, int32_t read_child)
+{
+ GF_ASSERT (read_child >= 0);
- unwind = 0;
+ afr_set_read_child (this, local->cont.lookup.inode, read_child);
+ local->cont.lookup.read_child = read_child;
- afr_self_heal_type_str_get(&local->self_heal,
- sh_type_str,
- sizeof(sh_type_str));
+ return 0;
+}
- gf_log (this->name, GF_LOG_INFO,
- "background %s self-heal triggered. path: %s",
- sh_type_str, local->loc.path);
+static void
+afr_lookup_done (call_frame_t *frame, xlator_t *this)
+{
+ int unwind = 1;
+ afr_private_t *priv = NULL;
+ afr_local_t *local = NULL;
+ int ret = -1;
+ gf_boolean_t sh_launched = _gf_false;
+ int32_t read_child = -1;
+
+ priv = this->private;
+ local = frame->local;
- afr_self_heal (frame, this);
+ if (local->op_ret < 0)
+ goto unwind;
+
+ if (_gf_true == afr_lookup_split_brain (local, this)) {
+ local->op_ret = -1;
+ local->op_errno = EIO;
+ goto unwind;
}
-unwind:
- if (unwind) {
- AFR_STACK_UNWIND (lookup, frame, local->op_ret,
- local->op_errno,
- local->cont.lookup.inode,
- &local->cont.lookup.buf,
- local->cont.lookup.xattr,
- &local->cont.lookup.postparent);
+ ret = afr_lookup_select_read_child (local, this, &read_child);
+ if (ret) {
+ local->op_ret = -1;
+ local->op_errno = EIO;
+ goto unwind;
}
-}
+ ret = afr_lookup_set_read_child (local, this, read_child);
+ if (ret)
+ goto unwind;
+
+ afr_lookup_build_response_params (local, this);
+ if (afr_is_fresh_lookup (&local->loc, this)) {
+ afr_update_loc_gfids (&local->loc, &local->cont.lookup.buf,
+ &local->cont.lookup.postparent);
+ }
+
+ afr_lookup_perform_self_heal_if_needed (frame, this, &sh_launched);
+ if (sh_launched)
+ unwind = 0;
+ unwind:
+ if (unwind) {
+ AFR_STACK_UNWIND (lookup, frame, local->op_ret,
+ local->op_errno, local->cont.lookup.inode,
+ &local->cont.lookup.buf,
+ local->cont.lookup.xattr,
+ &local->cont.lookup.postparent);
+ }
+}
/*
* During a lookup, some errors are more "important" than
@@ -749,236 +1000,170 @@ __error_more_important (int32_t old_errno, int32_t new_errno)
return ret;
}
-
-int
-afr_fresh_lookup_cbk (call_frame_t *frame, void *cookie,
- xlator_t *this, int32_t op_ret, int32_t op_errno,
- inode_t *inode, struct iatt *buf, dict_t *xattr,
- struct iatt *postparent)
+static void
+afr_lookup_handle_error (afr_local_t *local, int32_t op_ret, int32_t op_errno)
{
- afr_local_t * local = NULL;
- afr_private_t * priv = NULL;
- struct iatt * lookup_buf = NULL;
- int call_count = -1;
- int child_index = -1;
- int first_up_child = -1;
-
- child_index = (long) cookie;
- priv = this->private;
-
- LOCK (&frame->lock);
- {
- local = frame->local;
-
- lookup_buf = &local->cont.lookup.buf;
-
- if (op_ret == -1) {
- if (op_errno == ENOENT)
- local->enoent_count++;
-
- if (__error_more_important (local->op_errno, op_errno))
- local->op_errno = op_errno;
-
- if (local->op_errno == ESTALE) {
- local->op_ret = -1;
- }
-
- goto unlock;
- }
-
- afr_lookup_collect_xattr (local, this, child_index, xattr);
+ GF_ASSERT (local);
+ if (op_errno == ENOENT)
+ local->enoent_count++;
- first_up_child = afr_first_up_child (priv);
-
- if (local->success_count == 0) {
- if (local->op_errno != ESTALE)
- local->op_ret = op_ret;
-
- local->cont.lookup.inode = inode_ref (inode);
- local->cont.lookup.xattr = dict_ref (xattr);
- local->cont.lookup.xattrs[child_index] = dict_ref (xattr);
- local->cont.lookup.postparent = *postparent;
-
- if (priv->first_lookup && inode->ino == 1) {
- gf_log (this->name, GF_LOG_INFO,
- "added root inode");
- priv->root_inode = inode_ref (inode);
- priv->first_lookup = 0;
- }
-
- *lookup_buf = *buf;
-
- uuid_copy (local->loc.gfid, buf->ia_gfid);
- uuid_copy (local->loc.pargfid,
- postparent->ia_gfid);
-
- if (priv->read_child >= 0) {
- afr_set_read_child (this,
- local->cont.lookup.inode,
- priv->read_child);
- } else {
- afr_set_read_child (this,
- local->cont.lookup.inode,
- child_index);
- }
+ if (__error_more_important (local->op_errno, op_errno))
+ local->op_errno = op_errno;
- } else {
- afr_lookup_self_heal_check (this, local, buf, lookup_buf);
+ if (local->op_errno == ESTALE) {
+ local->op_ret = -1;
+ }
+}
- if (child_index == local->read_child_index) {
- /*
- lookup has succeeded on the read child.
- So use its inode number
- */
- if (local->cont.lookup.xattr)
- dict_unref (local->cont.lookup.xattr);
+static void
+afr_set_root_inode_on_first_lookup (afr_local_t *local, xlator_t *this,
+ inode_t *inode)
+{
+ afr_private_t *priv = NULL;
+ GF_ASSERT (inode);
- local->cont.lookup.xattr = dict_ref (xattr);
- local->cont.lookup.xattrs[child_index] = dict_ref (xattr);
- local->cont.lookup.postparent = *postparent;
+ if (inode->ino != 1)
+ goto out;
+ if (!afr_is_fresh_lookup (&local->loc, this))
+ goto out;
+ priv = this->private;
+ if ((priv->first_lookup)) {
+ gf_log (this->name, GF_LOG_INFO, "added root inode");
+ priv->root_inode = inode_ref (inode);
+ priv->first_lookup = 0;
+ }
+out:
+ return;
+}
- *lookup_buf = *buf;
+static void
+afr_lookup_cache_args (afr_local_t *local, int child_index, dict_t *xattr,
+ struct iatt *buf, struct iatt *postparent)
+{
+ GF_ASSERT (child_index >= 0);
+ local->cont.lookup.xattrs[child_index] = dict_ref (xattr);
+ local->cont.lookup.postparents[child_index] = *postparent;
+ local->cont.lookup.bufs[child_index] = *buf;
+}
- uuid_copy (local->loc.gfid, buf->ia_gfid);
- uuid_copy (local->loc.pargfid,
- postparent->ia_gfid);
- }
+static void
+afr_lookup_handle_first_success (afr_local_t *local, xlator_t *this,
+ inode_t *inode, struct iatt *buf)
+{
+ local->cont.lookup.inode = inode_ref (inode);
+ local->cont.lookup.buf = *buf;
+ afr_set_root_inode_on_first_lookup (local, this, inode);
+}
+static void
+afr_lookup_handle_success (afr_local_t *local, xlator_t *this, int32_t child_index,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *buf, dict_t *xattr,
+ struct iatt *postparent)
+{
+ if (local->success_count == 0) {
+ if (local->op_errno != ESTALE) {
+ local->op_ret = op_ret;
+ local->op_errno = 0;
}
-
- local->success_count++;
- }
-unlock:
- UNLOCK (&frame->lock);
-
- call_count = afr_frame_return (frame);
-
- if (call_count == 0) {
- afr_lookup_done (frame, this, lookup_buf);
+ afr_lookup_handle_first_success (local, this, inode, buf);
}
+ afr_lookup_update_lk_counts (local, this,
+ child_index, xattr);
- return 0;
+ afr_lookup_cache_args (local, child_index, xattr,
+ buf, postparent);
+ local->cont.lookup.child_success[local->success_count] = child_index;
+ local->success_count++;
}
-
int
-afr_revalidate_lookup_cbk (call_frame_t *frame, void *cookie,
- xlator_t *this, int32_t op_ret, int32_t op_errno,
- inode_t *inode, struct iatt *buf, dict_t *xattr,
- struct iatt *postparent)
+afr_lookup_cbk (call_frame_t *frame, void *cookie,
+ xlator_t *this, int32_t op_ret, int32_t op_errno,
+ inode_t *inode, struct iatt *buf, dict_t *xattr,
+ struct iatt *postparent)
{
afr_local_t * local = NULL;
- afr_private_t * priv = NULL;
- struct iatt * lookup_buf = NULL;
int call_count = -1;
int child_index = -1;
- int first_up_child = -1;
- child_index = (long) cookie;
- priv = this->private;
+ child_index = (long) cookie;
LOCK (&frame->lock);
{
local = frame->local;
- lookup_buf = &local->cont.lookup.buf;
-
if (op_ret == -1) {
- if (op_errno == ENOENT)
- local->enoent_count++;
-
- if (__error_more_important (local->op_errno, op_errno))
- local->op_errno = op_errno;
-
- if (local->op_errno == ESTALE) {
- local->op_ret = -1;
- }
-
+ afr_lookup_handle_error (local, op_ret, op_errno);
goto unlock;
}
+ afr_lookup_handle_success (local, this, child_index, op_ret,
+ op_errno, inode, buf, xattr,
+ postparent);
- afr_lookup_collect_xattr (local, this, child_index, xattr);
-
- first_up_child = afr_first_up_child (priv);
-
- /* in case of revalidate, we need to send stat of the
- * child whose stat was sent during the first lookup.
- * (so that time stamp does not vary with revalidate.
- * in case it is down, stat of the fist success will
- * be replied */
-
- /* inode number should be preserved across revalidates */
-
- if (local->success_count == 0) {
- if (local->op_errno != ESTALE)
- local->op_ret = op_ret;
-
- local->cont.lookup.inode = inode_ref (inode);
- local->cont.lookup.xattr = dict_ref (xattr);
- local->cont.lookup.xattrs[child_index] = dict_ref (xattr);
- local->cont.lookup.postparent = *postparent;
-
- *lookup_buf = *buf;
-
- if (priv->read_child >= 0) {
- afr_set_read_child (this,
- local->cont.lookup.inode,
- priv->read_child);
- } else {
- afr_set_read_child (this,
- local->cont.lookup.inode,
- child_index);
- }
-
- } else {
- afr_lookup_self_heal_check (this, local, buf, lookup_buf);
-
- if (child_index == local->read_child_index) {
+ }
+unlock:
+ UNLOCK (&frame->lock);
- /*
- lookup has succeeded on the read child.
- So use its inode number
- */
+ call_count = afr_frame_return (frame);
+ if (call_count == 0) {
+ afr_lookup_done (frame, this);
+ }
- if (local->cont.lookup.xattr)
- dict_unref (local->cont.lookup.xattr);
+ return 0;
+}
- local->cont.lookup.xattr = dict_ref (xattr);
- local->cont.lookup.xattrs[child_index] = dict_ref (xattr);
- local->cont.lookup.postparent = *postparent;
+int
+afr_lookup_cont_init (afr_local_t *local, unsigned int child_count)
+{
+ int ret = -ENOMEM;
+ int32_t *child_success = NULL;
+ struct iatt *iatts = NULL;
+ int i = 0;
- *lookup_buf = *buf;
- }
+ GF_ASSERT (local);
+ local->cont.lookup.xattrs = GF_CALLOC (child_count,
+ sizeof (*local->cont.lookup.xattr),
+ gf_afr_mt_dict_t);
+ if (NULL == local->cont.lookup.xattrs)
+ goto out;
- }
+ iatts = GF_CALLOC (child_count, sizeof (*iatts), gf_afr_mt_iatt);
+ if (NULL == iatts)
+ goto out;
+ local->cont.lookup.postparents = iatts;
- local->success_count++;
- }
-unlock:
- UNLOCK (&frame->lock);
+ iatts = GF_CALLOC (child_count, sizeof (*iatts), gf_afr_mt_iatt);
+ if (NULL == iatts)
+ goto out;
+ local->cont.lookup.bufs = iatts;
- call_count = afr_frame_return (frame);
+ child_success = GF_CALLOC (child_count, sizeof (*child_success),
+ gf_afr_mt_char);
+ if (NULL == child_success)
+ goto out;
+ for (i = 0; i < child_count; i++)
+ child_success[i] = -1;
- if (call_count == 0) {
- afr_lookup_done (frame, this, lookup_buf);
- }
+ local->cont.lookup.child_success = child_success;
- return 0;
+ local->cont.lookup.read_child = -1;
+ ret = 0;
+out:
+ return ret;
}
-
int
afr_lookup (call_frame_t *frame, xlator_t *this,
loc_t *loc, dict_t *xattr_req)
{
- afr_private_t *priv = NULL;
- afr_local_t *local = NULL;
- int ret = -1;
- int i = 0;
- fop_lookup_cbk_t callback = NULL;
- int call_count = 0;
- uint64_t ctx = 0;
- int32_t op_errno = 0;
+ afr_private_t *priv = NULL;
+ afr_local_t *local = NULL;
+ int ret = -1;
+ int i = 0;
+ int call_count = 0;
+ uint64_t ctx = 0;
+ int32_t op_errno = 0;
priv = this->private;
@@ -999,14 +1184,9 @@ afr_lookup (call_frame_t *frame, xlator_t *this,
if (ret == 0) {
/* lookup is a revalidate */
- callback = afr_revalidate_lookup_cbk;
-
- local->cont.lookup.is_revalidate = _gf_true;
local->read_child_index = afr_read_child (this,
loc->inode);
} else {
- callback = afr_fresh_lookup_cbk;
-
LOCK (&priv->read_child_lock);
{
local->read_child_index = (++priv->read_child_rr)
@@ -1019,10 +1199,16 @@ afr_lookup (call_frame_t *frame, xlator_t *this,
local->cont.lookup.parent_ino = loc->parent->ino;
local->child_up = memdup (priv->child_up, priv->child_count);
+ if (NULL == local->child_up) {
+ op_errno = ENOMEM;
+ goto out;
+ }
- local->cont.lookup.xattrs = GF_CALLOC (priv->child_count,
- sizeof (*local->cont.lookup.xattr),
- gf_afr_mt_dict_t);
+ ret = afr_lookup_cont_init (local, priv->child_count);
+ if (ret < 0) {
+ op_errno = -ret;
+ goto out;
+ }
local->call_count = afr_up_children_count (priv->child_count,
local->child_up);
@@ -1068,7 +1254,8 @@ afr_lookup (call_frame_t *frame, xlator_t *this,
for (i = 0; i < priv->child_count; i++) {
if (local->child_up[i]) {
- STACK_WIND_COOKIE (frame, callback, (void *) (long) i,
+ STACK_WIND_COOKIE (frame, afr_lookup_cbk,
+ (void *) (long) i,
priv->children[i],
priv->children[i]->fops->lookup,
loc, local->xattr_req);