diff options
author | Raghavendra G <raghavendra@gluster.com> | 2009-11-30 15:48:38 +0000 |
---|---|---|
committer | Anand V. Avati <avati@dev.gluster.com> | 2009-12-01 05:45:52 -0800 |
commit | d7e0bf1757e5fae23bce3d09cd0d9fbbd822a067 (patch) | |
tree | c6235c2473b96d54472f07d6cd615e353109f239 | |
parent | 47a8f97b729dd6bb2a2dabd3ed250ebbdf8be7fc (diff) |
performance/stat-prefetch: make lookup to wait for the completion of another lookup on same path if one is in progress.
- If current lookup (2) does not wait for completion of the lookup (1) which
is in progress, there can be a race condition where (2) completes ahead of
(1) and resuming all the waiting operations in the queue. When (1) returns,
the original operation (eg., stat, chmod etc) might've already unwound and
hence the frame would've been destroyed.
Signed-off-by: Raghavendra G <raghavendra@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 221 (stat prefetch implementation)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=221
-rw-r--r-- | xlators/performance/stat-prefetch/src/stat-prefetch.c | 120 |
1 files changed, 106 insertions, 14 deletions
diff --git a/xlators/performance/stat-prefetch/src/stat-prefetch.c b/xlators/performance/stat-prefetch/src/stat-prefetch.c index e20b9964520..a1322ee4759 100644 --- a/xlators/performance/stat-prefetch/src/stat-prefetch.c +++ b/xlators/performance/stat-prefetch/src/stat-prefetch.c @@ -787,6 +787,77 @@ sp_is_empty (dict_t *this, char *key, data_t *value, void *data) } } + +int32_t +sp_lookup_helper (call_frame_t *frame,xlator_t *this, loc_t *loc, + dict_t *xattr_req) +{ + uint64_t value = 0; + sp_inode_ctx_t *inode_ctx = NULL; + int32_t ret = 0, op_ret = -1, op_errno = -1; + call_stub_t *stub = NULL; + char can_wind = 0; + + ret = inode_ctx_get (loc->inode, this, &value); + if (ret == -1) { + gf_log (this->name, GF_LOG_DEBUG, "context not set in inode " + "(%p)", loc->inode); + op_errno = EINVAL; + goto unwind; + } + + inode_ctx = (sp_inode_ctx_t *)(long) value; + GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, inode_ctx, unwind, op_errno, + EINVAL); + + stub = fop_lookup_stub (frame, sp_lookup_helper, loc, + xattr_req); + GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, stub, unwind, + op_errno, ENOMEM); + + LOCK (&inode_ctx->lock); + { + op_ret = inode_ctx->op_ret; + op_errno = inode_ctx->op_errno; + if (op_ret == 0) { + if (!inode_ctx->lookup_in_progress) { + inode_ctx->lookup_in_progress = 1; + can_wind = 1; + } else { + list_add_tail (&stub->list, + &inode_ctx->waiting_ops); + stub = NULL; + } + } + } + UNLOCK (&inode_ctx->lock); + + if (op_ret == -1) { + goto unwind; + } + + if (can_wind) { + STACK_WIND (frame, sp_lookup_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->lookup, loc, + xattr_req); + } + + if (stub != NULL) { + call_stub_destroy (stub); + } + + return 0; + +unwind: + SP_STACK_UNWIND (lookup, frame, -1, op_errno, NULL, NULL, NULL, NULL); + if (stub != NULL) { + call_stub_destroy (stub); + } + + return 0; +} + + /* * TODO: implement sending lookups for every fop done on this path. As of now * lookup on the path is sent only for the first fop on this path. @@ -797,12 +868,13 @@ sp_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xattr_req) gf_dirent_t *dirent = NULL; char entry_cached = 0; uint64_t value = 0; - char xattr_req_empty = 1; + char xattr_req_empty = 1, can_wind = 0; sp_cache_t *cache = NULL; struct stat postparent = {0, }, buf = {0, }; int32_t ret = -1, op_ret = -1, op_errno = EINVAL; sp_inode_ctx_t *inode_ctx = NULL, *parent_inode_ctx = NULL; sp_local_t *local = NULL; + call_stub_t *stub = NULL; if (loc == NULL || loc->inode == NULL) { goto unwind; @@ -880,7 +952,7 @@ sp_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xattr_req) } } -wind: +wind: if (entry_cached) { if (cache) { cache->hits++; @@ -892,19 +964,15 @@ wind: sp_cache_unref (cache); } - LOCK (&inode_ctx->lock); - { - if (!(inode_ctx->looked_up - || inode_ctx->lookup_in_progress)) { - inode_ctx->lookup_in_progress = 1; - } - } - UNLOCK (&inode_ctx->lock); + stub = fop_lookup_stub (frame, sp_lookup_helper, loc, + xattr_req); + GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, stub, unwind, + op_errno, ENOMEM); local = CALLOC (1, sizeof (*local)); GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, local, unwind, op_errno, ENOMEM); - + frame->local = local; ret = loc_copy (&local->loc, loc); @@ -917,9 +985,33 @@ wind: local->is_lookup = 1; - STACK_WIND (frame, sp_lookup_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->lookup, loc, xattr_req); - + LOCK (&inode_ctx->lock); + { + if (inode_ctx->lookup_in_progress) { + list_add_tail (&stub->list, + &inode_ctx->waiting_ops); + stub = NULL; + } else { + can_wind = 1; + } + + if (!(inode_ctx->looked_up + || inode_ctx->lookup_in_progress)) { + inode_ctx->lookup_in_progress = 1; + } + } + UNLOCK (&inode_ctx->lock); + + if (can_wind) { + STACK_WIND (frame, sp_lookup_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->lookup, loc, + xattr_req); + } + + if (stub != NULL) { + call_stub_destroy (stub); + } + return 0; } |