summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/ec/src/ec-heal.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/ec/src/ec-heal.c')
-rw-r--r--xlators/cluster/ec/src/ec-heal.c607
1 files changed, 439 insertions, 168 deletions
diff --git a/xlators/cluster/ec/src/ec-heal.c b/xlators/cluster/ec/src/ec-heal.c
index b7b910502f8..315de8765ad 100644
--- a/xlators/cluster/ec/src/ec-heal.c
+++ b/xlators/cluster/ec/src/ec-heal.c
@@ -76,6 +76,11 @@ out:
return _gf_false;
}
+static gf_boolean_t
+ec_sh_key_match (dict_t *dict, char *key, data_t *val, void *mdata)
+{
+ return !ec_ignorable_key_match (dict, key, val, mdata);
+}
/* FOP: heal */
void ec_heal_exclude(ec_heal_t * heal, uintptr_t mask)
@@ -1058,8 +1063,15 @@ ec_heal_writev_cbk (call_frame_t *frame, void *cookie,
struct iatt *prebuf, struct iatt *postbuf,
dict_t *xdata)
{
+ ec_fop_data_t *fop = cookie;
+ ec_heal_t *heal = fop->data;
+
ec_trace("WRITE_CBK", cookie, "ret=%d, errno=%d", op_ret, op_errno);
+ gf_log (fop->xl->name, GF_LOG_DEBUG, "%s: write op_ret %d, op_errno %s"
+ " at %"PRIu64, uuid_utoa (heal->fd->inode->gfid), op_ret,
+ strerror (op_errno), heal->offset);
+
ec_heal_update(cookie, 0);
return 0;
@@ -1080,12 +1092,19 @@ int32_t ec_heal_readv_cbk(call_frame_t * frame, void * cookie, xlator_t * this,
if (op_ret > 0)
{
+ gf_log (fop->xl->name, GF_LOG_DEBUG, "%s: read succeeded, proceeding "
+ "to write at %"PRIu64, uuid_utoa (heal->fd->inode->gfid),
+ heal->offset);
ec_writev(heal->fop->frame, heal->xl, heal->bad, EC_MINIMUM_ONE,
ec_heal_writev_cbk, heal, heal->fd, vector, count,
heal->offset, 0, iobref, NULL);
}
else
{
+ gf_log (fop->xl->name, GF_LOG_DEBUG, "%s: read failed %s, failing "
+ "to heal block at %"PRIu64,
+ uuid_utoa (heal->fd->inode->gfid), strerror (op_errno),
+ heal->offset);
heal->done = 1;
}
@@ -1529,8 +1548,8 @@ ec_manager_heal (ec_fop_data_t * fop, int32_t state)
}
}
-void ec_heal(call_frame_t * frame, xlator_t * this, uintptr_t target,
- int32_t minimum, fop_heal_cbk_t func, void * data, loc_t * loc,
+void ec_heal2(call_frame_t *frame, xlator_t *this, uintptr_t target,
+ int32_t minimum, fop_heal_cbk_t func, void *data, loc_t *loc,
int32_t partial, dict_t *xdata)
{
ec_cbk_t callback = { .heal = func };
@@ -1647,19 +1666,15 @@ ec_char_array_to_mask (unsigned char *array, int numsubvols)
}
int
-ec_heal_find_direction (ec_t *ec, ec_txn_t type, default_args_cbk_t *replies,
+ec_heal_entry_find_direction (ec_t *ec, default_args_cbk_t *replies,
uint64_t *versions, uint64_t *dirty,
unsigned char *sources, unsigned char *healed_sinks)
{
- void *ptr = NULL;
- uint64_t *value = NULL;
+ uint64_t xattr[EC_VERSION_SIZE] = {0};
int source = -1;
uint64_t max_version = 0;
- int32_t len = 0;
int ret = 0;
int i = 0;
- struct iatt source_ia = {0};
- struct iatt child_ia = {0};
for (i = 0; i < ec->nodes; i++) {
if (!replies[i].valid)
@@ -1671,22 +1686,21 @@ ec_heal_find_direction (ec_t *ec, ec_txn_t type, default_args_cbk_t *replies,
if (source == -1)
source = i;
- ret = dict_get_ptr_and_len (replies[i].xdata, EC_XATTR_VERSION,
- &ptr, &len);
+ ret = ec_dict_del_array (replies[i].xdata, EC_XATTR_VERSION,
+ xattr, EC_VERSION_SIZE);
if (ret == 0) {
- value = ptr;
- versions[i] = ntoh64(value[type]);
+ versions[i] = xattr[EC_DATA_TXN];
if (max_version < versions[i]) {
max_version = versions[i];
source = i;
}
}
- ret = dict_get_ptr_and_len (replies[i].xdata, EC_XATTR_DIRTY,
- &ptr, &len);
+ memset (xattr, 0, sizeof(xattr));
+ ret = ec_dict_del_array (replies[i].xdata, EC_XATTR_DIRTY,
+ xattr, EC_VERSION_SIZE);
if (ret == 0) {
- value = ptr;
- dirty[i] = ntoh64(value[type]);
+ dirty[i] = xattr[EC_DATA_TXN];
}
}
@@ -1706,29 +1720,13 @@ ec_heal_find_direction (ec_t *ec, ec_txn_t type, default_args_cbk_t *replies,
healed_sinks[i] = 1;
}
- if (type == EC_METADATA_TXN) {
- source_ia = replies[source].stat;
- for (i = 0; i < ec->nodes; i++) {
- if (!sources[i])
- continue;
- child_ia = replies[i].stat;
- if (!IA_EQUAL(source_ia, child_ia, gfid) ||
- !IA_EQUAL(source_ia, child_ia, type) ||
- !IA_EQUAL(source_ia, child_ia, prot) ||
- !IA_EQUAL(source_ia, child_ia, uid) ||
- !IA_EQUAL(source_ia, child_ia, gid)) {
- sources[i] = 0;
- healed_sinks[i] = 1;
- }
- }
- }
out:
return source;
}
int
-ec_adjust_versions (call_frame_t *frame, ec_t *ec, ec_txn_t type, inode_t *inode, int source,
- unsigned char *sources,
+ec_adjust_versions (call_frame_t *frame, ec_t *ec, ec_txn_t type,
+ inode_t *inode, int source, unsigned char *sources,
unsigned char *healed_sinks, uint64_t *versions,
uint64_t *dirty)
{
@@ -1798,39 +1796,127 @@ out:
}
int
-__ec_heal_prepare (call_frame_t *frame, ec_t *ec, inode_t *inode,
- unsigned char *locked_on, default_args_cbk_t *replies,
- uint64_t *versions, uint64_t *dirty, unsigned char *sources,
- unsigned char *healed_sinks, ec_txn_t type)
-{
- loc_t loc = {0};
- unsigned char *output = NULL;
- dict_t *xdata = NULL;
- int ret = 0;
- int source = 0;
+ec_heal_metadata_find_direction (ec_t *ec, default_args_cbk_t *replies,
+ uint64_t *versions, uint64_t *dirty,
+ unsigned char *sources, unsigned char *healed_sinks)
+{
+ uint64_t xattr[EC_VERSION_SIZE] = {0};
+ int same_count = 0;
+ int max_same_count = 0;
+ int same_source = -1;
+ int ret = 0;
+ int i = 0;
+ int j = 0;
+ int *groups = NULL;
+ struct iatt source_ia = {0};
+ struct iatt child_ia = {0};
- xdata = dict_new ();
- if (!xdata) {
- ret = -ENOMEM;
- goto out;
+ groups = alloca0 (ec->nodes * sizeof(*groups));
+ for (i = 0; i < ec->nodes; i++)
+ groups[i] = -1;
+
+ for (i = 0; i < ec->nodes; i++) {
+ if (!replies[i].valid)
+ continue;
+ ret = ec_dict_del_array (replies[i].xdata, EC_XATTR_VERSION,
+ xattr, EC_VERSION_SIZE);
+ if (ret == 0) {
+ versions[i] = xattr[EC_METADATA_TXN];
+ }
+
+ memset (xattr, 0, sizeof (xattr));
+ ret = ec_dict_del_array (replies[i].xdata, EC_XATTR_DIRTY,
+ xattr, EC_VERSION_SIZE);
+ if (ret == 0) {
+ dirty[i] = xattr[EC_METADATA_TXN];
+ }
+ if (groups[i] >= 0) /*Already part of group*/
+ continue;
+ groups[i] = i;
+ same_count = 1;
+ source_ia = replies[i].stat;
+ for (j = i + 1; j < ec->nodes; j++) {
+ child_ia = replies[j].stat;
+ if (!IA_EQUAL(source_ia, child_ia, gfid) ||
+ !IA_EQUAL(source_ia, child_ia, type) ||
+ !IA_EQUAL(source_ia, child_ia, prot) ||
+ !IA_EQUAL(source_ia, child_ia, uid) ||
+ !IA_EQUAL(source_ia, child_ia, gid))
+ continue;
+ if (!are_dicts_equal(replies[i].xdata, replies[j].xdata,
+ ec_sh_key_match, NULL))
+ continue;
+ groups[j] = i; /*If iatts match put them into a group*/
+ same_count++;
+ }
+
+ if (max_same_count < same_count) {
+ max_same_count = same_count;
+ same_source = i;
+ }
}
- if (dict_set_uint64(xdata, "list-xattr", 0)) {
- ret = -ENOMEM;
+ if (max_same_count < ec->fragments) {
+ ret = -EIO;
goto out;
}
+ for (i = 0; i < ec->nodes; i++) {
+ if (groups[i] == groups[same_source])
+ sources[i] = 1;
+ else if (replies[i].valid)
+ healed_sinks[i] = 1;
+ }
+ ret = same_source;
+out:
+ return ret;
+}
+
+int
+__ec_heal_metadata_prepare (call_frame_t *frame, ec_t *ec, inode_t *inode,
+ unsigned char *locked_on, default_args_cbk_t *replies,
+ uint64_t *versions, uint64_t *dirty, unsigned char *sources,
+ unsigned char *healed_sinks)
+{
+ loc_t loc = {0};
+ unsigned char *output = NULL;
+ unsigned char *lookup_on = NULL;
+ int ret = 0;
+ int source = 0;
+ default_args_cbk_t *greplies = NULL;
+ int i = 0;
+
+ EC_REPLIES_ALLOC (greplies, ec->nodes);
+
loc.inode = inode_ref (inode);
gf_uuid_copy (loc.gfid, inode->gfid);
output = alloca0 (ec->nodes);
+ lookup_on = alloca0 (ec->nodes);
ret = cluster_lookup (ec->xl_list, locked_on, ec->nodes, replies,
- output, frame, ec->xl, &loc, xdata);
+ output, frame, ec->xl, &loc, NULL);
if (ret <= ec->fragments) {
ret = -ENOTCONN;
goto out;
}
- source = ec_heal_find_direction (ec, type, replies, versions,
+ memcpy (lookup_on, output, ec->nodes);
+ /*Use getxattr to get the filtered xattrs which filter internal xattrs*/
+ ret = cluster_getxattr (ec->xl_list, lookup_on, ec->nodes, greplies,
+ output, frame, ec->xl, &loc, NULL, NULL);
+ for (i = 0; i < ec->nodes; i++) {
+ if (lookup_on[i] && !output[i]) {
+ replies[i].valid = 0;
+ continue;
+ }
+ if (replies[i].xdata) {
+ dict_unref (replies[i].xdata);
+ replies[i].xdata = NULL;
+ if (greplies[i].xattr)
+ replies[i].xdata = dict_ref (greplies[i].xattr);
+ }
+ }
+
+ source = ec_heal_metadata_find_direction (ec, replies, versions,
dirty, sources, healed_sinks);
if (source < 0) {
ret = -EIO;
@@ -1838,9 +1924,7 @@ __ec_heal_prepare (call_frame_t *frame, ec_t *ec, inode_t *inode,
}
ret = source;
out:
- if (xdata)
- dict_unref (xdata);
-
+ cluster_replies_wipe (greplies, ec->nodes);
loc_wipe (&loc);
return ret;
}
@@ -1864,14 +1948,14 @@ __ec_removexattr_sinks (call_frame_t *frame, ec_t *ec, inode_t *inode,
continue;
if (!sources[i] && !healed_sinks[i])
continue;
- ret = dict_foreach (replies[i].xattr, ec_heal_xattr_clean,
- replies[source].xattr);
+ ret = dict_foreach (replies[i].xdata, ec_heal_xattr_clean,
+ replies[source].xdata);
if (ret < 0) {
sources[i] = 0;
healed_sinks[i] = 0;
}
- if (replies[i].xattr->count == 0) {
+ if (replies[i].xdata->count == 0) {
continue;
} else if (sources[i]) {
/* This can happen if setxattr/removexattr succeeds on
@@ -1883,7 +1967,7 @@ __ec_removexattr_sinks (call_frame_t *frame, ec_t *ec, inode_t *inode,
}
ret = syncop_removexattr (ec->xl_list[i], &loc, "",
- replies[i].xattr, NULL);
+ replies[i].xdata, NULL);
if (ret < 0)
healed_sinks[i] = 0;
}
@@ -1896,39 +1980,46 @@ __ec_removexattr_sinks (call_frame_t *frame, ec_t *ec, inode_t *inode,
int
__ec_heal_metadata (call_frame_t *frame, ec_t *ec, inode_t *inode,
- unsigned char *locked_on)
+ unsigned char *locked_on, unsigned char *sources,
+ unsigned char *healed_sinks)
{
loc_t loc = {0};
int ret = 0;
int source = 0;
default_args_cbk_t *replies = NULL;
+ default_args_cbk_t *sreplies = NULL;
uint64_t *versions = NULL;
uint64_t *dirty = NULL;
- unsigned char *sources = NULL;
- unsigned char *healed_sinks = NULL;
unsigned char *output = NULL;
dict_t *source_dict = NULL;
struct iatt source_buf = {0};
EC_REPLIES_ALLOC (replies, ec->nodes);
+ EC_REPLIES_ALLOC (sreplies, ec->nodes);
loc.inode = inode_ref (inode);
gf_uuid_copy (loc.gfid, inode->gfid);
output = alloca0 (ec->nodes);
versions = alloca0 (ec->nodes * sizeof (*versions));
dirty = alloca0 (ec->nodes * sizeof (*dirty));
- sources = alloca0 (ec->nodes);
- healed_sinks = alloca0 (ec->nodes);
- source = __ec_heal_prepare (frame, ec, inode, locked_on, replies,
- versions, dirty, sources, healed_sinks,
- EC_METADATA_TXN);
+ source = __ec_heal_metadata_prepare (frame, ec, inode, locked_on, replies,
+ versions, dirty, sources, healed_sinks);
if (source < 0) {
ret = -EIO;
goto out;
}
+ if (EC_COUNT (sources, ec->nodes) == ec->nodes) {
+ ret = 0;
+ goto erase_dirty;
+ }
+
+ if (EC_COUNT (healed_sinks, ec->nodes) == 0) {
+ ret = -ENOTCONN;
+ goto out;
+ }
source_buf = replies[source].stat;
- ret = cluster_setattr (ec->xl_list, healed_sinks, ec->nodes, replies,
+ ret = cluster_setattr (ec->xl_list, healed_sinks, ec->nodes, sreplies,
output, frame, ec->xl, &loc,
&source_buf, GF_SET_ATTR_MODE |
GF_SET_ATTR_UID | GF_SET_ATTR_GID, NULL);
@@ -1939,22 +2030,12 @@ __ec_heal_metadata (call_frame_t *frame, ec_t *ec, inode_t *inode,
goto out;
}
- ret = cluster_getxattr (ec->xl_list, locked_on, ec->nodes, replies,
- output, frame, ec->xl, &loc, NULL, NULL);
- EC_INTERSECT (sources, sources, output, ec->nodes);
- EC_INTERSECT (healed_sinks, healed_sinks, output, ec->nodes);
- EC_ADJUST_SOURCE (source, sources, ec->nodes);
- if ((EC_COUNT (healed_sinks, ec->nodes) == 0) || (source < 0)) {
- ret = -ENOTCONN;
- goto out;
- }
-
ret = __ec_removexattr_sinks (frame, ec, inode, source, sources,
healed_sinks, replies);
if (ret < 0)
goto out;
- source_dict = dict_ref (replies[source].xattr);
+ source_dict = dict_ref (replies[source].xdata);
if (dict_foreach_match (source_dict, ec_ignorable_key_match, NULL,
dict_remove_foreach_fn, NULL) == -1) {
ret = -ENOMEM;
@@ -1971,6 +2052,7 @@ __ec_heal_metadata (call_frame_t *frame, ec_t *ec, inode_t *inode,
goto out;
}
+erase_dirty:
ret = ec_adjust_versions (frame, ec, EC_METADATA_TXN, inode, source,
sources, healed_sinks, versions, dirty);
out:
@@ -1979,29 +2061,21 @@ out:
loc_wipe (&loc);
cluster_replies_wipe (replies, ec->nodes);
+ cluster_replies_wipe (sreplies, ec->nodes);
return ret;
}
int
-ec_heal_metadata (call_frame_t *req_frame, ec_t *ec, inode_t *inode)
+ec_heal_metadata (call_frame_t *frame, ec_t *ec, inode_t *inode,
+ unsigned char *sources, unsigned char *healed_sinks)
{
unsigned char *locked_on = NULL;
unsigned char *up_subvols = NULL;
unsigned char *output = NULL;
int ret = 0;
default_args_cbk_t *replies = NULL;
- call_frame_t *frame = NULL;
EC_REPLIES_ALLOC (replies, ec->nodes);
- frame = copy_frame (req_frame);
- if (!frame) {
- ret = -ENOMEM;
- goto out;
- }
-
- /*Do heal as root*/
- frame->root->uid = 0;
- frame->root->gid = 0;
locked_on = alloca0(ec->nodes);
output = alloca0(ec->nodes);
up_subvols = alloca0(ec->nodes);
@@ -2017,15 +2091,13 @@ ec_heal_metadata (call_frame_t *req_frame, ec_t *ec, inode_t *inode)
ret = -ENOTCONN;
goto unlock;
}
- ret = __ec_heal_metadata (frame, ec, inode, locked_on);
+ ret = __ec_heal_metadata (frame, ec, inode, locked_on, sources,
+ healed_sinks);
}
unlock:
cluster_uninodelk (ec->xl_list, locked_on, ec->nodes, replies, output,
frame, ec->xl, ec->xl->name, inode, 0, 0);
-out:
cluster_replies_wipe (replies, ec->nodes);
- if (frame)
- STACK_DESTROY (frame->root);
return ret;
}
@@ -2036,24 +2108,47 @@ __ec_heal_entry_prepare (call_frame_t *frame, ec_t *ec, inode_t *inode,
uint64_t *dirty, unsigned char *sources,
unsigned char *healed_sinks)
{
- int source = 0;
- default_args_cbk_t *replies = NULL;
loc_t loc = {0};
+ int source = 0;
int ret = 0;
+ default_args_cbk_t *replies = NULL;
+ unsigned char *output = NULL;
+ dict_t *xdata = NULL;
EC_REPLIES_ALLOC (replies, ec->nodes);
loc.inode = inode_ref (inode);
gf_uuid_copy (loc.gfid, inode->gfid);
- source = __ec_heal_prepare (frame, ec, inode, locked_on, replies,
- versions, dirty, sources, healed_sinks,
- EC_DATA_TXN);
+ xdata = dict_new ();
+ if (!xdata) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ if (dict_set_uint64(xdata, EC_XATTR_VERSION, 0) ||
+ dict_set_uint64(xdata, EC_XATTR_DIRTY, 0)) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ output = alloca0 (ec->nodes);
+ ret = cluster_lookup (ec->xl_list, locked_on, ec->nodes, replies,
+ output, frame, ec->xl, &loc, xdata);
+ if (ret <= ec->fragments) {
+ ret = -ENOTCONN;
+ goto out;
+ }
+
+ source = ec_heal_entry_find_direction (ec, replies, versions,
+ dirty, sources, healed_sinks);
if (source < 0) {
ret = -EIO;
goto out;
}
ret = source;
out:
+ if (xdata)
+ dict_unref (xdata);
loc_wipe (&loc);
cluster_replies_wipe (replies, ec->nodes);
return ret;
@@ -2156,6 +2251,11 @@ ec_delete_stale_name (dict_t *gfid_db, char *key, data_t *d, void *data)
/*This will help in making decisions about creating names*/
dict_del (gfid_db, key);
out:
+ if (ret < 0) {
+ gf_log (ec->xl->name, GF_LOG_DEBUG, "%s/%s: heal failed %s",
+ uuid_utoa (name_data->parent->gfid), name_data->name,
+ strerror (-ret));
+ }
cluster_replies_wipe (replies, ec->nodes);
loc_wipe (&loc);
return ret;
@@ -2320,9 +2420,12 @@ ec_create_name (call_frame_t *frame, ec_t *ec, inode_t *parent, char *name,
ret = 0;
out:
+ if (ret < 0)
+ gf_log (ec->xl->name, GF_LOG_DEBUG, "%s/%s: heal failed %s",
+ uuid_utoa (parent->gfid), name, strerror (-ret));
+ cluster_replies_wipe (replies, ec->nodes);
loc_wipe (&loc);
loc_wipe (&srcloc);
- EC_REPLIES_ALLOC (replies, ec->nodes);
if (xdata)
dict_unref (xdata);
return ret;
@@ -2345,6 +2448,7 @@ __ec_heal_name (call_frame_t *frame, ec_t *ec, inode_t *parent, char *name,
unsigned char *same = NULL;
unsigned char *gfidless = NULL;
+ EC_REPLIES_ALLOC (replies, ec->nodes);
loc.parent = inode_ref (parent);
loc.inode = inode_new (parent->table);
gf_uuid_copy (loc.pargfid, parent->gfid);
@@ -2365,7 +2469,6 @@ __ec_heal_name (call_frame_t *frame, ec_t *ec, inode_t *parent, char *name,
output = alloca0 (ec->nodes);
gfidless = alloca0 (ec->nodes);
enoent = alloca0 (ec->nodes);
- EC_REPLIES_ALLOC (replies, ec->nodes);
ret = cluster_lookup (ec->xl_list, participants, ec->nodes, replies,
output, frame, ec->xl, &loc, NULL);
for (i = 0; i < ec->nodes; i++) {
@@ -2464,9 +2567,10 @@ ec_heal_name (call_frame_t *frame, ec_t *ec, inode_t *parent, char *name,
NULL);
{
if (ret <= ec->fragments) {
- gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: Skipping heal "
- "as only %d number of subvolumes could "
- "be locked", uuid_utoa (parent->gfid), ret);
+ gf_log (ec->xl->name, GF_LOG_DEBUG, "%s/%s: Skipping "
+ "heal as only %d number of subvolumes could "
+ "be locked", uuid_utoa (parent->gfid), name,
+ ret);
ret = -ENOTCONN;
goto unlock;
}
@@ -2534,19 +2638,19 @@ ec_heal_names (call_frame_t *frame, ec_t *ec, inode_t *inode,
if (EC_COUNT (participants, ec->nodes) <= ec->fragments)
return -ENOTCONN;
}
+ loc_wipe (&loc);
return 0;
}
int
__ec_heal_entry (call_frame_t *frame, ec_t *ec, inode_t *inode,
- unsigned char *heal_on)
+ unsigned char *heal_on, unsigned char *sources,
+ unsigned char *healed_sinks)
{
unsigned char *locked_on = NULL;
unsigned char *output = NULL;
uint64_t *versions = NULL;
uint64_t *dirty = NULL;
- unsigned char *sources = NULL;
- unsigned char *healed_sinks = NULL;
unsigned char *participants = NULL;
default_args_cbk_t *replies = NULL;
int ret = 0;
@@ -2557,8 +2661,6 @@ __ec_heal_entry (call_frame_t *frame, ec_t *ec, inode_t *inode,
output = alloca0(ec->nodes);
versions = alloca0 (ec->nodes * sizeof (*versions));
dirty = alloca0 (ec->nodes * sizeof (*dirty));
- sources = alloca0 (ec->nodes);
- healed_sinks = alloca0 (ec->nodes);
EC_REPLIES_ALLOC (replies, ec->nodes);
ret = cluster_entrylk (ec->xl_list, heal_on, ec->nodes, replies,
@@ -2608,7 +2710,8 @@ out:
}
int
-ec_heal_entry (call_frame_t *req_frame, ec_t *ec, inode_t *inode)
+ec_heal_entry (call_frame_t *frame, ec_t *ec, inode_t *inode,
+ unsigned char *sources, unsigned char *healed_sinks)
{
unsigned char *locked_on = NULL;
unsigned char *up_subvols = NULL;
@@ -2616,21 +2719,12 @@ ec_heal_entry (call_frame_t *req_frame, ec_t *ec, inode_t *inode)
char selfheal_domain[1024] = {0};
int ret = 0;
default_args_cbk_t *replies = NULL;
- call_frame_t *frame = NULL;
EC_REPLIES_ALLOC (replies, ec->nodes);
locked_on = alloca0(ec->nodes);
output = alloca0(ec->nodes);
up_subvols = alloca0(ec->nodes);
- frame = copy_frame (req_frame);
- if (!frame) {
- ret = -ENOMEM;
- goto out;
- }
- /*Do heal as root*/
- frame->root->uid = 0;
- frame->root->gid = 0;
sprintf (selfheal_domain, "%s:self-heal", ec->xl->name);
ec_mask_to_char_array (ec->xl_up, up_subvols, ec->nodes);
/*If other processes are already doing the heal, don't block*/
@@ -2645,15 +2739,13 @@ ec_heal_entry (call_frame_t *req_frame, ec_t *ec, inode_t *inode)
ret = -ENOTCONN;
goto unlock;
}
- ret = __ec_heal_entry (frame, ec, inode, locked_on);
+ ret = __ec_heal_entry (frame, ec, inode, locked_on,
+ sources, healed_sinks);
}
unlock:
cluster_unentrylk (ec->xl_list, locked_on, ec->nodes, replies, output,
frame, ec->xl, selfheal_domain, inode, NULL);
-out:
cluster_replies_wipe (replies, ec->nodes);
- if (frame)
- STACK_DESTROY (frame->root);
return ret;
}
@@ -2664,12 +2756,10 @@ ec_heal_data_find_direction (ec_t *ec, default_args_cbk_t *replies,
uint64_t *size, unsigned char *sources,
unsigned char *healed_sinks)
{
+ uint64_t xattr[EC_VERSION_SIZE] = {0};
char version_size[64] = {0};
- uint64_t *value = NULL;
dict_t *version_size_db = NULL;
unsigned char *same = NULL;
- void *ptr = NULL;
- int len = 0;
int max_same_count = 0;
int source = 0;
int i = 0;
@@ -2686,25 +2776,20 @@ ec_heal_data_find_direction (ec_t *ec, default_args_cbk_t *replies,
continue;
if (replies[i].op_ret < 0)
continue;
- ret = dict_get_ptr_and_len (replies[i].xattr, EC_XATTR_VERSION,
- &ptr, &len);
+ ret = ec_dict_del_array (replies[i].xattr, EC_XATTR_VERSION,
+ xattr, EC_VERSION_SIZE);
if (ret == 0) {
- value = ptr;
- versions[i] = ntoh64(value[EC_DATA_TXN]);
+ versions[i] = xattr[EC_DATA_TXN];
}
- ret = dict_get_ptr_and_len (replies[i].xattr, EC_XATTR_DIRTY,
- &ptr, &len);
- if (ret == 0) {
- value = ptr;
- dirty[i] = ntoh64(value[EC_DATA_TXN]);
- }
- ret = dict_get_ptr_and_len (replies[i].xattr, EC_XATTR_SIZE,
- &ptr, &len);
+ memset (xattr, 0, sizeof (xattr));
+ ret = ec_dict_del_array (replies[i].xattr, EC_XATTR_DIRTY,
+ xattr, EC_VERSION_SIZE);
if (ret == 0) {
- value = ptr;
- size[i] = ntoh64(*value);
+ dirty[i] = xattr[EC_DATA_TXN];
}
+ ret = ec_dict_del_number (replies[i].xattr, EC_XATTR_SIZE,
+ &size[i]);
/*Build a db of same version, size*/
snprintf (version_size, sizeof (version_size),
"%"PRIu64"-%"PRIu64, versions[i], size[i]);
@@ -2749,10 +2834,7 @@ ec_heal_data_find_direction (ec_t *ec, default_args_cbk_t *replies,
healed_sinks[i] = 1;
}
}
- if (EC_COUNT (healed_sinks, ec->nodes) == 0) {
- ret = -ENOTCONN;
- goto out;
- }
+
ret = source;
out:
if (version_size_db)
@@ -2812,8 +2894,7 @@ __ec_heal_data_prepare (call_frame_t *frame, ec_t *ec, fd_t *fd,
output, frame, ec->xl, fd, NULL);
EC_INTERSECT (sources, sources, output, ec->nodes);
EC_INTERSECT (healed_sinks, healed_sinks, output, ec->nodes);
- if ((EC_COUNT (sources, ec->nodes) < ec->fragments) ||
- (EC_COUNT (healed_sinks, ec->nodes) == 0)) {
+ if (EC_COUNT (sources, ec->nodes) < ec->fragments) {
ret = -ENOTCONN;
goto out;
}
@@ -2826,6 +2907,7 @@ __ec_heal_data_prepare (call_frame_t *frame, ec_t *ec, fd_t *fd,
sources[i] = 0;
healed_sinks[i] = 1;
} else if (stbuf) {
+ source = i;
*stbuf = replies[i].stat;
}
}
@@ -2841,11 +2923,24 @@ __ec_heal_data_prepare (call_frame_t *frame, ec_t *ec, fd_t *fd,
goto out;
}
+ if (EC_COUNT(healed_sinks, ec->nodes) == 0) {
+ ret = -ENOTCONN;
+ goto out;
+ }
ret = source;
out:
if (xattrs)
dict_unref (xattrs);
cluster_replies_wipe (replies, ec->nodes);
+ if (ret < 0) {
+ gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: heal failed %s",
+ uuid_utoa (fd->inode->gfid), strerror (-ret));
+ } else {
+ gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: sources: %d, sinks: "
+ "%d", uuid_utoa (fd->inode->gfid),
+ EC_COUNT (sources, ec->nodes),
+ EC_COUNT (healed_sinks, ec->nodes));
+ }
return ret;
}
@@ -2910,6 +3005,9 @@ out:
cluster_replies_wipe (replies, ec->nodes);
if (xattrs)
dict_unref (xattrs);
+ if (ret < 0)
+ gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: heal failed %s",
+ uuid_utoa (fd->inode->gfid), strerror (-ret));
return ret;
}
@@ -2928,6 +3026,8 @@ ec_manager_heal_block (ec_fop_data_t *fop, int32_t state)
return EC_STATE_HEAL_DATA_COPY;
case EC_STATE_HEAL_DATA_COPY:
+ gf_log (fop->xl->name, GF_LOG_DEBUG, "%s: read/write starting",
+ uuid_utoa (heal->fd->inode->gfid));
ec_heal_data_block (heal);
return EC_STATE_HEAL_DATA_UNLOCK;
@@ -2986,6 +3086,8 @@ ec_heal_block (call_frame_t *frame, xlator_t *this, uintptr_t target,
if (fop == NULL)
goto out;
+ fop->pre_size = fop->post_size = heal->total_size;
+ fop->have_size = 1;
error = 0;
out:
@@ -3039,6 +3141,7 @@ ec_rebuild_data (call_frame_t *frame, ec_t *ec, fd_t *fd, uint64_t size,
heal->data = &barrier;
syncbarrier_init (heal->data);
pool = ec->xl->ctx->iobuf_pool;
+ heal->total_size = size;
heal->size = iobpool_default_pagesize (pool);
heal->bad = ec_char_array_to_mask (healed_sinks, ec->nodes);
heal->good = ec_char_array_to_mask (sources, ec->nodes);
@@ -3047,6 +3150,12 @@ ec_rebuild_data (call_frame_t *frame, ec_t *ec, fd_t *fd, uint64_t size,
for (heal->offset = 0; (heal->offset < size) && !heal->done;
heal->offset += heal->size) {
+ gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: sources: %d, sinks: "
+ "%d, offset: %"PRIu64" bsize: %"PRIu64,
+ uuid_utoa (fd->inode->gfid),
+ EC_COUNT (sources, ec->nodes),
+ EC_COUNT (healed_sinks, ec->nodes), heal->offset,
+ heal->size);
ret = ec_sync_heal_block (frame, ec->xl, heal);
if (ret < 0)
break;
@@ -3055,6 +3164,9 @@ ec_rebuild_data (call_frame_t *frame, ec_t *ec, fd_t *fd, uint64_t size,
fd_unref (heal->fd);
LOCK_DESTROY (&heal->lock);
syncbarrier_destroy (heal->data);
+ if (ret < 0)
+ gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: heal failed %s",
+ uuid_utoa (fd->inode->gfid), strerror (-ret));
return ret;
}
@@ -3089,6 +3201,9 @@ __ec_heal_trim_sinks (call_frame_t *frame, ec_t *ec, fd_t *fd,
out:
cluster_replies_wipe (replies, ec->nodes);
+ if (ret < 0)
+ gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: heal failed %s",
+ uuid_utoa (fd->inode->gfid), strerror (-ret));
return ret;
}
@@ -3281,15 +3396,14 @@ unlock:
}
int
-__ec_heal_data (call_frame_t *frame, ec_t *ec, fd_t *fd, unsigned char *heal_on)
+__ec_heal_data (call_frame_t *frame, ec_t *ec, fd_t *fd, unsigned char *heal_on,
+ unsigned char *sources, unsigned char *healed_sinks)
{
unsigned char *locked_on = NULL;
unsigned char *output = NULL;
uint64_t *versions = NULL;
uint64_t *dirty = NULL;
uint64_t *size = NULL;
- unsigned char *sources = NULL;
- unsigned char *healed_sinks = NULL;
unsigned char *trim = NULL;
default_args_cbk_t *replies = NULL;
int ret = 0;
@@ -3297,8 +3411,6 @@ __ec_heal_data (call_frame_t *frame, ec_t *ec, fd_t *fd, unsigned char *heal_on)
locked_on = alloca0(ec->nodes);
output = alloca0(ec->nodes);
- sources = alloca0 (ec->nodes);
- healed_sinks = alloca0 (ec->nodes);
trim = alloca0 (ec->nodes);
versions = alloca0 (ec->nodes * sizeof (*versions));
dirty = alloca0 (ec->nodes * sizeof (*dirty));
@@ -3337,6 +3449,11 @@ unlock:
if (ret < 0)
goto out;
+ gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: sources: %d, sinks: "
+ "%d", uuid_utoa (fd->inode->gfid),
+ EC_COUNT (sources, ec->nodes),
+ EC_COUNT (healed_sinks, ec->nodes));
+
ret = ec_rebuild_data (frame, ec, fd, size[source], sources,
healed_sinks);
if (ret < 0)
@@ -3351,13 +3468,13 @@ out:
}
int
-ec_heal_data2 (call_frame_t *req_frame, ec_t *ec, inode_t *inode)
+ec_heal_data (call_frame_t *frame, ec_t *ec, gf_boolean_t block, inode_t *inode,
+ unsigned char *sources, unsigned char *healed_sinks)
{
unsigned char *locked_on = NULL;
unsigned char *up_subvols = NULL;
unsigned char *output = NULL;
default_args_cbk_t *replies = NULL;
- call_frame_t *frame = NULL;
fd_t *fd = NULL;
loc_t loc = {0};
char selfheal_domain[1024] = {0};
@@ -3368,7 +3485,7 @@ ec_heal_data2 (call_frame_t *req_frame, ec_t *ec, inode_t *inode)
locked_on = alloca0(ec->nodes);
output = alloca0(ec->nodes);
up_subvols = alloca0(ec->nodes);
- loc. inode = inode_ref (inode);
+ loc.inode = inode_ref (inode);
gf_uuid_copy (loc.gfid, inode->gfid);
fd = fd_create (inode, 0);
@@ -3378,14 +3495,6 @@ ec_heal_data2 (call_frame_t *req_frame, ec_t *ec, inode_t *inode)
}
ec_mask_to_char_array (ec->xl_up, up_subvols, ec->nodes);
- frame = copy_frame (req_frame);
- if (!frame) {
- ret = -ENOMEM;
- goto out;
- }
- /*Do heal as root*/
- frame->root->uid = 0;
- frame->root->gid = 0;
ret = cluster_open (ec->xl_list, up_subvols, ec->nodes, replies, output,
frame, ec->xl, &loc, O_RDWR|O_LARGEFILE, fd, NULL);
@@ -3397,9 +3506,15 @@ ec_heal_data2 (call_frame_t *req_frame, ec_t *ec, inode_t *inode)
fd_bind (fd);
sprintf (selfheal_domain, "%s:self-heal", ec->xl->name);
/*If other processes are already doing the heal, don't block*/
- ret = cluster_tryinodelk (ec->xl_list, output, ec->nodes, replies,
- locked_on, frame, ec->xl, selfheal_domain, inode,
- 0, 0);
+ if (block) {
+ ret = cluster_inodelk (ec->xl_list, output, ec->nodes, replies,
+ locked_on, frame, ec->xl,
+ selfheal_domain, inode, 0, 0);
+ } else {
+ ret = cluster_tryinodelk (ec->xl_list, output, ec->nodes,
+ replies, locked_on, frame, ec->xl,
+ selfheal_domain, inode, 0, 0);
+ }
{
if (ret <= ec->fragments) {
gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: Skipping heal "
@@ -3408,7 +3523,8 @@ ec_heal_data2 (call_frame_t *req_frame, ec_t *ec, inode_t *inode)
ret = -ENOTCONN;
goto unlock;
}
- ret = __ec_heal_data (frame, ec, fd, locked_on);
+ ret = __ec_heal_data (frame, ec, fd, locked_on, sources,
+ healed_sinks);
}
unlock:
cluster_uninodelk (ec->xl_list, locked_on, ec->nodes, replies, output,
@@ -3418,7 +3534,162 @@ out:
fd_unref (fd);
loc_wipe (&loc);
cluster_replies_wipe (replies, ec->nodes);
- if (frame)
- STACK_DESTROY (frame->root);
return ret;
}
+
+void
+ec_heal_do (xlator_t *this, void *data, loc_t *loc, int32_t partial)
+{
+ call_frame_t *frame = NULL;
+ unsigned char *participants = NULL;
+ unsigned char *msources = NULL;
+ unsigned char *mhealed_sinks = NULL;
+ unsigned char *sources = NULL;
+ unsigned char *healed_sinks = NULL;
+ ec_t *ec = NULL;
+ int ret = 0;
+ int op_ret = 0;
+ int op_errno = 0;
+ intptr_t mgood = 0;
+ intptr_t mbad = 0;
+ intptr_t good = 0;
+ intptr_t bad = 0;
+ ec_fop_data_t *fop = data;
+ gf_boolean_t blocking = _gf_false;
+
+ ec = this->private;
+
+ /* If it is heal request from getxattr, complete the heal and then
+ * unwind, if it is ec_heal with NULL as frame then no need to block
+ * the heal as the caller doesn't care about its completion*/
+ if (fop->req_frame)
+ blocking = _gf_true;
+
+ frame = create_frame (this, this->ctx->pool);
+ if (!frame)
+ return;
+
+ ec_owner_set(frame, frame->root);
+ /*Do heal as root*/
+ frame->root->uid = 0;
+ frame->root->gid = 0;
+ participants = alloca0(ec->nodes);
+ ec_mask_to_char_array (ec->xl_up, participants, ec->nodes);
+ if (loc->name && strlen (loc->name)) {
+ ret = ec_heal_name (frame, ec, loc->parent, (char *)loc->name,
+ participants);
+ if (ret == 0) {
+ gf_log (this->name, GF_LOG_INFO, "%s: name heal "
+ "successful on %lX", loc->path,
+ ec_char_array_to_mask (participants, ec->nodes));
+ } else {
+ gf_log (this->name, GF_LOG_INFO, "%s: name heal "
+ "failed on %s", loc->path, strerror (-ret));
+ }
+ }
+
+ msources = alloca0(ec->nodes);
+ mhealed_sinks = alloca0(ec->nodes);
+ ret = ec_heal_metadata (frame, ec, loc->inode, msources, mhealed_sinks);
+ if (ret == 0) {
+ mgood = ec_char_array_to_mask (msources, ec->nodes);
+ mbad = ec_char_array_to_mask (mhealed_sinks, ec->nodes);
+ } else {
+ op_ret = -1;
+ op_errno = -ret;
+ }
+ sources = alloca0(ec->nodes);
+ healed_sinks = alloca0(ec->nodes);
+ if (IA_ISREG (loc->inode->ia_type)) {
+ ret = ec_heal_data (frame, ec, blocking, loc->inode, sources,
+ healed_sinks);
+ } else if (IA_ISDIR (loc->inode->ia_type) && !partial) {
+ ret = ec_heal_entry (frame, ec, loc->inode, sources,
+ healed_sinks);
+ } else {
+ ret = 0;
+ memcpy (sources, participants, ec->nodes);
+ memcpy (healed_sinks, participants, ec->nodes);
+ }
+
+ if (ret == 0) {
+ good = ec_char_array_to_mask (sources, ec->nodes);
+ bad = ec_char_array_to_mask (healed_sinks, ec->nodes);
+ } else {
+ op_ret = -1;
+ op_errno = -ret;
+ }
+
+
+ if (fop->cbks.heal) {
+ fop->cbks.heal (fop->req_frame, fop, fop->xl, op_ret,
+ op_errno, ec_char_array_to_mask (participants,
+ ec->nodes),
+ mgood & good, mbad & bad, NULL);
+ }
+ STACK_DESTROY (frame->root);
+ return;
+}
+
+int
+ec_synctask_heal_wrap (void *opaque)
+{
+ ec_fop_data_t *fop = opaque;
+ ec_heal_do (fop->xl, fop, &fop->loc[0], fop->int32);
+ return 0;
+}
+
+int
+ec_heal_done (int ret, call_frame_t *heal, void *opaque)
+{
+ if (opaque)
+ ec_fop_data_release (opaque);
+ return 0;
+}
+
+void
+ec_heal (call_frame_t *frame, xlator_t *this, uintptr_t target,
+ int32_t minimum, fop_heal_cbk_t func, void *data, loc_t *loc,
+ int32_t partial, dict_t *xdata)
+{
+ ec_cbk_t callback = { .heal = func };
+ ec_fop_data_t *fop = NULL;
+ int ret = 0;
+
+ gf_log("ec", GF_LOG_TRACE, "EC(HEAL) %p", frame);
+
+ VALIDATE_OR_GOTO(this, fail);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, fail);
+
+ if (!loc || !loc->inode || gf_uuid_is_null (loc->inode->gfid))
+ goto fail;
+
+ if (frame && frame->local)
+ goto fail;
+ fop = ec_fop_data_allocate (frame, this, EC_FOP_HEAL,
+ EC_FLAG_UPDATE_LOC_INODE, target, minimum,
+ ec_wind_heal, ec_manager_heal, callback, data);
+ if (fop == NULL)
+ goto fail;
+
+ fop->int32 = partial;
+
+ if (loc) {
+ if (loc_copy(&fop->loc[0], loc) != 0)
+ goto fail;
+ }
+
+ if (xdata)
+ fop->xdata = dict_ref(xdata);
+
+ ret = synctask_new (this->ctx->env, ec_synctask_heal_wrap,
+ ec_heal_done, NULL, fop);
+ if (ret < 0)
+ goto fail;
+ return;
+fail:
+ if (fop)
+ ec_fop_data_release (fop);
+ if (func)
+ func (frame, NULL, this, -1, EIO, 0, 0, 0, NULL);
+}