diff options
Diffstat (limited to 'xlators/cluster/dht/src/dht-rename.c')
-rw-r--r-- | xlators/cluster/dht/src/dht-rename.c | 260 |
1 files changed, 239 insertions, 21 deletions
diff --git a/xlators/cluster/dht/src/dht-rename.c b/xlators/cluster/dht/src/dht-rename.c index ce7df8442bf..10040a33958 100644 --- a/xlators/cluster/dht/src/dht-rename.c +++ b/xlators/cluster/dht/src/dht-rename.c @@ -355,26 +355,72 @@ err: }while (0) int -dht_rename_done (call_frame_t *frame, xlator_t *this) +dht_rename_unlock_cbk (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, int32_t op_errno, + dict_t *xdata) { - dht_local_t *local = NULL; + dht_local_t *local = NULL; local = frame->local; - if (local->linked == _gf_true) { - local->linked = _gf_false; - dht_linkfile_attr_heal (frame, this); - } DHT_STRIP_PHASE1_FLAGS (&local->stbuf); DHT_STACK_UNWIND (rename, frame, local->op_ret, local->op_errno, &local->stbuf, &local->preoldparent, &local->postoldparent, &local->preparent, &local->postparent, NULL); + return 0; +} + +int +dht_rename_unlock (call_frame_t *frame, xlator_t *this) +{ + dht_local_t *local = NULL; + int op_ret = -1; + char src_gfid[GF_UUID_BUF_SIZE] = {0}; + char dst_gfid[GF_UUID_BUF_SIZE] = {0}; + + local = frame->local; + + op_ret = dht_unlock_inodelk (frame, local->lock.locks, + local->lock.lk_count, + dht_rename_unlock_cbk); + if (op_ret < 0) { + uuid_utoa_r (local->loc.inode->gfid, src_gfid); + + if (local->loc2.inode) + uuid_utoa_r (local->loc2.inode->gfid, dst_gfid); + + gf_log (this->name, GF_LOG_WARNING, + "winding unlock inodelk failed " + "rename (%s:%s:%s %s:%s:%s), " + "stale locks left on bricks", + local->loc.path, src_gfid, local->src_cached->name, + local->loc2.path, dst_gfid, + local->dst_cached ? local->dst_cached->name : NULL); + + dht_rename_unlock_cbk (frame, NULL, this, 0, 0, NULL); + } return 0; } int +dht_rename_done (call_frame_t *frame, xlator_t *this) +{ + dht_local_t *local = NULL; + + local = frame->local; + + if (local->linked == _gf_true) { + local->linked = _gf_false; + dht_linkfile_attr_heal (frame, this); + } + + dht_rename_unlock (frame, this); + return 0; +} + +int dht_rename_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) @@ -418,14 +464,14 @@ out: int dht_rename_cleanup (call_frame_t *frame) { - dht_local_t *local = NULL; - xlator_t *this = NULL; - xlator_t *src_hashed = NULL; - xlator_t *src_cached = NULL; - xlator_t *dst_hashed = NULL; - xlator_t *dst_cached = NULL; - int call_cnt = 0; - dict_t *xattr = NULL; + dht_local_t *local = NULL; + xlator_t *this = NULL; + xlator_t *src_hashed = NULL; + xlator_t *src_cached = NULL; + xlator_t *dst_hashed = NULL; + xlator_t *dst_cached = NULL; + int call_cnt = 0; + dict_t *xattr = NULL; char gfid[GF_UUID_BUF_SIZE] = {0}; local = frame->local; @@ -513,12 +559,7 @@ nolinks: WIPE (&local->preparent); WIPE (&local->postparent); - DHT_STRIP_PHASE1_FLAGS (&local->stbuf); - DHT_STACK_UNWIND (rename, frame, local->op_ret, local->op_errno, - &local->stbuf, &local->preoldparent, - &local->postoldparent, &local->preparent, - &local->postparent, NULL); - + dht_rename_unlock (frame, this); return 0; } @@ -954,6 +995,181 @@ nolinks: return 0; } +int +dht_rename_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, + inode_t *inode, struct iatt *stbuf, dict_t *xattr, + struct iatt *postparent) +{ + dht_local_t *local = NULL; + int call_cnt = 0; + dht_conf_t *conf = NULL; + + local = frame->local; + conf = this->private; + + if (op_ret < 0) { + /* The meaning of is_linkfile is overloaded here. For locking + * to work properly both rebalance and rename should acquire + * lock on datafile. The reason for sending this lookup is to + * find out whether we've acquired a lock on data file. + * Between the lookup before rename and this rename, the + * file could be migrated by a rebalance process and now this + * file this might be a linkto file. We verify that by sending + * this lookup. However, if this lookup fails we cannot really + * say whether we've acquired lock on a datafile or linkto file. + * So, we act conservatively and _assume_ + * that this is a linkfile and fail the rename operation. + */ + local->is_linkfile = _gf_true; + } else if (xattr && check_is_linkfile (inode, stbuf, xattr, + conf->link_xattr_name)) { + local->is_linkfile = _gf_true; + } + + call_cnt = dht_frame_return (frame); + if (is_last_call (call_cnt)) { + if (local->is_linkfile) { + local->op_ret = -1; + local->op_errno = EBUSY; + goto fail; + } + + dht_rename_create_links (frame); + } + + return 0; +fail: + dht_rename_unlock (frame, this); + return 0; +} + +int32_t +dht_rename_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + dht_local_t *local = NULL; + char src_gfid[GF_UUID_BUF_SIZE] = {0}; + char dst_gfid[GF_UUID_BUF_SIZE] = {0}; + dict_t *xattr_req = NULL; + dht_conf_t *conf = NULL; + int i = 0; + + local = frame->local; + conf = this->private; + + if (op_ret < 0) { + uuid_utoa_r (local->loc.inode->gfid, src_gfid); + + if (local->loc2.inode) + uuid_utoa_r (local->loc2.inode->gfid, dst_gfid); + + gf_log (this->name, GF_LOG_WARNING, + "acquiring inodelk failed (%s) " + "rename (%s:%s:%s %s:%s:%s), returning EBUSY", + strerror (op_errno), + local->loc.path, src_gfid, local->src_cached->name, + local->loc2.path, dst_gfid, + local->dst_cached ? local->dst_cached->name : NULL); + + local->op_ret = -1; + local->op_errno = (op_errno == EAGAIN) ? EBUSY : op_errno; + + goto done; + } + + xattr_req = dict_new (); + if (xattr_req == NULL) { + local->op_ret = -1; + local->op_errno = ENOMEM; + goto done; + } + + op_ret = dict_set_uint32 (xattr_req, + conf->link_xattr_name, 256); + if (op_ret < 0) { + local->op_ret = -1; + local->op_errno = -op_ret; + goto done; + } + + local->call_cnt = local->lock.lk_count; + + for (i = 0; i < local->lock.lk_count; i++) { + STACK_WIND (frame, dht_rename_lookup_cbk, + local->lock.locks[i]->xl, + local->lock.locks[i]->xl->fops->lookup, + &local->lock.locks[i]->loc, xattr_req); + } + + dict_unref (xattr_req); + return 0; + +done: + /* Its fine to call unlock even when no locks are acquired, as we check + * for lock->locked before winding a unlock call. + */ + dht_rename_unlock (frame, this); + + if (xattr_req) + dict_unref (xattr_req); + + return 0; +} + +int +dht_rename_lock (call_frame_t *frame) +{ + dht_local_t *local = NULL; + int count = 1, ret = -1; + dht_lock_t **lk_array = NULL; + + local = frame->local; + + if (local->dst_cached) + count++; + + lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char); + if (lk_array == NULL) + goto err; + + lk_array[0] = dht_lock_new (frame->this, local->src_cached, &local->loc, + F_WRLCK, DHT_FILE_MIGRATE_DOMAIN); + if (lk_array[0] == NULL) + goto err; + + if (local->dst_cached) { + lk_array[1] = dht_lock_new (frame->this, local->dst_cached, + &local->loc2, F_WRLCK, + DHT_FILE_MIGRATE_DOMAIN); + if (lk_array[1] == NULL) + goto err; + } + + local->lock.locks = lk_array; + local->lock.lk_count = count; + + ret = dht_nonblocking_inodelk (frame, lk_array, count, + dht_rename_lock_cbk); + if (ret < 0) { + local->lock.locks = NULL; + local->lock.lk_count = 0; + goto err; + } + + return 0; +err: + if (lk_array != NULL) { + int tmp_count = 0, i = 0; + + for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++); + + dht_lock_array_free (lk_array, tmp_count); + GF_FREE (lk_array); + } + + return -1; +} int dht_rename (call_frame_t *frame, xlator_t *this, @@ -1040,7 +1256,9 @@ dht_rename (call_frame_t *frame, xlator_t *this, dht_rename_dir (frame, this); } else { local->op_ret = 0; - dht_rename_create_links (frame); + ret = dht_rename_lock (frame); + if (ret < 0) + goto err; } return 0; |