diff options
Diffstat (limited to 'xlators/cluster/afr/src/afr-common.c')
-rw-r--r-- | xlators/cluster/afr/src/afr-common.c | 777 |
1 files changed, 482 insertions, 295 deletions
diff --git a/xlators/cluster/afr/src/afr-common.c b/xlators/cluster/afr/src/afr-common.c index 3ddfa6dff..e5046cb69 100644 --- a/xlators/cluster/afr/src/afr-common.c +++ b/xlators/cluster/afr/src/afr-common.c @@ -329,6 +329,8 @@ afr_local_sh_cleanup (afr_local_t *local, xlator_t *this) if (sh->linkname) GF_FREE ((char *)sh->linkname); + if (sh->child_success) + GF_FREE (sh->child_success); loc_wipe (&sh->parent_loc); } @@ -418,6 +420,18 @@ afr_local_cleanup (afr_local_t *local, xlator_t *this) if (local->cont.lookup.inode) { inode_unref (local->cont.lookup.inode); } + + if (local->cont.lookup.postparents) + GF_FREE (local->cont.lookup.postparents); + + if (local->cont.lookup.bufs) + GF_FREE (local->cont.lookup.bufs); + + if (local->cont.lookup.child_success) + GF_FREE (local->cont.lookup.child_success); + + if (local->cont.lookup.sources) + GF_FREE (local->cont.lookup.sources); } { /* getxattr */ @@ -510,6 +524,22 @@ afr_up_children_count (int child_count, unsigned char *child_up) return ret; } +gf_boolean_t +afr_is_fresh_lookup (loc_t *loc, xlator_t *this) +{ + uint64_t ctx = 0; + int32_t ret = 0; + + GF_ASSERT (loc); + GF_ASSERT (this); + GF_ASSERT (loc->inode); + + ret = inode_ctx_get (loc->inode, this, &ctx); + if (0 == ret) + return _gf_false; + return _gf_true; +} + void afr_update_loc_gfids (loc_t *loc, struct iatt *buf, struct iatt *postparent) { @@ -534,68 +564,96 @@ afr_self_heal_lookup_unwind (call_frame_t *frame, xlator_t *this) } AFR_STACK_UNWIND (lookup, frame, local->op_ret, local->op_errno, - local->cont.lookup.inode, - &local->cont.lookup.buf, + local->cont.lookup.inode, &local->cont.lookup.buf, local->cont.lookup.xattr, &local->cont.lookup.postparent); return 0; } +void +afr_lookup_build_response_params (afr_local_t *local, xlator_t *this) +{ + int32_t read_child = -1; + struct iatt *buf = NULL; + struct iatt *postparent = NULL; + dict_t **xattr = NULL; -static void -afr_lookup_collect_xattr (afr_local_t *local, xlator_t *this, - int child_index, dict_t *xattr) + GF_ASSERT (local); + GF_ASSERT (local->cont.lookup.read_child >= 0); + + buf = &local->cont.lookup.buf; + postparent = &local->cont.lookup.postparent; + xattr = &local->cont.lookup.xattr; + + read_child = local->cont.lookup.read_child; + *xattr = dict_ref (local->cont.lookup.xattrs[read_child]); + *buf = local->cont.lookup.bufs[read_child]; + *postparent = local->cont.lookup.postparents[read_child]; + + if (IA_INVAL == local->cont.lookup.inode->ia_type) { + /* fix for RT #602 */ + local->cont.lookup.inode->ia_type = buf->ia_type; + } +} + + + static void +afr_lookup_update_lk_counts (afr_local_t *local, xlator_t *this, + int child_index, dict_t *xattr) { uint32_t inodelk_count = 0; uint32_t entrylk_count = 0; - int ret = 0; + int ret = -1; + + GF_ASSERT (local); + GF_ASSERT (this); + GF_ASSERT (xattr); + GF_ASSERT (child_index >= 0); + + ret = dict_get_uint32 (xattr, GLUSTERFS_INODELK_COUNT, + &inodelk_count); + if (ret == 0) + local->inodelk_count += inodelk_count; - if (afr_sh_has_metadata_pending (xattr, child_index, this)) { + ret = dict_get_uint32 (xattr, GLUSTERFS_ENTRYLK_COUNT, + &entrylk_count); + if (ret == 0) + local->entrylk_count += entrylk_count; +} + +static void +afr_lookup_detect_self_heal_by_xattr (afr_local_t *local, xlator_t *this, + dict_t *xattr) +{ + GF_ASSERT (local); + GF_ASSERT (this); + GF_ASSERT (xattr); + + if (afr_sh_has_metadata_pending (xattr, this)) { local->self_heal.need_metadata_self_heal = _gf_true; gf_log(this->name, GF_LOG_DEBUG, "metadata self-heal is pending for %s.", local->loc.path); } - if (afr_sh_has_entry_pending (xattr, child_index, this)) { + if (afr_sh_has_entry_pending (xattr, this)) { local->self_heal.need_entry_self_heal = _gf_true; gf_log(this->name, GF_LOG_DEBUG, "entry self-heal is pending for %s.", local->loc.path); } - if (afr_sh_has_data_pending (xattr, child_index, this)) { + if (afr_sh_has_data_pending (xattr, this)) { local->self_heal.need_data_self_heal = _gf_true; gf_log(this->name, GF_LOG_DEBUG, "data self-heal is pending for %s.", local->loc.path); } - - ret = dict_get_uint32 (xattr, GLUSTERFS_INODELK_COUNT, - &inodelk_count); - if (ret == 0) - local->inodelk_count += inodelk_count; - - ret = dict_get_uint32 (xattr, GLUSTERFS_ENTRYLK_COUNT, - &entrylk_count); - if (ret == 0) - local->entrylk_count += entrylk_count; } - static void -afr_lookup_self_heal_check (xlator_t *this, afr_local_t *local, +afr_detect_self_heal_by_iatt (afr_local_t *local, xlator_t *this, struct iatt *buf, struct iatt *lookup_buf) { - if (FILETYPE_DIFFERS (buf, lookup_buf)) { - /* mismatching filetypes with same name - */ - - gf_log (this->name, GF_LOG_INFO, - "filetype differs for %s ", local->loc.path); - - local->govinda_gOvinda = 1; - } - if (PERMISSION_DIFFERS (buf, lookup_buf)) { /* mismatching permissions */ gf_log (this->name, GF_LOG_INFO, @@ -624,105 +682,298 @@ afr_lookup_self_heal_check (xlator_t *this, afr_local_t *local, } } - static void -afr_lookup_done (call_frame_t *frame, xlator_t *this, struct iatt *lookup_buf) +afr_detect_self_heal_by_lookup_status (afr_local_t *local, xlator_t *this) { - int unwind = 1; - int source = -1; - int up_count = 0; - char sh_type_str[256] = {0,}; - afr_private_t *priv = NULL; - afr_local_t *local = NULL; - - priv = this->private; - local = frame->local; - - up_count = afr_up_children_count (priv->child_count, priv->child_up); - if (up_count == 1) { - gf_log (this->name, GF_LOG_DEBUG, - "Only 1 child up - do not attempt to detect self heal"); - - goto unwind; - } + GF_ASSERT (local); + GF_ASSERT (this); - if (local->success_count && local->enoent_count) { + if ((local->success_count > 0) && (local->enoent_count > 0)) { local->self_heal.need_metadata_self_heal = _gf_true; local->self_heal.need_data_self_heal = _gf_true; local->self_heal.need_entry_self_heal = _gf_true; gf_log(this->name, GF_LOG_INFO, "entries are missing in lookup of %s.", local->loc.path); + //If all self-heals are needed no need to check for other rules + goto out; } - if (local->success_count) { - /* check for split-brain case in previous lookup */ - if (afr_is_split_brain (this, local->cont.lookup.inode)) { + if (local->success_count > 0) { + if (afr_is_split_brain (this, local->cont.lookup.inode) && + IA_ISREG (local->cont.lookup.inode->ia_type)) { local->self_heal.need_data_self_heal = _gf_true; - gf_log(this->name, GF_LOG_WARNING, - "split brain detected during lookup of %s.", - local->loc.path); + gf_log (this->name, GF_LOG_WARNING, + "split brain detected during lookup of %s.", + local->loc.path); } } - if ((local->self_heal.need_metadata_self_heal - || local->self_heal.need_data_self_heal - || local->self_heal.need_entry_self_heal) - && ((!local->cont.lookup.is_revalidate) - || (local->op_ret != -1))) { +out: + return; +} - if (local->inodelk_count || local->entrylk_count) { +gf_boolean_t +afr_can_self_heal_proceed (afr_self_heal_t *sh, afr_private_t *priv) +{ + GF_ASSERT (sh); + GF_ASSERT (priv); - /* Someone else is doing self-heal on this file. - So just make a best effort to set the read-subvolume - and return */ + return ((priv->data_self_heal && sh->need_data_self_heal) + || (priv->metadata_self_heal && sh->need_metadata_self_heal) + || (priv->entry_self_heal && sh->need_entry_self_heal)); +} - if (IA_ISREG (local->cont.lookup.inode->ia_type)) { - source = afr_self_heal_get_source (this, local, local->cont.lookup.xattrs); +gf_boolean_t +afr_is_self_heal_enabled (afr_private_t *priv) +{ + GF_ASSERT (priv); - if (source >= 0) { - afr_set_read_child (this, - local->cont.lookup.inode, - source); - } - } - goto unwind; + return (priv->data_self_heal || priv->metadata_self_heal + || priv->entry_self_heal); +} + +int +afr_lookup_select_read_child (afr_local_t *local, xlator_t *this, + int32_t *read_child) +{ + int32_t source = -1; + ia_type_t ia_type = 0; + int ret = -1; + afr_transaction_type type = AFR_METADATA_TRANSACTION; + dict_t **xattrs = NULL; + int32_t *child_success = NULL; + struct iatt *bufs = NULL; + + GF_ASSERT (local); + GF_ASSERT (this); + + bufs = local->cont.lookup.bufs; + child_success = local->cont.lookup.child_success; + ia_type = local->cont.lookup.bufs[child_success[0]].ia_type; + if (IA_ISDIR (ia_type)) { + type = AFR_ENTRY_TRANSACTION; + } else if (IA_ISREG (ia_type)) { + type = AFR_DATA_TRANSACTION; + } + xattrs = local->cont.lookup.xattrs; + source = afr_lookup_select_read_child_by_txn_type (this, local, xattrs, + type); + if (source < 0) + goto out; + + *read_child = source; + ret = 0; +out: + return ret; +} + +static inline gf_boolean_t +afr_is_self_heal_running (afr_local_t *local) +{ + GF_ASSERT (local); + return ((local->inodelk_count > 0) || (local->entrylk_count > 0)); +} + +static void +afr_launch_self_heal (call_frame_t *frame, xlator_t *this, + gf_boolean_t is_background, ia_type_t ia_type, + int (*unwind) (call_frame_t *frame, xlator_t *this)) +{ + afr_local_t *local = NULL; + char sh_type_str[256] = {0,}; + + GF_ASSERT (frame); + GF_ASSERT (this); + + local = frame->local; + local->self_heal.background = is_background; + local->self_heal.type = ia_type; + local->self_heal.unwind = unwind; + + afr_self_heal_type_str_get (&local->self_heal, + sh_type_str, + sizeof (sh_type_str)); + + gf_log (this->name, GF_LOG_INFO, + "background %s self-heal triggered. path: %s", + sh_type_str, local->loc.path); + + afr_self_heal (frame, this); +} + +static void +afr_lookup_detect_self_heal (afr_local_t *local, xlator_t *this) +{ + int i = 0; + struct iatt *bufs = NULL; + dict_t **xattr = NULL; + afr_private_t *priv = NULL; + int32_t child1 = -1; + int32_t child2 = -1; + + afr_detect_self_heal_by_lookup_status (local, this); + + bufs = local->cont.lookup.bufs; + for (i = 1; i < local->success_count; i++) { + child1 = local->cont.lookup.child_success[i-1]; + child2 = local->cont.lookup.child_success[i];; + afr_detect_self_heal_by_iatt (local, this, + &bufs[child1], &bufs[child2]); + } + + xattr = local->cont.lookup.xattrs; + priv = this->private; + for (i = 0; i < local->success_count; i++) { + child1 = local->cont.lookup.child_success[i];; + afr_lookup_detect_self_heal_by_xattr (local, this, + xattr[child1]); + } +} + +static void +afr_lookup_perform_self_heal_if_needed (call_frame_t *frame, xlator_t *this, + gf_boolean_t *sh_launched) +{ + size_t up_count = 0; + afr_private_t *priv = NULL; + afr_local_t *local = NULL; + + GF_ASSERT (sh_launched); + *sh_launched = _gf_false; + priv = this->private; + local = frame->local; + + up_count = afr_up_children_count (priv->child_count, local->child_up); + if (up_count == 1) { + gf_log (this->name, GF_LOG_DEBUG, + "Only 1 child up - do not attempt to detect self heal"); + goto out; + } + + if (_gf_false == afr_is_self_heal_enabled (priv)) { + gf_log (this->name, GF_LOG_DEBUG, + "Self heal is not enabled"); + goto out; + } + + afr_lookup_detect_self_heal (local, this); + if (afr_can_self_heal_proceed (&local->self_heal, priv)) { + if (afr_is_self_heal_running (local)) { + goto out; } - if (!local->cont.lookup.inode->ia_type) { - /* fix for RT #602 */ - local->cont.lookup.inode->ia_type = - lookup_buf->ia_type; + afr_launch_self_heal (frame, this, _gf_true, + local->cont.lookup.buf.ia_type, + afr_self_heal_lookup_unwind); + *sh_launched = _gf_true; + } +out: + return; +} + +static gf_boolean_t +afr_lookup_split_brain (afr_local_t *local, xlator_t *this) +{ + int i = 0; + gf_boolean_t symptom = _gf_false; + struct iatt *bufs = NULL; + int32_t *child_success = NULL; + struct iatt *child1 = NULL; + struct iatt *child2 = NULL; + const char *path = NULL; + + bufs = local->cont.lookup.bufs; + child_success = local->cont.lookup.child_success; + for (i = 1; i < local->success_count; i++) { + child1 = &bufs[child_success[i-1]]; + child2 = &bufs[child_success[i]]; + /* + * TODO: gfid self-heal + * if (uuid_compare (child1->ia_gfid, child2->ia_gfid)) { + * gf_log (this->name, GF_LOG_WARNING, "%s: gfid differs" + * " on subvolumes (%d, %d)", local->loc.path, + * child_success[i-1], child_success[i]); + * symptom = _gf_true; + * } + */ + + if (FILETYPE_DIFFERS (child1, child2)) { + path = local->loc.path; + gf_log (this->name, GF_LOG_WARNING, "%s: filetype " + "differs on subvolumes (%d, %d)", path, + child_success[i-1], child_success[i]); + symptom = _gf_true; + local->govinda_gOvinda = 1; } + if (symptom) + break; + } + return symptom; +} - local->self_heal.background = _gf_true; - local->self_heal.type = local->cont.lookup.buf.ia_type; - local->self_heal.unwind = afr_self_heal_lookup_unwind; +static int +afr_lookup_set_read_child (afr_local_t *local, xlator_t *this, int32_t read_child) +{ + GF_ASSERT (read_child >= 0); - unwind = 0; + afr_set_read_child (this, local->cont.lookup.inode, read_child); + local->cont.lookup.read_child = read_child; - afr_self_heal_type_str_get(&local->self_heal, - sh_type_str, - sizeof(sh_type_str)); + return 0; +} - gf_log (this->name, GF_LOG_INFO, - "background %s self-heal triggered. path: %s", - sh_type_str, local->loc.path); +static void +afr_lookup_done (call_frame_t *frame, xlator_t *this) +{ + int unwind = 1; + afr_private_t *priv = NULL; + afr_local_t *local = NULL; + int ret = -1; + gf_boolean_t sh_launched = _gf_false; + int32_t read_child = -1; + + priv = this->private; + local = frame->local; - afr_self_heal (frame, this); + if (local->op_ret < 0) + goto unwind; + + if (_gf_true == afr_lookup_split_brain (local, this)) { + local->op_ret = -1; + local->op_errno = EIO; + goto unwind; } -unwind: - if (unwind) { - AFR_STACK_UNWIND (lookup, frame, local->op_ret, - local->op_errno, - local->cont.lookup.inode, - &local->cont.lookup.buf, - local->cont.lookup.xattr, - &local->cont.lookup.postparent); + ret = afr_lookup_select_read_child (local, this, &read_child); + if (ret) { + local->op_ret = -1; + local->op_errno = EIO; + goto unwind; } -} + ret = afr_lookup_set_read_child (local, this, read_child); + if (ret) + goto unwind; + + afr_lookup_build_response_params (local, this); + if (afr_is_fresh_lookup (&local->loc, this)) { + afr_update_loc_gfids (&local->loc, &local->cont.lookup.buf, + &local->cont.lookup.postparent); + } + + afr_lookup_perform_self_heal_if_needed (frame, this, &sh_launched); + if (sh_launched) + unwind = 0; + unwind: + if (unwind) { + AFR_STACK_UNWIND (lookup, frame, local->op_ret, + local->op_errno, local->cont.lookup.inode, + &local->cont.lookup.buf, + local->cont.lookup.xattr, + &local->cont.lookup.postparent); + } +} /* * During a lookup, some errors are more "important" than @@ -749,236 +1000,170 @@ __error_more_important (int32_t old_errno, int32_t new_errno) return ret; } - -int -afr_fresh_lookup_cbk (call_frame_t *frame, void *cookie, - xlator_t *this, int32_t op_ret, int32_t op_errno, - inode_t *inode, struct iatt *buf, dict_t *xattr, - struct iatt *postparent) +static void +afr_lookup_handle_error (afr_local_t *local, int32_t op_ret, int32_t op_errno) { - afr_local_t * local = NULL; - afr_private_t * priv = NULL; - struct iatt * lookup_buf = NULL; - int call_count = -1; - int child_index = -1; - int first_up_child = -1; - - child_index = (long) cookie; - priv = this->private; - - LOCK (&frame->lock); - { - local = frame->local; - - lookup_buf = &local->cont.lookup.buf; - - if (op_ret == -1) { - if (op_errno == ENOENT) - local->enoent_count++; - - if (__error_more_important (local->op_errno, op_errno)) - local->op_errno = op_errno; - - if (local->op_errno == ESTALE) { - local->op_ret = -1; - } - - goto unlock; - } - - afr_lookup_collect_xattr (local, this, child_index, xattr); + GF_ASSERT (local); + if (op_errno == ENOENT) + local->enoent_count++; - first_up_child = afr_first_up_child (priv); - - if (local->success_count == 0) { - if (local->op_errno != ESTALE) - local->op_ret = op_ret; - - local->cont.lookup.inode = inode_ref (inode); - local->cont.lookup.xattr = dict_ref (xattr); - local->cont.lookup.xattrs[child_index] = dict_ref (xattr); - local->cont.lookup.postparent = *postparent; - - if (priv->first_lookup && inode->ino == 1) { - gf_log (this->name, GF_LOG_INFO, - "added root inode"); - priv->root_inode = inode_ref (inode); - priv->first_lookup = 0; - } - - *lookup_buf = *buf; - - uuid_copy (local->loc.gfid, buf->ia_gfid); - uuid_copy (local->loc.pargfid, - postparent->ia_gfid); - - if (priv->read_child >= 0) { - afr_set_read_child (this, - local->cont.lookup.inode, - priv->read_child); - } else { - afr_set_read_child (this, - local->cont.lookup.inode, - child_index); - } + if (__error_more_important (local->op_errno, op_errno)) + local->op_errno = op_errno; - } else { - afr_lookup_self_heal_check (this, local, buf, lookup_buf); + if (local->op_errno == ESTALE) { + local->op_ret = -1; + } +} - if (child_index == local->read_child_index) { - /* - lookup has succeeded on the read child. - So use its inode number - */ - if (local->cont.lookup.xattr) - dict_unref (local->cont.lookup.xattr); +static void +afr_set_root_inode_on_first_lookup (afr_local_t *local, xlator_t *this, + inode_t *inode) +{ + afr_private_t *priv = NULL; + GF_ASSERT (inode); - local->cont.lookup.xattr = dict_ref (xattr); - local->cont.lookup.xattrs[child_index] = dict_ref (xattr); - local->cont.lookup.postparent = *postparent; + if (inode->ino != 1) + goto out; + if (!afr_is_fresh_lookup (&local->loc, this)) + goto out; + priv = this->private; + if ((priv->first_lookup)) { + gf_log (this->name, GF_LOG_INFO, "added root inode"); + priv->root_inode = inode_ref (inode); + priv->first_lookup = 0; + } +out: + return; +} - *lookup_buf = *buf; +static void +afr_lookup_cache_args (afr_local_t *local, int child_index, dict_t *xattr, + struct iatt *buf, struct iatt *postparent) +{ + GF_ASSERT (child_index >= 0); + local->cont.lookup.xattrs[child_index] = dict_ref (xattr); + local->cont.lookup.postparents[child_index] = *postparent; + local->cont.lookup.bufs[child_index] = *buf; +} - uuid_copy (local->loc.gfid, buf->ia_gfid); - uuid_copy (local->loc.pargfid, - postparent->ia_gfid); - } +static void +afr_lookup_handle_first_success (afr_local_t *local, xlator_t *this, + inode_t *inode, struct iatt *buf) +{ + local->cont.lookup.inode = inode_ref (inode); + local->cont.lookup.buf = *buf; + afr_set_root_inode_on_first_lookup (local, this, inode); +} +static void +afr_lookup_handle_success (afr_local_t *local, xlator_t *this, int32_t child_index, + int32_t op_ret, int32_t op_errno, inode_t *inode, + struct iatt *buf, dict_t *xattr, + struct iatt *postparent) +{ + if (local->success_count == 0) { + if (local->op_errno != ESTALE) { + local->op_ret = op_ret; + local->op_errno = 0; } - - local->success_count++; - } -unlock: - UNLOCK (&frame->lock); - - call_count = afr_frame_return (frame); - - if (call_count == 0) { - afr_lookup_done (frame, this, lookup_buf); + afr_lookup_handle_first_success (local, this, inode, buf); } + afr_lookup_update_lk_counts (local, this, + child_index, xattr); - return 0; + afr_lookup_cache_args (local, child_index, xattr, + buf, postparent); + local->cont.lookup.child_success[local->success_count] = child_index; + local->success_count++; } - int -afr_revalidate_lookup_cbk (call_frame_t *frame, void *cookie, - xlator_t *this, int32_t op_ret, int32_t op_errno, - inode_t *inode, struct iatt *buf, dict_t *xattr, - struct iatt *postparent) +afr_lookup_cbk (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, int32_t op_errno, + inode_t *inode, struct iatt *buf, dict_t *xattr, + struct iatt *postparent) { afr_local_t * local = NULL; - afr_private_t * priv = NULL; - struct iatt * lookup_buf = NULL; int call_count = -1; int child_index = -1; - int first_up_child = -1; - child_index = (long) cookie; - priv = this->private; + child_index = (long) cookie; LOCK (&frame->lock); { local = frame->local; - lookup_buf = &local->cont.lookup.buf; - if (op_ret == -1) { - if (op_errno == ENOENT) - local->enoent_count++; - - if (__error_more_important (local->op_errno, op_errno)) - local->op_errno = op_errno; - - if (local->op_errno == ESTALE) { - local->op_ret = -1; - } - + afr_lookup_handle_error (local, op_ret, op_errno); goto unlock; } + afr_lookup_handle_success (local, this, child_index, op_ret, + op_errno, inode, buf, xattr, + postparent); - afr_lookup_collect_xattr (local, this, child_index, xattr); - - first_up_child = afr_first_up_child (priv); - - /* in case of revalidate, we need to send stat of the - * child whose stat was sent during the first lookup. - * (so that time stamp does not vary with revalidate. - * in case it is down, stat of the fist success will - * be replied */ - - /* inode number should be preserved across revalidates */ - - if (local->success_count == 0) { - if (local->op_errno != ESTALE) - local->op_ret = op_ret; - - local->cont.lookup.inode = inode_ref (inode); - local->cont.lookup.xattr = dict_ref (xattr); - local->cont.lookup.xattrs[child_index] = dict_ref (xattr); - local->cont.lookup.postparent = *postparent; - - *lookup_buf = *buf; - - if (priv->read_child >= 0) { - afr_set_read_child (this, - local->cont.lookup.inode, - priv->read_child); - } else { - afr_set_read_child (this, - local->cont.lookup.inode, - child_index); - } - - } else { - afr_lookup_self_heal_check (this, local, buf, lookup_buf); - - if (child_index == local->read_child_index) { + } +unlock: + UNLOCK (&frame->lock); - /* - lookup has succeeded on the read child. - So use its inode number - */ + call_count = afr_frame_return (frame); + if (call_count == 0) { + afr_lookup_done (frame, this); + } - if (local->cont.lookup.xattr) - dict_unref (local->cont.lookup.xattr); + return 0; +} - local->cont.lookup.xattr = dict_ref (xattr); - local->cont.lookup.xattrs[child_index] = dict_ref (xattr); - local->cont.lookup.postparent = *postparent; +int +afr_lookup_cont_init (afr_local_t *local, unsigned int child_count) +{ + int ret = -ENOMEM; + int32_t *child_success = NULL; + struct iatt *iatts = NULL; + int i = 0; - *lookup_buf = *buf; - } + GF_ASSERT (local); + local->cont.lookup.xattrs = GF_CALLOC (child_count, + sizeof (*local->cont.lookup.xattr), + gf_afr_mt_dict_t); + if (NULL == local->cont.lookup.xattrs) + goto out; - } + iatts = GF_CALLOC (child_count, sizeof (*iatts), gf_afr_mt_iatt); + if (NULL == iatts) + goto out; + local->cont.lookup.postparents = iatts; - local->success_count++; - } -unlock: - UNLOCK (&frame->lock); + iatts = GF_CALLOC (child_count, sizeof (*iatts), gf_afr_mt_iatt); + if (NULL == iatts) + goto out; + local->cont.lookup.bufs = iatts; - call_count = afr_frame_return (frame); + child_success = GF_CALLOC (child_count, sizeof (*child_success), + gf_afr_mt_char); + if (NULL == child_success) + goto out; + for (i = 0; i < child_count; i++) + child_success[i] = -1; - if (call_count == 0) { - afr_lookup_done (frame, this, lookup_buf); - } + local->cont.lookup.child_success = child_success; - return 0; + local->cont.lookup.read_child = -1; + ret = 0; +out: + return ret; } - int afr_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xattr_req) { - afr_private_t *priv = NULL; - afr_local_t *local = NULL; - int ret = -1; - int i = 0; - fop_lookup_cbk_t callback = NULL; - int call_count = 0; - uint64_t ctx = 0; - int32_t op_errno = 0; + afr_private_t *priv = NULL; + afr_local_t *local = NULL; + int ret = -1; + int i = 0; + int call_count = 0; + uint64_t ctx = 0; + int32_t op_errno = 0; priv = this->private; @@ -999,14 +1184,9 @@ afr_lookup (call_frame_t *frame, xlator_t *this, if (ret == 0) { /* lookup is a revalidate */ - callback = afr_revalidate_lookup_cbk; - - local->cont.lookup.is_revalidate = _gf_true; local->read_child_index = afr_read_child (this, loc->inode); } else { - callback = afr_fresh_lookup_cbk; - LOCK (&priv->read_child_lock); { local->read_child_index = (++priv->read_child_rr) @@ -1019,10 +1199,16 @@ afr_lookup (call_frame_t *frame, xlator_t *this, local->cont.lookup.parent_ino = loc->parent->ino; local->child_up = memdup (priv->child_up, priv->child_count); + if (NULL == local->child_up) { + op_errno = ENOMEM; + goto out; + } - local->cont.lookup.xattrs = GF_CALLOC (priv->child_count, - sizeof (*local->cont.lookup.xattr), - gf_afr_mt_dict_t); + ret = afr_lookup_cont_init (local, priv->child_count); + if (ret < 0) { + op_errno = -ret; + goto out; + } local->call_count = afr_up_children_count (priv->child_count, local->child_up); @@ -1068,7 +1254,8 @@ afr_lookup (call_frame_t *frame, xlator_t *this, for (i = 0; i < priv->child_count; i++) { if (local->child_up[i]) { - STACK_WIND_COOKIE (frame, callback, (void *) (long) i, + STACK_WIND_COOKIE (frame, afr_lookup_cbk, + (void *) (long) i, priv->children[i], priv->children[i]->fops->lookup, loc, local->xattr_req); |