diff options
Diffstat (limited to 'xlators/cluster/dht')
-rw-r--r-- | xlators/cluster/dht/src/dht-common.h | 3 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-helper.c | 3 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-mem-types.h | 1 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-rename.c | 61 |
4 files changed, 61 insertions, 7 deletions
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h index be9d2ce5e4e..6030af7c41d 100644 --- a/xlators/cluster/dht/src/dht-common.h +++ b/xlators/cluster/dht/src/dht-common.h @@ -303,6 +303,9 @@ struct dht_local { call_stub_t *stub; int32_t parent_disk_layout[4]; + + /* rename rollback */ + int *ret_cache ; }; typedef struct dht_local dht_local_t; diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c index ad031b6216c..81d1dffa0af 100644 --- a/xlators/cluster/dht/src/dht-helper.c +++ b/xlators/cluster/dht/src/dht-helper.c @@ -638,6 +638,9 @@ dht_local_wipe (xlator_t *this, dht_local_t *local) local->stub = NULL; } + if (local->ret_cache) + GF_FREE (local->ret_cache); + mem_put (local); } diff --git a/xlators/cluster/dht/src/dht-mem-types.h b/xlators/cluster/dht/src/dht-mem-types.h index 5de5d1838ad..3554f3f9c2d 100644 --- a/xlators/cluster/dht/src/dht-mem-types.h +++ b/xlators/cluster/dht/src/dht-mem-types.h @@ -38,6 +38,7 @@ enum gf_dht_mem_types_ { gf_tier_mt_ipc_ctr_params_t, gf_dht_mt_fd_ctx_t, gf_tier_mt_qfile_array_t, + gf_dht_ret_cache_t, gf_dht_mt_end }; #endif diff --git a/xlators/cluster/dht/src/dht-rename.c b/xlators/cluster/dht/src/dht-rename.c index 7e7e7151af7..53c61f8a714 100644 --- a/xlators/cluster/dht/src/dht-rename.c +++ b/xlators/cluster/dht/src/dht-rename.c @@ -25,17 +25,22 @@ dht_rename_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, struct iatt *prenewparent, struct iatt *postnewparent, dict_t *xdata) { - dht_local_t *local = NULL; - int this_call_cnt = 0; - xlator_t *prev = NULL; - char gfid[GF_UUID_BUF_SIZE] = {0}; + dht_conf_t *conf = NULL; + dht_local_t *local = NULL; + int this_call_cnt = 0; + xlator_t *prev = NULL; + int i = 0; + char gfid[GF_UUID_BUF_SIZE] = {0}; + int subvol_cnt = -1; + conf = this->private; local = frame->local; prev = cookie; + subvol_cnt = dht_subvol_cnt (this, prev); + local->ret_cache[subvol_cnt] = op_ret; if (op_ret == -1) { - /* TODO: undo the damage */ gf_uuid_unparse(local->loc.inode->gfid, gfid); gf_msg (this->name, GF_LOG_INFO, op_errno, @@ -63,6 +68,42 @@ dht_rename_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, unwind: this_call_cnt = dht_frame_return (frame); if (is_last_call (this_call_cnt)) { + /* We get here with local->call_cnt == 0. Which means + * we are the only one executing this code, there is + * no contention. Therefore it's safe to manipulate or + * deref local->call_cnt directly (without locking). + */ + if (local->ret_cache[conf->subvolume_cnt] == 0) { + /* count errant subvols in last field of ret_cache */ + for (i = 0; i < conf->subvolume_cnt; i++) { + if (local->ret_cache[i] != 0) + ++local->ret_cache[conf->subvolume_cnt]; + } + if (local->ret_cache[conf->subvolume_cnt]) { + /* undoing the damage: + * for all subvolumes, where rename + * succeeded, we perform the reverse operation + */ + for (i = 0; i < conf->subvolume_cnt; i++) { + if (local->ret_cache[i] == 0) + ++local->call_cnt; + } + for (i = 0; i < conf->subvolume_cnt; i++) { + if (local->ret_cache[i]) + continue; + + STACK_WIND (frame, + dht_rename_dir_cbk, + conf->subvolumes[i], + conf->subvolumes[i]->fops->rename, + &local->loc2, &local->loc, + NULL); + } + + return 0; + } + } + WIPE (&local->preoldparent); WIPE (&local->postoldparent); WIPE (&local->preparent); @@ -96,8 +137,6 @@ dht_rename_hashed_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, if (op_ret == -1) { - /* TODO: undo the damage */ - gf_uuid_unparse(local->loc.inode->gfid, gfid); gf_msg (this->name, GF_LOG_INFO, op_errno, @@ -324,6 +363,14 @@ dht_rename_dir (call_frame_t *frame, xlator_t *this) conf = frame->this->private; local = frame->local; + local->ret_cache = GF_CALLOC (conf->subvolume_cnt + 1, sizeof (int), + gf_dht_ret_cache_t); + + if (local->ret_cache == NULL) { + op_errno = ENOMEM; + goto err; + } + /* We must take a lock on all the subvols with src gfid. * Along with this if dst exists we must take lock on * any one subvol with dst gfid. |