diff options
Diffstat (limited to 'xlators/cluster/ec/src/ec-heal.c')
-rw-r--r-- | xlators/cluster/ec/src/ec-heal.c | 607 |
1 files changed, 439 insertions, 168 deletions
diff --git a/xlators/cluster/ec/src/ec-heal.c b/xlators/cluster/ec/src/ec-heal.c index b7b910502f8..315de8765ad 100644 --- a/xlators/cluster/ec/src/ec-heal.c +++ b/xlators/cluster/ec/src/ec-heal.c @@ -76,6 +76,11 @@ out: return _gf_false; } +static gf_boolean_t +ec_sh_key_match (dict_t *dict, char *key, data_t *val, void *mdata) +{ + return !ec_ignorable_key_match (dict, key, val, mdata); +} /* FOP: heal */ void ec_heal_exclude(ec_heal_t * heal, uintptr_t mask) @@ -1058,8 +1063,15 @@ ec_heal_writev_cbk (call_frame_t *frame, void *cookie, struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) { + ec_fop_data_t *fop = cookie; + ec_heal_t *heal = fop->data; + ec_trace("WRITE_CBK", cookie, "ret=%d, errno=%d", op_ret, op_errno); + gf_log (fop->xl->name, GF_LOG_DEBUG, "%s: write op_ret %d, op_errno %s" + " at %"PRIu64, uuid_utoa (heal->fd->inode->gfid), op_ret, + strerror (op_errno), heal->offset); + ec_heal_update(cookie, 0); return 0; @@ -1080,12 +1092,19 @@ int32_t ec_heal_readv_cbk(call_frame_t * frame, void * cookie, xlator_t * this, if (op_ret > 0) { + gf_log (fop->xl->name, GF_LOG_DEBUG, "%s: read succeeded, proceeding " + "to write at %"PRIu64, uuid_utoa (heal->fd->inode->gfid), + heal->offset); ec_writev(heal->fop->frame, heal->xl, heal->bad, EC_MINIMUM_ONE, ec_heal_writev_cbk, heal, heal->fd, vector, count, heal->offset, 0, iobref, NULL); } else { + gf_log (fop->xl->name, GF_LOG_DEBUG, "%s: read failed %s, failing " + "to heal block at %"PRIu64, + uuid_utoa (heal->fd->inode->gfid), strerror (op_errno), + heal->offset); heal->done = 1; } @@ -1529,8 +1548,8 @@ ec_manager_heal (ec_fop_data_t * fop, int32_t state) } } -void ec_heal(call_frame_t * frame, xlator_t * this, uintptr_t target, - int32_t minimum, fop_heal_cbk_t func, void * data, loc_t * loc, +void ec_heal2(call_frame_t *frame, xlator_t *this, uintptr_t target, + int32_t minimum, fop_heal_cbk_t func, void *data, loc_t *loc, int32_t partial, dict_t *xdata) { ec_cbk_t callback = { .heal = func }; @@ -1647,19 +1666,15 @@ ec_char_array_to_mask (unsigned char *array, int numsubvols) } int -ec_heal_find_direction (ec_t *ec, ec_txn_t type, default_args_cbk_t *replies, +ec_heal_entry_find_direction (ec_t *ec, default_args_cbk_t *replies, uint64_t *versions, uint64_t *dirty, unsigned char *sources, unsigned char *healed_sinks) { - void *ptr = NULL; - uint64_t *value = NULL; + uint64_t xattr[EC_VERSION_SIZE] = {0}; int source = -1; uint64_t max_version = 0; - int32_t len = 0; int ret = 0; int i = 0; - struct iatt source_ia = {0}; - struct iatt child_ia = {0}; for (i = 0; i < ec->nodes; i++) { if (!replies[i].valid) @@ -1671,22 +1686,21 @@ ec_heal_find_direction (ec_t *ec, ec_txn_t type, default_args_cbk_t *replies, if (source == -1) source = i; - ret = dict_get_ptr_and_len (replies[i].xdata, EC_XATTR_VERSION, - &ptr, &len); + ret = ec_dict_del_array (replies[i].xdata, EC_XATTR_VERSION, + xattr, EC_VERSION_SIZE); if (ret == 0) { - value = ptr; - versions[i] = ntoh64(value[type]); + versions[i] = xattr[EC_DATA_TXN]; if (max_version < versions[i]) { max_version = versions[i]; source = i; } } - ret = dict_get_ptr_and_len (replies[i].xdata, EC_XATTR_DIRTY, - &ptr, &len); + memset (xattr, 0, sizeof(xattr)); + ret = ec_dict_del_array (replies[i].xdata, EC_XATTR_DIRTY, + xattr, EC_VERSION_SIZE); if (ret == 0) { - value = ptr; - dirty[i] = ntoh64(value[type]); + dirty[i] = xattr[EC_DATA_TXN]; } } @@ -1706,29 +1720,13 @@ ec_heal_find_direction (ec_t *ec, ec_txn_t type, default_args_cbk_t *replies, healed_sinks[i] = 1; } - if (type == EC_METADATA_TXN) { - source_ia = replies[source].stat; - for (i = 0; i < ec->nodes; i++) { - if (!sources[i]) - continue; - child_ia = replies[i].stat; - if (!IA_EQUAL(source_ia, child_ia, gfid) || - !IA_EQUAL(source_ia, child_ia, type) || - !IA_EQUAL(source_ia, child_ia, prot) || - !IA_EQUAL(source_ia, child_ia, uid) || - !IA_EQUAL(source_ia, child_ia, gid)) { - sources[i] = 0; - healed_sinks[i] = 1; - } - } - } out: return source; } int -ec_adjust_versions (call_frame_t *frame, ec_t *ec, ec_txn_t type, inode_t *inode, int source, - unsigned char *sources, +ec_adjust_versions (call_frame_t *frame, ec_t *ec, ec_txn_t type, + inode_t *inode, int source, unsigned char *sources, unsigned char *healed_sinks, uint64_t *versions, uint64_t *dirty) { @@ -1798,39 +1796,127 @@ out: } int -__ec_heal_prepare (call_frame_t *frame, ec_t *ec, inode_t *inode, - unsigned char *locked_on, default_args_cbk_t *replies, - uint64_t *versions, uint64_t *dirty, unsigned char *sources, - unsigned char *healed_sinks, ec_txn_t type) -{ - loc_t loc = {0}; - unsigned char *output = NULL; - dict_t *xdata = NULL; - int ret = 0; - int source = 0; +ec_heal_metadata_find_direction (ec_t *ec, default_args_cbk_t *replies, + uint64_t *versions, uint64_t *dirty, + unsigned char *sources, unsigned char *healed_sinks) +{ + uint64_t xattr[EC_VERSION_SIZE] = {0}; + int same_count = 0; + int max_same_count = 0; + int same_source = -1; + int ret = 0; + int i = 0; + int j = 0; + int *groups = NULL; + struct iatt source_ia = {0}; + struct iatt child_ia = {0}; - xdata = dict_new (); - if (!xdata) { - ret = -ENOMEM; - goto out; + groups = alloca0 (ec->nodes * sizeof(*groups)); + for (i = 0; i < ec->nodes; i++) + groups[i] = -1; + + for (i = 0; i < ec->nodes; i++) { + if (!replies[i].valid) + continue; + ret = ec_dict_del_array (replies[i].xdata, EC_XATTR_VERSION, + xattr, EC_VERSION_SIZE); + if (ret == 0) { + versions[i] = xattr[EC_METADATA_TXN]; + } + + memset (xattr, 0, sizeof (xattr)); + ret = ec_dict_del_array (replies[i].xdata, EC_XATTR_DIRTY, + xattr, EC_VERSION_SIZE); + if (ret == 0) { + dirty[i] = xattr[EC_METADATA_TXN]; + } + if (groups[i] >= 0) /*Already part of group*/ + continue; + groups[i] = i; + same_count = 1; + source_ia = replies[i].stat; + for (j = i + 1; j < ec->nodes; j++) { + child_ia = replies[j].stat; + if (!IA_EQUAL(source_ia, child_ia, gfid) || + !IA_EQUAL(source_ia, child_ia, type) || + !IA_EQUAL(source_ia, child_ia, prot) || + !IA_EQUAL(source_ia, child_ia, uid) || + !IA_EQUAL(source_ia, child_ia, gid)) + continue; + if (!are_dicts_equal(replies[i].xdata, replies[j].xdata, + ec_sh_key_match, NULL)) + continue; + groups[j] = i; /*If iatts match put them into a group*/ + same_count++; + } + + if (max_same_count < same_count) { + max_same_count = same_count; + same_source = i; + } } - if (dict_set_uint64(xdata, "list-xattr", 0)) { - ret = -ENOMEM; + if (max_same_count < ec->fragments) { + ret = -EIO; goto out; } + for (i = 0; i < ec->nodes; i++) { + if (groups[i] == groups[same_source]) + sources[i] = 1; + else if (replies[i].valid) + healed_sinks[i] = 1; + } + ret = same_source; +out: + return ret; +} + +int +__ec_heal_metadata_prepare (call_frame_t *frame, ec_t *ec, inode_t *inode, + unsigned char *locked_on, default_args_cbk_t *replies, + uint64_t *versions, uint64_t *dirty, unsigned char *sources, + unsigned char *healed_sinks) +{ + loc_t loc = {0}; + unsigned char *output = NULL; + unsigned char *lookup_on = NULL; + int ret = 0; + int source = 0; + default_args_cbk_t *greplies = NULL; + int i = 0; + + EC_REPLIES_ALLOC (greplies, ec->nodes); + loc.inode = inode_ref (inode); gf_uuid_copy (loc.gfid, inode->gfid); output = alloca0 (ec->nodes); + lookup_on = alloca0 (ec->nodes); ret = cluster_lookup (ec->xl_list, locked_on, ec->nodes, replies, - output, frame, ec->xl, &loc, xdata); + output, frame, ec->xl, &loc, NULL); if (ret <= ec->fragments) { ret = -ENOTCONN; goto out; } - source = ec_heal_find_direction (ec, type, replies, versions, + memcpy (lookup_on, output, ec->nodes); + /*Use getxattr to get the filtered xattrs which filter internal xattrs*/ + ret = cluster_getxattr (ec->xl_list, lookup_on, ec->nodes, greplies, + output, frame, ec->xl, &loc, NULL, NULL); + for (i = 0; i < ec->nodes; i++) { + if (lookup_on[i] && !output[i]) { + replies[i].valid = 0; + continue; + } + if (replies[i].xdata) { + dict_unref (replies[i].xdata); + replies[i].xdata = NULL; + if (greplies[i].xattr) + replies[i].xdata = dict_ref (greplies[i].xattr); + } + } + + source = ec_heal_metadata_find_direction (ec, replies, versions, dirty, sources, healed_sinks); if (source < 0) { ret = -EIO; @@ -1838,9 +1924,7 @@ __ec_heal_prepare (call_frame_t *frame, ec_t *ec, inode_t *inode, } ret = source; out: - if (xdata) - dict_unref (xdata); - + cluster_replies_wipe (greplies, ec->nodes); loc_wipe (&loc); return ret; } @@ -1864,14 +1948,14 @@ __ec_removexattr_sinks (call_frame_t *frame, ec_t *ec, inode_t *inode, continue; if (!sources[i] && !healed_sinks[i]) continue; - ret = dict_foreach (replies[i].xattr, ec_heal_xattr_clean, - replies[source].xattr); + ret = dict_foreach (replies[i].xdata, ec_heal_xattr_clean, + replies[source].xdata); if (ret < 0) { sources[i] = 0; healed_sinks[i] = 0; } - if (replies[i].xattr->count == 0) { + if (replies[i].xdata->count == 0) { continue; } else if (sources[i]) { /* This can happen if setxattr/removexattr succeeds on @@ -1883,7 +1967,7 @@ __ec_removexattr_sinks (call_frame_t *frame, ec_t *ec, inode_t *inode, } ret = syncop_removexattr (ec->xl_list[i], &loc, "", - replies[i].xattr, NULL); + replies[i].xdata, NULL); if (ret < 0) healed_sinks[i] = 0; } @@ -1896,39 +1980,46 @@ __ec_removexattr_sinks (call_frame_t *frame, ec_t *ec, inode_t *inode, int __ec_heal_metadata (call_frame_t *frame, ec_t *ec, inode_t *inode, - unsigned char *locked_on) + unsigned char *locked_on, unsigned char *sources, + unsigned char *healed_sinks) { loc_t loc = {0}; int ret = 0; int source = 0; default_args_cbk_t *replies = NULL; + default_args_cbk_t *sreplies = NULL; uint64_t *versions = NULL; uint64_t *dirty = NULL; - unsigned char *sources = NULL; - unsigned char *healed_sinks = NULL; unsigned char *output = NULL; dict_t *source_dict = NULL; struct iatt source_buf = {0}; EC_REPLIES_ALLOC (replies, ec->nodes); + EC_REPLIES_ALLOC (sreplies, ec->nodes); loc.inode = inode_ref (inode); gf_uuid_copy (loc.gfid, inode->gfid); output = alloca0 (ec->nodes); versions = alloca0 (ec->nodes * sizeof (*versions)); dirty = alloca0 (ec->nodes * sizeof (*dirty)); - sources = alloca0 (ec->nodes); - healed_sinks = alloca0 (ec->nodes); - source = __ec_heal_prepare (frame, ec, inode, locked_on, replies, - versions, dirty, sources, healed_sinks, - EC_METADATA_TXN); + source = __ec_heal_metadata_prepare (frame, ec, inode, locked_on, replies, + versions, dirty, sources, healed_sinks); if (source < 0) { ret = -EIO; goto out; } + if (EC_COUNT (sources, ec->nodes) == ec->nodes) { + ret = 0; + goto erase_dirty; + } + + if (EC_COUNT (healed_sinks, ec->nodes) == 0) { + ret = -ENOTCONN; + goto out; + } source_buf = replies[source].stat; - ret = cluster_setattr (ec->xl_list, healed_sinks, ec->nodes, replies, + ret = cluster_setattr (ec->xl_list, healed_sinks, ec->nodes, sreplies, output, frame, ec->xl, &loc, &source_buf, GF_SET_ATTR_MODE | GF_SET_ATTR_UID | GF_SET_ATTR_GID, NULL); @@ -1939,22 +2030,12 @@ __ec_heal_metadata (call_frame_t *frame, ec_t *ec, inode_t *inode, goto out; } - ret = cluster_getxattr (ec->xl_list, locked_on, ec->nodes, replies, - output, frame, ec->xl, &loc, NULL, NULL); - EC_INTERSECT (sources, sources, output, ec->nodes); - EC_INTERSECT (healed_sinks, healed_sinks, output, ec->nodes); - EC_ADJUST_SOURCE (source, sources, ec->nodes); - if ((EC_COUNT (healed_sinks, ec->nodes) == 0) || (source < 0)) { - ret = -ENOTCONN; - goto out; - } - ret = __ec_removexattr_sinks (frame, ec, inode, source, sources, healed_sinks, replies); if (ret < 0) goto out; - source_dict = dict_ref (replies[source].xattr); + source_dict = dict_ref (replies[source].xdata); if (dict_foreach_match (source_dict, ec_ignorable_key_match, NULL, dict_remove_foreach_fn, NULL) == -1) { ret = -ENOMEM; @@ -1971,6 +2052,7 @@ __ec_heal_metadata (call_frame_t *frame, ec_t *ec, inode_t *inode, goto out; } +erase_dirty: ret = ec_adjust_versions (frame, ec, EC_METADATA_TXN, inode, source, sources, healed_sinks, versions, dirty); out: @@ -1979,29 +2061,21 @@ out: loc_wipe (&loc); cluster_replies_wipe (replies, ec->nodes); + cluster_replies_wipe (sreplies, ec->nodes); return ret; } int -ec_heal_metadata (call_frame_t *req_frame, ec_t *ec, inode_t *inode) +ec_heal_metadata (call_frame_t *frame, ec_t *ec, inode_t *inode, + unsigned char *sources, unsigned char *healed_sinks) { unsigned char *locked_on = NULL; unsigned char *up_subvols = NULL; unsigned char *output = NULL; int ret = 0; default_args_cbk_t *replies = NULL; - call_frame_t *frame = NULL; EC_REPLIES_ALLOC (replies, ec->nodes); - frame = copy_frame (req_frame); - if (!frame) { - ret = -ENOMEM; - goto out; - } - - /*Do heal as root*/ - frame->root->uid = 0; - frame->root->gid = 0; locked_on = alloca0(ec->nodes); output = alloca0(ec->nodes); up_subvols = alloca0(ec->nodes); @@ -2017,15 +2091,13 @@ ec_heal_metadata (call_frame_t *req_frame, ec_t *ec, inode_t *inode) ret = -ENOTCONN; goto unlock; } - ret = __ec_heal_metadata (frame, ec, inode, locked_on); + ret = __ec_heal_metadata (frame, ec, inode, locked_on, sources, + healed_sinks); } unlock: cluster_uninodelk (ec->xl_list, locked_on, ec->nodes, replies, output, frame, ec->xl, ec->xl->name, inode, 0, 0); -out: cluster_replies_wipe (replies, ec->nodes); - if (frame) - STACK_DESTROY (frame->root); return ret; } @@ -2036,24 +2108,47 @@ __ec_heal_entry_prepare (call_frame_t *frame, ec_t *ec, inode_t *inode, uint64_t *dirty, unsigned char *sources, unsigned char *healed_sinks) { - int source = 0; - default_args_cbk_t *replies = NULL; loc_t loc = {0}; + int source = 0; int ret = 0; + default_args_cbk_t *replies = NULL; + unsigned char *output = NULL; + dict_t *xdata = NULL; EC_REPLIES_ALLOC (replies, ec->nodes); loc.inode = inode_ref (inode); gf_uuid_copy (loc.gfid, inode->gfid); - source = __ec_heal_prepare (frame, ec, inode, locked_on, replies, - versions, dirty, sources, healed_sinks, - EC_DATA_TXN); + xdata = dict_new (); + if (!xdata) { + ret = -ENOMEM; + goto out; + } + + if (dict_set_uint64(xdata, EC_XATTR_VERSION, 0) || + dict_set_uint64(xdata, EC_XATTR_DIRTY, 0)) { + ret = -ENOMEM; + goto out; + } + + output = alloca0 (ec->nodes); + ret = cluster_lookup (ec->xl_list, locked_on, ec->nodes, replies, + output, frame, ec->xl, &loc, xdata); + if (ret <= ec->fragments) { + ret = -ENOTCONN; + goto out; + } + + source = ec_heal_entry_find_direction (ec, replies, versions, + dirty, sources, healed_sinks); if (source < 0) { ret = -EIO; goto out; } ret = source; out: + if (xdata) + dict_unref (xdata); loc_wipe (&loc); cluster_replies_wipe (replies, ec->nodes); return ret; @@ -2156,6 +2251,11 @@ ec_delete_stale_name (dict_t *gfid_db, char *key, data_t *d, void *data) /*This will help in making decisions about creating names*/ dict_del (gfid_db, key); out: + if (ret < 0) { + gf_log (ec->xl->name, GF_LOG_DEBUG, "%s/%s: heal failed %s", + uuid_utoa (name_data->parent->gfid), name_data->name, + strerror (-ret)); + } cluster_replies_wipe (replies, ec->nodes); loc_wipe (&loc); return ret; @@ -2320,9 +2420,12 @@ ec_create_name (call_frame_t *frame, ec_t *ec, inode_t *parent, char *name, ret = 0; out: + if (ret < 0) + gf_log (ec->xl->name, GF_LOG_DEBUG, "%s/%s: heal failed %s", + uuid_utoa (parent->gfid), name, strerror (-ret)); + cluster_replies_wipe (replies, ec->nodes); loc_wipe (&loc); loc_wipe (&srcloc); - EC_REPLIES_ALLOC (replies, ec->nodes); if (xdata) dict_unref (xdata); return ret; @@ -2345,6 +2448,7 @@ __ec_heal_name (call_frame_t *frame, ec_t *ec, inode_t *parent, char *name, unsigned char *same = NULL; unsigned char *gfidless = NULL; + EC_REPLIES_ALLOC (replies, ec->nodes); loc.parent = inode_ref (parent); loc.inode = inode_new (parent->table); gf_uuid_copy (loc.pargfid, parent->gfid); @@ -2365,7 +2469,6 @@ __ec_heal_name (call_frame_t *frame, ec_t *ec, inode_t *parent, char *name, output = alloca0 (ec->nodes); gfidless = alloca0 (ec->nodes); enoent = alloca0 (ec->nodes); - EC_REPLIES_ALLOC (replies, ec->nodes); ret = cluster_lookup (ec->xl_list, participants, ec->nodes, replies, output, frame, ec->xl, &loc, NULL); for (i = 0; i < ec->nodes; i++) { @@ -2464,9 +2567,10 @@ ec_heal_name (call_frame_t *frame, ec_t *ec, inode_t *parent, char *name, NULL); { if (ret <= ec->fragments) { - gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: Skipping heal " - "as only %d number of subvolumes could " - "be locked", uuid_utoa (parent->gfid), ret); + gf_log (ec->xl->name, GF_LOG_DEBUG, "%s/%s: Skipping " + "heal as only %d number of subvolumes could " + "be locked", uuid_utoa (parent->gfid), name, + ret); ret = -ENOTCONN; goto unlock; } @@ -2534,19 +2638,19 @@ ec_heal_names (call_frame_t *frame, ec_t *ec, inode_t *inode, if (EC_COUNT (participants, ec->nodes) <= ec->fragments) return -ENOTCONN; } + loc_wipe (&loc); return 0; } int __ec_heal_entry (call_frame_t *frame, ec_t *ec, inode_t *inode, - unsigned char *heal_on) + unsigned char *heal_on, unsigned char *sources, + unsigned char *healed_sinks) { unsigned char *locked_on = NULL; unsigned char *output = NULL; uint64_t *versions = NULL; uint64_t *dirty = NULL; - unsigned char *sources = NULL; - unsigned char *healed_sinks = NULL; unsigned char *participants = NULL; default_args_cbk_t *replies = NULL; int ret = 0; @@ -2557,8 +2661,6 @@ __ec_heal_entry (call_frame_t *frame, ec_t *ec, inode_t *inode, output = alloca0(ec->nodes); versions = alloca0 (ec->nodes * sizeof (*versions)); dirty = alloca0 (ec->nodes * sizeof (*dirty)); - sources = alloca0 (ec->nodes); - healed_sinks = alloca0 (ec->nodes); EC_REPLIES_ALLOC (replies, ec->nodes); ret = cluster_entrylk (ec->xl_list, heal_on, ec->nodes, replies, @@ -2608,7 +2710,8 @@ out: } int -ec_heal_entry (call_frame_t *req_frame, ec_t *ec, inode_t *inode) +ec_heal_entry (call_frame_t *frame, ec_t *ec, inode_t *inode, + unsigned char *sources, unsigned char *healed_sinks) { unsigned char *locked_on = NULL; unsigned char *up_subvols = NULL; @@ -2616,21 +2719,12 @@ ec_heal_entry (call_frame_t *req_frame, ec_t *ec, inode_t *inode) char selfheal_domain[1024] = {0}; int ret = 0; default_args_cbk_t *replies = NULL; - call_frame_t *frame = NULL; EC_REPLIES_ALLOC (replies, ec->nodes); locked_on = alloca0(ec->nodes); output = alloca0(ec->nodes); up_subvols = alloca0(ec->nodes); - frame = copy_frame (req_frame); - if (!frame) { - ret = -ENOMEM; - goto out; - } - /*Do heal as root*/ - frame->root->uid = 0; - frame->root->gid = 0; sprintf (selfheal_domain, "%s:self-heal", ec->xl->name); ec_mask_to_char_array (ec->xl_up, up_subvols, ec->nodes); /*If other processes are already doing the heal, don't block*/ @@ -2645,15 +2739,13 @@ ec_heal_entry (call_frame_t *req_frame, ec_t *ec, inode_t *inode) ret = -ENOTCONN; goto unlock; } - ret = __ec_heal_entry (frame, ec, inode, locked_on); + ret = __ec_heal_entry (frame, ec, inode, locked_on, + sources, healed_sinks); } unlock: cluster_unentrylk (ec->xl_list, locked_on, ec->nodes, replies, output, frame, ec->xl, selfheal_domain, inode, NULL); -out: cluster_replies_wipe (replies, ec->nodes); - if (frame) - STACK_DESTROY (frame->root); return ret; } @@ -2664,12 +2756,10 @@ ec_heal_data_find_direction (ec_t *ec, default_args_cbk_t *replies, uint64_t *size, unsigned char *sources, unsigned char *healed_sinks) { + uint64_t xattr[EC_VERSION_SIZE] = {0}; char version_size[64] = {0}; - uint64_t *value = NULL; dict_t *version_size_db = NULL; unsigned char *same = NULL; - void *ptr = NULL; - int len = 0; int max_same_count = 0; int source = 0; int i = 0; @@ -2686,25 +2776,20 @@ ec_heal_data_find_direction (ec_t *ec, default_args_cbk_t *replies, continue; if (replies[i].op_ret < 0) continue; - ret = dict_get_ptr_and_len (replies[i].xattr, EC_XATTR_VERSION, - &ptr, &len); + ret = ec_dict_del_array (replies[i].xattr, EC_XATTR_VERSION, + xattr, EC_VERSION_SIZE); if (ret == 0) { - value = ptr; - versions[i] = ntoh64(value[EC_DATA_TXN]); + versions[i] = xattr[EC_DATA_TXN]; } - ret = dict_get_ptr_and_len (replies[i].xattr, EC_XATTR_DIRTY, - &ptr, &len); - if (ret == 0) { - value = ptr; - dirty[i] = ntoh64(value[EC_DATA_TXN]); - } - ret = dict_get_ptr_and_len (replies[i].xattr, EC_XATTR_SIZE, - &ptr, &len); + memset (xattr, 0, sizeof (xattr)); + ret = ec_dict_del_array (replies[i].xattr, EC_XATTR_DIRTY, + xattr, EC_VERSION_SIZE); if (ret == 0) { - value = ptr; - size[i] = ntoh64(*value); + dirty[i] = xattr[EC_DATA_TXN]; } + ret = ec_dict_del_number (replies[i].xattr, EC_XATTR_SIZE, + &size[i]); /*Build a db of same version, size*/ snprintf (version_size, sizeof (version_size), "%"PRIu64"-%"PRIu64, versions[i], size[i]); @@ -2749,10 +2834,7 @@ ec_heal_data_find_direction (ec_t *ec, default_args_cbk_t *replies, healed_sinks[i] = 1; } } - if (EC_COUNT (healed_sinks, ec->nodes) == 0) { - ret = -ENOTCONN; - goto out; - } + ret = source; out: if (version_size_db) @@ -2812,8 +2894,7 @@ __ec_heal_data_prepare (call_frame_t *frame, ec_t *ec, fd_t *fd, output, frame, ec->xl, fd, NULL); EC_INTERSECT (sources, sources, output, ec->nodes); EC_INTERSECT (healed_sinks, healed_sinks, output, ec->nodes); - if ((EC_COUNT (sources, ec->nodes) < ec->fragments) || - (EC_COUNT (healed_sinks, ec->nodes) == 0)) { + if (EC_COUNT (sources, ec->nodes) < ec->fragments) { ret = -ENOTCONN; goto out; } @@ -2826,6 +2907,7 @@ __ec_heal_data_prepare (call_frame_t *frame, ec_t *ec, fd_t *fd, sources[i] = 0; healed_sinks[i] = 1; } else if (stbuf) { + source = i; *stbuf = replies[i].stat; } } @@ -2841,11 +2923,24 @@ __ec_heal_data_prepare (call_frame_t *frame, ec_t *ec, fd_t *fd, goto out; } + if (EC_COUNT(healed_sinks, ec->nodes) == 0) { + ret = -ENOTCONN; + goto out; + } ret = source; out: if (xattrs) dict_unref (xattrs); cluster_replies_wipe (replies, ec->nodes); + if (ret < 0) { + gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: heal failed %s", + uuid_utoa (fd->inode->gfid), strerror (-ret)); + } else { + gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: sources: %d, sinks: " + "%d", uuid_utoa (fd->inode->gfid), + EC_COUNT (sources, ec->nodes), + EC_COUNT (healed_sinks, ec->nodes)); + } return ret; } @@ -2910,6 +3005,9 @@ out: cluster_replies_wipe (replies, ec->nodes); if (xattrs) dict_unref (xattrs); + if (ret < 0) + gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: heal failed %s", + uuid_utoa (fd->inode->gfid), strerror (-ret)); return ret; } @@ -2928,6 +3026,8 @@ ec_manager_heal_block (ec_fop_data_t *fop, int32_t state) return EC_STATE_HEAL_DATA_COPY; case EC_STATE_HEAL_DATA_COPY: + gf_log (fop->xl->name, GF_LOG_DEBUG, "%s: read/write starting", + uuid_utoa (heal->fd->inode->gfid)); ec_heal_data_block (heal); return EC_STATE_HEAL_DATA_UNLOCK; @@ -2986,6 +3086,8 @@ ec_heal_block (call_frame_t *frame, xlator_t *this, uintptr_t target, if (fop == NULL) goto out; + fop->pre_size = fop->post_size = heal->total_size; + fop->have_size = 1; error = 0; out: @@ -3039,6 +3141,7 @@ ec_rebuild_data (call_frame_t *frame, ec_t *ec, fd_t *fd, uint64_t size, heal->data = &barrier; syncbarrier_init (heal->data); pool = ec->xl->ctx->iobuf_pool; + heal->total_size = size; heal->size = iobpool_default_pagesize (pool); heal->bad = ec_char_array_to_mask (healed_sinks, ec->nodes); heal->good = ec_char_array_to_mask (sources, ec->nodes); @@ -3047,6 +3150,12 @@ ec_rebuild_data (call_frame_t *frame, ec_t *ec, fd_t *fd, uint64_t size, for (heal->offset = 0; (heal->offset < size) && !heal->done; heal->offset += heal->size) { + gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: sources: %d, sinks: " + "%d, offset: %"PRIu64" bsize: %"PRIu64, + uuid_utoa (fd->inode->gfid), + EC_COUNT (sources, ec->nodes), + EC_COUNT (healed_sinks, ec->nodes), heal->offset, + heal->size); ret = ec_sync_heal_block (frame, ec->xl, heal); if (ret < 0) break; @@ -3055,6 +3164,9 @@ ec_rebuild_data (call_frame_t *frame, ec_t *ec, fd_t *fd, uint64_t size, fd_unref (heal->fd); LOCK_DESTROY (&heal->lock); syncbarrier_destroy (heal->data); + if (ret < 0) + gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: heal failed %s", + uuid_utoa (fd->inode->gfid), strerror (-ret)); return ret; } @@ -3089,6 +3201,9 @@ __ec_heal_trim_sinks (call_frame_t *frame, ec_t *ec, fd_t *fd, out: cluster_replies_wipe (replies, ec->nodes); + if (ret < 0) + gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: heal failed %s", + uuid_utoa (fd->inode->gfid), strerror (-ret)); return ret; } @@ -3281,15 +3396,14 @@ unlock: } int -__ec_heal_data (call_frame_t *frame, ec_t *ec, fd_t *fd, unsigned char *heal_on) +__ec_heal_data (call_frame_t *frame, ec_t *ec, fd_t *fd, unsigned char *heal_on, + unsigned char *sources, unsigned char *healed_sinks) { unsigned char *locked_on = NULL; unsigned char *output = NULL; uint64_t *versions = NULL; uint64_t *dirty = NULL; uint64_t *size = NULL; - unsigned char *sources = NULL; - unsigned char *healed_sinks = NULL; unsigned char *trim = NULL; default_args_cbk_t *replies = NULL; int ret = 0; @@ -3297,8 +3411,6 @@ __ec_heal_data (call_frame_t *frame, ec_t *ec, fd_t *fd, unsigned char *heal_on) locked_on = alloca0(ec->nodes); output = alloca0(ec->nodes); - sources = alloca0 (ec->nodes); - healed_sinks = alloca0 (ec->nodes); trim = alloca0 (ec->nodes); versions = alloca0 (ec->nodes * sizeof (*versions)); dirty = alloca0 (ec->nodes * sizeof (*dirty)); @@ -3337,6 +3449,11 @@ unlock: if (ret < 0) goto out; + gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: sources: %d, sinks: " + "%d", uuid_utoa (fd->inode->gfid), + EC_COUNT (sources, ec->nodes), + EC_COUNT (healed_sinks, ec->nodes)); + ret = ec_rebuild_data (frame, ec, fd, size[source], sources, healed_sinks); if (ret < 0) @@ -3351,13 +3468,13 @@ out: } int -ec_heal_data2 (call_frame_t *req_frame, ec_t *ec, inode_t *inode) +ec_heal_data (call_frame_t *frame, ec_t *ec, gf_boolean_t block, inode_t *inode, + unsigned char *sources, unsigned char *healed_sinks) { unsigned char *locked_on = NULL; unsigned char *up_subvols = NULL; unsigned char *output = NULL; default_args_cbk_t *replies = NULL; - call_frame_t *frame = NULL; fd_t *fd = NULL; loc_t loc = {0}; char selfheal_domain[1024] = {0}; @@ -3368,7 +3485,7 @@ ec_heal_data2 (call_frame_t *req_frame, ec_t *ec, inode_t *inode) locked_on = alloca0(ec->nodes); output = alloca0(ec->nodes); up_subvols = alloca0(ec->nodes); - loc. inode = inode_ref (inode); + loc.inode = inode_ref (inode); gf_uuid_copy (loc.gfid, inode->gfid); fd = fd_create (inode, 0); @@ -3378,14 +3495,6 @@ ec_heal_data2 (call_frame_t *req_frame, ec_t *ec, inode_t *inode) } ec_mask_to_char_array (ec->xl_up, up_subvols, ec->nodes); - frame = copy_frame (req_frame); - if (!frame) { - ret = -ENOMEM; - goto out; - } - /*Do heal as root*/ - frame->root->uid = 0; - frame->root->gid = 0; ret = cluster_open (ec->xl_list, up_subvols, ec->nodes, replies, output, frame, ec->xl, &loc, O_RDWR|O_LARGEFILE, fd, NULL); @@ -3397,9 +3506,15 @@ ec_heal_data2 (call_frame_t *req_frame, ec_t *ec, inode_t *inode) fd_bind (fd); sprintf (selfheal_domain, "%s:self-heal", ec->xl->name); /*If other processes are already doing the heal, don't block*/ - ret = cluster_tryinodelk (ec->xl_list, output, ec->nodes, replies, - locked_on, frame, ec->xl, selfheal_domain, inode, - 0, 0); + if (block) { + ret = cluster_inodelk (ec->xl_list, output, ec->nodes, replies, + locked_on, frame, ec->xl, + selfheal_domain, inode, 0, 0); + } else { + ret = cluster_tryinodelk (ec->xl_list, output, ec->nodes, + replies, locked_on, frame, ec->xl, + selfheal_domain, inode, 0, 0); + } { if (ret <= ec->fragments) { gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: Skipping heal " @@ -3408,7 +3523,8 @@ ec_heal_data2 (call_frame_t *req_frame, ec_t *ec, inode_t *inode) ret = -ENOTCONN; goto unlock; } - ret = __ec_heal_data (frame, ec, fd, locked_on); + ret = __ec_heal_data (frame, ec, fd, locked_on, sources, + healed_sinks); } unlock: cluster_uninodelk (ec->xl_list, locked_on, ec->nodes, replies, output, @@ -3418,7 +3534,162 @@ out: fd_unref (fd); loc_wipe (&loc); cluster_replies_wipe (replies, ec->nodes); - if (frame) - STACK_DESTROY (frame->root); return ret; } + +void +ec_heal_do (xlator_t *this, void *data, loc_t *loc, int32_t partial) +{ + call_frame_t *frame = NULL; + unsigned char *participants = NULL; + unsigned char *msources = NULL; + unsigned char *mhealed_sinks = NULL; + unsigned char *sources = NULL; + unsigned char *healed_sinks = NULL; + ec_t *ec = NULL; + int ret = 0; + int op_ret = 0; + int op_errno = 0; + intptr_t mgood = 0; + intptr_t mbad = 0; + intptr_t good = 0; + intptr_t bad = 0; + ec_fop_data_t *fop = data; + gf_boolean_t blocking = _gf_false; + + ec = this->private; + + /* If it is heal request from getxattr, complete the heal and then + * unwind, if it is ec_heal with NULL as frame then no need to block + * the heal as the caller doesn't care about its completion*/ + if (fop->req_frame) + blocking = _gf_true; + + frame = create_frame (this, this->ctx->pool); + if (!frame) + return; + + ec_owner_set(frame, frame->root); + /*Do heal as root*/ + frame->root->uid = 0; + frame->root->gid = 0; + participants = alloca0(ec->nodes); + ec_mask_to_char_array (ec->xl_up, participants, ec->nodes); + if (loc->name && strlen (loc->name)) { + ret = ec_heal_name (frame, ec, loc->parent, (char *)loc->name, + participants); + if (ret == 0) { + gf_log (this->name, GF_LOG_INFO, "%s: name heal " + "successful on %lX", loc->path, + ec_char_array_to_mask (participants, ec->nodes)); + } else { + gf_log (this->name, GF_LOG_INFO, "%s: name heal " + "failed on %s", loc->path, strerror (-ret)); + } + } + + msources = alloca0(ec->nodes); + mhealed_sinks = alloca0(ec->nodes); + ret = ec_heal_metadata (frame, ec, loc->inode, msources, mhealed_sinks); + if (ret == 0) { + mgood = ec_char_array_to_mask (msources, ec->nodes); + mbad = ec_char_array_to_mask (mhealed_sinks, ec->nodes); + } else { + op_ret = -1; + op_errno = -ret; + } + sources = alloca0(ec->nodes); + healed_sinks = alloca0(ec->nodes); + if (IA_ISREG (loc->inode->ia_type)) { + ret = ec_heal_data (frame, ec, blocking, loc->inode, sources, + healed_sinks); + } else if (IA_ISDIR (loc->inode->ia_type) && !partial) { + ret = ec_heal_entry (frame, ec, loc->inode, sources, + healed_sinks); + } else { + ret = 0; + memcpy (sources, participants, ec->nodes); + memcpy (healed_sinks, participants, ec->nodes); + } + + if (ret == 0) { + good = ec_char_array_to_mask (sources, ec->nodes); + bad = ec_char_array_to_mask (healed_sinks, ec->nodes); + } else { + op_ret = -1; + op_errno = -ret; + } + + + if (fop->cbks.heal) { + fop->cbks.heal (fop->req_frame, fop, fop->xl, op_ret, + op_errno, ec_char_array_to_mask (participants, + ec->nodes), + mgood & good, mbad & bad, NULL); + } + STACK_DESTROY (frame->root); + return; +} + +int +ec_synctask_heal_wrap (void *opaque) +{ + ec_fop_data_t *fop = opaque; + ec_heal_do (fop->xl, fop, &fop->loc[0], fop->int32); + return 0; +} + +int +ec_heal_done (int ret, call_frame_t *heal, void *opaque) +{ + if (opaque) + ec_fop_data_release (opaque); + return 0; +} + +void +ec_heal (call_frame_t *frame, xlator_t *this, uintptr_t target, + int32_t minimum, fop_heal_cbk_t func, void *data, loc_t *loc, + int32_t partial, dict_t *xdata) +{ + ec_cbk_t callback = { .heal = func }; + ec_fop_data_t *fop = NULL; + int ret = 0; + + gf_log("ec", GF_LOG_TRACE, "EC(HEAL) %p", frame); + + VALIDATE_OR_GOTO(this, fail); + GF_VALIDATE_OR_GOTO(this->name, this->private, fail); + + if (!loc || !loc->inode || gf_uuid_is_null (loc->inode->gfid)) + goto fail; + + if (frame && frame->local) + goto fail; + fop = ec_fop_data_allocate (frame, this, EC_FOP_HEAL, + EC_FLAG_UPDATE_LOC_INODE, target, minimum, + ec_wind_heal, ec_manager_heal, callback, data); + if (fop == NULL) + goto fail; + + fop->int32 = partial; + + if (loc) { + if (loc_copy(&fop->loc[0], loc) != 0) + goto fail; + } + + if (xdata) + fop->xdata = dict_ref(xdata); + + ret = synctask_new (this->ctx->env, ec_synctask_heal_wrap, + ec_heal_done, NULL, fop); + if (ret < 0) + goto fail; + return; +fail: + if (fop) + ec_fop_data_release (fop); + if (func) + func (frame, NULL, this, -1, EIO, 0, 0, 0, NULL); +} |