summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/ec/src
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/ec/src')
-rw-r--r--xlators/cluster/ec/src/ec-combine.c35
-rw-r--r--xlators/cluster/ec/src/ec-common.c1471
-rw-r--r--xlators/cluster/ec/src/ec-common.h32
-rw-r--r--xlators/cluster/ec/src/ec-data.c25
-rw-r--r--xlators/cluster/ec/src/ec-data.h53
-rw-r--r--xlators/cluster/ec/src/ec-dir-read.c8
-rw-r--r--xlators/cluster/ec/src/ec-dir-write.c96
-rw-r--r--xlators/cluster/ec/src/ec-generic.c67
-rw-r--r--xlators/cluster/ec/src/ec-heal.c96
-rw-r--r--xlators/cluster/ec/src/ec-helpers.c38
-rw-r--r--xlators/cluster/ec/src/ec-helpers.h7
-rw-r--r--xlators/cluster/ec/src/ec-inode-read.c51
-rw-r--r--xlators/cluster/ec/src/ec-inode-write.c159
13 files changed, 1139 insertions, 999 deletions
diff --git a/xlators/cluster/ec/src/ec-combine.c b/xlators/cluster/ec/src/ec-combine.c
index 9d4a18999f1..4617a0430f1 100644
--- a/xlators/cluster/ec/src/ec-combine.c
+++ b/xlators/cluster/ec/src/ec-combine.c
@@ -171,8 +171,10 @@ void ec_iatt_rebuild(ec_t * ec, struct iatt * iatt, int32_t count,
gf_boolean_t
ec_xattr_match (dict_t *dict, char *key, data_t *value, void *arg)
{
- if (fnmatch(GF_XATTR_STIME_PATTERN, key, 0) == 0)
+ if ((fnmatch(GF_XATTR_STIME_PATTERN, key, 0) == 0) ||
+ (strcmp(key, GLUSTERFS_OPEN_FD_COUNT) == 0)) {
return _gf_false;
+ }
return _gf_true;
}
@@ -185,6 +187,8 @@ ec_value_ignore (char *key)
(strcmp(key, GF_XATTR_USER_PATHINFO_KEY) == 0) ||
(strcmp(key, GF_XATTR_LOCKINFO_KEY) == 0) ||
(strcmp(key, GLUSTERFS_OPEN_FD_COUNT) == 0) ||
+ (strcmp(key, GLUSTERFS_INODELK_COUNT) == 0) ||
+ (strcmp(key, GLUSTERFS_ENTRYLK_COUNT) == 0) ||
(strncmp(key, GF_XATTR_CLRLK_CMD,
strlen (GF_XATTR_CLRLK_CMD)) == 0) ||
(strncmp(key, EC_QUOTA_PREFIX, strlen(EC_QUOTA_PREFIX)) == 0) ||
@@ -225,15 +229,9 @@ int32_t ec_dict_list(data_t ** list, int32_t * count, ec_cbk_data_t * cbk,
dict = (which == EC_COMBINE_XDATA) ? ans->xdata : ans->dict;
list[i] = dict_get(dict, key);
- if (list[i] == NULL)
- {
- gf_log(cbk->fop->xl->name, GF_LOG_ERROR, "Unexpected missing "
- "dictionary entry");
-
- return 0;
+ if (list[i] != NULL) {
+ i++;
}
-
- i++;
}
*count = i;
@@ -471,11 +469,6 @@ int32_t ec_dict_data_max32(ec_cbk_data_t *cbk, int32_t which, char *key)
return -1;
}
- if (num <= 1)
- {
- return 0;
- }
-
max = data_to_uint32(data[0]);
for (i = 1; i < num; i++)
{
@@ -507,10 +500,6 @@ int32_t ec_dict_data_max64(ec_cbk_data_t *cbk, int32_t which, char *key)
return -1;
}
- if (num <= 1) {
- return 0;
- }
-
max = data_to_uint64(data[0]);
for (i = 1; i < num; i++) {
tmp = data_to_uint64(data[i]);
@@ -630,6 +619,10 @@ int32_t ec_dict_data_combine(dict_t * dict, char * key, data_t * value,
{
return ec_dict_data_max32(data->cbk, data->which, key);
}
+ if ((strcmp(key, GLUSTERFS_INODELK_COUNT) == 0) ||
+ (strcmp(key, GLUSTERFS_ENTRYLK_COUNT) == 0)) {
+ return ec_dict_data_max32(data->cbk, data->which, key);
+ }
if (strcmp(key, QUOTA_SIZE_KEY) == 0) {
return ec_dict_data_quota(data->cbk, data->which, key);
@@ -831,6 +824,8 @@ void ec_combine (ec_cbk_data_t *newcbk, ec_combine_f combine)
LOCK(&fop->lock);
+ fop->received |= newcbk->mask;
+
item = fop->cbk_list.prev;
list_for_each_entry(cbk, &fop->cbk_list, list)
{
@@ -868,7 +863,9 @@ void ec_combine (ec_cbk_data_t *newcbk, ec_combine_f combine)
}
cbk = list_entry(fop->cbk_list.next, ec_cbk_data_t, list);
- needed = fop->minimum - cbk->count - fop->winds + 1;
+ if ((fop->mask ^ fop->remaining) == fop->received) {
+ needed = fop->minimum - cbk->count;
+ }
UNLOCK(&fop->lock);
diff --git a/xlators/cluster/ec/src/ec-common.c b/xlators/cluster/ec/src/ec-common.c
index 374739ac6e0..1a1c7093d7e 100644
--- a/xlators/cluster/ec/src/ec-common.c
+++ b/xlators/cluster/ec/src/ec-common.c
@@ -368,24 +368,34 @@ int32_t ec_child_select(ec_fop_data_t * fop)
uintptr_t mask = 0;
int32_t first = 0, num = 0;
+ ec_fop_cleanup(fop);
+
fop->mask &= ec->node_mask;
mask = ec->xl_up;
if (fop->parent == NULL)
{
- if (fop->loc[0].inode != NULL) {
+ if ((fop->flags & EC_FLAG_UPDATE_LOC_PARENT) && fop->loc[0].parent)
+ mask &= ec_inode_good(fop->loc[0].parent, fop->xl);
+
+ if ((fop->flags & EC_FLAG_UPDATE_LOC_INODE) && fop->loc[0].inode) {
mask &= ec_inode_good(fop->loc[0].inode, fop->xl);
}
- if (fop->loc[1].inode != NULL) {
+
+ if ((fop->flags & EC_FLAG_UPDATE_LOC_INODE) && fop->loc[1].inode) {
mask &= ec_inode_good(fop->loc[1].inode, fop->xl);
}
- if (fop->fd != NULL) {
- if (fop->fd->inode != NULL) {
+
+ if (fop->fd) {
+ if ((fop->flags & EC_FLAG_UPDATE_FD_INODE) && fop->fd->inode) {
mask &= ec_inode_good(fop->fd->inode, fop->xl);
}
- mask &= ec_fd_good(fop->fd, fop->xl);
+ if (fop->flags & fop->flags & EC_FLAG_UPDATE_FD) {
+ mask &= ec_fd_good(fop->fd, fop->xl);
+ }
}
}
+
if ((fop->mask & ~mask) != 0)
{
gf_log(fop->xl->name, GF_LOG_WARNING, "Executing operation with "
@@ -420,6 +430,7 @@ int32_t ec_child_select(ec_fop_data_t * fop)
/*Unconditionally wind on healing subvolumes*/
fop->mask |= fop->healing;
fop->remaining = fop->mask;
+ fop->received = 0;
ec_trace("SELECT", fop, "");
@@ -585,7 +596,7 @@ void ec_dispatch_min(ec_fop_data_t * fop)
}
}
-ec_lock_t * ec_lock_allocate(xlator_t * xl, int32_t kind, loc_t * loc)
+ec_lock_t *ec_lock_allocate(xlator_t *xl, loc_t *loc)
{
ec_t * ec = xl->private;
ec_lock_t * lock;
@@ -602,9 +613,9 @@ ec_lock_t * ec_lock_allocate(xlator_t * xl, int32_t kind, loc_t * loc)
lock = mem_get0(ec->lock_pool);
if (lock != NULL)
{
- lock->kind = kind;
lock->good_mask = -1ULL;
INIT_LIST_HEAD(&lock->waiting);
+ INIT_LIST_HEAD(&lock->frozen);
if (ec_loc_from_loc(xl, &lock->loc, loc) != 0)
{
mem_put(lock);
@@ -618,6 +629,9 @@ ec_lock_t * ec_lock_allocate(xlator_t * xl, int32_t kind, loc_t * loc)
void ec_lock_destroy(ec_lock_t * lock)
{
loc_wipe(&lock->loc);
+ if (lock->fd != NULL) {
+ fd_unref(lock->fd);
+ }
mem_put(lock);
}
@@ -627,166 +641,96 @@ int32_t ec_lock_compare(ec_lock_t * lock1, ec_lock_t * lock2)
return gf_uuid_compare(lock1->loc.gfid, lock2->loc.gfid);
}
-ec_lock_link_t *ec_lock_insert(ec_fop_data_t *fop, ec_lock_t *lock,
- int32_t update)
+void ec_lock_insert(ec_fop_data_t *fop, ec_lock_t *lock, uint32_t flags,
+ loc_t *base)
{
- ec_lock_t *new_lock, *tmp;
- ec_lock_link_t *link = NULL;
- int32_t tmp_update;
+ ec_lock_link_t *link;
- new_lock = lock;
+ /* This check is only prepared for up to 2 locks per fop. If more locks
+ * are needed this must be changed. */
if ((fop->lock_count > 0) &&
- (ec_lock_compare(fop->locks[0].lock, new_lock) > 0))
- {
- tmp = fop->locks[0].lock;
- fop->locks[0].lock = new_lock;
- new_lock = tmp;
-
- tmp_update = fop->locks_update;
- fop->locks_update = update;
- update = tmp_update;
- }
- fop->locks[fop->lock_count].lock = new_lock;
- fop->locks[fop->lock_count].fop = fop;
-
- fop->locks_update |= update << fop->lock_count;
-
- fop->lock_count++;
-
- if (lock->timer != NULL) {
- link = lock->timer->data;
- ec_trace("UNLOCK_CANCELLED", link->fop, "lock=%p", lock);
- gf_timer_call_cancel(fop->xl->ctx, lock->timer);
- lock->timer = NULL;
+ (ec_lock_compare(fop->locks[0].lock, lock) < 0)) {
+ fop->first_lock = fop->lock_count;
} else {
- lock->refs++;
- }
-
- return link;
-}
-
-void ec_lock_prepare_entry(ec_fop_data_t *fop, loc_t *loc, int32_t update)
-{
- ec_lock_t * lock = NULL;
- ec_inode_t * ctx = NULL;
- ec_lock_link_t *link = NULL;
- loc_t tmp;
-
- if ((fop->parent != NULL) || (fop->error != 0))
- {
- return;
- }
-
- /* update is only 0 for 'opendir', which needs to lock the entry pointed
- * by loc instead of its parent.
- */
- if (update)
- {
- if (ec_loc_parent(fop->xl, loc, &tmp) != 0) {
- ec_fop_set_error(fop, EIO);
-
- return;
- }
-
- /* If there's another lock, make sure that it's not the same. Otherwise
- * do not insert it.
- *
- * This can only happen on renames where source and target names are
- * in the same directory. */
- if ((fop->lock_count > 0) &&
- (fop->locks[0].lock->loc.inode == tmp.inode)) {
- goto wipe;
+ /* When the first lock is added to the current fop, request lock
+ * counts from locks xlator to be able to determine if there is
+ * contention and release the lock sooner. */
+ if (fop->xdata == NULL) {
+ fop->xdata = dict_new();
+ if (fop->xdata == NULL) {
+ ec_fop_set_error(fop, ENOMEM);
+ return;
+ }
}
- } else {
- if (ec_loc_from_loc(fop->xl, &tmp, loc) != 0) {
- ec_fop_set_error(fop, EIO);
-
+ if (dict_set_str(fop->xdata, GLUSTERFS_INODELK_DOM_COUNT,
+ fop->xl->name) != 0) {
+ ec_fop_set_error(fop, ENOMEM);
return;
}
}
- LOCK(&tmp.inode->lock);
-
- ctx = __ec_inode_get(tmp.inode, fop->xl);
- if (ctx == NULL)
- {
- __ec_fop_set_error(fop, EIO);
-
- goto unlock;
- }
-
- if (ctx->entry_lock != NULL)
- {
- lock = ctx->entry_lock;
- ec_trace("LOCK_ENTRYLK", fop, "lock=%p, inode=%p, path=%s"
- "Lock already acquired",
- lock, tmp.inode, tmp.path);
-
- goto insert;
- }
-
- lock = ec_lock_allocate(fop->xl, EC_LOCK_ENTRY, &tmp);
- if (lock == NULL)
- {
- __ec_fop_set_error(fop, EIO);
-
- goto unlock;
- }
-
- ec_trace("LOCK_CREATE", fop, "lock=%p", lock);
+ link = &fop->locks[fop->lock_count++];
- lock->type = ENTRYLK_WRLCK;
+ link->lock = lock;
+ link->fop = fop;
+ link->update[EC_DATA_TXN] = (flags & EC_UPDATE_DATA) != 0;
+ link->update[EC_METADATA_TXN] = (flags & EC_UPDATE_META) != 0;
+ link->base = base;
- lock->plock = &ctx->entry_lock;
- ctx->entry_lock = lock;
-
-insert:
- link = ec_lock_insert(fop, lock, update);
-
-unlock:
- UNLOCK(&tmp.inode->lock);
-
-wipe:
- loc_wipe(&tmp);
-
- if (link != NULL) {
- ec_resume(link->fop, 0);
- }
+ lock->refs++;
+ lock->inserted++;
}
-void ec_lock_prepare_inode(ec_fop_data_t *fop, loc_t *loc, int32_t update)
+void ec_lock_prepare_inode_internal(ec_fop_data_t *fop, loc_t *loc,
+ uint32_t flags, loc_t *base)
{
- ec_lock_link_t *link = NULL;
- ec_lock_t * lock;
- ec_inode_t * ctx;
+ ec_lock_t *lock = NULL;
+ ec_inode_t *ctx;
- if ((fop->parent != NULL) || (fop->error != 0) || (loc->inode == NULL))
- {
+ if ((fop->parent != NULL) || (fop->error != 0) || (loc->inode == NULL)) {
return;
}
LOCK(&loc->inode->lock);
ctx = __ec_inode_get(loc->inode, fop->xl);
- if (ctx == NULL)
- {
+ if (ctx == NULL) {
__ec_fop_set_error(fop, EIO);
goto unlock;
}
- if (ctx->inode_lock != NULL)
- {
+ if (ctx->inode_lock != NULL) {
lock = ctx->inode_lock;
+
+ /* If there's another lock, make sure that it's not the same. Otherwise
+ * do not insert it.
+ *
+ * This can only happen on renames where source and target names are
+ * in the same directory. */
+ if ((fop->lock_count > 0) && (fop->locks[0].lock == lock)) {
+ /* Combine data/meta updates */
+ fop->locks[0].update[EC_DATA_TXN] |= (flags & EC_UPDATE_DATA) != 0;
+ fop->locks[0].update[EC_METADATA_TXN] |=
+ (flags & EC_UPDATE_META) != 0;
+
+ /* Only one base inode is allowed per fop, so there shouldn't be
+ * overwrites here. */
+ if (base != NULL) {
+ fop->locks[0].base = base;
+ }
+
+ goto update_query;
+ }
+
ec_trace("LOCK_INODELK", fop, "lock=%p, inode=%p. Lock already "
"acquired", lock, loc->inode);
goto insert;
}
- lock = ec_lock_allocate(fop->xl, EC_LOCK_INODE, loc);
- if (lock == NULL)
- {
+ lock = ec_lock_allocate(fop->xl, loc);
+ if (lock == NULL) {
__ec_fop_set_error(fop, EIO);
goto unlock;
@@ -797,154 +741,78 @@ void ec_lock_prepare_inode(ec_fop_data_t *fop, loc_t *loc, int32_t update)
lock->flock.l_type = F_WRLCK;
lock->flock.l_whence = SEEK_SET;
- lock->plock = &ctx->inode_lock;
+ lock->ctx = ctx;
ctx->inode_lock = lock;
insert:
- link = ec_lock_insert(fop, lock, update);
-
+ ec_lock_insert(fop, lock, flags, base);
+update_query:
+ lock->query |= (flags & EC_QUERY_INFO) != 0;
unlock:
UNLOCK(&loc->inode->lock);
+}
- if (link != NULL) {
- ec_resume(link->fop, 0);
- }
+void ec_lock_prepare_inode(ec_fop_data_t *fop, loc_t *loc, uint32_t flags)
+{
+ ec_lock_prepare_inode_internal(fop, loc, flags, NULL);
}
-void ec_lock_prepare_fd(ec_fop_data_t *fop, fd_t *fd, int32_t update)
+void ec_lock_prepare_parent_inode(ec_fop_data_t *fop, loc_t *loc,
+ uint32_t flags)
{
- loc_t loc;
+ loc_t tmp, *base = NULL;
- if ((fop->parent != NULL) || (fop->error != 0))
- {
+ if (fop->error != 0) {
return;
}
- if (ec_loc_from_fd(fop->xl, &loc, fd) == 0)
- {
- ec_lock_prepare_inode(fop, &loc, update);
-
- loc_wipe(&loc);
- }
- else
- {
+ if (ec_loc_parent(fop->xl, loc, &tmp) != 0) {
ec_fop_set_error(fop, EIO);
- }
-}
-
-int32_t ec_locked(call_frame_t * frame, void * cookie, xlator_t * this,
- int32_t op_ret, int32_t op_errno, dict_t * xdata)
-{
- ec_fop_data_t * fop = cookie;
- ec_lock_t * lock = NULL;
-
- if (op_ret >= 0)
- {
- lock = fop->data;
- lock->mask = fop->good;
- lock->acquired = 1;
-
- fop->parent->mask &= fop->good;
- fop->parent->locked++;
- ec_trace("LOCKED", fop->parent, "lock=%p", lock);
-
- ec_lock(fop->parent);
+ return;
}
- else
- {
- gf_log(this->name, GF_LOG_WARNING, "Failed to complete preop lock");
+
+ if ((flags & EC_INODE_SIZE) != 0) {
+ base = loc;
+ flags ^= EC_INODE_SIZE;
}
- return 0;
+ ec_lock_prepare_inode_internal(fop, &tmp, flags, base);
+
+ loc_wipe(&tmp);
}
-void ec_lock(ec_fop_data_t * fop)
+void ec_lock_prepare_fd(ec_fop_data_t *fop, fd_t *fd, uint32_t flags)
{
- ec_lock_t * lock;
-
- while (fop->locked < fop->lock_count)
- {
- lock = fop->locks[fop->locked].lock;
-
- LOCK(&lock->loc.inode->lock);
-
- if (lock->owner != NULL)
- {
- ec_trace("LOCK_WAIT", fop, "lock=%p", lock);
-
- list_add_tail(&fop->locks[fop->locked].wait_list, &lock->waiting);
-
- ec_sleep(fop);
-
- UNLOCK(&lock->loc.inode->lock);
-
- break;
- }
- lock->owner = fop;
-
- UNLOCK(&lock->loc.inode->lock);
-
- if (!lock->acquired)
- {
- ec_owner_set(fop->frame, lock);
-
- if (lock->kind == EC_LOCK_ENTRY)
- {
- ec_trace("LOCK_ACQUIRE", fop, "lock=%p, inode=%p, path=%s",
- lock, lock->loc.inode, lock->loc.path);
-
- ec_entrylk(fop->frame, fop->xl, -1, EC_MINIMUM_ALL, ec_locked,
- lock, fop->xl->name, &lock->loc, NULL,
- ENTRYLK_LOCK, lock->type, NULL);
- }
- else
- {
- ec_trace("LOCK_ACQUIRE", fop, "lock=%p, inode=%p", lock,
- lock->loc.inode);
+ loc_t loc;
- ec_inodelk(fop->frame, fop->xl, -1, EC_MINIMUM_ALL, ec_locked,
- lock, fop->xl->name, &lock->loc, F_SETLKW,
- &lock->flock, NULL);
- }
+ if (fop->error != 0) {
+ return;
+ }
- break;
- }
+ if (ec_loc_from_fd(fop->xl, &loc, fd) != 0) {
+ ec_fop_set_error(fop, EIO);
- ec_trace("LOCK_REUSE", fop, "lock=%p", lock);
+ return;
+ }
- if (lock->have_size)
- {
- fop->pre_size = fop->post_size = lock->size;
- fop->have_size = 1;
- }
- fop->mask &= lock->good_mask;
+ ec_lock_prepare_inode_internal(fop, &loc, flags, NULL);
- fop->locked++;
- }
+ loc_wipe(&loc);
}
gf_boolean_t
-ec_config_check (ec_fop_data_t *fop, dict_t *xdata)
+ec_config_check (ec_fop_data_t *fop, ec_config_t *config)
{
ec_t *ec;
- if (ec_dict_del_config(xdata, EC_XATTR_CONFIG, &fop->config) < 0) {
- gf_log(fop->xl->name, GF_LOG_ERROR, "Failed to get a valid "
- "config");
-
- ec_fop_set_error(fop, EIO);
-
- return _gf_false;
- }
-
ec = fop->xl->private;
- if ((fop->config.version != EC_CONFIG_VERSION) ||
- (fop->config.algorithm != EC_CONFIG_ALGORITHM) ||
- (fop->config.gf_word_size != EC_GF_BITS) ||
- (fop->config.bricks != ec->nodes) ||
- (fop->config.redundancy != ec->redundancy) ||
- (fop->config.chunk_size != EC_METHOD_CHUNK_SIZE)) {
+ if ((config->version != EC_CONFIG_VERSION) ||
+ (config->algorithm != EC_CONFIG_ALGORITHM) ||
+ (config->gf_word_size != EC_GF_BITS) ||
+ (config->bricks != ec->nodes) ||
+ (config->redundancy != ec->redundancy) ||
+ (config->chunk_size != EC_METHOD_CHUNK_SIZE)) {
uint32_t data_bricks;
/* This combination of version/algorithm requires the following
@@ -957,273 +825,201 @@ ec_config_check (ec_fop_data_t *fop, dict_t *xdata)
chunk_size (in bits) must be a multiple of gf_word_size *
(bricks - redundancy) */
- data_bricks = fop->config.bricks - fop->config.redundancy;
- if ((fop->config.redundancy < 1) ||
- (fop->config.redundancy * 2 >= fop->config.bricks) ||
- !ec_is_power_of_2(fop->config.gf_word_size) ||
- ((fop->config.chunk_size * 8) % (fop->config.gf_word_size *
- data_bricks) != 0)) {
+ data_bricks = config->bricks - config->redundancy;
+ if ((config->redundancy < 1) ||
+ (config->redundancy * 2 >= config->bricks) ||
+ !ec_is_power_of_2(config->gf_word_size) ||
+ ((config->chunk_size * 8) % (config->gf_word_size * data_bricks)
+ != 0)) {
gf_log(fop->xl->name, GF_LOG_ERROR, "Invalid or corrupted config");
} else {
gf_log(fop->xl->name, GF_LOG_ERROR, "Unsupported config "
"(V=%u, A=%u, W=%u, "
"N=%u, R=%u, S=%u)",
- fop->config.version, fop->config.algorithm,
- fop->config.gf_word_size, fop->config.bricks,
- fop->config.redundancy, fop->config.chunk_size);
+ config->version, config->algorithm,
+ config->gf_word_size, config->bricks,
+ config->redundancy, config->chunk_size);
}
- ec_fop_set_error(fop, EIO);
-
return _gf_false;
}
return _gf_true;
}
-int32_t ec_get_size_version_set(call_frame_t * frame, void * cookie,
- xlator_t * this, int32_t op_ret,
- int32_t op_errno, inode_t * inode,
- struct iatt * buf, dict_t * xdata,
- struct iatt * postparent)
+int32_t
+ec_prepare_update_cbk (call_frame_t *frame, void *cookie,
+ xlator_t *this, int32_t op_ret, int32_t op_errno,
+ dict_t *dict, dict_t *xdata)
{
- ec_fop_data_t * fop = cookie;
- ec_inode_t * ctx;
+ ec_fop_data_t *fop = cookie, *parent;
+ ec_lock_link_t *link = fop->data;
ec_lock_t *lock = NULL;
+ ec_inode_t *ctx;
- if (op_ret >= 0)
- {
- if ((buf->ia_type == IA_IFREG) && !ec_config_check(fop, xdata)) {
- return 0;
- }
+ lock = link->lock;
+ parent = link->fop;
+ ctx = lock->ctx;
- LOCK(&inode->lock);
+ if (op_ret < 0) {
+ gf_log(this->name, GF_LOG_WARNING,
+ "Failed to get size and version (error %d: %s)", op_errno,
+ strerror (op_errno));
- ctx = __ec_inode_get(inode, this);
- if (ctx != NULL) {
- if (ctx->inode_lock != NULL) {
- lock = ctx->inode_lock;
- lock->version[0] = fop->answer->version[0];
- lock->version[1] = fop->answer->version[1];
+ goto out;
+ }
- if (buf->ia_type == IA_IFREG) {
- lock->have_size = 1;
- lock->size = buf->ia_size;
- }
- }
- if (ctx->entry_lock != NULL) {
- lock = ctx->entry_lock;
- lock->version[0] = fop->answer->version[0];
- lock->version[1] = fop->answer->version[1];
- }
- }
+ op_errno = EIO;
- UNLOCK(&inode->lock);
+ LOCK(&lock->loc.inode->lock);
- if (lock != NULL)
- {
- // Only update parent mask if the lookup has been made with
- // inode locked.
- fop->parent->mask &= fop->good;
- }
+ if (ec_dict_del_array(dict, EC_XATTR_VERSION, ctx->pre_version,
+ EC_VERSION_SIZE) != 0) {
+ gf_log(this->name, GF_LOG_ERROR, "Unable to get version xattr");
- if (buf->ia_type == IA_IFREG) {
- fop->parent->pre_size = fop->parent->post_size = buf->ia_size;
- fop->parent->have_size = 1;
- }
- }
- else
- {
- gf_log(this->name, GF_LOG_WARNING, "Failed to get size and version "
- "(error %d)", op_errno);
- ec_fop_set_error(fop, op_errno);
+ goto unlock;
}
+ ctx->post_version[0] += ctx->pre_version[0];
+ ctx->post_version[1] += ctx->pre_version[1];
- return 0;
-}
+ ctx->have_version = _gf_true;
-gf_boolean_t
-ec_is_data_fop (glusterfs_fop_t fop)
-{
- switch (fop) {
- case GF_FOP_WRITE:
- case GF_FOP_TRUNCATE:
- case GF_FOP_FTRUNCATE:
- case GF_FOP_FALLOCATE:
- case GF_FOP_DISCARD:
- case GF_FOP_ZEROFILL:
- return _gf_true;
- default:
- return _gf_false;
- }
- return _gf_false;
-}
+ if (ec_dict_del_array(dict, EC_XATTR_DIRTY, ctx->pre_dirty,
+ EC_VERSION_SIZE) == 0) {
+ ctx->post_dirty[0] += ctx->pre_dirty[0];
+ ctx->post_dirty[1] += ctx->pre_dirty[1];
-gf_boolean_t
-ec_is_metadata_fop (glusterfs_fop_t fop)
-{
- switch (fop) {
- case GF_FOP_SETATTR:
- case GF_FOP_FSETATTR:
- case GF_FOP_SETXATTR:
- case GF_FOP_FSETXATTR:
- case GF_FOP_REMOVEXATTR:
- case GF_FOP_FREMOVEXATTR:
- return _gf_true;
- default:
- return _gf_false;
- }
- return _gf_false;
-}
+ ctx->have_dirty = _gf_true;
+ }
-int32_t
-ec_prepare_update_cbk (call_frame_t *frame, void *cookie,
- xlator_t *this, int32_t op_ret, int32_t op_errno,
- dict_t *dict, dict_t *xdata)
-{
- ec_fop_data_t *fop = cookie, *parent;
- ec_lock_t *lock = NULL;
- uint64_t size = 0;
- uint64_t version[EC_VERSION_SIZE] = {0, 0};
+ if (lock->loc.inode->ia_type == IA_IFREG) {
+ if (ec_dict_del_number(dict, EC_XATTR_SIZE, &ctx->pre_size) != 0) {
+ gf_log(this->name, GF_LOG_ERROR, "Unable to get size xattr");
- if (op_ret >= 0) {
- parent = fop->parent;
- while ((parent != NULL) && (parent->locks[0].lock == NULL)) {
- parent = parent->parent;
- }
- if (parent == NULL) {
- return 0;
+ goto unlock;
}
+ ctx->post_size = ctx->pre_size;
- lock = parent->locks[0].lock;
- if (ec_is_metadata_fop (fop->parent->id))
- lock->is_dirty[EC_METADATA_TXN] = _gf_true;
- else
- lock->is_dirty[EC_DATA_TXN] = _gf_true;
-
- if (lock->loc.inode->ia_type == IA_IFREG) {
- if (!ec_config_check(fop, dict) ||
- (ec_dict_del_number(dict, EC_XATTR_SIZE, &size) != 0)) {
- ec_fop_set_error(fop, EIO);
- return 0;
- }
- }
+ ctx->have_size = _gf_true;
+
+ if ((ec_dict_del_config(dict, EC_XATTR_CONFIG, &ctx->config) != 0) ||
+ !ec_config_check(parent, &ctx->config)) {
+ gf_log(this->name, GF_LOG_ERROR, "Unable to get config xattr");
- if (ec_dict_del_array(dict, EC_XATTR_VERSION, version,
- EC_VERSION_SIZE) != 0) {
- ec_fop_set_error(fop, EIO);
- return 0;
+ goto unlock;
}
- LOCK(&lock->loc.inode->lock);
+ ctx->have_config = _gf_true;
+ }
- if (lock->loc.inode->ia_type == IA_IFREG) {
- lock->size = size;
- fop->parent->pre_size = fop->parent->post_size = size;
- fop->parent->have_size = lock->have_size = 1;
- }
- lock->version[0] = version[0];
- lock->version[1] = version[1];
+ ctx->have_info = _gf_true;
- UNLOCK(&lock->loc.inode->lock);
+ op_errno = 0;
+
+unlock:
+ UNLOCK(&lock->loc.inode->lock);
+out:
+ if (op_errno == 0) {
+ parent->mask &= fop->good;
- fop->parent->mask &= fop->good;
/*As of now only data healing marks bricks as healing*/
- if (ec_is_data_fop (fop->parent->id))
- fop->parent->healing |= fop->healing;
+ if (ec_is_data_fop (parent->id)) {
+ parent->healing |= fop->healing;
+ }
} else {
- gf_log(this->name, GF_LOG_WARNING,
- "Failed to get size and version (error %d: %s)", op_errno,
- strerror (op_errno));
- ec_fop_set_error(fop, op_errno);
+ ec_fop_set_error(parent, op_errno);
}
return 0;
}
-void ec_get_size_version(ec_fop_data_t * fop)
+void ec_get_size_version(ec_lock_link_t *link)
{
loc_t loc;
- dict_t * xdata;
+ ec_lock_t *lock;
+ ec_inode_t *ctx;
+ ec_fop_data_t *fop;
+ dict_t *dict = NULL;
uid_t uid;
gid_t gid;
int32_t error = ENOMEM;
uint64_t allzero[EC_VERSION_SIZE] = {0, 0};
- if (fop->have_size)
- {
+ lock = link->lock;
+ ctx = lock->ctx;
+
+ /* If ec metadata has already been retrieved, do not try again. */
+ if (ctx->have_info) {
return;
}
- if ((fop->parent != NULL) && fop->parent->have_size)
- {
- fop->pre_size = fop->parent->pre_size;
- fop->post_size = fop->parent->post_size;
-
- fop->have_size = 1;
-
+ /* Determine if there's something we need to retrieve for the current
+ * operation. */
+ if (!lock->query && (lock->loc.inode->ia_type != IA_IFREG)) {
return;
}
+
+ fop = link->fop;
+
uid = fop->frame->root->uid;
gid = fop->frame->root->gid;
- fop->frame->root->uid = 0;
- fop->frame->root->gid = 0;
-
memset(&loc, 0, sizeof(loc));
- xdata = dict_new();
- if (xdata == NULL)
- {
+ dict = dict_new();
+ if (dict == NULL) {
goto out;
}
- if ((ec_dict_set_array(xdata, EC_XATTR_VERSION,
- allzero, EC_VERSION_SIZE) != 0) ||
- (ec_dict_set_number(xdata, EC_XATTR_SIZE, 0) != 0) ||
- (ec_dict_set_number(xdata, EC_XATTR_CONFIG, 0) != 0) ||
- (ec_dict_set_array(xdata, EC_XATTR_DIRTY, allzero,
- EC_VERSION_SIZE) != 0))
- {
+
+ /* Once we know that an xattrop will be needed, we try to get all available
+ * information in a single call. */
+ if ((ec_dict_set_array(dict, EC_XATTR_VERSION, allzero,
+ EC_VERSION_SIZE) != 0) ||
+ (ec_dict_set_array(dict, EC_XATTR_DIRTY, allzero,
+ EC_VERSION_SIZE) != 0)) {
goto out;
}
- error = EIO;
+ if (lock->loc.inode->ia_type == IA_IFREG) {
+ if ((ec_dict_set_number(dict, EC_XATTR_SIZE, 0) != 0) ||
+ (ec_dict_set_number(dict, EC_XATTR_CONFIG, 0) != 0)) {
+ goto out;
+ }
+ }
- if (!fop->use_fd)
- {
- if (ec_loc_from_loc(fop->xl, &loc, &fop->loc[0]) != 0)
- {
+ fop->frame->root->uid = 0;
+ fop->frame->root->gid = 0;
+
+ /* For normal fops, ec_[f]xattrop() must succeed on at least
+ * EC_MINIMUM_MIN bricks, however when this is called as part of a
+ * self-heal operation the mask of target bricks (fop->mask) could
+ * contain less than EC_MINIMUM_MIN bricks, causing the lookup to
+ * always fail. Thus we always use the same minimum used for the main
+ * fop.
+ */
+ if (lock->fd == NULL) {
+ if (ec_loc_from_loc(fop->xl, &loc, &lock->loc) != 0) {
goto out;
}
- if (gf_uuid_is_null(loc.pargfid))
- {
- if (loc.parent != NULL)
- {
+ if (gf_uuid_is_null(loc.pargfid)) {
+ if (loc.parent != NULL) {
inode_unref(loc.parent);
loc.parent = NULL;
}
GF_FREE((char *)loc.path);
- loc.path = NULL;
- loc.name = NULL;
+ loc.path = NULL;
+ loc.name = NULL;
}
- /* For normal fops, ec_[f]xattrop() must succeed on at least
- * EC_MINIMUM_MIN bricks, however when this is called as part of
- * a self-heal operation the mask of target bricks (fop->mask) could
- * contain less than EC_MINIMUM_MIN bricks, causing the lookup to
- * always fail. Thus we always use the same minimum used for the main
- * fop.
- */
+
ec_xattrop (fop->frame, fop->xl, fop->mask, fop->minimum,
- ec_prepare_update_cbk, NULL, &loc,
- GF_XATTROP_ADD_ARRAY64, xdata, NULL);
+ ec_prepare_update_cbk, link, &loc,
+ GF_XATTROP_ADD_ARRAY64, dict, NULL);
} else {
- if (ec_loc_from_fd(fop->xl, &loc, fop->fd) != 0) {
- goto out;
- }
ec_fxattrop(fop->frame, fop->xl, fop->mask, fop->minimum,
- ec_prepare_update_cbk, NULL, fop->fd,
- GF_XATTROP_ADD_ARRAY64, xdata, NULL);
+ ec_prepare_update_cbk, link, lock->fd,
+ GF_XATTROP_ADD_ARRAY64, dict, NULL);
}
+
error = 0;
out:
@@ -1232,9 +1028,8 @@ out:
loc_wipe(&loc);
- if (xdata != NULL)
- {
- dict_unref(xdata);
+ if (dict != NULL) {
+ dict_unref(dict);
}
if (error != 0) {
@@ -1242,147 +1037,408 @@ out:
}
}
-void ec_prepare_update(ec_fop_data_t *fop)
+gf_boolean_t ec_get_inode_size(ec_fop_data_t *fop, inode_t *inode,
+ uint64_t *size)
{
- loc_t loc;
- dict_t *xdata;
- ec_fop_data_t *tmp;
- ec_lock_t *lock;
- ec_t *ec;
- uid_t uid;
- gid_t gid;
- uint64_t version[2] = {0, 0};
- uint64_t dirty[2] = {0, 0};
- int32_t error = ENOMEM;
+ ec_inode_t *ctx;
+ gf_boolean_t found = _gf_false;
- tmp = fop;
- while ((tmp != NULL) && (tmp->locks[0].lock == NULL)) {
- tmp = tmp->parent;
- }
- if ((tmp != NULL) &&
- (tmp->locks[0].lock->is_dirty[0] || tmp->locks[0].lock->is_dirty[1])) {
- lock = tmp->locks[0].lock;
+ LOCK(&inode->lock);
- fop->pre_size = fop->post_size = lock->size;
- fop->have_size = 1;
+ ctx = __ec_inode_get(inode, fop->xl);
+ if (ctx == NULL) {
+ goto unlock;
+ }
- return;
+ if (ctx->have_size) {
+ *size = ctx->post_size;
+ found = _gf_true;
}
- uid = fop->frame->root->uid;
- gid = fop->frame->root->gid;
- fop->frame->root->uid = 0;
- fop->frame->root->gid = 0;
+unlock:
+ UNLOCK(&inode->lock);
- memset(&loc, 0, sizeof(loc));
+ return found;
+}
- ec = fop->xl->private;
- if (ec_bits_count (fop->mask) >= ec->fragments) {
- /* It is changing data only if the update happens on at least
- * fragment number of bricks. Otherwise it probably is healing*/
- if (ec_is_metadata_fop (fop->id))
- dirty[EC_METADATA_TXN] = 1;
- else
- dirty[EC_DATA_TXN] = 1;
+gf_boolean_t ec_set_inode_size(ec_fop_data_t *fop, inode_t *inode,
+ uint64_t size)
+{
+ ec_inode_t *ctx;
+ gf_boolean_t found = _gf_false;
+
+ LOCK(&inode->lock);
+
+ ctx = __ec_inode_get(inode, fop->xl);
+ if (ctx == NULL) {
+ goto unlock;
}
- xdata = dict_new();
- if (xdata == NULL) {
- goto out;
+ /* Normal fops always have ctx->have_size set. However self-heal calls this
+ * to prepare the inode, so ctx->have_size will be false. In this case we
+ * prepare both pre_size and post_size, and set have_size and have_info to
+ * true. */
+ if (!ctx->have_size) {
+ ctx->pre_size = size;
+ ctx->have_size = ctx->have_info = _gf_true;
}
- if ((ec_dict_set_array(xdata, EC_XATTR_VERSION,
- version, EC_VERSION_SIZE) != 0) ||
- (ec_dict_set_number(xdata, EC_XATTR_SIZE, 0) != 0) ||
- (ec_dict_set_number(xdata, EC_XATTR_CONFIG, 0) != 0) ||
- (ec_dict_set_array(xdata, EC_XATTR_DIRTY, dirty,
- EC_VERSION_SIZE) != 0)) {
- goto out;
+ ctx->post_size = size;
+
+ found = _gf_true;
+
+unlock:
+ UNLOCK(&inode->lock);
+
+ return found;
+}
+
+void ec_clear_inode_info(ec_fop_data_t *fop, inode_t *inode)
+{
+ ec_inode_t *ctx;
+
+ LOCK(&inode->lock);
+
+ ctx = __ec_inode_get(inode, fop->xl);
+ if (ctx == NULL) {
+ goto unlock;
}
- error = EIO;
+ ctx->have_info = _gf_false;
+ ctx->have_config = _gf_false;
+ ctx->have_version = _gf_false;
+ ctx->have_size = _gf_false;
+ ctx->have_dirty = _gf_false;
- if (!fop->use_fd) {
- if (ec_loc_from_loc(fop->xl, &loc, &fop->loc[0]) != 0) {
- goto out;
- }
+ memset(&ctx->config, 0, sizeof(ctx->config));
+ memset(ctx->pre_version, 0, sizeof(ctx->pre_version));
+ memset(ctx->post_version, 0, sizeof(ctx->post_version));
+ ctx->pre_size = ctx->post_size = 0;
+ memset(ctx->pre_dirty, 0, sizeof(ctx->pre_dirty));
+ memset(ctx->post_dirty, 0, sizeof(ctx->post_dirty));
+
+unlock:
+ UNLOCK(&inode->lock);
+}
- ec_xattrop(fop->frame, fop->xl, fop->mask, fop->minimum,
- ec_prepare_update_cbk, NULL, &loc, GF_XATTROP_ADD_ARRAY64,
- xdata, NULL);
+int32_t ec_get_real_size_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *buf, dict_t *xdata,
+ struct iatt *postparent)
+{
+ ec_fop_data_t *fop = cookie;
+ ec_lock_link_t *link;
+
+ if (op_ret >= 0) {
+ link = fop->data;
+ if (ec_dict_del_number(xdata, EC_XATTR_SIZE, &link->size) != 0) {
+ gf_log(this->name, GF_LOG_WARNING,
+ "Unable to determine real file size");
+ }
} else {
- ec_fxattrop(fop->frame, fop->xl, fop->mask, fop->minimum,
- ec_prepare_update_cbk, NULL, fop->fd,
- GF_XATTROP_ADD_ARRAY64, xdata, NULL);
+ /* Prevent failure of parent fop. */
+ fop->error = 0;
}
- error = 0;
+ return 0;
+}
-out:
+/* This function is used to get the trusted.ec.size xattr from a file when
+ * no lock is needed on the inode. This is only required to maintan iatt
+ * structs on fops that manipulate directory entries but do not operate
+ * directly on the inode, like link, rename, ...
+ *
+ * Any error processing this request is ignored. In the worst case, an invalid
+ * or not up to date value in the iatt could cause some cache invalidation.
+ */
+void ec_get_real_size(ec_lock_link_t *link)
+{
+ ec_fop_data_t *fop;
+ dict_t *xdata;
- fop->frame->root->uid = uid;
- fop->frame->root->gid = gid;
+ if (link->base == NULL) {
+ return;
+ }
- loc_wipe(&loc);
+ if (link->base->inode->ia_type != IA_IFREG) {
+ return;
+ }
+
+ fop = link->fop;
+
+ if (ec_get_inode_size(fop, link->base->inode, &link->size)) {
+ return;
+ }
+ xdata = dict_new();
+ if (xdata == NULL) {
+ return;
+ }
+ if (ec_dict_set_number(xdata, EC_XATTR_SIZE, 0) != 0) {
+ goto out;
+ }
+
+ /* Send a simple lookup. A single answer is considered ok since this value
+ * is only used to return an iatt struct related to an inode that is not
+ * locked and have not suffered any operation. */
+ ec_lookup(fop->frame, fop->xl, fop->mask, 1, ec_get_real_size_cbk, link,
+ link->base, xdata);
+
+out:
if (xdata != NULL) {
dict_unref(xdata);
}
+}
- if (error != 0) {
- ec_fop_set_error(fop, error);
+void ec_lock_acquired(ec_lock_link_t *link)
+{
+ ec_lock_t *lock;
+ ec_fop_data_t *fop;
+
+ lock = link->lock;
+ fop = link->fop;
+
+ ec_trace("LOCKED", link->fop, "lock=%p", lock);
+
+ /* If the fop has an fd available, attach it to the lock structure to be
+ * able to do fxattrop calls instead of xattrop. It's safe to change this
+ * here because no xattrop using the fd can start concurrently at this
+ * point. */
+ if (fop->use_fd) {
+ if (lock->fd != NULL) {
+ fd_unref(lock->fd);
+ }
+ lock->fd = fd_ref(fop->fd);
}
+ lock->acquired = _gf_true;
+
+ fop->mask &= lock->good_mask;
+
+ fop->locked++;
+
+ ec_get_size_version(link);
+ ec_get_real_size(link);
}
-int32_t ec_unlocked(call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, dict_t *xdata)
+int32_t ec_locked(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
ec_fop_data_t *fop = cookie;
+ ec_lock_link_t *link = NULL;
+ ec_lock_t *lock = NULL;
- if (op_ret < 0) {
- gf_log(this->name, GF_LOG_WARNING, "entry/inode unlocking failed (%s)",
- ec_fop_name(fop->parent->id));
+ if (op_ret >= 0) {
+ link = fop->data;
+ lock = link->lock;
+ lock->mask = lock->good_mask = fop->good;
+
+ ec_lock_acquired(link);
+ ec_lock(fop->parent);
} else {
- ec_trace("UNLOCKED", fop->parent, "lock=%p", fop->data);
+ gf_log(this->name, GF_LOG_WARNING, "Failed to complete preop lock");
}
return 0;
}
-void ec_unlock_lock(ec_fop_data_t *fop, ec_lock_t *lock)
+gf_boolean_t ec_lock_acquire(ec_lock_link_t *link)
{
- if ((lock->mask != 0) && lock->acquired) {
+ ec_lock_t *lock;
+ ec_fop_data_t *fop;
+
+ lock = link->lock;
+ fop = link->fop;
+ if (!lock->acquired) {
ec_owner_set(fop->frame, lock);
- switch (lock->kind) {
- case EC_LOCK_ENTRY:
- ec_trace("UNLOCK_ENTRYLK", fop, "lock=%p, inode=%p, path=%s", lock,
- lock->loc.inode, lock->loc.path);
+ ec_trace("LOCK_ACQUIRE", fop, "lock=%p, inode=%p", lock,
+ lock->loc.inode);
+
+ lock->flock.l_type = F_WRLCK;
+ ec_inodelk(fop->frame, fop->xl, -1, EC_MINIMUM_ALL, ec_locked,
+ link, fop->xl->name, &lock->loc, F_SETLKW, &lock->flock,
+ NULL);
+
+ return _gf_false;
+ }
+
+ ec_trace("LOCK_REUSE", fop, "lock=%p", lock);
+
+ ec_lock_acquired(link);
+
+ return _gf_true;
+}
+
+void ec_lock(ec_fop_data_t *fop)
+{
+ ec_lock_link_t *link;
+ ec_lock_link_t *timer_link = NULL;
+ ec_lock_t *lock;
+
+ /* There is a chance that ec_resume is called on fop even before ec_sleep.
+ * Which can result in refs == 0 for fop leading to use after free in this
+ * function when it calls ec_sleep so do ec_sleep at start and end of this
+ * function.*/
+ ec_sleep (fop);
+ while (fop->locked < fop->lock_count) {
+ /* Since there are only up to 2 locks per fop, this xor will change
+ * the order of the locks if fop->first_lock is 1. */
+ link = &fop->locks[fop->locked ^ fop->first_lock];
+ lock = link->lock;
+
+ timer_link = NULL;
+
+ LOCK(&lock->loc.inode->lock);
+ GF_ASSERT (lock->inserted > 0);
+ lock->inserted--;
+
+ if (lock->timer != NULL) {
+ GF_ASSERT (lock->release == _gf_false);
+ timer_link = lock->timer->data;
+ ec_trace("UNLOCK_CANCELLED", timer_link->fop, "lock=%p", lock);
+ gf_timer_call_cancel(fop->xl->ctx, lock->timer);
+ lock->timer = NULL;
+
+ lock->refs--;
+ /* There should remain at least 1 ref, the current one. */
+ GF_ASSERT(lock->refs > 0);
+ }
+
+ GF_ASSERT(list_empty(&link->wait_list));
- ec_entrylk(fop->frame, fop->xl, lock->mask, EC_MINIMUM_ALL,
- ec_unlocked, lock, fop->xl->name, &lock->loc, NULL,
- ENTRYLK_UNLOCK, lock->type, NULL);
+ if ((lock->owner != NULL) || lock->release) {
+ if (lock->release) {
+ ec_trace("LOCK_QUEUE_FREEZE", fop, "lock=%p", lock);
+
+ list_add_tail(&link->wait_list, &lock->frozen);
+
+ /* The lock is frozen, so we move the current reference to
+ * refs_frozen. After that, there should remain at least one
+ * ref belonging to the lock that is processing the release. */
+ lock->refs--;
+ GF_ASSERT(lock->refs > 0);
+ lock->refs_frozen++;
+ } else {
+ ec_trace("LOCK_QUEUE_WAIT", fop, "lock=%p", lock);
+
+ list_add_tail(&link->wait_list, &lock->waiting);
+ }
+
+ UNLOCK(&lock->loc.inode->lock);
+
+ ec_sleep(fop);
break;
+ }
- case EC_LOCK_INODE:
- lock->flock.l_type = F_UNLCK;
- ec_trace("UNLOCK_INODELK", fop, "lock=%p, inode=%p", lock,
- lock->loc.inode);
+ lock->owner = fop;
- ec_inodelk(fop->frame, fop->xl, lock->mask, EC_MINIMUM_ALL,
- ec_unlocked, lock, fop->xl->name, &lock->loc, F_SETLK,
- &lock->flock, NULL);
+ UNLOCK(&lock->loc.inode->lock);
+ if (!ec_lock_acquire(link)) {
break;
+ }
- default:
- gf_log(fop->xl->name, GF_LOG_ERROR, "Invalid lock type");
+ if (timer_link != NULL) {
+ ec_resume(timer_link->fop, 0);
+ timer_link = NULL;
}
}
+ ec_resume (fop, 0);
+
+ if (timer_link != NULL) {
+ ec_resume(timer_link->fop, 0);
+ }
+}
+
+void
+ec_lock_unfreeze(ec_lock_link_t *link)
+{
+ ec_lock_t *lock;
+
+ lock = link->lock;
+
+ LOCK(&lock->loc.inode->lock);
+
+ lock->acquired = _gf_false;
+ lock->release = _gf_false;
+
+ lock->refs--;
+ GF_ASSERT (lock->refs == lock->inserted);
+
+ GF_ASSERT(list_empty(&lock->waiting) && (lock->owner == NULL));
- ec_trace("LOCK_DESTROY", fop, "lock=%p", lock);
+ list_splice_init(&lock->frozen, &lock->waiting);
+ lock->refs += lock->refs_frozen;
+ lock->refs_frozen = 0;
- ec_lock_destroy(lock);
+ if (!list_empty(&lock->waiting)) {
+ link = list_entry(lock->waiting.next, ec_lock_link_t, wait_list);
+ list_del_init(&link->wait_list);
+
+ lock->owner = link->fop;
+
+ UNLOCK(&lock->loc.inode->lock);
+
+ ec_trace("LOCK_UNFREEZE", link->fop, "lock=%p", lock);
+
+ if (ec_lock_acquire(link)) {
+ ec_lock(link->fop);
+ }
+ ec_resume(link->fop, 0);
+ } else if (lock->refs == 0) {
+ ec_trace("LOCK_DESTROY", link->fop, "lock=%p", lock);
+
+ lock->ctx->inode_lock = NULL;
+
+ UNLOCK(&lock->loc.inode->lock);
+
+ ec_lock_destroy(lock);
+ } else {
+ UNLOCK(&lock->loc.inode->lock);
+ }
+}
+
+int32_t ec_unlocked(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ ec_fop_data_t *fop = cookie;
+ ec_lock_link_t *link = fop->data;
+
+ if (op_ret < 0) {
+ gf_log(this->name, GF_LOG_WARNING, "entry/inode unlocking failed (%s)",
+ ec_fop_name(link->fop->id));
+ } else {
+ ec_trace("UNLOCKED", link->fop, "lock=%p", link->lock);
+ }
+
+ ec_lock_unfreeze(link);
+
+ return 0;
+}
+
+void ec_unlock_lock(ec_lock_link_t *link)
+{
+ ec_lock_t *lock;
+ ec_fop_data_t *fop;
+
+ lock = link->lock;
+ fop = link->fop;
+
+ ec_clear_inode_info(fop, lock->loc.inode);
+
+ if ((lock->mask != 0) && lock->acquired) {
+ ec_owner_set(fop->frame, lock);
+
+ lock->flock.l_type = F_UNLCK;
+ ec_trace("UNLOCK_INODELK", fop, "lock=%p, inode=%p", lock,
+ lock->loc.inode);
+
+ ec_inodelk(fop->frame, fop->xl, lock->mask, EC_MINIMUM_ALL,
+ ec_unlocked, link, fop->xl->name, &lock->loc, F_SETLK,
+ &lock->flock, NULL);
+ } else {
+ ec_lock_unfreeze(link);
+ }
}
int32_t ec_update_size_version_done(call_frame_t * frame, void * cookie,
@@ -1390,111 +1446,128 @@ int32_t ec_update_size_version_done(call_frame_t * frame, void * cookie,
int32_t op_errno, dict_t * xattr,
dict_t * xdata)
{
- ec_fop_data_t * fop = cookie;
+ ec_fop_data_t *fop = cookie;
+ ec_lock_link_t *link;
+ ec_lock_t *lock;
+ ec_inode_t *ctx;
- if (op_ret < 0)
- {
+ if (op_ret < 0) {
gf_log(fop->xl->name, GF_LOG_ERROR, "Failed to update version and "
"size (error %d)", op_errno);
- }
- else
- {
+ } else {
fop->parent->mask &= fop->good;
+ link = fop->data;
+ lock = link->lock;
+ ctx = lock->ctx;
+
+ if (ec_dict_del_array(xattr, EC_XATTR_VERSION, ctx->post_version,
+ EC_VERSION_SIZE) == 0) {
+ ctx->pre_version[0] = ctx->post_version[0];
+ ctx->pre_version[1] = ctx->post_version[1];
+
+ ctx->have_version = _gf_true;
+ }
+ if (ec_dict_del_number(xattr, EC_XATTR_SIZE, &ctx->post_size) == 0) {
+ ctx->pre_size = ctx->post_size;
+
+ ctx->have_size = _gf_true;
+ }
+ if (ec_dict_del_array(xattr, EC_XATTR_DIRTY, ctx->post_dirty,
+ EC_VERSION_SIZE) == 0) {
+ ctx->pre_dirty[0] = ctx->post_dirty[0];
+ ctx->pre_dirty[1] = ctx->post_dirty[1];
+
+ ctx->have_dirty = _gf_true;
+ }
+ if ((ec_dict_del_config(xdata, EC_XATTR_CONFIG, &ctx->config) == 0) &&
+ ec_config_check(fop->parent, &ctx->config)) {
+ ctx->have_config = _gf_true;
+ }
+
+ ctx->have_info = _gf_true;
}
- if (fop->data != NULL) {
- ec_unlock_lock(fop->parent, fop->data);
+ if ((fop->parent->id != GF_FOP_FLUSH) &&
+ (fop->parent->id != GF_FOP_FSYNC) &&
+ (fop->parent->id != GF_FOP_FSYNCDIR)) {
+ ec_unlock_lock(fop->data);
}
return 0;
}
-uint64_t
-ec_get_dirty_value (ec_t *ec, uintptr_t fop_mask, uint64_t version_delta,
- gf_boolean_t dirty)
-{
- uint64_t dirty_val = 0;
-
- if (version_delta) {
- if (~fop_mask & ec->node_mask) {
- /* fop didn't succeed on all subvols so 'dirty' xattr
- * shouldn't be cleared */
- if (!dirty)
- dirty_val = 1;
- } else {
- /* fop succeed on all subvols so 'dirty' xattr
- * should be cleared */
- if (dirty)
- dirty_val = -1;
- }
- }
- return dirty_val;
-}
-
void
-ec_update_size_version(ec_fop_data_t *fop, loc_t *loc, uint64_t version[2],
- uint64_t size, gf_boolean_t dirty[2], ec_lock_t *lock)
+ec_update_size_version(ec_lock_link_t *link, uint64_t *version,
+ uint64_t size, uint64_t *dirty)
{
- ec_t *ec = fop->xl->private;
+ ec_fop_data_t *fop;
+ ec_lock_t *lock;
+ ec_inode_t *ctx;
dict_t * dict;
uid_t uid;
gid_t gid;
- uint64_t dirty_values[2] = {0};
- int i = 0;
-
- if (fop->parent != NULL)
- {
- fop->parent->post_size = fop->post_size;
- return;
- }
+ fop = link->fop;
- ec_trace("UPDATE", fop, "version=%ld, size=%ld, dirty=%u", version, size,
- dirty);
+ ec_trace("UPDATE", fop, "version=%ld/%ld, size=%ld, dirty=%ld/%ld",
+ version[0], version[1], size, dirty[0], dirty[1]);
dict = dict_new();
- if (dict == NULL)
- {
+ if (dict == NULL) {
goto out;
}
- if (version[0] != 0 || version[1] != 0) {
+ lock = link->lock;
+ ctx = lock->ctx;
+
+ /* If we don't have version information or it has been modified, we
+ * update it. */
+ if (!ctx->have_version || (version[0] != 0) || (version[1] != 0)) {
if (ec_dict_set_array(dict, EC_XATTR_VERSION,
version, EC_VERSION_SIZE) != 0) {
goto out;
}
}
+
if (size != 0) {
+ /* If size has been changed, we should already know the previous size
+ * of the file. */
+ GF_ASSERT(ctx->have_size);
+
if (ec_dict_set_number(dict, EC_XATTR_SIZE, size) != 0) {
goto out;
}
}
- for (i = 0; i < sizeof (dirty_values)/sizeof (dirty_values[0]); i++) {
- dirty_values[i] = ec_get_dirty_value (ec, fop->mask, version[i],
- dirty[i]);
- }
-
- if (dirty_values[0] || dirty_values[1]) {
- if (ec_dict_set_array(dict, EC_XATTR_DIRTY, dirty_values,
+ /* If we don't have dirty information or it has been modified, we update
+ * it. */
+ if (!ctx->have_dirty || (dirty[0] != 0) || (dirty[1] != 0)) {
+ if (ec_dict_set_array(dict, EC_XATTR_DIRTY, dirty,
EC_VERSION_SIZE) != 0) {
goto out;
}
}
+ /* If config information is not know, we request it now. */
+ if ((lock->loc.inode->ia_type == IA_IFREG) && !ctx->have_config) {
+ /* A failure requesting this xattr is ignored because it's not
+ * absolutely required right now. */
+ ec_dict_set_number(dict, EC_XATTR_CONFIG, 0);
+ }
+
uid = fop->frame->root->uid;
gid = fop->frame->root->gid;
fop->frame->root->uid = 0;
fop->frame->root->gid = 0;
- if (fop->use_fd) {
- ec_fxattrop(fop->frame, fop->xl, fop->mask, EC_MINIMUM_MIN,
- ec_update_size_version_done, lock, fop->fd,
+ if (link->lock->fd == NULL) {
+ ec_xattrop(fop->frame, fop->xl, fop->mask, EC_MINIMUM_MIN,
+ ec_update_size_version_done, link, &link->lock->loc,
GF_XATTROP_ADD_ARRAY64, dict, NULL);
} else {
- ec_xattrop(fop->frame, fop->xl, fop->mask, EC_MINIMUM_MIN,
- ec_update_size_version_done, lock, loc,
+ ec_fxattrop(fop->frame, fop->xl, fop->mask, EC_MINIMUM_MIN,
+ ec_update_size_version_done, link, link->lock->fd,
GF_XATTROP_ADD_ARRAY64, dict, NULL);
}
@@ -1506,8 +1579,7 @@ ec_update_size_version(ec_fop_data_t *fop, loc_t *loc, uint64_t version[2],
return;
out:
- if (dict != NULL)
- {
+ if (dict != NULL) {
dict_unref(dict);
}
@@ -1516,27 +1588,60 @@ out:
gf_log(fop->xl->name, GF_LOG_ERROR, "Unable to update version and size");
}
-void ec_unlock_now(ec_fop_data_t *fop, ec_lock_t *lock)
+gf_boolean_t
+ec_update_info(ec_lock_link_t *link)
+{
+ ec_lock_t *lock;
+ ec_inode_t *ctx;
+ uint64_t version[2];
+ uint64_t dirty[2];
+ uint64_t size;
+
+ lock = link->lock;
+ ctx = lock->ctx;
+
+ /* pre_version[*] will be 0 if have_version is false */
+ version[0] = ctx->post_version[0] - ctx->pre_version[0];
+ version[1] = ctx->post_version[1] - ctx->pre_version[1];
+
+ size = ctx->post_size - ctx->pre_size;
+
+ /* pre_dirty[*] will be 0 if have_dirty is false */
+ dirty[0] = ctx->post_dirty[0] - ctx->pre_dirty[0];
+ dirty[1] = ctx->post_dirty[1] - ctx->pre_dirty[1];
+
+ if ((version[0] != 0) || (version[1] != 0) ||
+ (dirty[0] != 0) || (dirty[1] != 0)) {
+ ec_update_size_version(link, version, size, dirty);
+
+ return _gf_true;
+ }
+
+ return _gf_false;
+}
+
+void
+ec_unlock_now(ec_lock_link_t *link)
{
- ec_trace("UNLOCK_NOW", fop, "lock=%p", lock);
+ ec_trace("UNLOCK_NOW", link->fop, "lock=%p", link->lock);
- if ((lock->version_delta[0] != 0) || (lock->version_delta[1] != 0) ||
- lock->is_dirty[0] || lock->is_dirty[1]) {
- ec_update_size_version(fop, &lock->loc, lock->version_delta,
- lock->size_delta, lock->is_dirty, lock);
- } else {
- ec_unlock_lock(fop, lock);
+ if (!ec_update_info(link)) {
+ ec_unlock_lock(link);
}
- ec_resume(fop, 0);
+ ec_resume(link->fop, 0);
}
void
-ec_unlock_timer_del(ec_fop_data_t *fop, ec_lock_t *lock)
+ec_unlock_timer_del(ec_lock_link_t *link)
{
+ int32_t before = 0;
+ ec_lock_t *lock;
inode_t *inode;
gf_boolean_t now = _gf_false;
+ lock = link->lock;
+
/* A race condition can happen if timer expires, calls this function
* and the lock is released (lock->loc is wiped) but the fop is not
* fully completed yet (it's still on the list of pending fops). In
@@ -1550,27 +1655,32 @@ ec_unlock_timer_del(ec_fop_data_t *fop, ec_lock_t *lock)
LOCK(&inode->lock);
if (lock->timer != NULL) {
- ec_trace("UNLOCK_DELAYED", fop, "lock=%p", lock);
+ ec_trace("UNLOCK_DELAYED", link->fop, "lock=%p", lock);
- gf_timer_call_cancel(fop->xl->ctx, lock->timer);
+ gf_timer_call_cancel(link->fop->xl->ctx, lock->timer);
lock->timer = NULL;
- *lock->plock = NULL;
- now = _gf_true;
+ lock->release = now = _gf_true;
+
+ before = lock->refs + lock->refs_frozen;
+ list_splice_init(&lock->waiting, &lock->frozen);
+ lock->refs_frozen += lock->refs - lock->inserted - 1;
+ lock->refs = 1 + lock->inserted;
+ /* We moved around the locks, so total number of locks shouldn't
+ * change by this operation*/
+ GF_ASSERT (before == (lock->refs + lock->refs_frozen));
}
UNLOCK(&inode->lock);
if (now) {
- ec_unlock_now(fop, lock);
+ ec_unlock_now(link);
}
}
void ec_unlock_timer_cbk(void *data)
{
- ec_lock_link_t *link = data;
-
- ec_unlock_timer_del(link->fop, link->lock);
+ ec_unlock_timer_del(data);
}
void ec_unlock_timer_add(ec_lock_link_t *link)
@@ -1578,28 +1688,28 @@ void ec_unlock_timer_add(ec_lock_link_t *link)
struct timespec delay;
ec_fop_data_t *fop = link->fop;
ec_lock_t *lock = link->lock;
- int32_t refs = 1;
+ gf_boolean_t now = _gf_false;
LOCK(&lock->loc.inode->lock);
GF_ASSERT(lock->timer == NULL);
- if (lock->refs != 1) {
+ if ((lock->refs - lock->inserted) > 1) {
ec_trace("UNLOCK_SKIP", fop, "lock=%p", lock);
lock->refs--;
UNLOCK(&lock->loc.inode->lock);
} else if (lock->acquired) {
- delay.tv_sec = 1;
- delay.tv_nsec = 0;
+ ec_t *ec = fop->xl->private;
ec_sleep(fop);
- /* If healing is needed, do not delay lock release to let self-heal
- * start working as soon as possible. */
- if (!ec_fop_needs_heal(fop)) {
- ec_trace("UNLOCK_DELAY", fop, "lock=%p", lock);
+ /* If healing is needed, the lock needs to be released due to
+ * contention, or ec is shutting down, do not delay lock release. */
+ if (!lock->release && !ec_fop_needs_heal(fop) && !ec->shutdown) {
+ ec_trace("UNLOCK_DELAY", fop, "lock=%p, release=%d", lock,
+ lock->release);
delay.tv_sec = 1;
delay.tv_nsec = 0;
@@ -1609,26 +1719,25 @@ void ec_unlock_timer_add(ec_lock_link_t *link)
gf_log(fop->xl->name, GF_LOG_WARNING, "Unable to delay an "
"unlock");
- *lock->plock = NULL;
- refs = 0;
+ lock->release = now = _gf_true;
}
} else {
- ec_trace("UNLOCK_FORCE", fop, "lock=%p", lock);
- *lock->plock = NULL;
- refs = 0;
+ ec_trace("UNLOCK_FORCE", fop, "lock=%p, release=%d", lock,
+ lock->release);
+ lock->release = now = _gf_true;
}
UNLOCK(&lock->loc.inode->lock);
- if (refs == 0) {
- ec_unlock_now(fop, lock);
+ if (now) {
+ ec_unlock_now(link);
}
} else {
- *lock->plock = NULL;
+ lock->release = _gf_true;
UNLOCK(&lock->loc.inode->lock);
- ec_lock_destroy(lock);
+ ec_lock_unfreeze(link);
}
}
@@ -1649,56 +1758,52 @@ ec_unlock_force(ec_fop_data_t *fop)
for (i = 0; i < fop->lock_count; i++) {
ec_trace("UNLOCK_FORCED", fop, "lock=%p", &fop->locks[i]);
- ec_unlock_timer_del(fop, fop->locks[i].lock);
+ ec_unlock_timer_del(&fop->locks[i]);
}
}
void ec_flush_size_version(ec_fop_data_t * fop)
{
- ec_lock_t * lock;
- uint64_t version[2], delta;
- gf_boolean_t dirty[2] = {_gf_false, _gf_false};
-
GF_ASSERT(fop->lock_count == 1);
- lock = fop->locks[0].lock;
-
- LOCK(&lock->loc.inode->lock);
-
- GF_ASSERT(lock->owner == fop);
-
- version[0] = lock->version_delta[0];
- version[1] = lock->version_delta[1];
- dirty[0] = lock->is_dirty[0];
- dirty[1] = lock->is_dirty[1];
- delta = lock->size_delta;
- lock->version_delta[0] = 0;
- lock->version_delta[1] = 0;
- lock->size_delta = 0;
- lock->is_dirty[0] = _gf_false;
- lock->is_dirty[1] = _gf_false;
-
- UNLOCK(&lock->loc.inode->lock);
-
- if (version[0] > 0 || version[1] > 0 || dirty[0] || dirty[1])
- {
- ec_update_size_version(fop, &lock->loc, version, delta, dirty,
- NULL);
- }
+ ec_update_info(&fop->locks[0]);
}
void ec_lock_reuse(ec_fop_data_t *fop)
{
- ec_fop_data_t * wait_fop;
- ec_lock_t * lock;
- ec_lock_link_t * link;
- int32_t i;
+ ec_t *ec;
+ ec_cbk_data_t *cbk;
+ ec_lock_t *lock;
+ ec_lock_link_t *link;
+ ec_inode_t *ctx;
+ int32_t i, count;
+ gf_boolean_t release = _gf_false;
+
+ cbk = fop->answer;
+ if (cbk != NULL) {
+ if (cbk->xdata != NULL) {
+ if ((dict_get_int32(cbk->xdata, GLUSTERFS_INODELK_COUNT,
+ &count) == 0) && (count > 1)) {
+ release = _gf_true;
+ }
+ if (release) {
+ gf_log(fop->xl->name, GF_LOG_DEBUG,
+ "Lock contention detected");
+ }
+ }
+ } else {
+ /* If we haven't get an answer with enough quorum, we always release
+ * the lock. */
+ release = _gf_true;
+ }
+
+ ec = fop->xl->private;
for (i = 0; i < fop->lock_count; i++)
{
- wait_fop = NULL;
-
- lock = fop->locks[i].lock;
+ link = &fop->locks[i];
+ lock = link->lock;
+ ctx = lock->ctx;
LOCK(&lock->loc.inode->lock);
@@ -1706,47 +1811,42 @@ void ec_lock_reuse(ec_fop_data_t *fop)
GF_ASSERT(lock->owner == fop);
lock->owner = NULL;
+ lock->release |= release;
- if (((fop->locks_update >> i) & 1) != 0) {
- if (fop->error == 0)
- {
- if (ec_is_metadata_fop (fop->id)) {
- lock->version_delta[1]++;
- } else {
- lock->version_delta[0]++;
- }
- lock->size_delta += fop->post_size - fop->pre_size;
- if (fop->have_size) {
- lock->size = fop->post_size;
- lock->have_size = 1;
+ if ((fop->error == 0) && (cbk != NULL) && (cbk->op_ret >= 0)) {
+ if (link->update[0]) {
+ ctx->post_version[0]++;
+ if (ec->node_mask & ~fop->mask) {
+ ctx->post_dirty[0]++;
+ }
+ }
+ if (link->update[1]) {
+ ctx->post_version[1]++;
+ if (ec->node_mask & ~fop->mask) {
+ ctx->post_dirty[1]++;
}
}
}
lock->good_mask &= fop->mask;
+ link = NULL;
if (!list_empty(&lock->waiting))
{
link = list_entry(lock->waiting.next, ec_lock_link_t, wait_list);
list_del_init(&link->wait_list);
- wait_fop = link->fop;
-
- if (lock->kind == EC_LOCK_INODE)
- {
- wait_fop->pre_size = wait_fop->post_size = fop->post_size;
- wait_fop->have_size = fop->have_size;
- }
- wait_fop->mask &= fop->mask;
+ lock->owner = link->fop;
}
UNLOCK(&lock->loc.inode->lock);
- if (wait_fop != NULL)
+ if (link != NULL)
{
- ec_lock(wait_fop);
-
- ec_resume(wait_fop, 0);
+ if (ec_lock_acquire(link)) {
+ ec_lock(link->fop);
+ }
+ ec_resume(link->fop, 0);
}
}
}
@@ -1768,6 +1868,7 @@ void __ec_manager(ec_fop_data_t * fop, int32_t error)
if ((fop->state == EC_STATE_END) || (fop->state == -EC_STATE_END)) {
ec_fop_data_release(fop);
+
break;
}
diff --git a/xlators/cluster/ec/src/ec-common.h b/xlators/cluster/ec/src/ec-common.h
index 78cf261feeb..c0db0218699 100644
--- a/xlators/cluster/ec/src/ec-common.h
+++ b/xlators/cluster/ec/src/ec-common.h
@@ -38,19 +38,20 @@ typedef enum {
#define EC_MINIMUM_MIN -2
#define EC_MINIMUM_ALL -3
-#define EC_LOCK_ENTRY 0
-#define EC_LOCK_INODE 1
+#define EC_UPDATE_DATA 1
+#define EC_UPDATE_META 2
+#define EC_QUERY_INFO 4
+#define EC_INODE_SIZE 8
#define EC_STATE_START 0
#define EC_STATE_END 0
#define EC_STATE_INIT 1
#define EC_STATE_LOCK 2
-#define EC_STATE_GET_SIZE_AND_VERSION 3
-#define EC_STATE_DISPATCH 4
-#define EC_STATE_PREPARE_ANSWER 5
-#define EC_STATE_REPORT 6
-#define EC_STATE_LOCK_REUSE 7
-#define EC_STATE_UNLOCK 8
+#define EC_STATE_DISPATCH 3
+#define EC_STATE_PREPARE_ANSWER 4
+#define EC_STATE_REPORT 5
+#define EC_STATE_LOCK_REUSE 6
+#define EC_STATE_UNLOCK 7
#define EC_STATE_DELAYED_START 100
@@ -84,16 +85,21 @@ void ec_update_bad(ec_fop_data_t * fop, uintptr_t good);
void ec_fop_set_error(ec_fop_data_t * fop, int32_t error);
-void ec_lock_prepare_inode(ec_fop_data_t *fop, loc_t *loc, int32_t update);
-void ec_lock_prepare_entry(ec_fop_data_t *fop, loc_t *loc, int32_t update);
-void ec_lock_prepare_fd(ec_fop_data_t *fop, fd_t *fd, int32_t update);
+void ec_lock_prepare_inode(ec_fop_data_t *fop, loc_t *loc, uint32_t flags);
+void ec_lock_prepare_parent_inode(ec_fop_data_t *fop, loc_t *loc,
+ uint32_t flags);
+void ec_lock_prepare_fd(ec_fop_data_t *fop, fd_t *fd, uint32_t flags);
void ec_lock(ec_fop_data_t * fop);
void ec_lock_reuse(ec_fop_data_t *fop);
void ec_unlock(ec_fop_data_t * fop);
void ec_unlock_force(ec_fop_data_t *fop);
-void ec_get_size_version(ec_fop_data_t * fop);
-void ec_prepare_update(ec_fop_data_t *fop);
+gf_boolean_t ec_get_inode_size(ec_fop_data_t *fop, inode_t *inode,
+ uint64_t *size);
+gf_boolean_t ec_set_inode_size(ec_fop_data_t *fop, inode_t *inode,
+ uint64_t size);
+void ec_clear_inode_info(ec_fop_data_t *fop, inode_t *inode);
+
void ec_flush_size_version(ec_fop_data_t * fop);
void ec_dispatch_all(ec_fop_data_t * fop);
diff --git a/xlators/cluster/ec/src/ec-data.c b/xlators/cluster/ec/src/ec-data.c
index 609a47b466c..047ccd5ff31 100644
--- a/xlators/cluster/ec/src/ec-data.c
+++ b/xlators/cluster/ec/src/ec-data.c
@@ -130,6 +130,8 @@ ec_fop_data_t * ec_fop_data_allocate(call_frame_t * frame, xlator_t * this,
INIT_LIST_HEAD(&fop->cbk_list);
INIT_LIST_HEAD(&fop->answer_list);
INIT_LIST_HEAD(&fop->pending_list);
+ INIT_LIST_HEAD(&fop->locks[0].wait_list);
+ INIT_LIST_HEAD(&fop->locks[1].wait_list);
fop->xl = this;
fop->req_frame = frame;
@@ -222,10 +224,24 @@ ec_handle_last_pending_fop_completion (ec_fop_data_t *fop, gf_boolean_t *notify)
}
}
+void
+ec_fop_cleanup(ec_fop_data_t *fop)
+{
+ ec_cbk_data_t *cbk, *tmp;
+
+ list_for_each_entry_safe(cbk, tmp, &fop->answer_list, answer_list) {
+ list_del_init(&cbk->answer_list);
+
+ ec_cbk_data_destroy(cbk);
+ }
+ INIT_LIST_HEAD(&fop->cbk_list);
+
+ fop->answer = NULL;
+}
+
void ec_fop_data_release(ec_fop_data_t * fop)
{
ec_t *ec = NULL;
- ec_cbk_data_t * cbk, * tmp;
int32_t refs;
gf_boolean_t notify = _gf_false;
@@ -272,12 +288,7 @@ void ec_fop_data_release(ec_fop_data_t * fop)
ec_resume_parent(fop, fop->error);
- list_for_each_entry_safe(cbk, tmp, &fop->answer_list, answer_list)
- {
- list_del_init(&cbk->answer_list);
-
- ec_cbk_data_destroy(cbk);
- }
+ ec_fop_cleanup(fop);
ec = fop->xl->private;
ec_handle_last_pending_fop_completion (fop, &notify);
diff --git a/xlators/cluster/ec/src/ec-data.h b/xlators/cluster/ec/src/ec-data.h
index 8a58ffb288b..8204cf087de 100644
--- a/xlators/cluster/ec/src/ec-data.h
+++ b/xlators/cluster/ec/src/ec-data.h
@@ -67,10 +67,20 @@ struct _ec_fd
struct _ec_inode
{
uintptr_t bad;
- ec_lock_t *entry_lock;
ec_lock_t *inode_lock;
+ gf_boolean_t have_info;
+ gf_boolean_t have_config;
+ gf_boolean_t have_version;
+ gf_boolean_t have_size;
+ gf_boolean_t have_dirty;
+ ec_config_t config;
+ uint64_t pre_version[2];
+ uint64_t post_version[2];
+ uint64_t pre_size;
+ uint64_t post_size;
+ uint64_t pre_dirty[2];
+ uint64_t post_dirty[2];
struct list_head heal;
-
};
typedef int32_t (* fop_heal_cbk_t)(call_frame_t *, void * cookie, xlator_t *,
@@ -80,7 +90,6 @@ typedef int32_t (* fop_fheal_cbk_t)(call_frame_t *, void * cookie, xlator_t *,
int32_t, int32_t, uintptr_t, uintptr_t,
uintptr_t, dict_t *);
-
union _ec_cbk
{
fop_access_cbk_t access;
@@ -132,21 +141,21 @@ union _ec_cbk
struct _ec_lock
{
- ec_lock_t **plock;
+ ec_inode_t *ctx;
gf_timer_t *timer;
- struct list_head waiting;
+ struct list_head waiting; /* Queue of requests being serviced. */
+ struct list_head frozen; /* Queue of requests that will be serviced in
+ the next unlock/lock cycle. */
uintptr_t mask;
uintptr_t good_mask;
- int32_t kind;
int32_t refs;
- int32_t acquired;
- int32_t have_size;
- uint64_t size;
- uint64_t size_delta;
- uint64_t version[2];
- uint64_t version_delta[2];
- gf_boolean_t is_dirty[2];
+ int32_t refs_frozen;
+ int32_t inserted;
+ gf_boolean_t acquired;
+ gf_boolean_t release;
+ gf_boolean_t query;
ec_fop_data_t *owner;
+ fd_t *fd;
loc_t loc;
union
{
@@ -157,9 +166,12 @@ struct _ec_lock
struct _ec_lock_link
{
- ec_lock_t * lock;
- ec_fop_data_t * fop;
- struct list_head wait_list;
+ ec_lock_t *lock;
+ ec_fop_data_t *fop;
+ struct list_head wait_list;
+ gf_boolean_t update[2];
+ loc_t *base;
+ uint64_t size;
};
struct _ec_fop_data
@@ -183,12 +195,8 @@ struct _ec_fop_data
int32_t lock_count;
int32_t locked;
ec_lock_link_t locks[2];
- int32_t locks_update;
- int32_t have_size;
- uint64_t pre_size;
- uint64_t post_size;
+ int32_t first_lock;
gf_lock_t lock;
- ec_config_t config;
uint32_t flags;
uint32_t first;
@@ -197,6 +205,7 @@ struct _ec_fop_data
if fop->minimum number of subvolumes succeed
which are not healing*/
uintptr_t remaining;
+ uintptr_t received; /* Mask of responses */
uintptr_t good;
uintptr_t bad;
@@ -300,4 +309,6 @@ ec_fop_data_t * ec_fop_data_allocate(call_frame_t * frame, xlator_t * this,
void ec_fop_data_acquire(ec_fop_data_t * fop);
void ec_fop_data_release(ec_fop_data_t * fop);
+void ec_fop_cleanup(ec_fop_data_t *fop);
+
#endif /* __EC_DATA_H__ */
diff --git a/xlators/cluster/ec/src/ec-dir-read.c b/xlators/cluster/ec/src/ec-dir-read.c
index ffc3ed5a7cd..354c63d3683 100644
--- a/xlators/cluster/ec/src/ec-dir-read.c
+++ b/xlators/cluster/ec/src/ec-dir-read.c
@@ -128,14 +128,9 @@ int32_t ec_manager_opendir(ec_fop_data_t * fop, int32_t state)
/* Fall through */
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 0);
+ ec_lock_prepare_inode(fop, &fop->loc[0], EC_QUERY_INFO);
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
@@ -195,7 +190,6 @@ int32_t ec_manager_opendir(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
diff --git a/xlators/cluster/ec/src/ec-dir-write.c b/xlators/cluster/ec/src/ec-dir-write.c
index ffc96bf4351..ce09138fb7a 100644
--- a/xlators/cluster/ec/src/ec-dir-write.c
+++ b/xlators/cluster/ec/src/ec-dir-write.c
@@ -98,11 +98,10 @@ void ec_wind_create(ec_t * ec, ec_fop_data_t * fop, int32_t idx)
int32_t ec_manager_create(ec_fop_data_t * fop, int32_t state)
{
-
-
- ec_t * ec;
- ec_cbk_data_t * cbk;
- ec_fd_t * ctx;
+ ec_config_t config;
+ ec_t *ec;
+ ec_cbk_data_t *cbk;
+ ec_fd_t *ctx;
uint64_t version[2] = {0, 0};
switch (state)
@@ -137,16 +136,15 @@ int32_t ec_manager_create(ec_fop_data_t * fop, int32_t state)
ec = fop->xl->private;
- fop->config.version = EC_CONFIG_VERSION;
- fop->config.algorithm = EC_CONFIG_ALGORITHM;
- fop->config.gf_word_size = EC_GF_BITS;
- fop->config.bricks = ec->nodes;
- fop->config.redundancy = ec->redundancy;
- fop->config.chunk_size = EC_METHOD_CHUNK_SIZE;
+ config.version = EC_CONFIG_VERSION;
+ config.algorithm = EC_CONFIG_ALGORITHM;
+ config.gf_word_size = EC_GF_BITS;
+ config.bricks = ec->nodes;
+ config.redundancy = ec->redundancy;
+ config.chunk_size = EC_METHOD_CHUNK_SIZE;
if (ec_dict_set_config(fop->xdata, EC_XATTR_CONFIG,
- &fop->config) < 0)
- {
+ &config) < 0) {
fop->error = EIO;
return EC_STATE_REPORT;
@@ -172,7 +170,8 @@ int32_t ec_manager_create(ec_fop_data_t * fop, int32_t state)
/* Fall through */
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[0],
+ EC_UPDATE_DATA | EC_UPDATE_META);
ec_lock(fop);
return EC_STATE_DISPATCH;
@@ -376,17 +375,11 @@ int32_t ec_manager_link(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- // Parent entry of fop->loc[0] should be locked, but I don't
- // receive enough information to do it (fop->loc[0].parent is
- // NULL).
- ec_lock_prepare_entry(fop, &fop->loc[1], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[1], EC_UPDATE_DATA |
+ EC_UPDATE_META |
+ EC_INODE_SIZE);
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
@@ -410,7 +403,7 @@ int32_t ec_manager_link(ec_fop_data_t * fop, int32_t state)
ec_iatt_rebuild(fop->xl->private, cbk->iatt, 3,
cbk->count);
if (cbk->iatt[0].ia_type == IA_IFREG) {
- cbk->iatt[0].ia_size = fop->pre_size;
+ cbk->iatt[0].ia_size = fop->locks[0].size;
}
if (ec_loc_update(fop->xl, &fop->loc[0], cbk->inode,
@@ -446,7 +439,6 @@ int32_t ec_manager_link(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
@@ -589,7 +581,8 @@ int32_t ec_manager_mkdir(ec_fop_data_t * fop, int32_t state)
/* Fall through */
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[0],
+ EC_UPDATE_DATA | EC_UPDATE_META);
ec_lock(fop);
return EC_STATE_DISPATCH;
@@ -764,6 +757,7 @@ void ec_wind_mknod(ec_t * ec, ec_fop_data_t * fop, int32_t idx)
int32_t ec_manager_mknod(ec_fop_data_t * fop, int32_t state)
{
+ ec_config_t config;
ec_t *ec;
ec_cbk_data_t * cbk;
uint64_t version[2] = {0, 0};
@@ -783,15 +777,15 @@ int32_t ec_manager_mknod(ec_fop_data_t * fop, int32_t state)
ec = fop->xl->private;
- fop->config.version = EC_CONFIG_VERSION;
- fop->config.algorithm = EC_CONFIG_ALGORITHM;
- fop->config.gf_word_size = EC_GF_BITS;
- fop->config.bricks = ec->nodes;
- fop->config.redundancy = ec->redundancy;
- fop->config.chunk_size = EC_METHOD_CHUNK_SIZE;
+ config.version = EC_CONFIG_VERSION;
+ config.algorithm = EC_CONFIG_ALGORITHM;
+ config.gf_word_size = EC_GF_BITS;
+ config.bricks = ec->nodes;
+ config.redundancy = ec->redundancy;
+ config.chunk_size = EC_METHOD_CHUNK_SIZE;
if (ec_dict_set_config(fop->xdata, EC_XATTR_CONFIG,
- &fop->config) < 0) {
+ &config) < 0) {
fop->error = EIO;
return EC_STATE_REPORT;
@@ -814,7 +808,8 @@ int32_t ec_manager_mknod(ec_fop_data_t * fop, int32_t state)
/* Fall through */
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[0],
+ EC_UPDATE_DATA | EC_UPDATE_META);
ec_lock(fop);
return EC_STATE_DISPATCH;
@@ -997,15 +992,13 @@ int32_t ec_manager_rename(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 1);
- ec_lock_prepare_entry(fop, &fop->loc[1], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[0], EC_UPDATE_DATA |
+ EC_UPDATE_META |
+ EC_INODE_SIZE);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[1],
+ EC_UPDATE_DATA | EC_UPDATE_META);
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
@@ -1034,9 +1027,8 @@ int32_t ec_manager_rename(ec_fop_data_t * fop, int32_t state)
ec_iatt_rebuild(fop->xl->private, cbk->iatt, 5,
cbk->count);
- if (cbk->iatt[0].ia_type == IA_IFREG)
- {
- cbk->iatt[0].ia_size = fop->pre_size;
+ if (cbk->iatt[0].ia_type == IA_IFREG) {
+ cbk->iatt[0].ia_size = fop->locks[0].size;
}
}
}
@@ -1064,7 +1056,6 @@ int32_t ec_manager_rename(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
@@ -1191,7 +1182,8 @@ int32_t ec_manager_rmdir(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[0],
+ EC_UPDATE_DATA | EC_UPDATE_META);
ec_lock(fop);
return EC_STATE_DISPATCH;
@@ -1361,7 +1353,8 @@ int32_t ec_manager_symlink(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[0],
+ EC_UPDATE_DATA | EC_UPDATE_META);
ec_lock(fop);
return EC_STATE_DISPATCH;
@@ -1552,14 +1545,10 @@ int32_t ec_manager_unlink(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- ec_lock_prepare_entry(fop, &fop->loc[0], 1);
+ ec_lock_prepare_parent_inode(fop, &fop->loc[0],
+ EC_UPDATE_DATA | EC_UPDATE_META);
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
@@ -1607,7 +1596,6 @@ int32_t ec_manager_unlink(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
diff --git a/xlators/cluster/ec/src/ec-generic.c b/xlators/cluster/ec/src/ec-generic.c
index 9d64735df55..f50c7a70560 100644
--- a/xlators/cluster/ec/src/ec-generic.c
+++ b/xlators/cluster/ec/src/ec-generic.c
@@ -320,15 +320,10 @@ int32_t ec_manager_fsync(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- ec_lock_prepare_fd(fop, fop->fd, 0);
+ ec_lock_prepare_fd(fop, fop->fd, EC_QUERY_INFO);
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
- return EC_STATE_DISPATCH;
+ return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
ec_flush_size_version(fop);
@@ -361,8 +356,10 @@ int32_t ec_manager_fsync(ec_fop_data_t * fop, int32_t state)
ec_iatt_rebuild(fop->xl->private, cbk->iatt, 2,
cbk->count);
- cbk->iatt[0].ia_size = fop->pre_size;
- cbk->iatt[1].ia_size = fop->post_size;
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_get_inode_size(fop, fop->fd->inode,
+ &cbk->iatt[0].ia_size));
+ cbk->iatt[1].ia_size = cbk->iatt[0].ia_size;
}
}
else
@@ -388,7 +385,6 @@ int32_t ec_manager_fsync(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
@@ -705,7 +701,6 @@ void ec_lookup_rebuild(ec_t * ec, ec_fop_data_t * fop, ec_cbk_data_t * cbk)
{
ec_cbk_data_t * ans = NULL;
ec_inode_t * ctx = NULL;
- ec_lock_t * lock = NULL;
data_t * data = NULL;
uint8_t * buff = NULL;
uint64_t size = 0;
@@ -729,14 +724,14 @@ void ec_lookup_rebuild(ec_t * ec, ec_fop_data_t * fop, ec_cbk_data_t * cbk)
LOCK(&cbk->inode->lock);
ctx = __ec_inode_get(cbk->inode, fop->xl);
- if ((ctx != NULL) && (ctx->inode_lock != NULL))
+ if (ctx != NULL)
{
- lock = ctx->inode_lock;
- cbk->version[0] = lock->version[0];
- cbk->version[1] = lock->version[1];
- if (lock->have_size)
- {
- size = lock->size;
+ if (ctx->have_version) {
+ cbk->version[0] = ctx->post_version[0];
+ cbk->version[1] = ctx->post_version[1];
+ }
+ if (ctx->have_size) {
+ size = ctx->post_size;
have_size = 1;
}
}
@@ -1304,8 +1299,8 @@ out:
/* FOP: xattrop */
-int32_t ec_combine_xattrop(ec_fop_data_t * fop, ec_cbk_data_t * dst,
- ec_cbk_data_t * src)
+int32_t ec_combine_xattrop(ec_fop_data_t *fop, ec_cbk_data_t *dst,
+ ec_cbk_data_t *src)
{
if (!ec_dict_compare(dst->dict, src->dict))
{
@@ -1325,9 +1320,9 @@ ec_xattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
{
ec_fop_data_t *fop = NULL;
ec_cbk_data_t *cbk = NULL;
+ data_t *data;
+ uint64_t *version;
int32_t idx = (int32_t)(uintptr_t)cookie;
- uint64_t version = 0;
- uint64_t *version_xattr = 0;
VALIDATE_OR_GOTO (this, out);
GF_VALIDATE_OR_GOTO (this->name, frame, out);
@@ -1347,12 +1342,19 @@ ec_xattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
if (op_ret >= 0) {
cbk->dict = dict_ref (xattr);
- if (dict_get_bin (xattr, EC_XATTR_VERSION,
- (void **)&version_xattr) == 0) {
- version = ntoh64(version_xattr[0]);
- if ((version >> EC_SELFHEAL_BIT) & 1)
- fop->healing |= (1ULL<<idx);
+ data = dict_get(cbk->dict, EC_XATTR_VERSION);
+ if ((data != NULL) && (data->len >= sizeof(uint64_t))) {
+ version = (uint64_t *)data->data;
+
+ if (((ntoh64(version[0]) >> EC_SELFHEAL_BIT) & 1) != 0) {
+ LOCK(&fop->lock);
+
+ fop->healing |= 1ULL << idx;
+
+ UNLOCK(&fop->lock);
+ }
}
+
ec_dict_del_array (xattr, EC_XATTR_DIRTY, cbk->dirty,
EC_VERSION_SIZE);
}
@@ -1386,13 +1388,10 @@ int32_t ec_manager_xattrop(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- if (fop->fd == NULL)
- {
- ec_lock_prepare_inode(fop, &fop->loc[0], 1);
- }
- else
- {
- ec_lock_prepare_fd(fop, fop->fd, 1);
+ if (fop->fd == NULL) {
+ ec_lock_prepare_inode(fop, &fop->loc[0], EC_UPDATE_META);
+ } else {
+ ec_lock_prepare_fd(fop, fop->fd, EC_UPDATE_META);
}
ec_lock(fop);
diff --git a/xlators/cluster/ec/src/ec-heal.c b/xlators/cluster/ec/src/ec-heal.c
index 4fe5754cbb8..80725e5a9fa 100644
--- a/xlators/cluster/ec/src/ec-heal.c
+++ b/xlators/cluster/ec/src/ec-heal.c
@@ -119,9 +119,8 @@ void ec_heal_lookup_resume(ec_fop_data_t * fop)
heal->version[0] = cbk->version[0];
heal->version[1] = cbk->version[1];
heal->raw_size = cbk->size;
- heal->fop->pre_size = cbk->iatt[0].ia_size;
- heal->fop->post_size = cbk->iatt[0].ia_size;
- heal->fop->have_size = 1;
+
+ GF_ASSERT(ec_set_inode_size(fop, cbk->inode, cbk->size));
if (ec_loc_update(heal->xl, &heal->loc, cbk->inode,
&cbk->iatt[0]) != 0)
@@ -547,25 +546,8 @@ out:
return error;
}
-void ec_heal_entrylk(ec_heal_t * heal, entrylk_cmd cmd)
-{
- loc_t loc;
-
- if (ec_loc_parent(heal->xl, &heal->loc, &loc) != 0) {
- gf_log("ec", GF_LOG_NOTICE, "ec_loc_parent() failed");
- ec_fop_set_error(heal->fop, EIO);
-
- return;
- }
-
- ec_entrylk(heal->fop->frame, heal->xl, -1, EC_MINIMUM_ALL, NULL, NULL,
- heal->xl->name, &loc, NULL, cmd, ENTRYLK_WRLCK, NULL);
-
- loc_wipe(&loc);
-}
-
-void ec_heal_inodelk(ec_heal_t * heal, int32_t type, int32_t use_fd,
- off_t offset, size_t size)
+void ec_heal_lock(ec_heal_t *heal, int32_t type, fd_t *fd, loc_t *loc,
+ off_t offset, size_t size)
{
struct gf_flock flock;
@@ -576,20 +558,47 @@ void ec_heal_inodelk(ec_heal_t * heal, int32_t type, int32_t use_fd,
flock.l_pid = 0;
flock.l_owner.len = 0;
- if (use_fd)
+ /* Remove inode size information before unlocking it. */
+ if ((type == F_UNLCK) && (heal->loc.inode != NULL)) {
+ ec_clear_inode_info(heal->fop, heal->loc.inode);
+ }
+
+ if (fd != NULL)
{
ec_finodelk(heal->fop->frame, heal->xl, heal->fop->mask,
- EC_MINIMUM_ALL, NULL, NULL, heal->xl->name, heal->fd,
+ EC_MINIMUM_ALL, NULL, NULL, heal->xl->name, fd,
F_SETLKW, &flock, NULL);
}
else
{
ec_inodelk(heal->fop->frame, heal->xl, heal->fop->mask, EC_MINIMUM_ALL,
- NULL, NULL, heal->xl->name, &heal->loc, F_SETLKW, &flock,
+ NULL, NULL, heal->xl->name, loc, F_SETLKW, &flock,
NULL);
}
}
+void ec_heal_entrylk(ec_heal_t *heal, int32_t type)
+{
+ loc_t loc;
+
+ if (ec_loc_parent(heal->xl, &heal->loc, &loc) != 0) {
+ ec_fop_set_error(heal->fop, EIO);
+
+ return;
+ }
+
+ ec_heal_lock(heal, type, NULL, &loc, 0, 0);
+
+ loc_wipe(&loc);
+}
+
+void ec_heal_inodelk(ec_heal_t *heal, int32_t type, int32_t use_fd,
+ off_t offset, size_t size)
+{
+ ec_heal_lock(heal, type, use_fd ? heal->fd : NULL, &heal->loc, offset,
+ size);
+}
+
void ec_heal_lookup(ec_heal_t *heal, uintptr_t mask)
{
dict_t * xdata;
@@ -1297,13 +1306,10 @@ ec_manager_heal (ec_fop_data_t * fop, int32_t state)
case EC_STATE_DISPATCH:
if (heal->done != 0) {
- gf_log("ec", GF_LOG_NOTICE, "heal already done");
return EC_STATE_HEAL_DISPATCH;
}
- gf_log("ec", GF_LOG_NOTICE, "heal before entrylk");
- ec_heal_entrylk(heal, ENTRYLK_LOCK);
- gf_log("ec", GF_LOG_NOTICE, "heal after entrylk");
+ ec_heal_entrylk(heal, F_WRLCK);
return EC_STATE_HEAL_ENTRY_LOOKUP;
@@ -1331,7 +1337,7 @@ ec_manager_heal (ec_fop_data_t * fop, int32_t state)
/* Only heal data/metadata if enough information is supplied. */
if (gf_uuid_is_null(heal->loc.gfid))
{
- ec_heal_entrylk(heal, ENTRYLK_UNLOCK);
+ ec_heal_entrylk(heal, F_UNLCK);
return EC_STATE_HEAL_DISPATCH;
}
@@ -1387,7 +1393,7 @@ ec_manager_heal (ec_fop_data_t * fop, int32_t state)
case -EC_STATE_HEAL_UNLOCK_ENTRY:
case EC_STATE_HEAL_UNLOCK_ENTRY:
if (heal->nameheal)
- ec_heal_entrylk(heal, ENTRYLK_UNLOCK);
+ ec_heal_entrylk(heal, F_UNLCK);
heal->bad = ec_heal_needs_data_rebuild(heal);
if (heal->bad != 0)
@@ -2557,9 +2563,9 @@ ec_heal_name (call_frame_t *frame, ec_t *ec, inode_t *parent, char *name,
EC_REPLIES_ALLOC (replies, ec->nodes);
output = alloca0 (ec->nodes);
locked_on = alloca0 (ec->nodes);
- ret = cluster_entrylk (ec->xl_list, participants, ec->nodes, replies,
+ ret = cluster_inodelk (ec->xl_list, participants, ec->nodes, replies,
locked_on, frame, ec->xl, ec->xl->name, parent,
- NULL);
+ 0, 0);
{
if (ret <= ec->fragments) {
gf_log (ec->xl->name, GF_LOG_DEBUG, "%s/%s: Skipping "
@@ -2573,8 +2579,8 @@ ec_heal_name (call_frame_t *frame, ec_t *ec, inode_t *parent, char *name,
ret = __ec_heal_name (frame, ec, parent, name, participants);
}
unlock:
- cluster_unentrylk (ec->xl_list, locked_on, ec->nodes, replies, output,
- frame, ec->xl, ec->xl->name, parent, NULL);
+ cluster_uninodelk (ec->xl_list, locked_on, ec->nodes, replies, output,
+ frame, ec->xl, ec->xl->name, parent, 0, 0);
out:
cluster_replies_wipe (replies, ec->nodes);
loc_wipe (&loc);
@@ -2658,9 +2664,9 @@ __ec_heal_entry (call_frame_t *frame, ec_t *ec, inode_t *inode,
dirty = alloca0 (ec->nodes * sizeof (*dirty));
EC_REPLIES_ALLOC (replies, ec->nodes);
- ret = cluster_entrylk (ec->xl_list, heal_on, ec->nodes, replies,
+ ret = cluster_inodelk (ec->xl_list, heal_on, ec->nodes, replies,
locked_on, frame, ec->xl, ec->xl->name, inode,
- NULL);
+ 0, 0);
{
if (ret <= ec->fragments) {
gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: Skipping heal "
@@ -2675,8 +2681,8 @@ __ec_heal_entry (call_frame_t *frame, ec_t *ec, inode_t *inode,
source = ret;
}
unlock:
- cluster_unentrylk (ec->xl_list, locked_on, ec->nodes, replies, output,
- frame, ec->xl, ec->xl->name, inode, NULL);
+ cluster_uninodelk (ec->xl_list, locked_on, ec->nodes, replies, output,
+ frame, ec->xl, ec->xl->name, inode, 0, 0);
if (ret < 0)
goto out;
@@ -2723,9 +2729,9 @@ ec_heal_entry (call_frame_t *frame, ec_t *ec, inode_t *inode,
sprintf (selfheal_domain, "%s:self-heal", ec->xl->name);
ec_mask_to_char_array (ec->xl_up, up_subvols, ec->nodes);
/*If other processes are already doing the heal, don't block*/
- ret = cluster_entrylk (ec->xl_list, up_subvols, ec->nodes, replies,
+ ret = cluster_inodelk (ec->xl_list, up_subvols, ec->nodes, replies,
locked_on, frame, ec->xl, selfheal_domain, inode,
- NULL);
+ 0, 0);
{
if (ret <= ec->fragments) {
gf_log (ec->xl->name, GF_LOG_DEBUG, "%s: Skipping heal "
@@ -2738,8 +2744,8 @@ ec_heal_entry (call_frame_t *frame, ec_t *ec, inode_t *inode,
sources, healed_sinks);
}
unlock:
- cluster_unentrylk (ec->xl_list, locked_on, ec->nodes, replies, output,
- frame, ec->xl, selfheal_domain, inode, NULL);
+ cluster_uninodelk (ec->xl_list, locked_on, ec->nodes, replies, output,
+ frame, ec->xl, selfheal_domain, inode, 0, 0);
cluster_replies_wipe (replies, ec->nodes);
return ret;
}
@@ -3081,8 +3087,8 @@ ec_heal_block (call_frame_t *frame, xlator_t *this, uintptr_t target,
if (fop == NULL)
goto out;
- fop->pre_size = fop->post_size = heal->total_size;
- fop->have_size = 1;
+ GF_ASSERT(ec_set_inode_size(fop, heal->fd->inode, heal->total_size));
+
error = 0;
out:
diff --git a/xlators/cluster/ec/src/ec-helpers.c b/xlators/cluster/ec/src/ec-helpers.c
index 8ce3087d5a6..48251c84bac 100644
--- a/xlators/cluster/ec/src/ec-helpers.c
+++ b/xlators/cluster/ec/src/ec-helpers.c
@@ -738,3 +738,41 @@ ec_filter_internal_xattrs (dict_t *xattr)
dict_foreach_match (xattr, ec_is_internal_xattr, NULL,
dict_remove_foreach_fn, NULL);
}
+
+gf_boolean_t
+ec_is_data_fop (glusterfs_fop_t fop)
+{
+ switch (fop) {
+ case GF_FOP_WRITE:
+ case GF_FOP_TRUNCATE:
+ case GF_FOP_FTRUNCATE:
+ case GF_FOP_FALLOCATE:
+ case GF_FOP_DISCARD:
+ case GF_FOP_ZEROFILL:
+ return _gf_true;
+ default:
+ return _gf_false;
+ }
+ return _gf_false;
+}
+/*
+gf_boolean_t
+ec_is_metadata_fop (int32_t lock_kind, glusterfs_fop_t fop)
+{
+ if (lock_kind == EC_LOCK_ENTRY) {
+ return _gf_false;
+ }
+
+ switch (fop) {
+ case GF_FOP_SETATTR:
+ case GF_FOP_FSETATTR:
+ case GF_FOP_SETXATTR:
+ case GF_FOP_FSETXATTR:
+ case GF_FOP_REMOVEXATTR:
+ case GF_FOP_FREMOVEXATTR:
+ return _gf_true;
+ default:
+ return _gf_false;
+ }
+ return _gf_false;
+}*/
diff --git a/xlators/cluster/ec/src/ec-helpers.h b/xlators/cluster/ec/src/ec-helpers.h
index df4495138fe..14243df54f3 100644
--- a/xlators/cluster/ec/src/ec-helpers.h
+++ b/xlators/cluster/ec/src/ec-helpers.h
@@ -59,4 +59,11 @@ ec_is_internal_xattr (dict_t *dict, char *key, data_t *value, void *data);
void
ec_filter_internal_xattrs (dict_t *xattr);
+
+gf_boolean_t
+ec_is_data_fop (glusterfs_fop_t fop);
+/*
+gf_boolean_t
+ec_is_metadata_fop (glusterfs_fop_t fop);
+*/
#endif /* __EC_HELPERS_H__ */
diff --git a/xlators/cluster/ec/src/ec-inode-read.c b/xlators/cluster/ec/src/ec-inode-read.c
index 7372c0a0599..853d914148b 100644
--- a/xlators/cluster/ec/src/ec-inode-read.c
+++ b/xlators/cluster/ec/src/ec-inode-read.c
@@ -267,9 +267,9 @@ int32_t ec_manager_getxattr(ec_fop_data_t * fop, int32_t state)
(strncmp(fop->str[0], GF_XATTR_CLRLK_CMD,
strlen(GF_XATTR_CLRLK_CMD)) != 0)) {
if (fop->fd == NULL) {
- ec_lock_prepare_inode(fop, &fop->loc[0], 0);
+ ec_lock_prepare_inode(fop, &fop->loc[0], EC_QUERY_INFO);
} else {
- ec_lock_prepare_fd(fop, fop->fd, 0);
+ ec_lock_prepare_fd(fop, fop->fd, EC_QUERY_INFO);
}
ec_lock(fop);
}
@@ -1094,12 +1094,12 @@ int32_t ec_readv_rebuild(ec_t * ec, ec_fop_data_t * fop, ec_cbk_data_t * cbk)
size_t fsize = 0, size = 0, max = 0;
int32_t i = 0;
- if (cbk->op_ret < 0)
- {
+ if (cbk->op_ret < 0) {
goto out;
}
- cbk->iatt[0].ia_size = fop->pre_size;
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_get_inode_size(fop, fop->fd->inode, &cbk->iatt[0].ia_size));
if (cbk->op_ret > 0)
{
@@ -1331,15 +1331,10 @@ int32_t ec_manager_readv(ec_fop_data_t * fop, int32_t state)
/* Fall through */
case EC_STATE_LOCK:
- ec_lock_prepare_fd(fop, fop->fd, 0);
+ ec_lock_prepare_fd(fop, fop->fd, EC_QUERY_INFO);
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
- return EC_STATE_DISPATCH;
+ return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
ec_dispatch_min(fop);
@@ -1396,7 +1391,6 @@ int32_t ec_manager_readv(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
@@ -1580,22 +1574,14 @@ int32_t ec_manager_stat(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- if (fop->fd == NULL)
- {
- ec_lock_prepare_inode(fop, &fop->loc[0], 0);
- }
- else
- {
- ec_lock_prepare_fd(fop, fop->fd, 0);
+ if (fop->fd == NULL) {
+ ec_lock_prepare_inode(fop, &fop->loc[0], EC_QUERY_INFO);
+ } else {
+ ec_lock_prepare_fd(fop, fop->fd, EC_QUERY_INFO);
}
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
- return EC_STATE_DISPATCH;
+ return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
ec_dispatch_all(fop);
@@ -1614,16 +1600,16 @@ int32_t ec_manager_stat(ec_fop_data_t * fop, int32_t state)
cbk->op_errno = EIO;
}
}
- if (cbk->op_ret < 0)
- {
+ if (cbk->op_ret < 0) {
ec_fop_set_error(fop, cbk->op_errno);
- }
- else
- {
+ } else if (cbk->iatt[0].ia_type == IA_IFREG) {
ec_iatt_rebuild(fop->xl->private, cbk->iatt, 1,
cbk->count);
- cbk->iatt[0].ia_size = fop->pre_size;
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_get_inode_size(fop,
+ fop->locks[0].lock->loc.inode,
+ &cbk->iatt[0].ia_size));
}
}
else
@@ -1659,7 +1645,6 @@ int32_t ec_manager_stat(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
diff --git a/xlators/cluster/ec/src/ec-inode-write.c b/xlators/cluster/ec/src/ec-inode-write.c
index 6b485a26fbc..368b3ae5edf 100644
--- a/xlators/cluster/ec/src/ec-inode-write.c
+++ b/xlators/cluster/ec/src/ec-inode-write.c
@@ -123,11 +123,13 @@ ec_manager_xattr (ec_fop_data_t *fop, int32_t state)
switch (state) {
case EC_STATE_INIT:
case EC_STATE_LOCK:
- if (fop->fd == NULL)
- ec_lock_prepare_inode(fop, &fop->loc[0], 1);
- else
- ec_lock_prepare_fd(fop, fop->fd, 1);
-
+ if (fop->fd == NULL) {
+ ec_lock_prepare_inode(fop, &fop->loc[0],
+ EC_UPDATE_META | EC_QUERY_INFO);
+ } else {
+ ec_lock_prepare_fd(fop, fop->fd,
+ EC_UPDATE_META | EC_QUERY_INFO);
+ }
ec_lock(fop);
return EC_STATE_DISPATCH;
@@ -378,21 +380,15 @@ int32_t ec_manager_setattr(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- if (fop->fd == NULL)
- {
- ec_lock_prepare_inode(fop, &fop->loc[0], 1);
- }
- else
- {
- ec_lock_prepare_fd(fop, fop->fd, 1);
+ if (fop->fd == NULL) {
+ ec_lock_prepare_inode(fop, &fop->loc[0],
+ EC_UPDATE_META | EC_QUERY_INFO);
+ } else {
+ ec_lock_prepare_fd(fop, fop->fd,
+ EC_UPDATE_META | EC_QUERY_INFO);
}
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_get_size_version(fop);
-
return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
@@ -412,17 +408,17 @@ int32_t ec_manager_setattr(ec_fop_data_t * fop, int32_t state)
cbk->op_errno = EIO;
}
}
- if (cbk->op_ret < 0)
- {
+ if (cbk->op_ret < 0) {
ec_fop_set_error(fop, cbk->op_errno);
- }
- else
- {
+ } else if (cbk->iatt[0].ia_type == IA_IFREG) {
ec_iatt_rebuild(fop->xl->private, cbk->iatt, 2,
cbk->count);
- cbk->iatt[0].ia_size = fop->pre_size;
- cbk->iatt[1].ia_size = fop->pre_size;
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_get_inode_size(fop,
+ fop->locks[0].lock->loc.inode,
+ &cbk->iatt[0].ia_size));
+ cbk->iatt[1].ia_size = cbk->iatt[0].ia_size;
}
}
else
@@ -462,7 +458,6 @@ int32_t ec_manager_setattr(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
@@ -992,21 +987,17 @@ int32_t ec_manager_truncate(ec_fop_data_t * fop, int32_t state)
/* Fall through */
case EC_STATE_LOCK:
- if (fop->id == GF_FOP_TRUNCATE)
- {
- ec_lock_prepare_inode(fop, &fop->loc[0], 1);
- }
- else
- {
- ec_lock_prepare_fd(fop, fop->fd, 1);
+ if (fop->id == GF_FOP_TRUNCATE) {
+ ec_lock_prepare_inode(fop, &fop->loc[0],
+ EC_UPDATE_DATA | EC_UPDATE_META |
+ EC_QUERY_INFO);
+ } else {
+ ec_lock_prepare_fd(fop, fop->fd,
+ EC_UPDATE_DATA | EC_UPDATE_META |
+ EC_QUERY_INFO);
}
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_prepare_update(fop);
-
return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
@@ -1035,14 +1026,18 @@ int32_t ec_manager_truncate(ec_fop_data_t * fop, int32_t state)
ec_iatt_rebuild(fop->xl->private, cbk->iatt, 2,
cbk->count);
- cbk->iatt[0].ia_size = fop->pre_size;
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_get_inode_size(fop,
+ fop->locks[0].lock->loc.inode,
+ &cbk->iatt[0].ia_size));
cbk->iatt[1].ia_size = fop->user_size;
- fop->post_size = fop->user_size;
- if ((fop->pre_size > fop->post_size) &&
- (fop->user_size != fop->offset))
- {
- if (!ec_truncate_clean(fop))
- {
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_set_inode_size(fop,
+ fop->locks[0].lock->loc.inode,
+ fop->user_size));
+ if ((cbk->iatt[0].ia_size > cbk->iatt[1].ia_size) &&
+ (fop->user_size != fop->offset)) {
+ if (!ec_truncate_clean(fop)) {
ec_fop_set_error(fop, EIO);
}
}
@@ -1085,7 +1080,6 @@ int32_t ec_manager_truncate(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
@@ -1355,9 +1349,13 @@ void ec_writev_start(ec_fop_data_t *fop)
ec_fd_t *ctx;
fd_t *fd;
size_t tail;
+ uint64_t current;
uid_t uid;
gid_t gid;
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_get_inode_size(fop, fop->fd->inode, &current));
+
fd = fd_anonymous(fop->fd->inode);
if (fd == NULL) {
ec_fop_set_error(fop, EIO);
@@ -1373,7 +1371,7 @@ void ec_writev_start(ec_fop_data_t *fop)
ctx = ec_fd_get(fop->fd, fop->xl);
if (ctx != NULL) {
if ((ctx->flags & O_APPEND) != 0) {
- fop->offset = fop->pre_size;
+ fop->offset = current;
}
}
@@ -1404,22 +1402,17 @@ void ec_writev_start(ec_fop_data_t *fop)
iobref_unref(fop->buffers);
fop->buffers = iobref;
- if (fop->head > 0)
- {
+ if (fop->head > 0) {
ec_readv(fop->frame, fop->xl, -1, EC_MINIMUM_MIN, ec_writev_merge_head,
NULL, fd, ec->stripe_size, fop->offset, 0, NULL);
}
tail = fop->size - fop->user_size - fop->head;
- if ((tail > 0) && ((fop->head == 0) || (fop->size > ec->stripe_size)))
- {
- if (fop->pre_size > fop->offset + fop->head + fop->user_size)
- {
+ if ((tail > 0) && ((fop->head == 0) || (fop->size > ec->stripe_size))) {
+ if (current > fop->offset + fop->head + fop->user_size) {
ec_readv(fop->frame, fop->xl, -1, EC_MINIMUM_MIN,
ec_writev_merge_tail, NULL, fd, ec->stripe_size,
fop->offset + fop->size - ec->stripe_size, 0, NULL);
- }
- else
- {
+ } else {
memset(fop->vector[0].iov_base + fop->size - tail, 0, tail);
}
}
@@ -1530,14 +1523,11 @@ int32_t ec_manager_writev(ec_fop_data_t * fop, int32_t state)
{
case EC_STATE_INIT:
case EC_STATE_LOCK:
- ec_lock_prepare_fd(fop, fop->fd, 1);
+ ec_lock_prepare_fd(fop, fop->fd,
+ EC_UPDATE_DATA | EC_UPDATE_META |
+ EC_QUERY_INFO);
ec_lock(fop);
- return EC_STATE_GET_SIZE_AND_VERSION;
-
- case EC_STATE_GET_SIZE_AND_VERSION:
- ec_prepare_update(fop);
-
return EC_STATE_DISPATCH;
case EC_STATE_DISPATCH:
@@ -1574,27 +1564,34 @@ int32_t ec_manager_writev(ec_fop_data_t * fop, int32_t state)
ec_iatt_rebuild(fop->xl->private, cbk->iatt, 2,
cbk->count);
+ /* This shouldn't fail because we have the inode locked. */
+ GF_ASSERT(ec_get_inode_size(fop, fop->fd->inode,
+ &cbk->iatt[0].ia_size));
+ cbk->iatt[1].ia_size = cbk->iatt[0].ia_size;
size = fop->offset + fop->head + fop->user_size;
- if (size > fop->pre_size)
- {
- fop->post_size = size;
- }
-
- cbk->iatt[0].ia_size = fop->pre_size;
- cbk->iatt[1].ia_size = fop->post_size;
-
- cbk->op_ret *= ec->fragments;
- if (cbk->op_ret < fop->head)
- {
- cbk->op_ret = 0;
- }
- else
- {
- cbk->op_ret -= fop->head;
+ if (size > cbk->iatt[0].ia_size) {
+ /* Only update inode size if this is a top level fop.
+ * Otherwise this is an internal write and the top
+ * level fop should take care of the real inode size.
+ */
+ if (fop->parent == NULL) {
+ /* This shouldn't fail because we have the inode
+ * locked. */
+ GF_ASSERT(ec_set_inode_size(fop, fop->fd->inode,
+ size));
+ }
+ cbk->iatt[1].ia_size = size;
}
- if (cbk->op_ret > fop->user_size)
- {
- cbk->op_ret = fop->user_size;
+ if (fop->error == 0) {
+ cbk->op_ret *= ec->fragments;
+ if (cbk->op_ret < fop->head) {
+ cbk->op_ret = 0;
+ } else {
+ cbk->op_ret -= fop->head;
+ }
+ if (cbk->op_ret > fop->user_size) {
+ cbk->op_ret = fop->user_size;
+ }
}
}
}
@@ -1621,8 +1618,8 @@ int32_t ec_manager_writev(ec_fop_data_t * fop, int32_t state)
case -EC_STATE_INIT:
case -EC_STATE_LOCK:
- case -EC_STATE_GET_SIZE_AND_VERSION:
case -EC_STATE_DISPATCH:
+ case -EC_STATE_DELAYED_START:
case -EC_STATE_PREPARE_ANSWER:
case -EC_STATE_REPORT:
GF_ASSERT(fop->error != 0);