summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorXavier Hernandez <xhernandez@datalab.es>2015-05-20 15:17:35 +0200
committerPranith Kumar Karampuri <pkarampu@redhat.com>2015-05-27 03:25:47 -0700
commit3b666b40efbed157e8c5991f29b345d93b28c659 (patch)
treea6fb9a20bed31bbcb0e5dd2025e51d7f4ea6e257
parent5513144feb5b062b733d7514adf194429e31666f (diff)
cluster/ec: Forced unlock when lock contention is detected
EC uses an eager lock mechanism to optimize multiple read/write requests on the same entry or inode. This increases performance but can have adverse results when other clients try to access the same entry/inode. To solve this, this patch adds a functionality to detect when this happens and force an earlier release to not block other clients. The method consists on requesting GF_GLUSTERFS_INODELK_COUNT and GF_GLUSTERFS_ENTRYLK_COUNT for all fops that take a lock. When this count is greater than one, the lock is marked to be released. All fops already waiting for this lock will be executed normally before releasing the lock, but new requests that also require it will be blocked and restarted after the lock has been released and reacquired again. Another problem was that some operations did correctly lock the parent of an entry when needed, but got the size and version xattrs from the entry instead of the parent. This patch solves this problem by binding all queries of size and version to each lock and replacing all entrylk calls by inodelk ones to remove concurrent updates on directory metadata. This also allows rename to correctly update source and destination directories. Change-Id: I2df0b22bc6f407d49f3cbf0733b0720015bacfbd BUG: 1165041 Signed-off-by: Xavier Hernandez <xhernandez@datalab.es> Reviewed-on: http://review.gluster.org/10852 Tested-by: NetBSD Build System Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Pranith Kumar Karampuri <pkarampu@redhat.com>
-rw-r--r--xlators/cluster/ec/src/ec-combine.c35
-rw-r--r--xlators/cluster/ec/src/ec-common.c1471
-rw-r--r--xlators/cluster/ec/src/ec-common.h32
-rw-r--r--xlators/cluster/ec/src/ec-data.c25
-rw-r--r--xlators/cluster/ec/src/ec-data.h53
-rw-r--r--xlators/cluster/ec/src/ec-dir-read.c8
-rw-r--r--xlators/cluster/ec/src/ec-dir-write.c96
-rw-r--r--xlators/cluster/ec/src/ec-generic.c67
-rw-r--r--xlators/cluster/ec/src/ec-heal.c96
-rw-r--r--xlators/cluster/ec/src/ec-helpers.c38
-rw-r--r--xlators/cluster/ec/src/ec-helpers.h7
-rw-r--r--xlators/cluster/ec/src/ec-inode-read.c51
-rw-r--r--xlators/cluster/ec/src/ec-inode-write.c159
13 files changed, 1139 insertions, 999 deletions
diff --git a/xlators/cluster/ec/src/ec-combine.c b/xlators/cluster/ec/src/ec-combine.c
index 9d4a18999f1..4617a0430f1 100644
--- a/xlators/cluster/ec/src/ec-combine.c
+++ b/xlators/cluster/ec/src/ec-combine.c
@@ -171,8 +171,10 @@ void ec_iatt_rebuild(ec_t * ec, struct iatt * iatt, int32_t count,
gf_boolean_t
ec_xattr_match (dict_t *dict, char *key, data_t *value, void *arg)
{
- if (fnmatch(GF_XATTR_STIME_PATTERN, key, 0) == 0)
+ if ((fnmatch(GF_XATTR_STIME_PATTERN, key, 0) == 0) ||
+ (strcmp(key, GLUSTERFS_OPEN_FD_COUNT) == 0)) {
return _gf_false;
+ }
return _gf_true;
}
@@ -185,6 +187,8 @@ ec_value_ignore (char *key)
(strcmp(key, GF_XATTR_USER_PATHINFO_KEY) == 0) ||
(strcmp(key, GF_XATTR_LOCKINFO_KEY) == 0) ||
(strcmp(key, GLUSTERFS_OPEN_FD_COUNT) == 0) ||
+ (strcmp(key, GLUSTERFS_INODELK_COUNT) == 0) ||
+ (strcmp(key, GLUSTERFS_ENTRYLK_COUNT) == 0) ||
(strncmp(key, GF_XATTR_CLRLK_CMD,
strlen (GF_XATTR_CLRLK_CMD)) == 0) ||
(strncmp(key, EC_QUOTA_PREFIX, strlen(EC_QUOTA_PREFIX)) == 0) ||
@@ -225,15 +229,9 @@ int32_t ec_dict_list(data_t ** list, int32_t * count, ec_cbk_data_t * cbk,
dict = (which == EC_COMBINE_XDATA) ? ans->xdata : ans->dict;
list[i] = dict_get(dict, key);
- if (list[i] == NULL)
- {
- gf_log(cbk->fop->xl->name, GF_LOG_ERROR, "Unexpected missing "
- "dictionary entry");
-
- return 0;
+ if (list[i] != NULL) {
+ i++;
}
-
- i++;
}
*count = i;
@@ -471,11 +469,6 @@ int32_t ec_dict_data_max32(ec_cbk_data_t *cbk, int32_t which, char *key)
return -1;
}
- if (num <= 1)
- {
- return 0;
- }
-
max = data_to_uint32(data[0]);
for (i = 1; i < num; i++)
{
@@ -507,10 +500,6 @@ int32_t ec_dict_data_max64(ec_cbk_data_t *cbk, int32_t which, char *key)
return -1;
}
- if (num <= 1) {
- return 0;
- }
-
max = data_to_uint64(data[0]);
for (i = 1; i < num; i++) {
tmp = data_to_uint64(data[i]);
@@ -630,6 +619,10 @@ int32_t ec_dict_data_combine(dict_t * dict, char * key, data_t * value,
{
return ec_dict_data_max32(data->cbk, data->which, key);
}
+ if ((strcmp(key, GLUSTERFS_INODELK_COUNT) == 0) ||
+ (strcmp(key, GLUSTERFS_ENTRYLK_COUNT) == 0)) {
+ return ec_dict_data_max32(data->cbk, data->which, key);
+ }
if (strcmp(key, QUOTA_SIZE_KEY) == 0) {
return ec_dict_data_quota(data->cbk, data->which, key);
@@ -831,6 +824,8 @@ void ec_combine (ec_cbk_data_t *newcbk, ec_combine_f combine)
LOCK(&fop->lock);
+ fop->received |= newcbk->mask;
+
item = fop->cbk_list.prev;
list_for_each_entry(cbk, &fop->cbk_list, list)
{
@@ -868,7 +863,9 @@ void ec_combine (ec_cbk_data_t *newcbk, ec_combine_f combine)
}
cbk = list_entry(fop->cbk_list.next, ec_cbk_data_t, list);
- needed = fop->minimum - cbk->count - fop->winds + 1;
+ if ((fop->mask ^ fop->remaining) == fop->received) {
+ needed = fop->minimum - cbk->count;
+ }
UNLOCK(&fop->lock);
diff --git a/xlators/cluster/ec/src/ec-common.c b/xlators/cluster/ec/src/ec-common.c
index 374739ac6e0..1a1c7093d7e 100644
--- a/xlators/cluster/ec/src/ec-common.c
+++ b/xlators/cluster/ec/src/ec-common.c
@@ -368,24 +368,34 @@ int32_t ec_child_select(ec_fop_data_t * fop)
uintptr_t mask = 0;
int32_t first = 0, num = 0;
+ ec_fop_cleanup(fop);
+
fop->mask &= ec->node_mask;
mask = ec->xl_up;
if (fop->parent == NULL)
{
- if (fop->loc[0].inode != NULL) {
+ if ((fop->flags & EC_FLAG_UPDATE_LOC_PARENT) && fop->loc[0].parent)
+ mask &= ec_inode_good(fop->loc[0].parent, fop->xl);
+
+ if ((fop->flags & EC_FLAG_UPDATE_LOC_INODE) && fop->loc[0].inode) {
mask &= ec_inode_good(fop->loc[0].inode, fop->xl);
}
- if (fop->loc[1].inode != NULL) {
+
+ if ((fop->flags & EC_FLAG_UPDATE_LOC_INODE) && fop->loc[1].inode) {
mask &= ec_inode_good(fop->loc[1].inode, fop->xl);
}
- if (fop->fd != NULL) {
- if (fop->fd->inode != NULL) {
+
+ if (fop->fd) {
+ if ((fop->flags & EC_FLAG_UPDATE_FD_INODE) && fop->fd->inode) {
mask &= ec_inode_good(fop->fd->inode, fop->xl);
}
- mask &= ec_fd_good(fop->fd, fop->xl);
+ if (fop->flags & fop->flags & EC_FLAG_UPDATE_FD) {
+ mask &= ec_fd_good(fop->fd, fop->xl);
+ }
}
}
+
if ((fop->mask & ~mask) != 0)
{
gf_log(fop->xl->name, GF_LOG_WARNING, "Executing operation with "
@@ -420,6 +430,7 @@ int32_t ec_child_select(ec_fop_data_t * fop)
/*Unconditionally wind on healing subvolumes*/
fop->mask |= fop->healing;
fop->remaining = fop->mask;
+ fop->received = 0;
ec_trace("SELECT", fop, "");
@@ -585,7 +596,7 @@ void ec_dispatch_min(ec_fop_data_t * fop)
}
}
-ec_lock_t * ec_lock_allocate(xlator_t * xl, int32_t kind, loc_t * loc)
+ec_lock_t *ec_lock_allocate(xlator_t *xl, loc_t *loc)
{
ec_t * ec = xl->private;
ec_lock_t * lock;
@@ -602,9 +613,9 @@ ec_lock_t * ec_lock_allocate(xlator_t * xl, int32_t kind, loc_t * loc)
lock = mem_get0(ec->lock_pool);
if (lock != NULL)
{
- lock->kind = kind;
lock->good_mask = -1ULL;
INIT_LIST_HEAD(&lock->waiting);
+ INIT_LIST_HEAD(&lock->frozen);
if (ec_loc_from_loc(xl, &lock->loc, loc) != 0)
{
mem_put(lock);
@@ -618,6 +629,9 @@ ec_lock_t * ec_lock_allocate(xlator_t * xl, int32_t kind, loc_t * loc)
void ec_lock_destroy(ec_lock_t * lock)
{
loc_wipe(&lock->loc);
+ if (lock->fd != NULL) {
+ fd_unref(lock->fd);
+ }
mem_put(lock);
}
@@ -627,166 +641,96 @@ int32_t ec_lock_compare(ec_lock_t * lock1, ec_lock_t * lock2)
return gf_uuid_compare(lock1->loc.gfid, lock2->loc.gfid);
}
-ec_lock_link_t *ec_lock_insert(ec_fop_data_t *fop, ec_lock_t *lock,
- int32_t update)
+void ec_lock_insert(ec_fop_data_t *fop, ec_lock_t *lock, uint32_t flags,
+ loc_t *base)
{
- ec_lock_t *new_lock, *tmp;
- ec_lock_link_t *link = NULL;
- int32_t tmp_update;
+ ec_lock_link_t *link;
- new_lock = lock;
+ /* This check is only prepared for up to 2 locks per fop. If more locks
+ * are needed this must be changed. */
if ((fop->lock_count > 0) &&
- (ec_lock_compare(fop->locks[0].lock, new_lock) > 0))
- {
- tmp = fop->locks[0].lock;
- fop->locks[0].lock = new_lock;
- new_lock = tmp;
-
- tmp_update = fop->locks_update;
- fop->locks_update = update;
- update = tmp_update;
- }
- fop->locks[fop->lock_count].lock = new_lock;
- fop->locks[fop->lock_count].fop = fop;
-
- fop->locks_update |= update << fop->lock_count;
-
- fop->lock_count++;
-
- if (lock->timer != NULL) {
- link = lock->timer->data;
- ec_trace("UNLOCK_CANCELLED", link->fop, "lock=%p", lock);
- gf_timer_call_cancel(fop->xl->ctx, lock->timer);
- lock->timer = NULL;
+ (ec_lock_compare(fop->locks[0].lock, lock) < 0)) {
+ fop->first_lock = fop->lock_count;
} else {
- lock->refs++;
- }
-
- return link;
-}
-
-void ec_lock_prepare_entry(ec_fop_data_t *fop, loc_t *loc, int32_t update)
-{
- ec_lock_t * lock = NULL;
- ec_inode_t * ctx = NULL;
- ec_lock_link_t *link = NULL;
- loc_t tmp;
-
- if ((fop->parent != NULL) || (fop->error != 0))
- {
- return;
- }
-
- /* update is only 0 for 'opendir', which needs to lock the entry pointed
- * by loc instead of its parent.
- */
- if (update)
- {
- if (ec_loc_parent(fop->xl, loc, &tmp) != 0) {
- ec_fop_set_error(fop, EIO);
-
- return;
- }
-
- /* If there's another lock, make sure that it's not the same. Otherwise
- * do not insert it.
- *
- * This can only happen on renames where source and target names are
- * in the same directory. */
- if ((fop->lock_count > 0) &&
- (fop->locks[0].lock->loc.inode == tmp.inode)) {
- goto wipe;
+ /* When the first lock is added to the current fop, request lock
+ * counts from locks xlator to be able to determine if there is
+ * contention and release the lock sooner. */
+ if (fop->xdata == NULL) {
+ fop->xdata = dict_new();
+ if (fop->xdata == NULL) {
+ ec_fop_set_error(fop, ENOMEM);
+ return;
+ }
}
- } else {
- if (ec_loc_from_loc(fop->xl, &tmp, loc) != 0) {
- ec_fop_set_error(fop, EIO);
-
+ if (dict_set_str(fop->xdata, GLUSTERFS_INODELK_DOM_COUNT,
+ fop->xl->name) != 0) {
+ ec_fop_set_error(fop, ENOMEM);
return;
}
}
- LOCK(&tmp.inode->lock);
-
- ctx = __ec_inode_get(tmp.inode, fop->xl);
- if (ctx == NULL)
- {
- __ec_fop_set_error(fop, EIO);
-
- goto unlock;
- }
-
- if (ctx->entry_lock != NULL)
- {
- lock = ctx->entry_lock;
- ec_trace("LOCK_ENTRYLK", fop, "lock=%p, inode=%p, path=%s"
- "Lock already acquired",
- lock, tmp.inode, tmp.path);
-
- goto insert;
- }
-
- lock = ec_lock_allocate(fop->xl, EC_LOCK_ENTRY, &tmp);
- if (lock == NULL)
- {
- __ec_fop_set_error(fop, EIO);
-
- goto unlock;
- }
-
- ec_trace("LOCK_CREATE", fop, "lock=%p", lock);
+ link = &fop->locks[fop->lock_count++];
- lock->type = ENTRYLK_WRLCK;
+ link->lock = lock;
+ link->fop = fop;
+ link->update[EC_DATA_TXN] = (flags & EC_UPDATE_DATA) != 0;
+ link->update[EC_METADATA_TXN] = (flags & EC_UPDATE_META) != 0;
+ link->base = base;
- lock->plock = &ctx->entry_lock;
- ctx->entry_lock = lock;
-
-insert:
- link = ec_lock_insert(fop, lock, update);
-
-unlock:
- UNLOCK(&tmp.inode->lock);
-
-wipe:
- loc_wipe(&tmp);
-
- if (link != NULL) {
- ec_resume(link->fop, 0);
- }
+ lock->refs++;
+ lock->inserted++;
}
-void ec_lock_prepare_inode(ec_fop_data_t *fop, loc_t *loc, int32_t update)
+void ec_lock_prepare_inode_internal(ec_fop_data_t *fop, loc_t *loc,
+ uint32_t flags, loc_t *base)
{
- ec_lock_link_t *link = NULL;
- ec_lock_t * lock;
- ec_inode_t * ctx;
+ ec_lock_t *lock = NULL;
+ ec_inode_t *ctx;
- if ((fop->parent != NULL) || (fop->error != 0) || (loc->inode == NULL))
- {
+ if ((fop->parent != NULL) || (fop->error != 0) || (loc->inode == NULL)) {
return;
}
LOCK(&loc->inode->lock);
ctx = __ec_inode_get(loc->inode, fop->xl);
- if (ctx == NULL)
- {
+ if (ctx == NULL) {
__ec_fop_set_error(fop, EIO);
goto unlock;
}
- if (ctx->inode_lock != NULL)
- {
+ if (ctx->inode_lock != NULL) {
lock = ctx->inode_lock;
+
+ /* If there's another lock, make sure that it's not the same. Otherwise
+ * do not insert it.
+ *
+ * This can only happen on renames where source and target names are
+ * in the same directory. */
+ if ((fop->lock_count > 0) && (fop->locks[0].lock == lock)) {
+ /* Combine data/meta updates */
+ fop->locks[0].update[EC_DATA_TXN] |= (flags & EC_UPDATE_DATA) != 0;
+ fop->locks[0].update[EC_METADATA_TXN] |=
+ (flags & EC_UPDATE_META) != 0;
+
+ /* Only one base inode is allowed per fop, so there shouldn't be
+ * overwrites here. */
+ if (base != NULL) {
+ fop->locks[0].base = base;
+ }
+
+ goto update_query;
+ }
+
ec_trace("LOCK_INODELK", fop, "lock=%p, inode=%p. Lock already "
"acquired", lock, loc->inode);
goto insert;
}
- lock = ec_lock_allocate(fop->xl, EC_LOCK_INODE, loc);
- if (lock == NULL)
- {
+ lock = ec_lock_allocate(fop->xl, loc);
+ if (lock == NULL) {
__ec_fop_set_error(fop, EIO);
goto unlock;
@@ -797,154 +741,78 @@ void ec_lock_prepare_inode(ec_fop_data_t *fop, loc_t *loc, int32_t update)
lock->flock.l_type = F_WRLCK;
lock->flock.l_whence = SEEK_SET;
- lock->plock = &ctx->inode_lock;
+ lock->ctx = ctx;
ctx->inode_lock = lock;
insert:
- link = ec_lock_insert(fop, lock, update);
-
+ ec_lock_insert(fop, lock, flags, base);
+update_query:
+ lock->query |= (flags & EC_QUERY_INFO) != 0;
unlock:
UNLOCK(&loc->inode->lock);
+}
- if (link != NULL) {
- ec_resume(link->fop, 0);
- }
+void ec_lock_prepare_inode(ec_fop_data_t *fop, loc_t *loc, uint32_t flags)
+{
+ ec_lock_prepare_inode_internal(fop, loc, flags, NULL);
}
-void ec_lock_prepare_fd(ec_fop_data_t *fop, fd_t *fd, int32_t update)
+void ec_lock_prepare_parent_inode(ec_fop_data_t *fop, loc_t *loc,
+ uint32_t flags)
{
- loc_t loc;
+ loc_t tmp, *base = NULL;
- if ((fop->parent != NULL) || (fop->error != 0))
- {
+ if (fop->error != 0) {
return;
}
- if (ec_loc_from_fd(fop->xl, &loc, fd) == 0)
- {
- ec_lock_prepare_inode(fop, &loc, update);
-
- loc_wipe(&loc);
- }
- else
- {
+ if (ec_loc_parent(fop->xl, loc, &tmp) != 0) {
ec_fop_set_error(fop, EIO);
- }
-}
-
-int32_t ec_locked(call_frame_t * frame, void * cookie, xlator_t * this,
- int32_t op_ret, int32_t op_errno, dict_t * xdata)
-{
- ec_fop_data_t * fop = cookie;
- ec_lock_t * lock = NULL;
-
- if (op_ret >= 0)
- {
- lock = fop->data;
- lock->mask = fop->good;
- lock->acquired = 1;
-
- fop->parent->mask &= fop->good;
- fop->parent->locked++;
- ec_trace("LOCKED", fop->parent, "lock=%p", lock);
-
- ec_lock(fop->parent);
+ return;
}
- else
- {
- gf_log(this->name, GF_LOG_WARNING, "Failed to complete preop lock");
+
+ if ((flags & EC_INODE_SIZE) != 0) {
+ base = loc;
+ flags ^= EC_INODE_SIZE;
}
- return 0;
+ ec_lock_prepare_inode_internal(fop, &tmp, flags, base);
+
+ loc_wipe(&tmp);
}
-void ec_lock(ec_fop_data_t * fop)
+void ec_lock_prepare_fd(ec_fop_data_t *fop, fd_t *fd, uint32_t flags)
{
- ec_lock_t * lock;
-
- while (fop->locked < fop->lock_count)
- {
- lock = fop->locks[fop->locked].lock;
-
- LOCK(&lock->loc.inode->lock);
-
- if (lock->owner != NULL)
- {
- ec_trace("LOCK_WAIT", fop, "lock=%p", lock);
-
- list_add_tail(&fop->locks[fop->locked].wait_list, &lock->waiting);
-
- ec_sleep(fop);
-
- UNLOCK(&lock->loc.inode->lock);
-
- break;
- }
- lock->owner = fop;
-
- UNLOCK(&lock->loc.inode->lock);
-
- if (!lock->acquired)
- {
- ec_owner_set(fop->frame, lock);
-
- if (lock->kind == EC_LOCK_ENTRY)
- {
- ec_trace("LOCK_ACQUIRE", fop, "lock=%p, inode=%p, path=%s",
- lock, lock->loc.inode, lock->loc.path);
-
- ec_entrylk(fop->frame, fop->xl, -1, EC_MINIMUM_ALL, ec_locked,
- lock, fop->xl->name, &lock->loc, NULL,
- ENTRYLK_LOCK, lock->type, NULL);
- }
- else
- {
- ec_trace("LOCK_ACQUIRE", fop, "lock=%p, inode=%p", lock,
- lock->loc.inode);
+ loc_t loc;
- ec_inodelk(fop->frame, fop->xl, -1, EC_MINIMUM_ALL, ec_locked,
- lock, fop->xl->name, &lock->loc, F_SETLKW,
- &lock->flock, NULL);
- }
+ if (fop->error != 0) {
+ return;
+ }
- break;
- }
+ if (ec_loc_from_fd(fop->xl, &loc, fd) != 0) {
+ ec_fop_set_error(fop, EIO);
- ec_trace("LOCK_REUSE", fop, "lock=%p", lock);
+ return;
+ }
- if (lock->have_size)
- {
- fop->pre_size = fop->post_size = lock->size;
- fop->have_size = 1;
- }
- fop->mask &= lock->good_mask;
+ ec_lock_prepare_inode_internal(fop, &loc, flags, NULL);
- fop->locked++;
- }
+ loc_wipe(&loc);
}
gf_boolean_t
-ec_config_check (ec_fop_data_t *fop, dict_t *xdata)
+ec_config_check (ec_fop_data_t *fop, ec_config_t *config)
{
ec_t *ec;
- if (ec_dict_del_config(xdata, EC_XATTR_CONFIG, &fop->config) < 0) {
- gf_log(fop->xl->name, GF_LOG_ERROR, "Failed to get a valid "
- "config");
-
- ec_fop_set_error(fop, EIO);
-
- return _gf_false;
- }
-
ec = fop->xl->private;
- if ((fop->config.version != EC_CONFIG_VERSION) ||
- (fop->config.algorithm != EC_CONFIG_ALGORITHM) ||
- (fop->config.gf_word_size != EC_GF_BITS) ||
- (fop->config.bricks != ec->nodes) ||
- (fop->config.redundancy != ec->redundancy) ||
- (fop->config.chunk_size != EC_METHOD_CHUNK_SIZE)) {
+ if ((config->version != EC_CONFIG_VERSION) ||
+ (config->algorithm != EC_CONFIG_ALGORITHM) ||
+ (config->gf_word_size != EC_GF_BITS) ||
+ (config->bricks != ec->nodes) ||
+ (config->redundancy != ec->redundancy) ||
+ (config->chunk_size != EC_METHOD_CHUNK_SIZE)) {
uint32_t data_bricks;
/* This combination of version/algorithm requires the following
@@ -957,273 +825,201 @@ ec_config_check (ec_fop_data_t *fop, dict_t *xdata)
chunk_size (in bits) must be a multiple of gf_word_size *
(bricks - redundancy) */
- data_bricks = fop->config.bricks - fop->config.redundancy;
- if ((fop->config.redundancy < 1) ||
- (fop->config.redundancy * 2 >= fop->config.bricks) ||
- !ec_is_power_of_2(fop->config.gf_word_size) ||
- ((fop->config.chunk_size * 8) % (fop->config.gf_word_size *
- data_bricks) != 0)) {
+ data_bricks = config->bricks - config->redundancy;
+ if ((config->redundancy < 1) ||
+ (config->redundancy * 2 >= config->bricks) ||
+ !ec_is_power_of_2(config->gf_word_size) ||
+ ((config->chunk_size * 8) % (config->gf_word_size * data_bricks)
+ != 0)) {
gf_log(fop->xl->name, GF_LOG_ERROR, "Invalid or corrupted config");
} else {
gf_log(fop->xl->name, GF_LOG_ERROR, "Unsupported config "
"(V=%u, A=%u, W=%u, "
"N=%u, R=%u, S=%u)",
- fop->config.version, fop->config.algorithm,
- fop->config.gf_word_size, fop->config.bricks,
- fop->config.redundancy, fop->config.chunk_size);
+ config->version, config->algorithm,
+ config->gf_word_size, config->bricks,
+ config->redundancy, config->chunk_size);
}
- ec_fop_set_error(fop, EIO);
-
return _gf_false;
}
return _gf_true;
}
-int32_t ec_get_size_version_set(call_frame_t * frame, void * cookie,
- xlator_t * this, int32_t op_ret,
- int32_t op_errno, inode_t * inode,
- struct iatt * buf, dict_t * xdata,
- struct iatt * postparent)
+int32_t
+ec_prepare_update_cbk (call_frame_t *frame, void *cookie,
+ xlator_t *this, int32_t op_ret, int32_t op_errno,
+ dict_t *dict, dict_t *xdata)
{
- ec_fop_data_t * fop = cookie;
- ec_inode_t * ctx;
+ ec_fop_data_t *fop = cookie, *parent;
+ ec_lock_link_t *link = fop->data;
ec_lock_t *lock = NULL;
+ ec_inode_t *ctx;
- if (op_ret >= 0)
- {
- if ((buf->ia_type == IA_IFREG) && !ec_config_check(fop, xdata)) {
- return 0;
- }
+ lock = link->lock;
+ parent = link->fop;
+ ctx = lock->ctx;
- LOCK(&inode->lock);
+ if (op_ret < 0) {
+ gf_log(this->name, GF_LOG_WARNING,
+ "Failed to get size and version (error %d: %s)", op_errno,
+ strerror (op_errno));
- ctx = __ec_inode_get(inode, this);
- if (ctx != NULL) {
- if (ctx->inode_lock != NULL) {
- lock = ctx->inode_lock;
- lock->version[0] = fop->answer->version[0];
- lock->version[1] = fop->answer->version[1];
+ goto out;
+ }
- if (buf->ia_type == IA_IFREG) {
- lock->have_size = 1;
- lock->size = buf->ia_size;
- }
- }
- if (ctx->entry_lock != NULL) {
- lock = ctx->entry_lock;
- lock->version[0] = fop->answer->version[0];
- lock->version[1] = fop->answer->version[1];
- }
- }
+ op_errno = EIO;
- UNLOCK(&inode->lock);
+ LOCK(&lock->loc.inode->lock);
- if (lock != NULL)
- {
- // Only update parent mask if the lookup has been made with
- // inode locked.
- fop->parent->mask &= fop->good;
- }
+ if (ec_dict_del_array(dict, EC_XATTR_VERSION, ctx->pre_version,
+ EC_VERSION_SIZE) != 0) {
+ gf_log(this->name, GF_LOG_ERROR, "Unable to get version xattr");
- if (buf->ia_type == IA_IFREG) {
- fop->parent->pre_size = fop->parent->post_size = buf->ia_size;
- fop->parent->have_size = 1;
- }
- }
- else
- {
- gf_log(this->name, GF_LOG_WARNING, "Failed to get size and version "
- "(error %d)", op_errno);
- ec_fop_set_error(fop, op_errno);
+ goto unlock;
}
+ ctx->post_version[0] += ctx->pre_version[0];
+ ctx->post_version[1] += ctx->pre_version[1];
- return 0;
-}
+ ctx->have_version = _gf_true;
-gf_boolean_t
-ec_is_data_fop (glusterfs_fop_t fop)
-{
- switch (fop) {
- case GF_FOP_WRITE:
- case GF_FOP_TRUNCATE:
- case GF_FOP_FTRUNCATE:
- case GF_FOP_FALLOCATE:
- case GF_FOP_DISCARD:
- case GF_FOP_ZEROFILL:
- return _gf_true;
- default:
- return _gf_false;
- }
- return _gf_false;
-}
+ if (ec_dict_del_array(dict, EC_XATTR_DIRTY, ctx->pre_dirty,
+ EC_VERSION_SIZE) == 0) {
+ ctx->post_dirty[0] += ctx->pre_dirty[0];
+ ctx->post_dirty[1] += ctx->pre_dirty[1];
-gf_boolean_t
-ec_is_metadata_fop (glusterfs_fop_t fop)
-{
- switch (fop) {
- case GF_FOP_SETATTR:
- case GF_FOP_FSETATTR:
- case GF_FOP_SETXATTR:
- case GF_FOP_FSETXATTR:
- case GF_FOP_REMOVEXATTR:
- case GF_FOP_FREMOVEXATTR:
- return _gf_true;
- default:
- return _gf_false;
- }
- return _gf_false;
-}
+ ctx->have_dirty = _gf_true;
+ }
-int32_t
-ec_prepare_update_cbk (call_frame_t *frame, void *cookie,
- xlator_t *this, int32_t op_ret, int32_t op_errno,
- dict_t *dict, dict_t *xdata)
-{
- ec_fop_data_t *fop = cookie, *parent;
- ec_lock_t *lock = NULL;
- uint64_t size = 0;
- uint64_t version[EC_VERSION_SIZE] = {0, 0};
+ if (lock->loc.inode->ia_type == IA_IFREG) {
+ if (ec_dict_del_number(dict, EC_XATTR_SIZE, &ctx->pre_size) != 0) {
+ gf_log(this->name, GF_LOG_ERROR, "Unable to get size xattr");
- if (op_ret >= 0) {
- parent = fop->parent;
- while ((parent != NULL) && (parent->locks[0].lock == NULL)) {
- parent = parent->parent;
- }
- if (parent == NULL) {
- return 0;
+ goto unlock;
}
+ ctx->post_size = ctx->pre_size;
- lock = parent->locks[0].lock;
- if (ec_is_metadata_fop (fop->parent->id))
- lock->is_dirty[EC_METADATA_TXN] = _gf_true;
- else
- lock->is_dirty[EC_DATA_TXN] = _gf_true;
-
- if (lock->loc.inode->ia_type == IA_IFREG) {
- if (!ec_config_check(fop, dict) ||
- (ec_dict_del_number(dict, EC_XATTR_SIZE, &size) != 0)) {
- ec_fop_set_error(fop, EIO);
- return 0;
- }
- }
+ ctx->have_size = _gf_true;
+
+ if ((ec_dict_del_config(dict, EC_XATTR_CONFIG, &ctx->config) != 0) ||
+ !ec_config_check(parent, &ctx->config)) {
+ gf_log(this->name, GF_LOG_ERROR, "Unable to get config xattr");
- if (ec_dict_del_array(dict, EC_XATTR_VERSION, version,
- EC_VERSION_SIZE) != 0) {
- ec_fop_set_error(fop, EIO);
- return 0;
+ goto unlock;
}
- LOCK(&lock->loc.inode->lock);
+ ctx->have_config = _gf_true;
+ }
- if (lock->loc.inode->ia_type == IA_IFREG) {
- lock->size = size;
- fop->parent->pre_size = fop->parent->post_size = size;
- fop->parent->have_size = lock->have_size = 1;
- }
- lock->version[0] = version[0];
- lock->version[1] = version[1];
+ ctx->have_info = _gf_true;
- UNLOCK(&lock->loc.inode->lock);
+ op_errno = 0;
+
+unlock:
+ UNLOCK(&lock->loc.inode->lock);
+out:
+ if (op_errno == 0) {
+ parent->mask &= fop->good;
- fop->parent->mask &= fop->good;
/*As of now only data healing marks bricks as healing*/
- if (ec_is_data_fop (fop->parent->id))
- fop->parent->healing |= fop->healing;
+ if (ec_is_data_fop (parent->id)) {
+ parent->healing |= fop->healing;
+ }
} else {
- gf_log(this->name, GF_LOG_WARNING,
- "Failed to get size and version (error %d: %s)", op_errno,
- strerror (op_errno));
- ec_fop_set_error(fop, op_errno);
+ ec_fop_set_error(parent, op_errno);
}
return 0;
}
-void ec_get_size_version(ec_fop_data_t * fop)
+void ec_get_size_version(ec_lock_link_t *link)
{
loc_t loc;
- dict_t * xdata;
+ ec_lock_t *lock;
+ ec_inode_t *ctx;
+ ec_fop_data_t *fop;
+ dict_t *dict = NULL;
uid_t uid;
gid_t gid;
int32_t error = ENOMEM;
uint64_t allzero[EC_VERSION_SIZE] = {0, 0};
- if (fop->have_size)
- {
+ lock = link->lock;
+ ctx = lock->ctx;
+
+ /* If ec metadata has already been retrieved, do not try again. */
+ if (ctx->have_info) {
return;
}
- if ((fop->parent != NULL) && fop->parent->have_size)
- {
- fop->pre_size = fop->parent->pre_size;
- fop->post_size = fop->parent->post_size;
-
- fop->have_size = 1;
-
+ /* Determine if there's something we need to retrieve for the current
+ * operation. */
+ if (!lock->query && (lock->loc.inode->ia_type != IA_IFREG)) {
return;
}
+
+ fop = link->fop;
+
uid = fop->frame->root->uid;
gid = fop->frame->root->gid;
- fop->frame->root->uid = 0;
- fop->frame->root->gid = 0;
-
memset(&loc, 0, sizeof(loc));
- xdata = dict_new();
- if (xdata == NULL)
- {
+ dict = dict_new();
+ if (dict == NULL) {
goto out;
}
- if ((ec_dict_set_array(xdata, EC_XATTR_VERSION,
- allzero, EC_VERSION_SIZE) != 0) ||
- (ec_dict_set_number(xdata, EC_XATTR_SIZE, 0) != 0) ||
- (ec_dict_set_number(xdata, EC_XATTR_CONFIG, 0) != 0) ||
- (ec_dict_set_array(xdata, EC_XATTR_DIRTY, allzero,
- EC_VERSION_SIZE) != 0))
- {
+
+ /* Once we know that an xattrop will be needed, we try to get all available
+ * information in a single call. */
+ if ((ec_dict_set_array(dict, EC_XATTR_VERSION, allzero,
+ EC_VERSION_SIZE) != 0) ||
+ (ec_dict_set_array(dict, EC_XATTR_DIRTY, allzero,
+ EC_VERSION_SIZE) != 0)) {
goto out;
}
- error = EIO;
+ if (lock->loc.inode->ia_type == IA_IFREG) {
+ if ((ec_dict_set_number(dict, EC_XATTR_SIZE, 0) != 0) ||
+ (ec_dict_set_number(dict, EC_XATTR_CONFIG, 0) != 0)) {
+ goto out;
+ }
+ }
- if (!fop->use_fd)
- {
- if (ec_loc_from_loc(fop->xl, &loc, &fop->loc[0]) != 0)
- {
+ fop->frame->root->uid = 0;
+ fop->frame->root->gid = 0;
+
+ /* For normal fops, ec_[f]xattrop() must succeed on at least
+ * EC_MINIMUM_MIN bricks, however when this is called as part of a
+ * self-heal operation the mask of target bricks (fop->mask) could
+ * contain less than EC_MINIMUM_MIN bricks, causing the lookup to
+ * always fail. Thus we always use the same minimum used for the main
+ * fop.
+ */
+ if (lock->fd == NULL) {
+ if (ec_loc_from_loc(fop->xl, &loc, &lock->loc) != 0) {
goto out;
}
- if (gf_uuid_is_null(loc.pargfid))
- {
- if (loc.parent != NULL)
- {
+ if (gf_uuid_is_null(loc.pargfid)) {
+ if (loc.parent != NULL) {
inode_unref(loc.parent);
loc.parent = NULL;
}
GF_FREE((char *)loc.path);
- loc.path = NULL;
- loc.name = NULL;
+ loc.path = NULL;
+ loc.name = NULL;
}
- /* For normal fops, ec_[f]xattrop() must succeed on at least
- * EC_MINIMUM_MIN bricks, however when this is called as part of
- * a self-heal operation the mask of target bricks (fop->mask) could
- * contain less than EC_MINIMUM_MIN bricks, causing the lookup to
- * always fail. Thus we always use the same minimum used for the main
- * fop.
- */
+
ec_xattrop (fop->frame, fop->xl, fop->mask, fop->minimum,
- ec_prepare_update_cbk, NULL, &loc,
- GF_XATTROP_ADD_ARRAY64, xdata, NULL);
+ ec_prepare_update_cbk, link, &loc,
+ GF_XATTROP_ADD_ARRAY64, dict, NULL);
} else {
- if (ec_loc_from_fd(fop->xl, &loc, fop->fd) != 0) {
- goto out;
- }
ec_fxattrop(fop->frame, fop->xl, fop->mask, fop->minimum,
- ec_prepare_update_cbk, NULL, fop->fd,
- GF_XATTROP_ADD_ARRAY64, xdata, NULL);
+ ec_prepare_update_cbk, link, lock->fd,
+ GF_XATTROP_ADD_ARRAY64, dict, NULL);
}
+
error = 0;
out:
@@ -1232,9 +1028,8 @@ out:
loc_wipe(&loc);
- if (xdata != NULL)
- {
- dict_unref(xdata);
+ if (dict != NULL) {
+ dict_unref(dict);
}
if (error != 0) {
@@ -1242,147 +1037,408 @@ out:
}
}
-void ec_prepare_update(ec_fop_data_t *fop)
+gf_boolean_t ec_get_inode_size(ec_fop_data_t *fop, inode_t *inode,
+ uint64_t *size)
{
- loc_t loc;
- dict_t *xdata;
- ec_fop_data_t *tmp;
- ec_lock_t *lock;
- ec_t *ec;
- uid_t uid;
- gid_t gid;
- uint64_t version[2] = {0, 0};
- uint64_t dirty[2] = {0, 0};
- int32_t error = ENOMEM;
+ ec_inode_t *ctx;
+ gf_boolean_t found = _gf_false;
- tmp = fop;
- while ((tmp != NULL) && (tmp->locks[0].lock == NULL)) {
- tmp = tmp->parent;
- }
- if ((tmp != NULL) &&
- (tmp->locks[0].lock->is_dirty[0] || tmp->locks[0].lock->is_dirty[1])) {
- lock = tmp->locks[0].lock;
+ LOCK(&inode->lock);
- fop->pre_size = fop->post_size = lock->size;
- fop->have_size = 1;
+ ctx = __ec_inode_get(inode, fop->xl);
+ if (ctx == NULL) {
+ goto unlock;
+ }
- return;
+ if (ctx->have_size) {
+ *size = ctx->post_size;
+ found = _gf_true;
}
- uid = fop->frame->root->uid;
- gid = fop->frame->root->gid;
- fop->frame->root->uid = 0;
- fop->frame->root->gid = 0;
+unlock:
+ UNLOCK(&inode->lock);
- memset(&loc, 0, sizeof(loc));
+ return found;
+}
- ec = fop->xl->private;
- if (ec_bits_count (fop->mask) >= ec->fragments) {
- /* It is changing data only if the update happens on at least
- * fragment number of bricks. Otherwise it probably is healing*/
- if (ec_is_metadata_fop (fop->id))
- dirty[EC_METADATA_TXN] = 1;
- else
- dirty[EC_DATA_TXN] = 1;
+gf_boolean_t ec_set_inode_size(ec_fop_data_t *fop, inode_t *inode,
+ uint64_t size)
+{
+ ec_inode_t *ctx;
+ gf_boolean_t found = _gf_false;
+
+ LOCK(&inode->lock);
+
+ ctx = __ec_inode_get(inode, fop->xl);
+ if (ctx == NULL) {
+ goto unlock;
}
- xdata = dict_new();
- if (xdata == NULL) {
- goto out;
+ /* Normal fops always have ctx->have_size set. However self-heal calls this
+ * to prepare the inode, so ctx->have_size will be false. In this case we
+ * prepare both pre_size and post_size, and set have_size and have_info to
+ * true. */
+ if (!ctx->have_size) {
+ ctx->pre_size = size;
+ ctx->have_size = ctx->have_info = _gf_true;
}
- if ((ec_dict_set_array(xdata, EC_XATTR_VERSION,
- version, EC_VERSION_SIZE) != 0) ||
- (ec_dict_set_number(xdata, EC_XATTR_SIZE, 0) != 0) ||
- (ec_dict_set_number(xdata, EC_XATTR_CONFIG, 0) != 0) ||
- (ec_dict_set_array(xdata, EC_XATTR_DIRTY, dirty,
- EC_VERSION_SIZE) != 0)) {
- goto out;
+ ctx->post_size = size;
+
+ found = _gf_true;
+
+unlock:
+ UNLOCK(&inode->lock);
+
+ return found;
+}
+
+void ec_clear_inode_info(ec_fop_data_t *fop, inode_t *inode)
+{
+ ec_inode_t *ctx;
+
+ LOCK(&inode->lock);
+
+ ctx = __ec_inode_get(inode, fop->xl);
+ if (ctx == NULL) {
+ goto unlock;
}
- error = EIO;
+ ctx->have_info = _gf_false;
+ ctx->have_config = _gf_false;
+ ctx->have_version = _gf_false;
+ ctx->have_size = _gf_false;
+ ctx->have_dirty = _gf_false;
- if (!fop->use_fd) {
- if (ec_loc_from_loc(fop->xl, &loc, &fop->loc[0]) != 0) {
- goto out;
- }
+ memset(&ctx->config, 0, sizeof(ctx->config));
+ memset(ctx->pre_version, 0, sizeof(ctx->pre_version));
+ memset(ctx->post_version, 0, sizeof(ctx->post_version));
+ ctx->pre_size = ctx->post_size = 0;
+ memset(ctx->pre_dirty, 0, sizeof(ctx->pre_dirty));
+ memset(ctx->post_dirty, 0, sizeof(ctx->post_dirty));
+
+unlock:
+ UNLOCK(&inode->lock);
+}
- ec_xattrop(fop->frame, fop->xl, fop->mask, fop->minimum,
- ec_prepare_update_cbk, NULL, &loc, GF_XATTROP_ADD_ARRAY64,
- xdata, NULL);
+int32_t ec_get_real_size_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *buf, dict_t *xdata,
+ struct iatt *postparent)
+{
+ ec_fop_data_t *fop = cookie;
+ ec_lock_link_t *link;
+
+ if (op_ret >= 0) {
+ link = fop->data;
+ if (ec_dict_del_number(xdata, EC_XATTR_SIZE, &link->size) != 0) {
+ gf_log(this->name, GF_LOG_WARNING,
+ "Unable to determine real file size");
+ }
} else {
- ec_fxattrop(fop->frame, fop->xl, fop->mask, fop->minimum,
- ec_prepare_update_cbk, NULL, fop->fd,
- GF_XATTROP_ADD_ARRAY64, xdata, NULL);
+ /* Prevent failure of parent fop. */
+ fop->error = 0;
}
- error = 0;
+ return 0;
+}
-out:
+/* This function is used to get the trusted.ec.size xattr from a file when
+ * no lock is needed on the inode. This is only required to maintan iatt
+ * structs on fops that manipulate directory entries but do not operate
+ * directly on the inode, like link, rename, ...
+ *
+ * Any error processing this request is ignored. In the worst case, an invalid
+ * or not up to date value in the iatt could cause some cache invalidation.
+ */
+void ec_get_real_size(ec_lock_link_t *link)
+{
+ ec_fop_data_t *fop;
+ dict_t *xdata;
- fop->frame->root->uid = uid;
- fop->frame->root->gid = gid;
+ if (link->base == NULL) {
+ return;
+ }
- loc_wipe(&loc);
+ if (link->base->inode->ia_type != IA_IFREG) {
+ return;
+ }
+
+ fop = link->fop;
+
+ if (ec_get_inode_size(fop, link->base->inode, &link->size)) {
+ return;
+ }
+ xdata = dict_new();
+ if (xdata == NULL) {
+ return;
+ }
+ if (ec_dict_set_number(xdata, EC_XATTR_SIZE, 0) != 0) {
+ goto out;
+ }
+
+ /* Send a simple lookup. A single answer is considered ok since this value
+ * is only used to return an iatt struct related to an inode that is not
+ * locked and have not suffered any operation. */
+ ec_lookup(fop->frame, fop->xl, fop->mask, 1, ec_get_real_size_cbk, link,
+ link->base, xdata);
+
+out:
if (xdata != NULL) {
dict_unref(xdata);
}
+}
- if (error != 0) {
- ec_fop_set_error(fop, error);
+void ec_lock_acquired(ec_lock_link_t *link)
+{
+ ec_lock_t *lock;
+ ec_fop_data_t *fop;
+
+ lock = link->lock;
+ fop = link->fop;
+
+ ec_trace("LOCKED", link->fop, "lock=%p", lock);
+
+ /* If the fop has an fd available, attach it to the lock structure to be
+ * able to do fxattrop calls instead of xattrop. It's safe to change this
+ * here because no xattrop using the fd can start concurrently at this
+ * point. */
+ if (fop->use_fd) {
+ if (lock->fd != NULL) {
+ fd_unref(lock->fd);
+ }
+ lock->fd = fd_ref(fop->fd);
}
+ lock->acquired = _gf_true;
+
+ fop->mask &= lock->good_mask;
+
+ fop->locked++;
+
+ ec_get_size_version(link);
+ ec_get_real_size(link);
}
-int32_t ec_unlocked(call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, dict_t *xdata)
+int32_t ec_locked(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
ec_fop_data_t *fop = cookie;
+ ec_lock_link_t *link = NULL;
+ ec_lock_t *lock = NULL;
- if (op_ret < 0) {
- gf_log(this->name, GF_LOG_WARNING, "entry/inode unlocking failed (%s)",
- ec_fop_name(fop->parent->id));
+ if (op_ret >= 0) {
+ link = fop->data;
+ lock = link->lock;
+ lock->mask = lock->good_mask = fop->good;
+
+ ec_lock_acquired(link);
+ ec_lock(fop->parent);
} else {
- ec_trace("UNLOCKED", fop->parent, "lock=%p", fop->data);
+ gf_log(this->name, GF_LOG_WARNING, "Failed to complete preop lock");
}
return 0;
}
-void ec_unlock_lock(ec_fop_data_t *fop, ec_lock_t *lock)
+gf_boolean_t ec_lock_acquire(ec_lock_link_t *link)
{
- if ((lock->mask != 0) && lock->acquired) {
+ ec_lock_t *lock;
+ ec_fop_data_t *fop;
+
+ lock = link->lock;
+ fop = link->fop;
+ if (!lock->acquired) {
ec_owner_set(fop->frame, lock);
- switch (lock->kind) {
- case EC_LOCK_ENTRY:
- ec_trace("UNLOCK_ENTRYLK", fop, "lock=%p, inode=%p, path=%s", lock,
- lock->loc.inode, lock->loc.path);
+ ec_trace("LOCK_ACQUIRE", fop, "lock=%p, inode=%p", lock,
+ lock->loc.inode);
+
+ lock->flock.l_type = F_WRLCK;
+ ec_inodelk(fop->frame, fop->xl, -1, EC_MINIMUM_ALL, ec_locked,
+ link, fop->xl->name, &lock->loc, F_SETLKW, &lock->flock,
+ NULL);
+
+ return _gf_false;
+ }
+
+ ec_trace("LOCK_REUSE", fop, "lock=%p", lock);
+
+ ec_lock_acquired(link);
+
+ return _gf_true;
+}
+
+void ec_lock(ec_fop_data_t *fop)
+{
+ ec_lock_link_t *link;
+ ec_lock_link_t *timer_link = NULL;
+ ec_lock_t *lock;
+
+ /* There is a chance that ec_resume is called on fop even before ec_sleep.
+ * Which can result in refs == 0 for fop leading to use after free in this
+ * function when it calls ec_sleep so do ec_sleep at start and end of this
+ * function.*/
+ ec_sleep (fop);
+ while (fop->locked < fop->lock_count) {
+ /* Since there are only up to 2 locks per fop, this xor will change
+ * the order of the locks if fop->first_lock is 1. */
+ link = &fop->locks[fop->locked ^ fop->first_lock];
+ lock = link->lock;
+
+ timer_link = NULL;
+
+ LOCK(&lock->loc.inode->lock);
+ GF_ASSERT (lock->inserted > 0);
+ lock->inserted--;
+
+ if (lock->timer != NULL) {
+ GF_ASSERT (lock->release == _gf_false);
+ timer_link = lock->timer->data;
+ ec_trace("UNLOCK_CANCELLED", timer_link->fop, "lock=%p", lock);
+ gf_timer_call_cancel(fop->xl->ctx, lock->timer);
+ lock->timer = NULL;
+
+ lock->refs--;
+ /* There should remain at least 1 ref, the current one. */
+ GF_ASSERT(lock->refs > 0);
+ }
+
+ GF_ASSERT(list_empty(&link->wait_list));
- ec_entrylk(fop->frame, fop->xl, lock->mask, EC_MINIMUM_ALL,
- ec_unlocked, lock, fop->xl->name, &lock->loc, NULL,
- ENTRYLK_UNLOCK, lock->type, NULL);
+ if ((lock->owner != NULL) || lock->release) {
+ if (lock->release) {
+ ec_trace("LOCK_QUEUE_FREEZE", fop, "lock=%p", lock);
+
+ list_add_tail(&link->wait_list, &lock->frozen);
+
+ /* The lock is frozen, so we move the current reference to
+ * refs_frozen. After that, there should remain at least one
+ * ref belonging to the lock that is processing the release. */
+ lock->refs--;
+ GF_ASSERT(lock->refs > 0);
+ lock->refs_frozen++;
+ } else {
+ ec_trace("LOCK_QUEUE_WAIT", fop, "lock=%p", lock);
+
+ list_add_tail(&link->wait_list, &lock->waiting);
+ }
+
+ UNLOCK(&lock->loc.inode->lock);
+
+ ec_sleep(fop);
break;
+ }
- case EC_LOCK_INODE:
- lock->flock.l_type = F_UNLCK;
- ec_trace("UNLOCK_INODELK", fop, "lock=%p, inode=%p", lock,
- lock->loc.inode);
+ lock->owner = fop;
- ec_inodelk(fop->frame, fop->xl, lock->mask, EC_MINIMUM_ALL,
- ec_unlocked, lock, fop->xl->name, &lock->loc, F_SETLK,
- &lock->flock, NULL);
+ UNLOCK(&lock->loc.inode->lock);
+ if (!ec_lock_acquire(link)) {
break;
+ }
- default:
- gf_log(fop->xl->name, GF_LOG_ERROR, "Invalid lock type");
+ if (timer_link != NULL) {
+ ec_resume(timer_link->fop, 0);
+ timer_link = NULL;
}
}
+ ec_resume (fop, 0);
+
+ if (timer_link != NULL) {
+ ec_resume(timer_link->fop, 0);
+ }
+}
+
+void
+ec_lock_unfreeze(ec_lock_link_t *link)
+{
+ ec_lock_t *lock;
+
+ lock = link->lock;
+
+ LOCK(&lock->loc.inode->lock);
+
+ lock->acquired = _gf_false;
+ lock->release = _gf_false;
+
+ lock->refs--;
+ GF_ASSERT (lock->refs == lock->inserted);
+
+ GF_ASSERT(list_empty(&lock->waiting) && (lock->owner == NULL));
- ec_trace("LOCK_DESTROY", fop, "lock=%p", lock);
+ list_splice_init(&lock->frozen, &lock->waiting);
+ lock->refs += lock->refs_frozen;
+ lock->refs_frozen = 0;
- ec_lock_destroy(lock);
+ if (!list_empty(&lock->waiting)) {
+ link = list_entry(lock->waiting.next, ec_lock_link_t, wait_list);
+ list_del_init(&link->wait_list);
+
+ lock->owner = link->fop;
+
+ UNLOCK(&lock->loc.inode->lock);
+
+ ec_trace("LOCK_UNFREEZE", link->fop, "lock=%p", lock);
+
+ if (ec_lock_acquire(link)) {
+ ec_lock(link->fop);
+ }
+ ec_resume(link->fop, 0);
+ } else if (lock->refs == 0) {
+ ec_trace("LOCK_DESTROY", link->fop, "lock=%p", lock);
+
+ lock->ctx->inode_lock = NULL;
+
+ UNLOCK(&lock->loc.inode->lock);
+
+ ec_lock_destroy(lock);
+ } else {
+ UNLOCK(&lock->loc.inode->lock);
+ }
+}
+
+int32_t ec_unlocked(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ ec_fop_data_t *fop = cookie;
+ ec_lock_link_t *link = fop->data;
+
+ if (op_ret < 0) {
+ gf_log(this->name, GF_LOG_WARNING, "entry/inode unlocking failed (%s)",
+ ec_fop_name(link->fop->id));
+ } else {
+ ec_trace("UNLOCKED", link->fop, "lock=%p", link->lock);
+ }
+
+ ec_lock_unfreeze(link);
+
+ return 0;
+}
+
+void ec_unlock_lock(ec_lock_link_t *link)
+{
+ ec_lock_t *lock;
+ ec_fop_data_t *fop;
+
+ lock = link->lock;
+ fop = link->fop;
+
+ ec_clear_inode_info(fop, lock->loc.inode);
+
+ if ((lock->mask != 0) && lock->acquired) {
+ ec_owner_set(fop->frame, lock);
+
+ lock->flock.l_type = F_UNLCK;
+ ec_trace("UNLOCK_INODELK", fop, "lock=%p, inode=%p", lock,
+ lock->loc.inode);
+
+ ec_inodelk(fop->frame, fop->xl, lock->mask, EC_MINIMUM_ALL,
+ ec_unlocked, link, fop->xl->name, &lock->loc, F_SETLK,
+ &lock->flock, NULL);
+ } else {
+ ec_lock_unfreeze(link);
+ }
}
int32_t ec_update_size_version_done(call_frame_t * frame, void * cookie,
@@ -1390,111 +1446,128 @@ int32_t ec_update_size_version_done(call_frame_t * frame, void * cookie,
int32_t op_errno, dict_t * xattr,
dict_t * xdata)
{
- ec_fop_data_t * fop = cookie;
+ ec_fop_data_t *fop = cookie;
+ ec_lock_link_t *link;
+ ec_lock_t *lock;
+ ec_inode_t *ctx;
- if (op_ret < 0)
- {
+ if (op_ret < 0) {
gf_log(fop->xl->name, GF_LOG_ERROR, "Failed to update version and "
"size (error %d)", op_errno);
- }
- else
- {
+ } else {
fop->parent->mask &= fop->good;
+ link = fop->data;
+ lock = link->lock;
+ ctx = lock->ctx;
+
+ if (ec_dict_del_array(xattr, EC_XATTR_VERSION, ctx->post_version,
+ EC_VERSION_SIZE) == 0) {
+ ctx->pre_version[0] = ctx->post_version[0];
+ ctx->pre_version[1] = ctx->post_version[1];
+
+ ctx->have_version = _gf_true;
+ }
+ if (ec_dict_del_number(xattr, EC_XATTR_SIZE, &ctx->post_size) == 0) {
+ ctx->pre_size = ctx->post_size;
+
+ ctx->have_size = _gf_true;
+ }
+ if (ec_dict_del_array(xattr, EC_XATTR_DIRTY, ctx->post_dirty,
+ EC_VERSION_SIZE) == 0) {
+ ctx->pre_dirty[0] = ctx->post_dirty[0];
+ ctx->pre_dirty[1] = ctx->post_dirty[1];
+
+ ctx->have_dirty = _gf_true;
+ }
+ if ((ec_dict_del_config(xdata, EC_XATTR_CONFIG, &ctx->config) == 0) &&
+ ec_config_check(fop->parent, &ctx->config)) {
+ ctx->have_config = _gf_true;
+ }
+
+ ctx->have_info = _gf_true;
}
- if (fop->data != NULL) {
- ec_unlock_lock(fop->parent, fop->data);
+ if ((fop->parent->id != GF_FOP_FLUSH) &&
+ (fop->parent->id != GF_FOP_FSYNC) &&
+ (fop->parent->id != GF_FOP_FSYNCDIR)) {
+ ec_unlock_lock(fop->data);
}
return 0;
}
-uint64_t
-ec_get_dirty_value (ec_t *ec, uintptr_t fop_mask, uint64_t version_delta,
- gf_boolean_t dirty)
-{
- uint64_t dirty_val = 0;
-
- if (version_delta) {
- if (~fop_mask & ec->node_mask) {
- /* fop didn't succeed on all subvols so 'dirty' xattr
- * shouldn't be cleared */
- if (!dirty)
- dirty_val = 1;
- } else {
- /* fop succeed on all subvols so 'dirty' xattr
- * should be cleared */
- if (dirty)
- dirty_val = -1;
- }
- }
- return dirty_val;
-}
-
void
-ec_update_size_version(ec_fop_data_t *fop, loc_t *loc, uint64_t version[2],
- uint64_t size, gf_boolean_t dirty[2], ec_lock_t *lock)
+ec_update_size_version(ec_lock_link_t *link, uint64_t *version,
+ uint64_t size, uint64_t *dirty)
{
- ec_t *ec = fop->xl->private;
+ ec_fop_data_t *fop;
+ ec_lock_t *lock;
+ ec_inode_t *ctx;
dict_t * dict;
uid_t uid;
gid_t gid;
- uint64_t dirty_values[2] = {0};
- int i = 0;
-
- if (fop->parent != NULL)
- {
- fop->parent->post_size = fop->post_size;
- return;
- }
+ fop = link->fop;
- ec_trace("UPDATE", fop, "version=%ld, size=%ld, dirty=%u", version, size,
- dirty);
+ ec_trace("UPDATE", fop, "version=%ld/%ld, size=%ld, dirty=%ld/%ld",
+ version[0], version[1], size, dirty[0], dirty[1]);
dict = dict_new();
- if (dict == NULL)
- {
+ if (dict == NULL) {
goto out;
}
- if (version[0] != 0 || version[1] != 0) {
+ lock = link->lock;
+ ctx = lock->ctx;
+
+ /* If we don't have version information or it has been modified, we
+ * update it. */
+ if (!ctx->have_version || (version[0] != 0) || (version[1] != 0)) {
if (ec_dict_set_array(dict, EC_XATTR_VERSION,
version, EC_VERSION_SIZE) != 0) {
goto out;
}
}
+
if (size != 0) {
+ /* If size has been changed, we should already know the previous size
+ * of the file. */
+ GF_ASSERT(ctx->have_size);
+
if (ec_dict_set_number(dict, EC_XATTR_SIZE, size) != 0) {
goto out;
}
}
- for (i = 0; i < sizeof (dirty_values)/sizeof (dirty_values[0]); i++) {
- dirty_values[i] = ec_get_dirty_value (ec, fop->mask, version[i],
- dirty[i]);
- }
-
- if (dirty_values[0] || dirty_values[1]) {
- if (ec_dict_set_array(dict, EC_XATTR_DIRTY, dirty_values,
+ /* If we don't have dirty information or it has been modified, we update
+ * it. */
+ if (!ctx->have_dirty || (dirty[0] != 0) || (dirty[1] != 0)) {
+ if (ec_dict_set_array(dict, EC_XATTR_DIRTY, dirty,
EC_VERSION_SIZE) != 0) {
goto out;
}
}
+ /* If config information is not know, we request it now. */
+ if ((lock->loc.inode->ia_type == IA_IFREG) && !ctx->have_config) {
+ /* A failure requesting this xattr is ignored because it's not
+ * absolutely required right now. */
+ ec_dict_set_number(dict, EC_XATTR_CONFIG, 0);
+ }
+
uid = fop->frame->root->uid;
gid = fop->frame->root->gid;
fop->frame->root->uid = 0;
fop->frame->root->gid = 0;
- if (fop->use_fd) {
- ec_fxattrop(fop->frame, fop->xl, fop->mask, EC_MINIMUM_MIN,
- ec_update_size_version_done, lock, fop->fd,
+ if (link->lock->fd == NULL) {
+ ec_xattrop(fop->frame, fop->xl, fop->mask, EC_MINIMUM_MIN,
+ ec_update_size_version_done, link, &link->lock->loc,
GF_XATTROP_ADD_ARRAY64, dict, NULL);
} else {
- ec_xattrop(fop->frame, fop->xl, fop->mask, EC_MINIMUM_MIN,
- ec_update_size_version_done, lock, loc,
+ ec_fxattrop(fop->frame, fop->xl, fop->mask, EC_MINIMUM_MIN,
+ ec_update_size_version_done, link, link->lock->fd,
GF_XATTROP_ADD_ARRAY64, dict, NULL);
}
@@ -1506,8 +1579,7 @@ ec_update_size_version(ec_fop_data_t *fop, loc_t *loc, uint64_t version[2],
return;
out:
- if (dict != NULL)
- {
+ if (dict != NULL) {
dict_unref(dict);
}
@@ -1516,27 +1588,60 @@ out:
gf_log(fop->xl->name, GF_LOG_ERROR, "Unable to update version and size");
}
-void ec_unlock_now(ec_fop_data_t *fop, ec_lock_t *lock)
+gf_boolean_t
+ec_update_info(ec_lock_link_t *link)
+{
+ ec_lock_t *lock;
+ ec_inode_t *ctx;
+ uint64_t version[2];
+ uint64_t dirty[2];
+ uint64_t size;
+
+ lock = link->lock;
+ ctx = lock->ctx;
+
+ /* pre_version[*] will be 0 if have_version is false */
+ version[0] = ctx->post_version[0] - ctx->pre_version[0];
+ version[1] = ctx->post_version[1] - ctx->pre_version[1];
+
+ size = ctx->post_size - ctx->pre_size;
+
+ /* pre_dirty[*] will be 0 if have_dirty is false */
+ dirty[0] = ctx->post_dirty[0] - ctx->pre_dirty[0];
+ dirty[1] = ctx->post_dirty[1] - ctx->pre_dirty[1];
+
+ if ((version[0] != 0) || (version[1] != 0) ||
+ (dirty[0] != 0) || (dirty[1] != 0)) {
+ ec_update_size_version(link, version, size, dirty);
+
+ return _gf_true;
+ }
+
+ return _gf_false;
+}
+
+void
+ec_unlock_now(ec_lock_link_t *link)
{
- ec_trace("UNLOCK_NOW", fop, "lock=%p", lock);
+ ec_trace("UNLOCK_NOW", link->fop, "lock=%p", link->lock);
- if ((lock->version_delta[0] != 0) || (lock->version_delta[1] != 0) ||
- lock->is_dirty[0] || lock->is_dirty[1]) {
- ec_update_size_version(fop, &lock->loc, lock->version_delta,
- lock->size_delta, lock->is_dirty, lock);
- } else {
- ec_unlock_lock(fop, lock);
+ if (!ec_update_info(link)) {
+ ec_unlock_lock(link);
}
- ec_resume(fop, 0);
+ ec_resume(link->fop, 0);
}
void
-ec_unlock_timer_del(ec_fop_data_t *fop, ec_lock_t *lock)
+ec_unlock_timer_del(ec_lock_link_t *link)
{
+ int32_t before = 0;
+ ec_lock_t *lock;
inode_t *inode;
gf_boolean_t now = _gf_false;
+ lock = link->lock;
+
/* A race condition can happen if timer expires, calls this function
* and the lock is released (lock->loc is wiped) but the fop is not
* fully completed yet (it's still on the list of pending fops). In
@@ -1550,27 +1655,32 @@ ec_unlock_timer_del(ec_fop_data_t *fop, ec_lock_t *lock)
LOCK(&inode->lock);
if (lock->timer != NULL) {
- ec_trace("UNLOCK_DELAYED", fop, "lock=%p", lock);
+ ec_trace("UNLOCK_DELAYED", link->fop, "lock=%p", lock);
- gf_timer_call_cancel(fop->xl->ctx, lock->timer);
+ gf_timer_call_cancel(link->fop->xl->ctx, lock->timer);
lock->timer = NULL;
- *lock->plock = NULL;
- now = _gf_true;
+ lock->release = now = _gf_true;
+
+ before = lock->refs + lock->refs_frozen;
+ list_splice_init(&lock->waiting, &lock->frozen);
+ lock->refs_frozen += lock->refs - lock->inserted - 1;
+ lock->refs = 1 + lock->inserted;
+ /* We moved around the locks, so total number of locks shouldn't
+ * change by this operation*/
+ GF_ASSERT (before == (lock->refs + lock->refs_frozen));
}
UNLOCK(&inode->lock);
if (now) {
- ec_unlock_now(fop, lock);
+ ec_unlock_now(link);
}
}
void ec_unlock_timer_cbk(void *data)
{
- ec_lock_link_t *link = data;
-
- ec_unlock_timer_del(link->fop, link->lock);
+ ec_unlock_timer_del(data);
}
void ec_unlock_timer_add(ec_lock_link_t *link)
@@ -1578,28 +1688,28 @@ void ec_unlock_timer_add(ec_lock_link_t *link)
struct timespec delay;
ec_fop_data_t *fop = link->fop;
ec_lock_t *lock = link->lock;
- int32_t refs = 1;
+ gf_boolean_t now = _gf_false;
LOCK(&lock->loc.inode->lock);
GF_ASSERT(lock->timer == NULL);
- if (lock->refs != 1) {
+ if ((lock->refs - lock->inserted) > 1) {
ec_trace("UNLOCK_SKIP", fop, "lock=%p", lock);
lock->refs--;
UNLOCK(&lock->loc.inode->lock);
} else if (lock->acquired) {
- delay.tv_sec = 1;
- delay.tv_nsec = 0;
+ ec_t *ec = fop->xl->private;
ec_sleep(fop);
- /* If healing is needed, do not delay lock release to let self-heal
- * start working as soon as possible. */
- if (!ec_fop_needs_heal(fop)) {
- ec_trace("UNLOCK_DELAY", fop, "lock=%p", lock);
+ /* If healing is needed, the lock needs to be released due to
+ * contention, or ec is shutting down, do not delay lock release. */
+ if (!lock->release && !ec_fop_needs_heal(fop) && !ec->shutdown) {
+ ec_trace("UNLOCK_DELAY", fop, "lock=%p, release=%d", lock,
+ lock->release);
delay.tv_sec = 1;
delay.tv_nsec = 0;
@@ -1609,26 +1719,25 @@ void ec_unlock_timer_add(ec_lock_link_t *link)
gf_log(fop->xl->name, GF_LOG_WARNING, "Unable to delay an "
"unlock");
- *lock->plock = NULL;
- refs = 0;
+ lock->release = now = _gf_true;
}
} else {
- ec_trace("UNLOCK_FORCE", fop, "lock=%p", lock);
- *lock->plock = NULL;
- refs = 0;
+ ec_trace("UNLOCK_FORCE", fop, "lock=%p, release=%d", lock,
+ lock->release);
+ lock->release = now = _gf_true;
}
UNLOCK(&lock->loc.inode->lock);
- if (refs == 0) {
- ec_unlock_now(fop, lock);
+ if (now) {
+ ec_unlock_now(link);
}
} else {
- *lock->plock = NULL;
+ lock->release = _gf_true;
UNLOCK(&lock->loc.inode->lock);
- ec_lock_destroy(lock);
+ ec_lock_unfreeze(link);
}
}
@@ -1649,56 +1758,52 @@ ec_unlock_force(ec_fop_data_t *fop)
for (i = 0; i < fop->lock_count; i++) {
ec_trace("UNLOCK_FORCED", fop, "lock=%p", &fop->locks[i]);
- ec_unlock_timer_del(fop, fop->locks[i].lock);
+ ec_unlock_timer_del(&fop->locks[i]);
}
}
void ec_flush_size_version(ec_fop_data_t * fop)
{
- ec_lock_t * lock;
- uint64_t version[2], delta;
- gf_boolean_t dirty[2] = {_gf_false, _gf_false};
-
GF_ASSERT(fop->lock_count == 1);
- lock = fop->locks[0].lock;
-
- LOCK(&lock->loc.inode->lock);
-
- GF_ASSERT(lock->owner == fop);
-
- version[0] = lock->version_delta[0];
- version[1] = lock->version_delta[1];
- dirty[0] = lock->is_dirty[0];
- dirty[1] = lock->is_dirty[1];
- delta = lock->size_delta;
- lock->version_delta[0] = 0;
- lock->version_delta[1] = 0;
- lock->size_delta = 0;
- lock->is_dirty[0] = _gf_false;
- lock->is_dirty[1] = _gf_false;
-
- UNLOCK(&lock->loc.inode->lock);
-
- if (version[0] > 0 || version[1] > 0 || dirty[0] || dirty[1])
- {
- ec_update_size_version(fop, &lock->loc, version, delta, dirty,
- NULL);
- }
+ ec_update_info(&fop->locks[0]);
}
void ec_lock_reuse(ec_fop_data_t *fop)
{
- ec_fop_data_t * wait_fop;
- ec_lock_t * lock;
- ec_lock_link_t * link;
- int32_t i;
+ ec_t *ec;
+ ec_cbk_data_t *cbk;
+ ec_lock_t *lock;
+ ec_lock_link_t *link;
+ ec_inode_t *ctx;
+ int32_t i, count;
+ gf_boolean_t release = _gf_false;
+
+ cbk = fop->answer;
+ if (cbk != NULL) {
+ if (cbk->xdata != NULL) {
+ if ((dict_get_int32(cbk->xdata, GLUSTERFS_INODELK_COUNT,
+ &count) == 0) && (count > 1)) {
+ release = _gf_true;
+ }
+ if (release) {
+ gf_log(fop->xl->name, GF_LOG_DEBUG,
+ "Lock contention detected");
+ }
+ }
+ } else {
+ /* If we haven't get an answer with enough quorum, we always release
+ * the lock. */
+ release = _gf_true;
+ }
+
+ ec = fop->xl->private;
for (i = 0; i < fop->lock_count; i++)
{
- wait_fop = NULL;
-
- lock = fop->locks[i].lock;
+ link = &fop->locks[i];
+ lock = link->lock;
+ ctx = lock->ctx;
LOCK(&lock->loc.inode->lock);
@@ -1706,47 +1811,42 @@ void ec_lock_reuse(ec_fop_data_t *fop)
GF_ASSERT(lock->owner == fop);
lock->owner = NULL;
+ lock->release |= release;
- if (((fop->locks_update >> i) & 1) != 0) {
- if (fop->error == 0)
- {
- if (ec_is_metadata_fop (fop->id)) {
- lock->version_delta[1]++;
- } else {
- lock->version_delta[0]++;
- }
- lock->size_delta += fop->post_size - fop->pre_size;
- if (fop->have_size) {
- lock->size = fop->post_size;
- lock->have_size = 1;
+ if ((fop->error == 0) && (cbk != NULL) && (cbk->op_ret >= 0)) {
+ if (link->update[0]) {
+ ctx->post_version[0]++;
+ if (ec->node_mask & ~fop->mask) {
+ ctx->post_dirty[0]++;
+ }
+ }
+ if (link->update[1]) {
+ ctx->post_version[1]++;
+ if (ec->node_mask & ~fop->mask) {
+ ctx->post_dirty[1]++;
}
}
}
lock->good_mask &= fop->mask;
+ link = NULL;
if (!list_empty(&lock->waiting))
{
link = list_entry(lock->waiting.next, ec_lock_link_t, wait_list);
list_del_init(&link->wait_list);
- wait_fop = link->fop;
-
- if (lock->kind == EC_LOCK_INODE)
- {
- wait_fop->pre_size = wait_fop->post_size = fop->post_size;
- wait_fop->have_size = fop->have_size;
- }
- wait_fop->mask &= fop->mask;
+ lock->owner = link->fop;
}
UNLOCK(&lock->loc.inode->lock);
- if (wait_fop != NULL)
+ if (link != NULL)
{
- ec_lock(wait_fop);
-
- ec_resume(wait_fop, 0);
+ if (ec_lock_acquire(link)) {
+ ec_lock(link->fop);
+ }
+ ec_resume(link->fop, 0);
}
}
}
@@ -1768,6 +1868,7 @@ void __ec_manager(ec_fop_data_t * fop, int32_t error)
if ((fop->state == EC_STATE_END) || (fop->state == -EC_STATE_END)) {
ec_fop_data_release(fop);
+
break;
}
diff --git a/xlators/cluster/ec/src/ec-common.h b/xlators/cluster/ec/src/ec-common.h
index 78cf261feeb..c0db0218699 100644
--- a/xlators/cluster/ec/src/ec-common.h
+++ b/xlators/cluster/ec/src/ec-common.h
@@ -38,19 +38,20 @@ typedef enum {
#define EC_MINIMUM_MIN -2
#define EC_MINIMUM_ALL -3
-#define EC_LOCK_ENTRY 0
-#define EC_LOCK_INODE 1
+#define EC_UPDATE_DATA 1
+#define EC_UPDATE_META 2
+#define EC_QUERY_INFO 4
+#define EC_INODE_SIZE 8
#define EC_STATE_START 0
#define EC_STATE_END 0
#define EC_STATE_INIT 1
#define EC_STATE_LOCK 2
-#define EC_STATE_GET_SIZE_AND_VERSION 3
-#define EC_STATE_DISPATCH 4
-#define EC_STATE_PREPARE_ANSWER 5
-#define EC_STATE_REPORT 6
-#define EC_STATE_LOCK_REUSE 7
-#define EC_STATE_UNLOCK 8
+#define EC_STATE_DISPATCH 3
+#define EC_STATE_PREPARE_ANSWER 4
+#define EC_STATE_REPORT 5
+#define EC_STATE_LOCK_REUSE 6
+#define EC_STATE_UNLOCK 7
#define EC_STATE_DELAYED_START 100
@@ -84,16 +85,21 @@ void ec_update_bad(ec_fop_data_t * fop, uintptr_t good);
void ec_fop_set_error(ec_fop_data_t * fop, int32_t error);
-void ec_lock_prepare_inode(ec_fop_data_t *fop, loc_t *loc, int32_t update);
-void ec_lock_prepare_entry(ec_fop_data_t *fop, loc_t *loc, int32_t update);
-void ec_lock_prepare_fd(ec_fop_data_t *fop, fd_t *fd, int32_t update);
+void ec_lock_prepare_inode(ec_fop_data_t *fop, loc_t *loc, uint32_t flags);
+void ec_lock_prepare_parent_inode(ec_fop_data_t *fop, loc_t *loc,
+ uint32_t flags);
+void ec_lock_prepare_fd(ec_fop_data_t *fop, fd_t *fd, uint32_t flags);
void ec_lock(ec_fop_data_t * fop);
void ec_lock_reuse(ec_fop_data_t *fop);
void ec_unlock(ec_fop_data_t * fop);
void ec_unlock_force(ec_fop_data_t *fop);
-void ec_get_size_version(ec_fop_data_t * fop);
-void ec_prepare_update(ec_fop_data_t *fop);
+gf_boolean_t ec_get_inode_size(ec_fop_data_t *fop, inode_t *inode,
+ uint64_t *size);
+gf_boolean_t ec_set_inode_size(ec_fop_data_t *fop, inode_t *inode,
+ uint64_t size);
+void ec_clear_inode_info(ec_fop_data_t *fop, inode_t *inode);
+
void ec_flush_size_version(ec_fop_data_t * fop);
void ec_dispatch_all(ec_fop_data_t * fop);
diff --git a/xlators/cluster/ec/src/ec-data.c b/xlators/cluster/ec/src/ec-data.c
index 609a47b466c..047ccd5ff31 100644
--- a/xlators/cluster/ec/src/ec-data.c
+++ b/xlators/cluster/ec/src/ec-data.c
@@ -130,6 +130,8 @@ ec_fop_data_t * ec_fop_data_allocate(call_frame_t * frame, xlator_t * this,
INIT_LIST_HEAD(&fop->cbk_list);
INIT_LIST_HEAD(&fop->answer_list);
INIT_LIST_HEAD(&fop->pending_list);
+ INIT_LIST_HEAD(&fop->locks[0].wait_list);
+ INIT_LIST_HEAD(&fop->locks[1].wait_list);
fop->xl = this;
fop->req_frame = frame;
@@ -222,10 +224,24 @@ ec_handle_last_pending_fop_completion (ec_fop_data_t *fop, gf_boolean_t *notify)
}
}
+void
+ec_fop_cleanup(ec_fop_data_t *fop)
+{
+ ec_cbk_data_t *cbk, *tmp;
+
+ list_for_each_entry_safe(cbk, tmp, &fop->answer_list, answer_list) {
+ list_del_init(&cbk->answer_list);
+
+ ec_cbk_data_destroy(cbk);
+ }
+ INIT_LIST_HEAD(&fop->cbk_list);
+
+ fop->answer = NULL;
+}
+
void ec_fop_data_release(ec_fop_data_t * fop)
{
ec_t *ec = NULL;
- ec_cbk_data_t * cbk, * tmp;
int32_t refs;
gf_boolean_t notify = _gf_false;
@@ -272,12 +288,7 @@ void ec_fop_data_release(ec_fop_data_t * fop)
ec_resume_parent(fop, fop->error);
- list_for_each_entry_safe(cbk, tmp, &fop->answer_list, answer_list)
- {
- list_del_init(&cbk->answer_list);
-
- ec_cbk_data_destroy(cbk);
- }
+ ec_fop_cleanup(fop);
ec = fop->xl->private;
ec_handle_last_pending_fop_completion (fop, &notify);
diff --git a/xlators/cluster/ec/src/ec-data.h b/xlators/cluster/ec/src/ec-data.h
index 8a58ffb288b..8204cf087de 100644
--- a/xlators/cluster/ec/src/ec-data.h
+++ b/xlators/cluster/ec/src/ec-data.h
@@ -67,10 +67,20 @@ struct _ec_fd
struct _ec_inode
{
uintptr_t bad;
- ec_lock_t *entry_lock;
ec_lock_t *inode_lock;
+ gf_boolean_t have_info;
+ gf_boolean_t have_config;
+ gf_boolean_t have_version;
+ gf_boolean_t have_size;
+ gf_boolean_t have_dirty;
+ ec_config_t config;
+ uint64_t pre_version[2];
+ uint64_t post_version[2];
+ uint64_t pre_size;
+ uint64_t post_size;
+ uint64_t pre_dirty[2];
+ uint64_t post_dirty[2];
struct list_head heal;
-
};
typedef int32_t (* fop_heal_cbk_t)(call_frame_t *, void * cookie, xlator_t *,
@@ -80,7 +90,6 @@ typedef int32_t (* fop_fheal_cbk_t)(call_frame_t *, void * cookie, xlator_t *,
int32_t, int32_t, uintptr_t, uintptr_t,
uintptr_t, dict_t *);
-
union _ec_cbk
{
fop_access_cbk_t access;
@@ -132,21 +141,21 @@ union _ec_cbk
struct _ec_lock
{
- ec_lock_t **plock;
+ ec_inode_t *ctx;
gf_timer_t *timer;
- struct list_head waiting;
+ struct list_head waiting; /* Queue of requests being serviced. */
+ struct list_head frozen; /* Queue of requests that will be serviced in
+ the next unlock/lock cycle. */
uintptr_t mask;
uintptr_t good_mask;
- int32_t kind;
int32_t refs;
- int32_t acquired;
- int32_t have_size;
- uint64_t size;
- uint64_t size_delta;
- uint64_t version[2];
- uint64_t version_delta[2];
- gf_boolean_t is_dirty[2];
+ int32_t refs_frozen;
+ int32_t inserted;
+ gf_boolean_t acquired;
+ gf_boolean_t release;
+ gf_boolean_t query;
ec_fop_data_t *owner;
+ fd_t *fd;
loc_t loc;
union
{
@@ -157,9 +166,12 @@ struct _ec_lock
struct _ec_lock_link
{
- ec_lock_t * lock;
- ec_fop_data_t * fop;
- struct list_head wait_list;
+ ec_lock_t *lock;
+ ec_fop_data_t *fop;
+ struct list_head wait_list;
+ gf_boolean_t update[2];
+ loc_t *base;
+ uint64_t size;
};
struct _ec_fop_data
@@ -183,12 +195,8 @@ struct _ec_fop_data
int32_t lock_count;
int32_t locked;
ec_lock_link_t locks[2];
- int32_t locks_update;
- int32_t have_size;
- uint64_t pre_size;
- uint64_t post_size;
+ int32_t first_lock;
gf_lock_t lock;
- ec_config_t config;
uint32_t flags;
uint32_t first;
@@ -197,6 +205,7 @@ struct _ec_fop_data
if fop->minimum number of subvolumes succeed
which are not healing*/
uintptr_t remaining;
+ uintptr_t received; /* Mask of responses */
uintptr_t good;
uintptr_t bad;
@@ -300,4 +309,6 @@ ec_fop_data_t * ec_fop_data_allocate(call_frame_t * frame, xlator_t * this,
void ec_fop_data_acquire(ec_fop_data_t * fop);
void ec_fop_data_release(ec_fop_data_t * fop);
+void ec_fop_cleanup(ec_fop_data_t *fop);
+
#endif /* __EC_DATA_H__ */
diff --git a/xlators/cluster/ec/src/ec-dir-read.c b/xlators/cluster/ec/src/ec-dir-read.c
index ffc3ed5a7cd..354c63d3683 100644
--- a/xlators/cluster/ec/src/ec-dir-read.c
+++ b/xlators/cluster/ec/src/ec-dir-read.c
@@ -128,14 +128,9 @@ int32_t ec_manager_opendir(ec_fop_data_t * fop, int32_t state)
/* Fall through */
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 0);
+ ec_lock_prepare_inode(fop, &fop->loc[0], EC_QUERY_INFO);
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
@@ -195,7 +190,6 @@ int32_t ec_manager_opendir(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
diff --git a/xlators/cluster/ec/src/ec-dir-write.c b/xlators/cluster/ec/src/ec-dir-write.c
index ffc96bf4351..ce09138fb7a 100644
--- a/xlators/cluster/ec/src/ec-dir-write.c
+++ b/xlators/cluster/ec/src/ec-dir-write.c
@@ -98,11 +98,10 @@ void ec_wind_create(ec_t * ec, ec_fop_data_t * fop, int32_t idx)
int32_t ec_manager_create(ec_fop_data_t * fop, int32_t state)
{
-
-
- ec_t * ec;
- ec_cbk_data_t * cbk;
- ec_fd_t * ctx;
+ ec_config_t config;
+ ec_t *ec;
+ ec_cbk_data_t *cbk;
+ ec_fd_t *ctx;
uint64_t version[2] = {0, 0};
switch (state)
@@ -137,16 +136,15 @@ int32_t ec_manager_create(ec_fop_data_t * fop, int32_t state)
ec = fop->xl->private;
- fop->config.version = EC_CONFIG_VERSION;
- fop->config.algorithm = EC_CONFIG_ALGORITHM;
- fop->config.gf_word_size = EC_GF_BITS;
- fop->config.bricks = ec->nodes;
- fop->config.redundancy = ec->redundancy;
- fop->config.chunk_size = EC_METHOD_CHUNK_SIZE;
+ config.version = EC_CONFIG_VERSION;
+ config.algorithm = EC_CONFIG_ALGORITHM;
+ config.gf_word_size = EC_GF_BITS;
+ config.bricks = ec->nodes;
+ config.redundancy = ec->redundancy;
+ config.chunk_size = EC_METHOD_CHUNK_SIZE;
if (ec_dict_set_config(fop->xdata, EC_XATTR_CONFIG,
- &fop->config) < 0)
- {
+ &config) < 0) {
fop->error = EIO;
return EC_STATE_REPORT;
@@ -172,7 +170,8 @@ int32_t ec_manager_create(ec_fop_data_t * fop, int32_t state)
/* Fall through */
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[0],
+ EC_UPDATE_DATA | EC_UPDATE_META);
ec_lock(fop);
return EC_STATE_DISPATCH;
@@ -376,17 +375,11 @@ int32_t ec_manager_link(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- // Parent entry of fop->loc[0] should be locked, but I don't
- // receive enough information to do it (fop->loc[0].parent is
- // NULL).
- ec_lock_prepare_entry(fop, &fop->loc[1], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[1], EC_UPDATE_DATA |
+ EC_UPDATE_META |
+ EC_INODE_SIZE);
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
@@ -410,7 +403,7 @@ int32_t ec_manager_link(ec_fop_data_t * fop, int32_t state)
ec_iatt_rebuild(fop->xl->private, cbk->iatt, 3,
cbk->count);
if (cbk->iatt[0].ia_type == IA_IFREG) {
- cbk->iatt[0].ia_size = fop->pre_size;
+ cbk->iatt[0].ia_size = fop->locks[0].size;
}
if (ec_loc_update(fop->xl, &fop->loc[0], cbk->inode,
@@ -446,7 +439,6 @@ int32_t ec_manager_link(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
@@ -589,7 +581,8 @@ int32_t ec_manager_mkdir(ec_fop_data_t * fop, int32_t state)
/* Fall through */
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[0],
+ EC_UPDATE_DATA | EC_UPDATE_META);
ec_lock(fop);
return EC_STATE_DISPATCH;
@@ -764,6 +757,7 @@ void ec_wind_mknod(ec_t * ec, ec_fop_data_t * fop, int32_t idx)
int32_t ec_manager_mknod(ec_fop_data_t * fop, int32_t state)
{
+ ec_config_t config;
ec_t *ec;
ec_cbk_data_t * cbk;
uint64_t version[2] = {0, 0};
@@ -783,15 +777,15 @@ int32_t ec_manager_mknod(ec_fop_data_t * fop, int32_t state)
ec = fop->xl->private;
- fop->config.version = EC_CONFIG_VERSION;
- fop->config.algorithm = EC_CONFIG_ALGORITHM;
- fop->config.gf_word_size = EC_GF_BITS;
- fop->config.bricks = ec->nodes;
- fop->config.redundancy = ec->redundancy;
- fop->config.chunk_size = EC_METHOD_CHUNK_SIZE;
+ config.version = EC_CONFIG_VERSION;
+ config.algorithm = EC_CONFIG_ALGORITHM;
+ config.gf_word_size = EC_GF_BITS;
+ config.bricks = ec->nodes;
+ config.redundancy = ec->redundancy;
+ config.chunk_size = EC_METHOD_CHUNK_SIZE;
if (ec_dict_set_config(fop->xdata, EC_XATTR_CONFIG,
- &fop->config) < 0) {
+ &config) < 0) {
fop->error = EIO;
return EC_STATE_REPORT;
@@ -814,7 +808,8 @@ int32_t ec_manager_mknod(ec_fop_data_t * fop, int32_t state)
/* Fall through */
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[0],
+ EC_UPDATE_DATA | EC_UPDATE_META);
ec_lock(fop);
return EC_STATE_DISPATCH;
@@ -997,15 +992,13 @@ int32_t ec_manager_rename(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 1);
- ec_lock_prepare_entry(fop, &fop->loc[1], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[0], EC_UPDATE_DATA |
+ EC_UPDATE_META |
+ EC_INODE_SIZE);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[1],
+ EC_UPDATE_DATA | EC_UPDATE_META);
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
@@ -1034,9 +1027,8 @@ int32_t ec_manager_rename(ec_fop_data_t * fop, int32_t state)
ec_iatt_rebuild(fop->xl->private, cbk->iatt, 5,
cbk->count);
- if (cbk->iatt[0].ia_type == IA_IFREG)
- {
- cbk->iatt[0].ia_size = fop->pre_size;
+ if (cbk->iatt[0].ia_type == IA_IFREG) {
+ cbk->iatt[0].ia_size = fop->locks[0].size;
}
}
}
@@ -1064,7 +1056,6 @@ int32_t ec_manager_rename(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
@@ -1191,7 +1182,8 @@ int32_t ec_manager_rmdir(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[0],
+ EC_UPDATE_DATA | EC_UPDATE_META);
ec_lock(fop);
return EC_STATE_DISPATCH;
@@ -1361,7 +1353,8 @@ int32_t ec_manager_symlink(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[0],
+ EC_UPDATE_DATA | EC_UPDATE_META);
ec_lock(fop);
return EC_STATE_DISPATCH;
@@ -1552,14 +1545,10 @@ int32_t ec_manager_unlink(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[0],
+ EC_UPDATE_DATA | EC_UPDATE_META);
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
@@ -1607,7 +1596,6 @@ int32_t ec_manager_unlink(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
diff --git a/xlators/cluster/ec/src/ec-generic.c b/xlators/cluster/ec/src/ec-generic.c
index 9d64735df55..f50c7a70560 100644
--- a/xlators/cluster/ec/src/ec-generic.c
+++ b/xlators/cluster/ec/src/ec-generic.c
@@ -320,15 +320,10 @@ int32_t ec_manager_fsync(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- ec_lock_prepare_fd(fop, fop->fd, 0);
+ ec_lock_prepare_fd(fop, fop->fd, EC_QUERY_INFO);
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
- return EC_STATE_DISPATCH;
+ return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
ec_flush_size_version(fop);
@@ -361,8 +356,10 @@ int32_t ec_manager_fsync(ec_fop_data_t * fop, int32_t state)
ec_iatt_rebuild(fop->xl->private, cbk->iatt, 2,
cbk->count);
- cbk->iatt[0].ia_size = fop->pre_size;
- cbk->iatt[1].ia_size = fop->post_size;
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_get_inode_size(fop, fop->fd->inode,
+ &cbk->iatt[0].ia_size));
+ cbk->iatt[1].ia_size = cbk->iatt[0].ia_size;
}
}
else
@@ -388,7 +385,6 @@ int32_t ec_manager_fsync(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
@@ -705,7 +701,6 @@ void ec_lookup_rebuild(ec_t * ec, ec_fop_data_t * fop, ec_cbk_data_t * cbk)
{
ec_cbk_data_t * ans = NULL;
ec_inode_t * ctx = NULL;
- ec_lock_t * lock = NULL;
data_t * data = NULL;
uint8_t * buff = NULL;
uint64_t size = 0;
@@ -729,14 +724,14 @@ void ec_lookup_rebuild(ec_t * ec, ec_fop_data_t * fop, ec_cbk_data_t * cbk)
LOCK(&cbk->inode->lock);
ctx = __ec_inode_get(cbk->inode, fop->xl);
- if ((ctx != NULL) && (ctx->inode_lock != NULL))
+ if (ctx != NULL)
{
- lock = ctx->inode_lock;
- cbk->version[0] = lock->version[0];
- cbk->version[1] = lock->version[1];
- if (lock->have_size)
- {
- size = lock->size;
+ if (ctx->have_version) {
+ cbk->version[0] = ctx->post_version[0];
+ cbk->version[1] = ctx->post_version[1];
+ }
+ if (ctx->have_size) {
+ size = ctx->post_size;
have_size = 1;
}
}
@@ -1304,8 +1299,8 @@ out:
/* FOP: xattrop */
-int32_t ec_combine_xattrop(ec_fop_data_t * fop, ec_cbk_data_t * dst,
- ec_cbk_data_t * src)
+int32_t ec_combine_xattrop(ec_fop_data_t *fop, ec_cbk_data_t *dst,
+ ec_cbk_data_t *src)
{
if (!ec_dict_compare(dst->dict, src->dict))
{
@@ -1325,9 +1320,9 @@ ec_xattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
{
ec_fop_data_t *fop = NULL;
ec_cbk_data_t *cbk = NULL;
+ data_t *data;
+ uint64_t *version;
int32_t idx = (int32_t)(uintptr_t)cookie;
- uint64_t version = 0;
- uint64_t *version_xattr = 0;
VALIDATE_OR_GOTO (this, out);
GF_VALIDATE_OR_GOTO (this->name, frame, out);
@@ -1347,12 +1342,19 @@ ec_xattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
if (op_ret >= 0) {
cbk->dict = dict_ref (xattr);
- if (dict_get_bin (xattr, EC_XATTR_VERSION,
- (void **)&version_xattr) == 0) {
- version = ntoh64(version_xattr[0]);
- if ((version >> EC_SELFHEAL_BIT) & 1)
- fop->healing |= (1ULL<<idx);
+ data = dict_get(cbk->dict, EC_XATTR_VERSION);
+ if ((data != NULL) && (data->len >= sizeof(uint64_t))) {
+ version = (uint64_t *)data->data;
+
+ if (((ntoh64(version[0]) >> EC_SELFHEAL_BIT) & 1) != 0) {
+ LOCK(&fop->lock);
+
+ fop->healing |= 1ULL << idx;
+
+ UNLOCK(&fop->lock);
+ }
}
+
ec_dict_del_array (xattr, EC_XATTR_DIRTY, cbk->dirty,
EC_VERSION_SIZE);
}
@@ -1386,13 +1388,10 @@ int32_t ec_manager_xattrop(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- if (fop->fd == NULL)
- {
- ec_lock_prepare_inode(fop, &fop->loc[0], 1);
- }
- else
- {
- ec_lock_prepare_fd(fop, fop->fd, 1);
+ if (fop->fd == NULL) {
+ ec_lock_prepare_inode(fop, &fop->loc[0], EC_UPDATE_META);
+ } else {
+ ec_lock_prepare_fd(fop, fop->fd, EC_UPDATE_META);
}
ec_lock(fop);
diff --git a/xlators/cluster/ec/src/ec-heal.c b/xlators/cluster/ec/src/ec-heal.c
index 4fe5754cbb8..80725e5a9fa 100644
--- a/xlators/cluster/ec/src/ec-heal.c
+++ b/xlators/cluster/ec/src/ec-heal.c
@@ -119,9 +119,8 @@ void ec_heal_lookup_resume(ec_fop_data_t * fop)
heal->version[0] = cbk->version[0];
heal->version[1] = cbk->version[1];
heal->raw_size = cbk->size;
- heal->fop->pre_size = cbk->iatt[0].ia_size;
- heal->fop->post_size = cbk->iatt[0].ia_size;
- heal->fop->have_size = 1;
+
+ GF_ASSERT(ec_set_inode_size(fop, cbk->inode, cbk->size));
if (ec_loc_update(heal->xl, &heal->loc, cbk->inode,
&cbk->iatt[0]) != 0)
@@ -547,25 +546,8 @@ out:
return error;
}
-void ec_heal_entrylk(ec_heal_t * heal, entrylk_cmd cmd)
-{
- loc_t loc;
-
- if (ec_loc_parent(heal->xl, &heal->loc, &loc) != 0) {
- gf_log("ec", GF_LOG_NOTICE, "ec_loc_parent() failed");
- ec_fop_set_error(heal->fop, EIO);
-
- return;
- }
-
- ec_entrylk(heal->fop->frame, heal->xl, -1, EC_MINIMUM_ALL, NULL, NULL,
- heal->xl->name, &loc, NULL, cmd, ENTRYLK_WRLCK, NULL);
-
- loc_wipe(&loc);
-}
-
-void ec_heal_inodelk(ec_heal_t * heal, int32_t type, int32_t use_fd,
- off_t offset, size_t size)
+void ec_heal_lock(ec_heal_t *heal, int32_t type, fd_t *fd, loc_t *loc,
+ off_t offset, size_t size)
{
struct gf_flock flock;
@@ -576,20 +558,47 @@ void ec_heal_inodelk(ec_heal_t * heal, int32_t type, int32_t use_fd,
flock.l_pid = 0;
flock.l_owner.len = 0;
- if (use_fd)
+ /* Remove inode size information before unlocking it. */
+ if ((type == F_UNLCK) && (heal->loc.inode != NULL)) {
+ ec_clear_inode_info(heal->fop, heal->loc.inode);
+ }
+
+ if (fd != NULL)
{
ec_finodelk(heal->fop->frame, heal->xl, heal->fop->mask,
- EC_MINIMUM_ALL, NULL, NULL, heal->xl->name, heal->fd,
+ EC_MINIMUM_ALL, NULL, NULL, heal->xl->name, fd,
F_SETLKW, &flock, NULL);
}
else
{
ec_inodelk(heal->fop->frame, heal->xl, heal->fop->mask, EC_MINIMUM_ALL,
- NULL, NULL, heal->xl->name, &heal->loc, F_SETLKW, &flock,
+ NULL, NULL, heal->xl->name, loc, F_SETLKW, &flock,
NULL);
}
}
+void ec_heal_entrylk(ec_heal_t *heal, int32_t type)
+{
+ loc_t loc;
+
+ if (ec_loc_parent(heal->xl, &heal->loc, &loc) != 0) {
+ ec_fop_set_error(heal->fop, EIO);
+
+ return;
+ }
+
+ ec_heal_lock(heal, type, NULL, &loc, 0, 0);
+
+ loc_wipe(&loc);
+}
+
+void ec_heal_inodelk(ec_heal_t *heal, int32_t type, int32_t use_fd,
+ off_t offset, size_t size)
+{
+ ec_heal_lock(heal, type, use_fd ? heal->fd : NULL, &heal->loc, offset,
+ size);
+}
+
void ec_heal_lookup(ec_heal_t *heal, uintptr_t mask)
{
dict_t * xdata;
@@ -1297,13 +1306,10 @@ ec_manager_heal (ec_fop_data_t * fop, int32_t state)
case EC_STATE_DISPATCH:
if (heal->done != 0) {
- gf_log("ec", GF_LOG_NOTICE, "heal already done");
return EC_STATE_HEAL_DISPATCH;
}
- gf_log("ec", GF_LOG_NOTICE, "heal before entrylk");
- ec_heal_entrylk(heal, ENTRYLK_LOCK);
- gf_log("ec", GF_LOG_NOTICE, "heal after entrylk");
+ ec_heal_entrylk(heal, F_WRLCK);
return EC_STATE_HEAL_ENTRY_LOOKUP;
@@ -1331,7 +1337,7 @@ ec_manager_heal (ec_fop_data_t * fop, int32_t state)
/* Only heal data/metadata if enough information is supplied. */
if (gf_uuid_is_null(heal->loc.gfid))
{
- ec_heal_entrylk(heal, ENTRYLK_UNLOCK);
+ ec_heal_entrylk(heal, F_UNLCK);
return EC_STATE_HEAL_DISPATCH;
}
@@ -1387,7 +1393,7 @@ ec_manager_heal (ec_fop_data_t * fop, int32_t state)
case -EC_STATE_HEAL_UNLOCK_ENTRY:
case EC_STATE_HEAL_UNLOCK_ENTRY:
if (heal->nameheal)
- ec_heal_entrylk(heal, ENTRYLK_UNLOCK);
+ ec_heal_entrylk(heal, F_UNLCK);
heal->bad = ec_heal_needs_data_rebuild(heal);
if (heal->bad != 0)
@@ -2557,9 +2563,9 @@ ec_heal_name (call_frame_t *frame, ec_t *ec, inode_t *parent, char *name,
EC_REPLIES_ALLOC (replies, ec->nodes);
output = alloca0 (ec->nodes);
locked_on = alloca0 (ec->nodes);
- ret = cluster_entrylk (ec->xl_list, participants, ec->nodes, replies,
+ ret = cluster_inodelk (ec->xl_list, participants, ec->nodes, replies,
locked_on, frame, ec->xl, ec->xl->name, parent,
- NULL);
+ 0, 0);
{
if (ret <= ec->fragments) {
gf_log (ec->xl->name, GF_LOG_DEBUG, "%s/%s: Skipping "
@@ -2573,8 +2579,8 @@ ec_heal_name (call_frame_t *frame, ec_t *ec, inode_t *parent, char *name,
ret = __ec_heal_name (frame, ec, parent, name, participants);
}
unlock:
- cluster_unentrylk (ec->xl_list, locked_on, ec->nodes, replies, output,
- frame, ec->xl, ec->xl->name, parent, NULL);
+ cluster_uninodelk (ec->xl_list, locked_on, ec->nodes, replies, output,
+ frame, ec->xl, ec->xl->name, parent, 0, 0);
out:
cluster_replies_wipe (replies, ec->nodes);
loc_wipe (&loc);
@@ -2658,9 +2664,9 @@ __ec_heal_entry (call_frame_t *frame, ec_t *ec, inode_t *inode,
dirty = alloca0 (ec->nodes * sizeof (*dirty));
EC_REPLIES_ALLOC (replies, ec->nodes);
- ret = cluster_entrylk (ec->xl_list, heal_on, ec->nodes, replies,
+ ret = cluster_inodelk (ec->xl_list, heal_on, ec->nodes, replies,
locked_on, frame, ec->xl, ec->xl->name, inode,
- NULL);
+ 0, 0);
{
if (ret <= ec->fragments) {
gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: Skipping heal "
@@ -2675,8 +2681,8 @@ __ec_heal_entry (call_frame_t *frame, ec_t *ec, inode_t *inode,
source = ret;
}
unlock:
- cluster_unentrylk (ec->xl_list, locked_on, ec->nodes, replies, output,
- frame, ec->xl, ec->xl->name, inode, NULL);
+ cluster_uninodelk (ec->xl_list, locked_on, ec->nodes, replies, output,
+ frame, ec->xl, ec->xl->name, inode, 0, 0);
if (ret < 0)
goto out;
@@ -2723,9 +2729,9 @@ ec_heal_entry (call_frame_t *frame, ec_t *ec, inode_t *inode,
sprintf (selfheal_domain, "%s:self-heal", ec->xl->name);
ec_mask_to_char_array (ec->xl_up, up_subvols, ec->nodes);
/*If other processes are already doing the heal, don't block*/
- ret = cluster_entrylk (ec->xl_list, up_subvols, ec->nodes, replies,
+ ret = cluster_inodelk (ec->xl_list, up_subvols, ec->nodes, replies,
locked_on, frame, ec->xl, selfheal_domain, inode,
- NULL);
+ 0, 0);
{
if (ret <= ec->fragments) {
gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: Skipping heal "
@@ -2738,8 +2744,8 @@ ec_heal_entry (call_frame_t *frame, ec_t *ec, inode_t *inode,
sources, healed_sinks);
}
unlock:
- cluster_unentrylk (ec->xl_list, locked_on, ec->nodes, replies, output,
- frame, ec->xl, selfheal_domain, inode, NULL);
+ cluster_uninodelk (ec->xl_list, locked_on, ec->nodes, replies, output,
+ frame, ec->xl, selfheal_domain, inode, 0, 0);
cluster_replies_wipe (replies, ec->nodes);
return ret;
}
@@ -3081,8 +3087,8 @@ ec_heal_block (call_frame_t *frame, xlator_t *this, uintptr_t target,
if (fop == NULL)
goto out;
- fop->pre_size = fop->post_size = heal->total_size;
- fop->have_size = 1;
+ GF_ASSERT(ec_set_inode_size(fop, heal->fd->inode, heal->total_size));
+
error = 0;
out:
diff --git a/xlators/cluster/ec/src/ec-helpers.c b/xlators/cluster/ec/src/ec-helpers.c
index 8ce3087d5a6..48251c84bac 100644
--- a/xlators/cluster/ec/src/ec-helpers.c
+++ b/xlators/cluster/ec/src/ec-helpers.c
@@ -738,3 +738,41 @@ ec_filter_internal_xattrs (dict_t *xattr)
dict_foreach_match (xattr, ec_is_internal_xattr, NULL,
dict_remove_foreach_fn, NULL);
}
+
+gf_boolean_t
+ec_is_data_fop (glusterfs_fop_t fop)
+{
+ switch (fop) {
+ case GF_FOP_WRITE:
+ case GF_FOP_TRUNCATE:
+ case GF_FOP_FTRUNCATE:
+ case GF_FOP_FALLOCATE:
+ case GF_FOP_DISCARD:
+ case GF_FOP_ZEROFILL:
+ return _gf_true;
+ default:
+ return _gf_false;
+ }
+ return _gf_false;
+}
+/*
+gf_boolean_t
+ec_is_metadata_fop (int32_t lock_kind, glusterfs_fop_t fop)
+{
+ if (lock_kind == EC_LOCK_ENTRY) {
+ return _gf_false;
+ }
+
+ switch (fop) {
+ case GF_FOP_SETATTR:
+ case GF_FOP_FSETATTR:
+ case GF_FOP_SETXATTR:
+ case GF_FOP_FSETXATTR:
+ case GF_FOP_REMOVEXATTR:
+ case GF_FOP_FREMOVEXATTR:
+ return _gf_true;
+ default:
+ return _gf_false;
+ }
+ return _gf_false;
+}*/
diff --git a/xlators/cluster/ec/src/ec-helpers.h b/xlators/cluster/ec/src/ec-helpers.h
index df4495138fe..14243df54f3 100644
--- a/xlators/cluster/ec/src/ec-helpers.h
+++ b/xlators/cluster/ec/src/ec-helpers.h
@@ -59,4 +59,11 @@ ec_is_internal_xattr (dict_t *dict, char *key, data_t *value, void *data);
void
ec_filter_internal_xattrs (dict_t *xattr);
+
+gf_boolean_t
+ec_is_data_fop (glusterfs_fop_t fop);
+/*
+gf_boolean_t
+ec_is_metadata_fop (glusterfs_fop_t fop);
+*/
#endif /* __EC_HELPERS_H__ */
diff --git a/xlators/cluster/ec/src/ec-inode-read.c b/xlators/cluster/ec/src/ec-inode-read.c
index 7372c0a0599..853d914148b 100644
--- a/xlators/cluster/ec/src/ec-inode-read.c
+++ b/xlators/cluster/ec/src/ec-inode-read.c
@@ -267,9 +267,9 @@ int32_t ec_manager_getxattr(ec_fop_data_t * fop, int32_t state)
(strncmp(fop->str[0], GF_XATTR_CLRLK_CMD,
strlen(GF_XATTR_CLRLK_CMD)) != 0)) {
if (fop->fd == NULL) {
- ec_lock_prepare_inode(fop, &fop->loc[0], 0);
+ ec_lock_prepare_inode(fop, &fop->loc[0], EC_QUERY_INFO);
} else {
- ec_lock_prepare_fd(fop, fop->fd, 0);
+ ec_lock_prepare_fd(fop, fop->fd, EC_QUERY_INFO);
}
ec_lock(fop);
}
@@ -1094,12 +1094,12 @@ int32_t ec_readv_rebuild(ec_t * ec, ec_fop_data_t * fop, ec_cbk_data_t * cbk)
size_t fsize = 0, size = 0, max = 0;
int32_t i = 0;
- if (cbk->op_ret < 0)
- {
+ if (cbk->op_ret < 0) {
goto out;
}
- cbk->iatt[0].ia_size = fop->pre_size;
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_get_inode_size(fop, fop->fd->inode, &cbk->iatt[0].ia_size));
if (cbk->op_ret > 0)
{
@@ -1331,15 +1331,10 @@ int32_t ec_manager_readv(ec_fop_data_t * fop, int32_t state)
/* Fall through */
case EC_STATE_LOCK:
- ec_lock_prepare_fd(fop, fop->fd, 0);
+ ec_lock_prepare_fd(fop, fop->fd, EC_QUERY_INFO);
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
- return EC_STATE_DISPATCH;
+ return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
ec_dispatch_min(fop);
@@ -1396,7 +1391,6 @@ int32_t ec_manager_readv(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
@@ -1580,22 +1574,14 @@ int32_t ec_manager_stat(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- if (fop->fd == NULL)
- {
- ec_lock_prepare_inode(fop, &fop->loc[0], 0);
- }
- else
- {
- ec_lock_prepare_fd(fop, fop->fd, 0);
+ if (fop->fd == NULL) {
+ ec_lock_prepare_inode(fop, &fop->loc[0], EC_QUERY_INFO);
+ } else {
+ ec_lock_prepare_fd(fop, fop->fd, EC_QUERY_INFO);
}
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
- return EC_STATE_DISPATCH;
+ return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
ec_dispatch_all(fop);
@@ -1614,16 +1600,16 @@ int32_t ec_manager_stat(ec_fop_data_t * fop, int32_t state)
cbk->op_errno = EIO;
}
}
- if (cbk->op_ret < 0)
- {
+ if (cbk->op_ret < 0) {
ec_fop_set_error(fop, cbk->op_errno);
- }
- else
- {
+ } else if (cbk->iatt[0].ia_type == IA_IFREG) {
ec_iatt_rebuild(fop->xl->private, cbk->iatt, 1,
cbk->count);
- cbk->iatt[0].ia_size = fop->pre_size;
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_get_inode_size(fop,
+ fop->locks[0].lock->loc.inode,
+ &cbk->iatt[0].ia_size));
}
}
else
@@ -1659,7 +1645,6 @@ int32_t ec_manager_stat(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
diff --git a/xlators/cluster/ec/src/ec-inode-write.c b/xlators/cluster/ec/src/ec-inode-write.c
index 6b485a26fbc..368b3ae5edf 100644
--- a/xlators/cluster/ec/src/ec-inode-write.c
+++ b/xlators/cluster/ec/src/ec-inode-write.c
@@ -123,11 +123,13 @@ ec_manager_xattr (ec_fop_data_t *fop, int32_t state)
switch (state) {
case EC_STATE_INIT:
case EC_STATE_LOCK:
- if (fop->fd == NULL)
- ec_lock_prepare_inode(fop, &fop->loc[0], 1);
- else
- ec_lock_prepare_fd(fop, fop->fd, 1);
-
+ if (fop->fd == NULL) {
+ ec_lock_prepare_inode(fop, &fop->loc[0],
+ EC_UPDATE_META | EC_QUERY_INFO);
+ } else {
+ ec_lock_prepare_fd(fop, fop->fd,
+ EC_UPDATE_META | EC_QUERY_INFO);
+ }
ec_lock(fop);
return EC_STATE_DISPATCH;
@@ -378,21 +380,15 @@ int32_t ec_manager_setattr(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- if (fop->fd == NULL)
- {
- ec_lock_prepare_inode(fop, &fop->loc[0], 1);
- }
- else
- {
- ec_lock_prepare_fd(fop, fop->fd, 1);
+ if (fop->fd == NULL) {
+ ec_lock_prepare_inode(fop, &fop->loc[0],
+ EC_UPDATE_META | EC_QUERY_INFO);
+ } else {
+ ec_lock_prepare_fd(fop, fop->fd,
+ EC_UPDATE_META | EC_QUERY_INFO);
}
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
@@ -412,17 +408,17 @@ int32_t ec_manager_setattr(ec_fop_data_t * fop, int32_t state)
cbk->op_errno = EIO;
}
}
- if (cbk->op_ret < 0)
- {
+ if (cbk->op_ret < 0) {
ec_fop_set_error(fop, cbk->op_errno);
- }
- else
- {
+ } else if (cbk->iatt[0].ia_type == IA_IFREG) {
ec_iatt_rebuild(fop->xl->private, cbk->iatt, 2,
cbk->count);
- cbk->iatt[0].ia_size = fop->pre_size;
- cbk->iatt[1].ia_size = fop->pre_size;
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_get_inode_size(fop,
+ fop->locks[0].lock->loc.inode,
+ &cbk->iatt[0].ia_size));
+ cbk->iatt[1].ia_size = cbk->iatt[0].ia_size;
}
}
else
@@ -462,7 +458,6 @@ int32_t ec_manager_setattr(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
@@ -992,21 +987,17 @@ int32_t ec_manager_truncate(ec_fop_data_t * fop, int32_t state)
/* Fall through */
case EC_STATE_LOCK:
- if (fop->id == GF_FOP_TRUNCATE)
- {
- ec_lock_prepare_inode(fop, &fop->loc[0], 1);
- }
- else
- {
- ec_lock_prepare_fd(fop, fop->fd, 1);
+ if (fop->id == GF_FOP_TRUNCATE) {
+ ec_lock_prepare_inode(fop, &fop->loc[0],
+ EC_UPDATE_DATA | EC_UPDATE_META |
+ EC_QUERY_INFO);
+ } else {
+ ec_lock_prepare_fd(fop, fop->fd,
+ EC_UPDATE_DATA | EC_UPDATE_META |
+ EC_QUERY_INFO);
}
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_prepare_update(fop);
-
return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
@@ -1035,14 +1026,18 @@ int32_t ec_manager_truncate(ec_fop_data_t * fop, int32_t state)
ec_iatt_rebuild(fop->xl->private, cbk->iatt, 2,
cbk->count);
- cbk->iatt[0].ia_size = fop->pre_size;
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_get_inode_size(fop,
+ fop->locks[0].lock->loc.inode,
+ &cbk->iatt[0].ia_size));
cbk->iatt[1].ia_size = fop->user_size;
- fop->post_size = fop->user_size;
- if ((fop->pre_size > fop->post_size) &&
- (fop->user_size != fop->offset))
- {
- if (!ec_truncate_clean(fop))
- {
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_set_inode_size(fop,
+ fop->locks[0].lock->loc.inode,
+ fop->user_size));
+ if ((cbk->iatt[0].ia_size > cbk->iatt[1].ia_size) &&
+ (fop->user_size != fop->offset)) {
+ if (!ec_truncate_clean(fop)) {
ec_fop_set_error(fop, EIO);
}
}
@@ -1085,7 +1080,6 @@ int32_t ec_manager_truncate(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
@@ -1355,9 +1349,13 @@ void ec_writev_start(ec_fop_data_t *fop)
ec_fd_t *ctx;
fd_t *fd;
size_t tail;
+ uint64_t current;
uid_t uid;
gid_t gid;
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_get_inode_size(fop, fop->fd->inode, &current));
+
fd = fd_anonymous(fop->fd->inode);
if (fd == NULL) {
ec_fop_set_error(fop, EIO);
@@ -1373,7 +1371,7 @@ void ec_writev_start(ec_fop_data_t *fop)
ctx = ec_fd_get(fop->fd, fop->xl);
if (ctx != NULL) {
if ((ctx->flags & O_APPEND) != 0) {
- fop->offset = fop->pre_size;
+ fop->offset = current;
}
}
@@ -1404,22 +1402,17 @@ void ec_writev_start(ec_fop_data_t *fop)
iobref_unref(fop->buffers);
fop->buffers = iobref;
- if (fop->head > 0)
- {
+ if (fop->head > 0) {
ec_readv(fop->frame, fop->xl, -1, EC_MINIMUM_MIN, ec_writev_merge_head,
NULL, fd, ec->stripe_size, fop->offset, 0, NULL);
}
tail = fop->size - fop->user_size - fop->head;
- if ((tail > 0) && ((fop->head == 0) || (fop->size > ec->stripe_size)))
- {
- if (fop->pre_size > fop->offset + fop->head + fop->user_size)
- {
+ if ((tail > 0) && ((fop->head == 0) || (fop->size > ec->stripe_size))) {
+ if (current > fop->offset + fop->head + fop->user_size) {
ec_readv(fop->frame, fop->xl, -1, EC_MINIMUM_MIN,
ec_writev_merge_tail, NULL, fd, ec->stripe_size,
fop->offset + fop->size - ec->stripe_size, 0, NULL);
- }
- else
- {
+ } else {
memset(fop->vector[0].iov_base + fop->size - tail, 0, tail);
}
}
@@ -1530,14 +1523,11 @@ int32_t ec_manager_writev(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- ec_lock_prepare_fd(fop, fop->fd, 1);
+ ec_lock_prepare_fd(fop, fop->fd,
+ EC_UPDATE_DATA | EC_UPDATE_META |
+ EC_QUERY_INFO);
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_prepare_update(fop);
-
return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
@@ -1574,27 +1564,34 @@ int32_t ec_manager_writev(ec_fop_data_t * fop, int32_t state)
ec_iatt_rebuild(fop->xl->private, cbk->iatt, 2,
cbk->count);
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_get_inode_size(fop, fop->fd->inode,
+ &cbk->iatt[0].ia_size));
+ cbk->iatt[1].ia_size = cbk->iatt[0].ia_size;
size = fop->offset + fop->head + fop->user_size;
- if (size > fop->pre_size)
- {
- fop->post_size = size;
- }
-
- cbk->iatt[0].ia_size = fop->pre_size;
- cbk->iatt[1].ia_size = fop->post_size;
-
- cbk->op_ret *= ec->fragments;
- if (cbk->op_ret < fop->head)
- {
- cbk->op_ret = 0;
- }
- else
- {
- cbk->op_ret -= fop->head;
+ if (size > cbk->iatt[0].ia_size) {
+ /* Only update inode size if this is a top level fop.
+ * Otherwise this is an internal write and the top
+ * level fop should take care of the real inode size.
+ */
+ if (fop->parent == NULL) {
+ /* This shouldn't fail because we have the inode
+ * locked. */
+ GF_ASSERT(ec_set_inode_size(fop, fop->fd->inode,
+ size));
+ }
+ cbk->iatt[1].ia_size = size;
}
- if (cbk->op_ret > fop->user_size)
- {
- cbk->op_ret = fop->user_size;
+ if (fop->error == 0) {
+ cbk->op_ret *= ec->fragments;
+ if (cbk->op_ret < fop->head) {
+ cbk->op_ret = 0;
+ } else {
+ cbk->op_ret -= fop->head;
+ }
+ if (cbk->op_ret > fop->user_size) {
+ cbk->op_ret = fop->user_size;
+ }
}
}
}
@@ -1621,8 +1618,8 @@ int32_t ec_manager_writev(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
+ case -EC_STATE_DELAYED_START:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
GF_ASSERT(fop->error != 0);