summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src/dht-rename.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/dht/src/dht-rename.c')
-rw-r--r--xlators/cluster/dht/src/dht-rename.c260
1 files changed, 239 insertions, 21 deletions
diff --git a/xlators/cluster/dht/src/dht-rename.c b/xlators/cluster/dht/src/dht-rename.c
index ce7df8442bf..10040a33958 100644
--- a/xlators/cluster/dht/src/dht-rename.c
+++ b/xlators/cluster/dht/src/dht-rename.c
@@ -355,26 +355,72 @@ err:
}while (0)
int
-dht_rename_done (call_frame_t *frame, xlator_t *this)
+dht_rename_unlock_cbk (call_frame_t *frame, void *cookie,
+ xlator_t *this, int32_t op_ret, int32_t op_errno,
+ dict_t *xdata)
{
- dht_local_t *local = NULL;
+ dht_local_t *local = NULL;
local = frame->local;
- if (local->linked == _gf_true) {
- local->linked = _gf_false;
- dht_linkfile_attr_heal (frame, this);
- }
DHT_STRIP_PHASE1_FLAGS (&local->stbuf);
DHT_STACK_UNWIND (rename, frame, local->op_ret, local->op_errno,
&local->stbuf, &local->preoldparent,
&local->postoldparent, &local->preparent,
&local->postparent, NULL);
+ return 0;
+}
+
+int
+dht_rename_unlock (call_frame_t *frame, xlator_t *this)
+{
+ dht_local_t *local = NULL;
+ int op_ret = -1;
+ char src_gfid[GF_UUID_BUF_SIZE] = {0};
+ char dst_gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ op_ret = dht_unlock_inodelk (frame, local->lock.locks,
+ local->lock.lk_count,
+ dht_rename_unlock_cbk);
+ if (op_ret < 0) {
+ uuid_utoa_r (local->loc.inode->gfid, src_gfid);
+
+ if (local->loc2.inode)
+ uuid_utoa_r (local->loc2.inode->gfid, dst_gfid);
+
+ gf_log (this->name, GF_LOG_WARNING,
+ "winding unlock inodelk failed "
+ "rename (%s:%s:%s %s:%s:%s), "
+ "stale locks left on bricks",
+ local->loc.path, src_gfid, local->src_cached->name,
+ local->loc2.path, dst_gfid,
+ local->dst_cached ? local->dst_cached->name : NULL);
+
+ dht_rename_unlock_cbk (frame, NULL, this, 0, 0, NULL);
+ }
return 0;
}
int
+dht_rename_done (call_frame_t *frame, xlator_t *this)
+{
+ dht_local_t *local = NULL;
+
+ local = frame->local;
+
+ if (local->linked == _gf_true) {
+ local->linked = _gf_false;
+ dht_linkfile_attr_heal (frame, this);
+ }
+
+ dht_rename_unlock (frame, this);
+ return 0;
+}
+
+int
dht_rename_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
@@ -418,14 +464,14 @@ out:
int
dht_rename_cleanup (call_frame_t *frame)
{
- dht_local_t *local = NULL;
- xlator_t *this = NULL;
- xlator_t *src_hashed = NULL;
- xlator_t *src_cached = NULL;
- xlator_t *dst_hashed = NULL;
- xlator_t *dst_cached = NULL;
- int call_cnt = 0;
- dict_t *xattr = NULL;
+ dht_local_t *local = NULL;
+ xlator_t *this = NULL;
+ xlator_t *src_hashed = NULL;
+ xlator_t *src_cached = NULL;
+ xlator_t *dst_hashed = NULL;
+ xlator_t *dst_cached = NULL;
+ int call_cnt = 0;
+ dict_t *xattr = NULL;
char gfid[GF_UUID_BUF_SIZE] = {0};
local = frame->local;
@@ -513,12 +559,7 @@ nolinks:
WIPE (&local->preparent);
WIPE (&local->postparent);
- DHT_STRIP_PHASE1_FLAGS (&local->stbuf);
- DHT_STACK_UNWIND (rename, frame, local->op_ret, local->op_errno,
- &local->stbuf, &local->preoldparent,
- &local->postoldparent, &local->preparent,
- &local->postparent, NULL);
-
+ dht_rename_unlock (frame, this);
return 0;
}
@@ -954,6 +995,181 @@ nolinks:
return 0;
}
+int
+dht_rename_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno,
+ inode_t *inode, struct iatt *stbuf, dict_t *xattr,
+ struct iatt *postparent)
+{
+ dht_local_t *local = NULL;
+ int call_cnt = 0;
+ dht_conf_t *conf = NULL;
+
+ local = frame->local;
+ conf = this->private;
+
+ if (op_ret < 0) {
+ /* The meaning of is_linkfile is overloaded here. For locking
+ * to work properly both rebalance and rename should acquire
+ * lock on datafile. The reason for sending this lookup is to
+ * find out whether we've acquired a lock on data file.
+ * Between the lookup before rename and this rename, the
+ * file could be migrated by a rebalance process and now this
+ * file this might be a linkto file. We verify that by sending
+ * this lookup. However, if this lookup fails we cannot really
+ * say whether we've acquired lock on a datafile or linkto file.
+ * So, we act conservatively and _assume_
+ * that this is a linkfile and fail the rename operation.
+ */
+ local->is_linkfile = _gf_true;
+ } else if (xattr && check_is_linkfile (inode, stbuf, xattr,
+ conf->link_xattr_name)) {
+ local->is_linkfile = _gf_true;
+ }
+
+ call_cnt = dht_frame_return (frame);
+ if (is_last_call (call_cnt)) {
+ if (local->is_linkfile) {
+ local->op_ret = -1;
+ local->op_errno = EBUSY;
+ goto fail;
+ }
+
+ dht_rename_create_links (frame);
+ }
+
+ return 0;
+fail:
+ dht_rename_unlock (frame, this);
+ return 0;
+}
+
+int32_t
+dht_rename_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ dht_local_t *local = NULL;
+ char src_gfid[GF_UUID_BUF_SIZE] = {0};
+ char dst_gfid[GF_UUID_BUF_SIZE] = {0};
+ dict_t *xattr_req = NULL;
+ dht_conf_t *conf = NULL;
+ int i = 0;
+
+ local = frame->local;
+ conf = this->private;
+
+ if (op_ret < 0) {
+ uuid_utoa_r (local->loc.inode->gfid, src_gfid);
+
+ if (local->loc2.inode)
+ uuid_utoa_r (local->loc2.inode->gfid, dst_gfid);
+
+ gf_log (this->name, GF_LOG_WARNING,
+ "acquiring inodelk failed (%s) "
+ "rename (%s:%s:%s %s:%s:%s), returning EBUSY",
+ strerror (op_errno),
+ local->loc.path, src_gfid, local->src_cached->name,
+ local->loc2.path, dst_gfid,
+ local->dst_cached ? local->dst_cached->name : NULL);
+
+ local->op_ret = -1;
+ local->op_errno = (op_errno == EAGAIN) ? EBUSY : op_errno;
+
+ goto done;
+ }
+
+ xattr_req = dict_new ();
+ if (xattr_req == NULL) {
+ local->op_ret = -1;
+ local->op_errno = ENOMEM;
+ goto done;
+ }
+
+ op_ret = dict_set_uint32 (xattr_req,
+ conf->link_xattr_name, 256);
+ if (op_ret < 0) {
+ local->op_ret = -1;
+ local->op_errno = -op_ret;
+ goto done;
+ }
+
+ local->call_cnt = local->lock.lk_count;
+
+ for (i = 0; i < local->lock.lk_count; i++) {
+ STACK_WIND (frame, dht_rename_lookup_cbk,
+ local->lock.locks[i]->xl,
+ local->lock.locks[i]->xl->fops->lookup,
+ &local->lock.locks[i]->loc, xattr_req);
+ }
+
+ dict_unref (xattr_req);
+ return 0;
+
+done:
+ /* Its fine to call unlock even when no locks are acquired, as we check
+ * for lock->locked before winding a unlock call.
+ */
+ dht_rename_unlock (frame, this);
+
+ if (xattr_req)
+ dict_unref (xattr_req);
+
+ return 0;
+}
+
+int
+dht_rename_lock (call_frame_t *frame)
+{
+ dht_local_t *local = NULL;
+ int count = 1, ret = -1;
+ dht_lock_t **lk_array = NULL;
+
+ local = frame->local;
+
+ if (local->dst_cached)
+ count++;
+
+ lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char);
+ if (lk_array == NULL)
+ goto err;
+
+ lk_array[0] = dht_lock_new (frame->this, local->src_cached, &local->loc,
+ F_WRLCK, DHT_FILE_MIGRATE_DOMAIN);
+ if (lk_array[0] == NULL)
+ goto err;
+
+ if (local->dst_cached) {
+ lk_array[1] = dht_lock_new (frame->this, local->dst_cached,
+ &local->loc2, F_WRLCK,
+ DHT_FILE_MIGRATE_DOMAIN);
+ if (lk_array[1] == NULL)
+ goto err;
+ }
+
+ local->lock.locks = lk_array;
+ local->lock.lk_count = count;
+
+ ret = dht_nonblocking_inodelk (frame, lk_array, count,
+ dht_rename_lock_cbk);
+ if (ret < 0) {
+ local->lock.locks = NULL;
+ local->lock.lk_count = 0;
+ goto err;
+ }
+
+ return 0;
+err:
+ if (lk_array != NULL) {
+ int tmp_count = 0, i = 0;
+
+ for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++);
+
+ dht_lock_array_free (lk_array, tmp_count);
+ GF_FREE (lk_array);
+ }
+
+ return -1;
+}
int
dht_rename (call_frame_t *frame, xlator_t *this,
@@ -1040,7 +1256,9 @@ dht_rename (call_frame_t *frame, xlator_t *this,
dht_rename_dir (frame, this);
} else {
local->op_ret = 0;
- dht_rename_create_links (frame);
+ ret = dht_rename_lock (frame);
+ if (ret < 0)
+ goto err;
}
return 0;