diff options
| author | vmallika <vmallika@redhat.com> | 2015-03-20 14:25:58 +0530 | 
|---|---|---|
| committer | Vijay Bellur <vbellur@redhat.com> | 2015-03-31 23:38:42 -0700 | 
| commit | 3d4203fe3acad47f0a29ebe094b3e427b13f363f (patch) | |
| tree | ae8b9ff2290cc57c75a8d273cd0fa2d908445933 | |
| parent | d6b6c32ac7f0a6b3d77009aec8bdd0cd289bd0ff (diff) | |
quota: enhancement of build ancestry
There can a small race window where marker accounting will
be missed.
Consider below example where NFS mount is used
and bricks are restarted and a write operation is performed
Currently:
T1: lookup from protocol server, quota will initiate ancestry build
T2: opendir FOP from ancestry build
T3: write FOP from client
T4: write_cbk
T5: marker initiate accounting.
    There will be a problem here and txn aborts,
    because build_ancestry is not complete
    and parent not found for an inode
T6: opendir_cbk
T7: initiate readdirp, posix builds ancestry in readdirp
With this patch, now calling readdirp on anonoymous fd 
during build ancestry:
T1: lookup from protocol server, quota will initiate ancestry build
T2: readirp FOP from ancestry build, posix builds ancestry in 
    readdirp 
T3: write FOP from client
T4: write_cbk
T5: marker initiate accounting. 
    No problem here as build ancestry was performed at T1
Change-Id: I2c262c86f34f6c574940a6b8772d94f2bade9465
BUG: 1184885
Signed-off-by: vmallika <vmallika@redhat.com>
Reviewed-on: http://review.gluster.org/9954
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Tested-by: Vijay Bellur <vbellur@redhat.com>
| -rw-r--r-- | xlators/features/quota/src/quota.c | 135 | 
1 files changed, 45 insertions, 90 deletions
diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c index 2afe58f6210..3f94498f5ad 100644 --- a/xlators/features/quota/src/quota.c +++ b/xlators/features/quota/src/quota.c @@ -751,130 +751,85 @@ cleanup:          return 0;  } -int32_t -quota_build_ancestry_open_cbk (call_frame_t *frame, void *cookie, -                               xlator_t *this, int32_t op_ret, int32_t op_errno, -                               fd_t *fd, dict_t *xdata) -{ -        dict_t        *xdata_req = NULL; -        quota_local_t *local     = NULL; - -        if (op_ret < 0) { -                goto err; -        } - -        xdata_req = dict_new (); -        if (xdata_req == NULL) { -                op_ret = -ENOMEM; -                goto err; -        } - -        op_ret = dict_set_int8 (xdata_req, QUOTA_LIMIT_KEY, 1); -        if (op_ret < 0) { -                op_errno = -op_ret; -                goto err; -        } - -        op_ret = dict_set_int8 (xdata_req, GET_ANCESTRY_DENTRY_KEY, 1); -        if (op_ret < 0) { -                op_errno = -op_ret; -                goto err; -        } - -        /* This would ask posix layer to construct dentry chain till root */ -        STACK_WIND (frame, quota_build_ancestry_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->readdirp, fd, 0, 0, xdata_req); - -        op_ret = 0; - -err: -        fd_unref (fd); - -        if (xdata_req) -                dict_unref (xdata_req); - -        if (op_ret < 0) { -                local = frame->local; -                frame->local = NULL; - -                local->ancestry_cbk (NULL, NULL, -1, op_errno, -                                     local->ancestry_data); -                quota_local_cleanup (this, local); -                STACK_DESTROY (frame->root); -        } - -        return 0; -} -  int  quota_build_ancestry (inode_t *inode, quota_ancestry_built_t ancestry_cbk,                        void *data)  { -        loc_t          loc       = {0, };          fd_t          *fd        = NULL;          quota_local_t *local     = NULL;          call_frame_t  *new_frame = NULL; -        int            op_errno  = EINVAL; +        int            op_errno  = ENOMEM; +        int            op_ret    = -1;          xlator_t      *this      = NULL; +        dict_t        *xdata_req = NULL;          this = THIS; -        loc.inode = inode_ref (inode); -        uuid_copy (loc.gfid, inode->gfid); +        xdata_req = dict_new (); +        if (xdata_req == NULL) +                goto err; -        fd = fd_create (inode, 0); +        fd = fd_anonymous (inode); +        if (fd == NULL) +                goto err;          new_frame = create_frame (this, this->ctx->pool); -        if (new_frame == NULL) { -                op_errno = ENOMEM; +        if (new_frame == NULL)                  goto err; -        } - -        new_frame->root->uid = new_frame->root->gid = 0;          local = quota_local_new (); -        if (local == NULL) { -                op_errno = ENOMEM; +        if (local == NULL)                  goto err; -        } +        new_frame->root->uid = new_frame->root->gid = 0;          new_frame->local = local;          local->ancestry_cbk = ancestry_cbk;          local->ancestry_data = data;          local->loc.inode = inode_ref (inode); -        if (IA_ISDIR (inode->ia_type)) { -                STACK_WIND (new_frame, quota_build_ancestry_open_cbk, -                            FIRST_CHILD(this), -                            FIRST_CHILD(this)->fops->opendir, &loc, fd, -                            NULL); -        } else { -                STACK_WIND (new_frame, quota_build_ancestry_open_cbk, -                            FIRST_CHILD(this), -                            FIRST_CHILD(this)->fops->open, &loc, 0, fd, -                            NULL); +        op_ret = dict_set_int8 (xdata_req, QUOTA_LIMIT_KEY, 1); +        if (op_ret < 0) { +                op_errno = -op_ret; +                goto err;          } -        loc_wipe (&loc); -        return 0; +        op_ret = dict_set_int8 (xdata_req, GET_ANCESTRY_DENTRY_KEY, 1); +        if (op_ret < 0) { +                op_errno = -op_ret; +                goto err; +        } + +        /* This would ask posix layer to construct dentry chain till root +         * We don't need to do a opendir, we can use the anonymous fd +         * here for  the readidrp. +         * avoiding opendir also reduces the window size where another FOP +         * can be executed before completion of build ancestry +         */ +        STACK_WIND (new_frame, quota_build_ancestry_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->readdirp, fd, 0, 0, xdata_req); + +        op_ret = 0;  err: -        ancestry_cbk (NULL, NULL, -1, op_errno, data); +        if (fd) +                fd_unref (fd); -        fd_unref (fd); +        if (xdata_req) +                dict_unref (xdata_req); -        local = new_frame->local; -        new_frame->local = NULL; +        if (op_ret < 0) { +                ancestry_cbk (NULL, NULL, -1, op_errno, data); -        if (local != NULL) { -                quota_local_cleanup (this, local); -        } +                if (new_frame) { +                        local = new_frame->local; +                        new_frame->local = NULL; +                        STACK_DESTROY (new_frame->root); +                } -        if (new_frame != NULL) { -                STACK_DESTROY (new_frame->root); +                if (local) +                        quota_local_cleanup (this, local);          } -        loc_wipe (&loc);          return 0;  }  | 
