diff options
-rw-r--r-- | libglusterfs/src/fd.h | 3 | ||||
-rw-r--r-- | xlators/cluster/ec/src/ec-common.c | 471 | ||||
-rw-r--r-- | xlators/cluster/ec/src/ec-common.h | 3 | ||||
-rw-r--r-- | xlators/cluster/ec/src/ec-data.c | 1 | ||||
-rw-r--r-- | xlators/cluster/ec/src/ec-data.h | 5 | ||||
-rw-r--r-- | xlators/cluster/ec/src/ec-dir-read.c | 13 | ||||
-rw-r--r-- | xlators/cluster/ec/src/ec-generic.c | 12 | ||||
-rw-r--r-- | xlators/cluster/ec/src/ec-inode-read.c | 46 |
8 files changed, 362 insertions, 192 deletions
diff --git a/libglusterfs/src/fd.h b/libglusterfs/src/fd.h index 53ec93dec6c..a6dc48a0b0e 100644 --- a/libglusterfs/src/fd.h +++ b/libglusterfs/src/fd.h @@ -116,6 +116,9 @@ fd_t * fd_ref (fd_t *fd); +fd_t * +__fd_unref (fd_t *fd); + void fd_unref (fd_t *fd); diff --git a/xlators/cluster/ec/src/ec-common.c b/xlators/cluster/ec/src/ec-common.c index b39fcb55d4e..d0c9f97ab28 100644 --- a/xlators/cluster/ec/src/ec-common.c +++ b/xlators/cluster/ec/src/ec-common.c @@ -625,6 +625,7 @@ ec_lock_t *ec_lock_allocate(ec_fop_data_t *fop, loc_t *loc) if (lock != NULL) { lock->good_mask = -1ULL; + INIT_LIST_HEAD(&lock->owners); INIT_LIST_HEAD(&lock->waiting); INIT_LIST_HEAD(&lock->frozen); err = ec_loc_from_loc(fop->xl, &lock->loc, loc); @@ -871,7 +872,8 @@ ec_prepare_update_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *dict, dict_t *xdata) { - ec_fop_data_t *fop = cookie, *parent; + struct list_head list; + ec_fop_data_t *fop = cookie, *parent, *tmp; ec_lock_link_t *link = fop->data; ec_lock_t *lock = NULL; ec_inode_t *ctx; @@ -880,16 +882,26 @@ ec_prepare_update_cbk (call_frame_t *frame, void *cookie, parent = link->fop; ctx = lock->ctx; + INIT_LIST_HEAD(&list); + + LOCK(&lock->loc.inode->lock); + + list_for_each_entry(tmp, &lock->owners, owner_list) { + if ((tmp->flags & EC_FLAG_WAITING_SIZE) != 0) { + tmp->flags ^= EC_FLAG_WAITING_SIZE; + + list_add_tail(&tmp->cbk_list, &list); + } + } + if (op_ret < 0) { gf_msg (this->name, GF_LOG_WARNING, op_errno, EC_MSG_SIZE_VERS_GET_FAIL, "Failed to get size and version"); - goto out; + goto unlock; } - LOCK(&lock->loc.inode->lock); - op_errno = -ec_dict_del_array(dict, EC_XATTR_VERSION, ctx->pre_version, EC_VERSION_SIZE); if (op_errno != 0) { @@ -942,8 +954,10 @@ ec_prepare_update_cbk (call_frame_t *frame, void *cookie, op_errno = 0; unlock: + lock->getting_size = _gf_false; + UNLOCK(&lock->loc.inode->lock); -out: + if (op_errno == 0) { /* We don't allow the main fop to be executed on bricks that have not * succeeded the initial xattrop. */ @@ -958,6 +972,24 @@ out: ec_fop_set_error(parent, op_errno); } + while (!list_empty(&list)) { + tmp = list_entry(list.next, ec_fop_data_t, cbk_list); + list_del_init(&tmp->cbk_list); + + if (op_errno == 0) { + tmp->mask &= fop->good; + + /*As of now only data healing marks bricks as healing*/ + if (ec_is_data_fop (tmp->id)) { + tmp->healing |= fop->healing; + } + } else { + ec_fop_set_error(tmp, op_errno); + } + + ec_resume(tmp, 0); + } + return 0; } @@ -971,6 +1003,7 @@ void ec_get_size_version(ec_lock_link_t *link) uid_t uid; gid_t gid; int32_t error = -ENOMEM; + gf_boolean_t getting_size; uint64_t allzero[EC_VERSION_SIZE] = {0, 0}; lock = link->lock; @@ -996,6 +1029,24 @@ void ec_get_size_version(ec_lock_link_t *link) memset(&loc, 0, sizeof(loc)); + LOCK(&lock->loc.inode->lock); + + getting_size = lock->getting_size; + lock->getting_size = _gf_true; + if (getting_size) { + fop->flags |= EC_FLAG_WAITING_SIZE; + + ec_sleep(fop); + } + + UNLOCK(&lock->loc.inode->lock); + + if (getting_size) { + error = 0; + + goto out; + } + dict = dict_new(); if (dict == NULL) { goto out; @@ -1221,36 +1272,123 @@ out: } } -void ec_lock_acquired(ec_lock_link_t *link) +static void +ec_lock_update_fd(ec_lock_t *lock, ec_fop_data_t *fop) +{ + /* If the fop has an fd available, attach it to the lock structure to be + * able to do fxattrop calls instead of xattrop. */ + if (fop->use_fd) { + if (lock->fd != NULL) { + __fd_unref(lock->fd); + } + lock->fd = __fd_ref(fop->fd); + } +} + +static void +ec_lock_wake_shared(ec_lock_t *lock, struct list_head *list) { - ec_lock_t *lock; ec_fop_data_t *fop; + ec_lock_link_t *link; + gf_boolean_t exclusive = _gf_false; - lock = link->lock; - fop = link->fop; + while (!exclusive && !list_empty(&lock->waiting)) { + link = list_entry(lock->waiting.next, ec_lock_link_t, wait_list); + fop = link->fop; - ec_trace("LOCKED", link->fop, "lock=%p", lock); + /* If lock is not acquired, at most one fop can be assigned as owner. + * The following fops will need to wait in the lock->waiting queue + * until the lock has been fully acquired. */ + exclusive = !lock->acquired; - /* If the fop has an fd available, attach it to the lock structure to be - * able to do fxattrop calls instead of xattrop. It's safe to change this - * here because no xattrop using the fd can start concurrently at this - * point. */ - if (fop->use_fd) { - if (lock->fd != NULL) { - fd_unref(lock->fd); + /* If the fop is not shareable, only this fop can be assigned as owner. + * Other fops will need to wait until this one finishes. */ + if ((fop->flags & EC_FLAG_LOCK_SHARED) == 0) { + exclusive = _gf_true; + + /* Avoid other requests to be assigned as owners. */ + lock->exclusive = 1; } - lock->fd = fd_ref(fop->fd); + + /* If only one fop is allowed, it can be assigned as the owner of the + * lock only if there weren't any other owner. */ + if (exclusive && !list_empty(&lock->owners)) { + break; + } + + list_move_tail(&link->wait_list, list); + + list_add_tail(&fop->owner_list, &lock->owners); + + ec_lock_update_fd(lock, fop); } - lock->acquired = _gf_true; +} - fop->mask &= lock->good_mask; +static void +ec_lock_apply(ec_lock_link_t *link) +{ + ec_fop_data_t *fop = link->fop; + fop->mask &= link->lock->good_mask; fop->locked++; ec_get_size_version(link); ec_get_real_size(link); } +gf_boolean_t ec_lock_acquire(ec_lock_link_t *link); + +static void +ec_lock_resume_shared(struct list_head *list) +{ + ec_lock_link_t *link; + + while (!list_empty(list)) { + link = list_entry(list->next, ec_lock_link_t, wait_list); + list_del_init(&link->wait_list); + + if (link->lock->acquired) { + ec_lock_apply(link); + ec_lock(link->fop); + } else { + GF_ASSERT(list_empty(list)); + + ec_lock_acquire(link); + } + + ec_resume(link->fop, 0); + } +} + +void ec_lock_acquired(ec_lock_link_t *link) +{ + struct list_head list; + ec_lock_t *lock; + ec_fop_data_t *fop; + + lock = link->lock; + fop = link->fop; + + ec_trace("LOCKED", fop, "lock=%p", lock); + + INIT_LIST_HEAD(&list); + + LOCK(&lock->loc.inode->lock); + + lock->acquired = _gf_true; + + ec_lock_update_fd(lock, fop); + if ((fop->flags & EC_FLAG_LOCK_SHARED) != 0) { + ec_lock_wake_shared(lock, &list); + } + + UNLOCK(&lock->loc.inode->lock); + + ec_lock_apply(link); + + ec_lock_resume_shared(&list); +} + int32_t ec_locked(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { @@ -1282,6 +1420,7 @@ gf_boolean_t ec_lock_acquire(ec_lock_link_t *link) lock = link->lock; fop = link->fop; + if (!lock->acquired) { ec_owner_set(fop->frame, lock); @@ -1303,141 +1442,195 @@ gf_boolean_t ec_lock_acquire(ec_lock_link_t *link) return _gf_true; } -void ec_lock(ec_fop_data_t *fop) +static gf_boolean_t +ec_lock_assign_owner(ec_lock_link_t *link) { - ec_lock_link_t *link; - ec_lock_link_t *timer_link = NULL; + ec_fop_data_t *fop; ec_lock_t *lock; + ec_lock_link_t *timer_link = NULL; + gf_boolean_t assigned = _gf_false; - /* There is a chance that ec_resume is called on fop even before ec_sleep. - * Which can result in refs == 0 for fop leading to use after free in this - * function when it calls ec_sleep so do ec_sleep at start and ec_resume at - * the end of this function.*/ - ec_sleep (fop); + GF_ASSERT(list_empty(&link->wait_list)); - while (fop->locked < fop->lock_count) { - /* Since there are only up to 2 locks per fop, this xor will change - * the order of the locks if fop->first_lock is 1. */ - link = &fop->locks[fop->locked ^ fop->first_lock]; - lock = link->lock; + fop = link->fop; + lock = link->lock; - timer_link = NULL; + LOCK(&lock->loc.inode->lock); - LOCK(&lock->loc.inode->lock); - GF_ASSERT (lock->inserted > 0); - lock->inserted--; + GF_ASSERT (lock->inserted > 0); + lock->inserted--; - if (lock->timer != NULL) { - GF_ASSERT (lock->release == _gf_false); - timer_link = lock->timer->data; - if (gf_timer_call_cancel(fop->xl->ctx, lock->timer) == 0) { - ec_trace("UNLOCK_CANCELLED", timer_link->fop, - "lock=%p", lock); - lock->timer = NULL; - lock->refs--; - /* There should remain at least 1 ref, the current one. */ - GF_ASSERT(lock->refs > 0); - } else { - /* Timer expired and on the way to unlock. - * Set lock->release to _gf_true, so that this - * lock will be put in frozen list*/ - timer_link = NULL; - lock->release = _gf_true; - } + if (lock->release) { + ec_trace("LOCK_QUEUE_FREEZE", fop, "lock=%p", lock); + + list_add_tail(&link->wait_list, &lock->frozen); + + /* The lock is frozen, so we move the current reference to refs_frozen. + * After that, there should remain at least one ref belonging to the + * lock that is processing the release. */ + lock->refs--; + GF_ASSERT(lock->refs > 0); + lock->refs_frozen++; + + goto unlock; + } + + lock->exclusive |= (fop->flags & EC_FLAG_LOCK_SHARED) == 0; + + if (!list_empty(&lock->owners)) { + if (!lock->acquired || (lock->exclusive != 0)) { + ec_trace("LOCK_QUEUE_WAIT", fop, "lock=%p", lock); + + list_add_tail(&link->wait_list, &lock->waiting); + + goto unlock; + } + } else if (lock->timer != NULL) { + GF_ASSERT (lock->release == _gf_false); + + timer_link = lock->timer->data; + if (gf_timer_call_cancel(fop->xl->ctx, lock->timer) == 0) { + ec_trace("UNLOCK_CANCELLED", timer_link->fop, "lock=%p", lock); + lock->timer = NULL; + lock->refs--; + /* There should remain at least 1 ref, the current one. */ + GF_ASSERT(lock->refs > 0); + } else { + /* Timer expired and on the way to unlock. + * Set lock->release to _gf_true, so that this + * lock will be put in frozen list*/ + timer_link = NULL; + lock->release = _gf_true; } + } - GF_ASSERT(list_empty(&link->wait_list)); + list_add_tail(&fop->owner_list, &lock->owners); - if ((lock->owner != NULL) || lock->release) { - if (lock->release) { - ec_trace("LOCK_QUEUE_FREEZE", fop, "lock=%p", lock); + assigned = _gf_true; - list_add_tail(&link->wait_list, &lock->frozen); +unlock: + if (!assigned) { + ec_sleep(fop); + } - /* The lock is frozen, so we move the current reference to - * refs_frozen. After that, there should remain at least one - * ref belonging to the lock that is processing the release. */ - lock->refs--; - GF_ASSERT(lock->refs > 0); - lock->refs_frozen++; - } else { - ec_trace("LOCK_QUEUE_WAIT", fop, "lock=%p", lock); + UNLOCK(&lock->loc.inode->lock); - list_add_tail(&link->wait_list, &lock->waiting); - } + if (timer_link != NULL) { + ec_resume(timer_link->fop, 0); + } - UNLOCK(&lock->loc.inode->lock); + return assigned; +} - ec_sleep(fop); +static void +ec_lock_next_owner(ec_lock_link_t *link, ec_cbk_data_t *cbk, + gf_boolean_t release) +{ + struct list_head list; + ec_lock_t *lock = link->lock; + ec_fop_data_t *fop = link->fop; + ec_inode_t *ctx = lock->ctx; + ec_t *ec = fop->xl->private; - break; - } + INIT_LIST_HEAD(&list); - lock->owner = fop; + LOCK(&lock->loc.inode->lock); - UNLOCK(&lock->loc.inode->lock); + ec_trace("LOCK_DONE", fop, "lock=%p", lock); - if (!ec_lock_acquire(link)) { - break; - } + GF_ASSERT(!list_empty(&fop->owner_list)); + list_del_init(&fop->owner_list); + lock->release |= release; - if (timer_link != NULL) { - ec_resume(timer_link->fop, 0); - timer_link = NULL; + if ((fop->error == 0) && (cbk != NULL) && (cbk->op_ret >= 0)) { + if (link->update[0]) { + ctx->post_version[0]++; + if (ec->node_mask & ~fop->good) { + ctx->dirty[0]++; + } + } + if (link->update[1]) { + ctx->post_version[1]++; + if (ec->node_mask & ~fop->good) { + ctx->dirty[1]++; + } } } - ec_resume (fop, 0); - if (timer_link != NULL) { - ec_resume(timer_link->fop, 0); + ec_lock_update_good(lock, fop); + + lock->exclusive -= (fop->flags & EC_FLAG_LOCK_SHARED) == 0; + if (list_empty(&lock->owners)) { + ec_lock_wake_shared(lock, &list); } + + UNLOCK(&lock->loc.inode->lock); + + ec_lock_resume_shared(&list); +} + +void ec_lock(ec_fop_data_t *fop) +{ + ec_lock_link_t *link; + + /* There is a chance that ec_resume is called on fop even before ec_sleep. + * Which can result in refs == 0 for fop leading to use after free in this + * function when it calls ec_sleep so do ec_sleep at start and ec_resume at + * the end of this function.*/ + ec_sleep (fop); + + while (fop->locked < fop->lock_count) { + /* Since there are only up to 2 locks per fop, this xor will change + * the order of the locks if fop->first_lock is 1. */ + link = &fop->locks[fop->locked ^ fop->first_lock]; + + if (!ec_lock_assign_owner(link) || !ec_lock_acquire(link)) { + break; + } + } + + ec_resume(fop, 0); } void ec_lock_unfreeze(ec_lock_link_t *link) { + struct list_head list; ec_lock_t *lock; lock = link->lock; + INIT_LIST_HEAD(&list); + LOCK(&lock->loc.inode->lock); lock->acquired = _gf_false; lock->release = _gf_false; - lock->refs--; - GF_ASSERT (lock->refs == lock->inserted); - GF_ASSERT(list_empty(&lock->waiting) && (lock->owner == NULL)); + GF_ASSERT (lock->refs == lock->inserted); + GF_ASSERT(lock->exclusive == 0); + GF_ASSERT(list_empty(&lock->waiting) && list_empty(&lock->owners)); list_splice_init(&lock->frozen, &lock->waiting); lock->refs += lock->refs_frozen; lock->refs_frozen = 0; + if (lock->refs == 0) { + ec_trace("LOCK_DESTROY", link->fop, "lock=%p", lock); - if (!list_empty(&lock->waiting)) { - link = list_entry(lock->waiting.next, ec_lock_link_t, wait_list); - list_del_init(&link->wait_list); - - lock->owner = link->fop; - - UNLOCK(&lock->loc.inode->lock); - + lock->ctx->inode_lock = NULL; + } else { ec_trace("LOCK_UNFREEZE", link->fop, "lock=%p", lock); - if (ec_lock_acquire(link)) { - ec_lock(link->fop); - } - ec_resume(link->fop, 0); - } else if (lock->refs == 0) { - ec_trace("LOCK_DESTROY", link->fop, "lock=%p", lock); + ec_lock_wake_shared(lock, &list); + } - lock->ctx->inode_lock = NULL; + UNLOCK(&lock->loc.inode->lock); - UNLOCK(&lock->loc.inode->lock); + ec_lock_resume_shared(&list); + if (lock->refs == 0) { ec_lock_destroy(lock); - } else { - UNLOCK(&lock->loc.inode->lock); } } @@ -1551,6 +1744,9 @@ ec_update_size_version(ec_lock_link_t *link, uint64_t *version, fop = link->fop; + GF_ASSERT(version[0] < 0x100000000); + GF_ASSERT(version[1] < 0x100000000); + ec_trace("UPDATE", fop, "version=%ld/%ld, size=%ld, dirty=%ld/%ld", version[0], version[1], size, dirty[0], dirty[1]); @@ -1708,6 +1904,10 @@ ec_unlock_timer_del(ec_lock_link_t *link) lock->release = now = _gf_true; + /* TODO: If the assertion is really true, following code is + * not needed. */ + GF_ASSERT(list_empty(&lock->waiting)); + before = lock->refs + lock->refs_frozen; list_splice_init(&lock->waiting, &lock->frozen); lock->refs_frozen += lock->refs - lock->inserted - 1; @@ -1749,6 +1949,8 @@ void ec_unlock_timer_add(ec_lock_link_t *link) } else if (lock->acquired) { ec_t *ec = fop->xl->private; + GF_ASSERT(list_empty(&lock->owners)); + ec_sleep(fop); /* If healing is needed, the lock needs to be released due to @@ -1783,6 +1985,8 @@ void ec_unlock_timer_add(ec_lock_link_t *link) } else { lock->release = _gf_true; + GF_ASSERT(list_empty(&lock->owners)); + UNLOCK(&lock->loc.inode->lock); ec_lock_unfreeze(link); @@ -1814,11 +2018,7 @@ void ec_flush_size_version(ec_fop_data_t * fop) void ec_lock_reuse(ec_fop_data_t *fop) { - ec_t *ec; ec_cbk_data_t *cbk; - ec_lock_t *lock; - ec_lock_link_t *link; - ec_inode_t *ctx; int32_t i, count; gf_boolean_t release = _gf_false; @@ -1840,57 +2040,8 @@ void ec_lock_reuse(ec_fop_data_t *fop) release = _gf_true; } - ec = fop->xl->private; - - for (i = 0; i < fop->lock_count; i++) - { - link = &fop->locks[i]; - lock = link->lock; - ctx = lock->ctx; - - LOCK(&lock->loc.inode->lock); - - ec_trace("LOCK_DONE", fop, "lock=%p", lock); - - GF_ASSERT(lock->owner == fop); - lock->owner = NULL; - lock->release |= release; - - if ((fop->error == 0) && (cbk != NULL) && (cbk->op_ret >= 0)) { - if (link->update[0]) { - ctx->post_version[0]++; - if (ec->node_mask & ~fop->good) { - ctx->dirty[0]++; - } - } - if (link->update[1]) { - ctx->post_version[1]++; - if (ec->node_mask & ~fop->good) { - ctx->dirty[1]++; - } - } - } - - ec_lock_update_good(lock, fop); - - link = NULL; - if (!list_empty(&lock->waiting)) - { - link = list_entry(lock->waiting.next, ec_lock_link_t, wait_list); - list_del_init(&link->wait_list); - - lock->owner = link->fop; - } - - UNLOCK(&lock->loc.inode->lock); - - if (link != NULL) - { - if (ec_lock_acquire(link)) { - ec_lock(link->fop); - } - ec_resume(link->fop, 0); - } + for (i = 0; i < fop->lock_count; i++) { + ec_lock_next_owner(&fop->locks[i], cbk, release); } } diff --git a/xlators/cluster/ec/src/ec-common.h b/xlators/cluster/ec/src/ec-common.h index 036da091f43..8e724a81380 100644 --- a/xlators/cluster/ec/src/ec-common.h +++ b/xlators/cluster/ec/src/ec-common.h @@ -27,6 +27,9 @@ typedef enum { #define EC_CONFIG_ALGORITHM 0 +#define EC_FLAG_LOCK_SHARED 0x0001 +#define EC_FLAG_WAITING_SIZE 0x0002 + #define EC_SELFHEAL_BIT 62 #define EC_MINIMUM_ONE -1 diff --git a/xlators/cluster/ec/src/ec-data.c b/xlators/cluster/ec/src/ec-data.c index 3dd1a34e265..34c8b6e92c8 100644 --- a/xlators/cluster/ec/src/ec-data.c +++ b/xlators/cluster/ec/src/ec-data.c @@ -135,6 +135,7 @@ ec_fop_data_t * ec_fop_data_allocate(call_frame_t * frame, xlator_t * this, return NULL; } + INIT_LIST_HEAD(&fop->owner_list); INIT_LIST_HEAD(&fop->cbk_list); INIT_LIST_HEAD(&fop->healer); INIT_LIST_HEAD(&fop->answer_list); diff --git a/xlators/cluster/ec/src/ec-data.h b/xlators/cluster/ec/src/ec-data.h index 8a48a7ca824..75ee7ef9c8a 100644 --- a/xlators/cluster/ec/src/ec-data.h +++ b/xlators/cluster/ec/src/ec-data.h @@ -139,9 +139,11 @@ struct _ec_lock { ec_inode_t *ctx; gf_timer_t *timer; + struct list_head owners; /* List of owners of this lock. */ struct list_head waiting; /* Queue of requests being serviced. */ struct list_head frozen; /* Queue of requests that will be serviced in the next unlock/lock cycle. */ + int32_t exclusive; uintptr_t mask; uintptr_t good_mask; uintptr_t healing; @@ -149,9 +151,9 @@ struct _ec_lock int32_t refs_frozen; int32_t inserted; gf_boolean_t acquired; + gf_boolean_t getting_size; gf_boolean_t release; gf_boolean_t query; - ec_fop_data_t *owner; fd_t *fd; loc_t loc; union @@ -185,6 +187,7 @@ struct _ec_fop_data xlator_t *xl; call_frame_t *req_frame; /* frame of the calling xlator */ call_frame_t *frame; /* frame used by this fop */ + struct list_head owner_list; /* member of lock owner list */ struct list_head cbk_list; /* sorted list of groups of answers */ struct list_head answer_list; /* list of answers */ struct list_head pending_list; /* member of ec_t.pending_fops */ diff --git a/xlators/cluster/ec/src/ec-dir-read.c b/xlators/cluster/ec/src/ec-dir-read.c index 03bb60cc7b7..fc8b38b22a4 100644 --- a/xlators/cluster/ec/src/ec-dir-read.c +++ b/xlators/cluster/ec/src/ec-dir-read.c @@ -212,7 +212,8 @@ void ec_opendir(call_frame_t * frame, xlator_t * this, uintptr_t target, GF_VALIDATE_OR_GOTO(this->name, frame, out); GF_VALIDATE_OR_GOTO(this->name, this->private, out); - fop = ec_fop_data_allocate(frame, this, GF_FOP_OPENDIR, 0, target, minimum, + fop = ec_fop_data_allocate(frame, this, GF_FOP_OPENDIR, + EC_FLAG_LOCK_SHARED, target, minimum, ec_wind_opendir, ec_manager_opendir, callback, data); if (fop == NULL) { @@ -510,7 +511,8 @@ void ec_readdir(call_frame_t * frame, xlator_t * this, uintptr_t target, GF_VALIDATE_OR_GOTO(this->name, frame, out); GF_VALIDATE_OR_GOTO(this->name, this->private, out); - fop = ec_fop_data_allocate(frame, this, GF_FOP_READDIR, 0, target, minimum, + fop = ec_fop_data_allocate(frame, this, GF_FOP_READDIR, + EC_FLAG_LOCK_SHARED, target, minimum, ec_wind_readdir, ec_manager_readdir, callback, data); if (fop == NULL) { @@ -578,9 +580,10 @@ void ec_readdirp(call_frame_t * frame, xlator_t * this, uintptr_t target, GF_VALIDATE_OR_GOTO(this->name, frame, out); GF_VALIDATE_OR_GOTO(this->name, this->private, out); - fop = ec_fop_data_allocate(frame, this, GF_FOP_READDIRP, 0, target, - minimum, ec_wind_readdirp, ec_manager_readdir, - callback, data); + fop = ec_fop_data_allocate(frame, this, GF_FOP_READDIRP, + EC_FLAG_LOCK_SHARED, target, minimum, + ec_wind_readdirp, ec_manager_readdir, callback, + data); if (fop == NULL) { goto out; } diff --git a/xlators/cluster/ec/src/ec-generic.c b/xlators/cluster/ec/src/ec-generic.c index 47118faa917..3f5856e7a86 100644 --- a/xlators/cluster/ec/src/ec-generic.c +++ b/xlators/cluster/ec/src/ec-generic.c @@ -890,9 +890,9 @@ void ec_lookup(call_frame_t * frame, xlator_t * this, uintptr_t target, GF_VALIDATE_OR_GOTO(this->name, frame, out); GF_VALIDATE_OR_GOTO(this->name, this->private, out); - fop = ec_fop_data_allocate(frame, this, GF_FOP_LOOKUP, 0, target, minimum, - ec_wind_lookup, ec_manager_lookup, callback, - data); + fop = ec_fop_data_allocate(frame, this, GF_FOP_LOOKUP, EC_FLAG_LOCK_SHARED, + target, minimum, ec_wind_lookup, + ec_manager_lookup, callback, data); if (fop == NULL) { goto out; } @@ -1085,9 +1085,9 @@ void ec_statfs(call_frame_t * frame, xlator_t * this, uintptr_t target, GF_VALIDATE_OR_GOTO(this->name, frame, out); GF_VALIDATE_OR_GOTO(this->name, this->private, out); - fop = ec_fop_data_allocate(frame, this, GF_FOP_STATFS, 0, target, minimum, - ec_wind_statfs, ec_manager_statfs, callback, - data); + fop = ec_fop_data_allocate(frame, this, GF_FOP_STATFS, EC_FLAG_LOCK_SHARED, + target, minimum, ec_wind_statfs, + ec_manager_statfs, callback, data); if (fop == NULL) { goto out; } diff --git a/xlators/cluster/ec/src/ec-inode-read.c b/xlators/cluster/ec/src/ec-inode-read.c index 8b76cc58abb..cbaa9bd9d3a 100644 --- a/xlators/cluster/ec/src/ec-inode-read.c +++ b/xlators/cluster/ec/src/ec-inode-read.c @@ -147,9 +147,9 @@ void ec_access(call_frame_t * frame, xlator_t * this, uintptr_t target, GF_VALIDATE_OR_GOTO(this->name, frame, out); GF_VALIDATE_OR_GOTO(this->name, this->private, out); - fop = ec_fop_data_allocate(frame, this, GF_FOP_ACCESS, 0, target, minimum, - ec_wind_access, ec_manager_access, callback, - data); + fop = ec_fop_data_allocate(frame, this, GF_FOP_ACCESS, EC_FLAG_LOCK_SHARED, + target, minimum, ec_wind_access, + ec_manager_access, callback, data); if (fop == NULL) { goto out; } @@ -468,9 +468,10 @@ ec_getxattr (call_frame_t *frame, xlator_t *this, uintptr_t target, return; } - fop = ec_fop_data_allocate(frame, this, GF_FOP_GETXATTR, 0, target, - minimum, ec_wind_getxattr, ec_manager_getxattr, - callback, data); + fop = ec_fop_data_allocate(frame, this, GF_FOP_GETXATTR, + EC_FLAG_LOCK_SHARED, target, minimum, + ec_wind_getxattr, ec_manager_getxattr, callback, + data); if (fop == NULL) { goto out; } @@ -607,8 +608,9 @@ ec_fgetxattr (call_frame_t *frame, xlator_t *this, uintptr_t target, GF_VALIDATE_OR_GOTO(this->name, frame, out); GF_VALIDATE_OR_GOTO(this->name, this->private, out); - fop = ec_fop_data_allocate(frame, this, GF_FOP_FGETXATTR, 0, target, - minimum, ec_wind_fgetxattr, ec_manager_getxattr, + fop = ec_fop_data_allocate(frame, this, GF_FOP_FGETXATTR, + EC_FLAG_LOCK_SHARED, target, minimum, + ec_wind_fgetxattr, ec_manager_getxattr, callback, data); if (fop == NULL) { goto out; @@ -896,8 +898,9 @@ void ec_open(call_frame_t * frame, xlator_t * this, uintptr_t target, GF_VALIDATE_OR_GOTO(this->name, frame, out); GF_VALIDATE_OR_GOTO(this->name, this->private, out); - fop = ec_fop_data_allocate(frame, this, GF_FOP_OPEN, 0, target, minimum, - ec_wind_open, ec_manager_open, callback, data); + fop = ec_fop_data_allocate(frame, this, GF_FOP_OPEN, EC_FLAG_LOCK_SHARED, + target, minimum, ec_wind_open, ec_manager_open, + callback, data); if (fop == NULL) { goto out; } @@ -1094,9 +1097,10 @@ void ec_readlink(call_frame_t * frame, xlator_t * this, uintptr_t target, GF_VALIDATE_OR_GOTO(this->name, frame, out); GF_VALIDATE_OR_GOTO(this->name, this->private, out); - fop = ec_fop_data_allocate(frame, this, GF_FOP_READLINK, 0, target, - minimum, ec_wind_readlink, ec_manager_readlink, - callback, data); + fop = ec_fop_data_allocate(frame, this, GF_FOP_READLINK, + EC_FLAG_LOCK_SHARED, target, minimum, + ec_wind_readlink, ec_manager_readlink, callback, + data); if (fop == NULL) { goto out; } @@ -1450,9 +1454,9 @@ void ec_readv(call_frame_t * frame, xlator_t * this, uintptr_t target, GF_VALIDATE_OR_GOTO(this->name, frame, out); GF_VALIDATE_OR_GOTO(this->name, this->private, out); - fop = ec_fop_data_allocate(frame, this, GF_FOP_READ, 0, target, minimum, - ec_wind_readv, ec_manager_readv, callback, - data); + fop = ec_fop_data_allocate(frame, this, GF_FOP_READ, EC_FLAG_LOCK_SHARED, + target, minimum, ec_wind_readv, + ec_manager_readv, callback, data); if (fop == NULL) { goto out; } @@ -1696,8 +1700,9 @@ void ec_stat(call_frame_t * frame, xlator_t * this, uintptr_t target, GF_VALIDATE_OR_GOTO(this->name, frame, out); GF_VALIDATE_OR_GOTO(this->name, this->private, out); - fop = ec_fop_data_allocate(frame, this, GF_FOP_STAT, 0, target, minimum, - ec_wind_stat, ec_manager_stat, callback, data); + fop = ec_fop_data_allocate(frame, this, GF_FOP_STAT, EC_FLAG_LOCK_SHARED, + target, minimum, ec_wind_stat, ec_manager_stat, + callback, data); if (fop == NULL) { goto out; } @@ -1810,8 +1815,9 @@ void ec_fstat(call_frame_t * frame, xlator_t * this, uintptr_t target, GF_VALIDATE_OR_GOTO(this->name, frame, out); GF_VALIDATE_OR_GOTO(this->name, this->private, out); - fop = ec_fop_data_allocate(frame, this, GF_FOP_FSTAT, 0, target, minimum, - ec_wind_fstat, ec_manager_stat, callback, data); + fop = ec_fop_data_allocate(frame, this, GF_FOP_FSTAT, EC_FLAG_LOCK_SHARED, + target, minimum, ec_wind_fstat, ec_manager_stat, + callback, data); if (fop == NULL) { goto out; } |