diff options
| -rw-r--r-- | xlators/features/quota/src/quota.c | 1462 | ||||
| -rw-r--r-- | xlators/features/quota/src/quota.h | 50 | 
2 files changed, 954 insertions, 558 deletions
diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c index 744748fd..9ad7f2b8 100644 --- a/xlators/features/quota/src/quota.c +++ b/xlators/features/quota/src/quota.c @@ -18,54 +18,6 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,                     char *name, uuid_t par);  struct volume_options options[]; - -static int32_t -__quota_init_inode_ctx (inode_t *inode, int64_t hard_lim, int64_t soft_lim, -                        xlator_t *this, dict_t *dict, struct iatt *buf, -                        quota_inode_ctx_t **context) -{ -        int32_t            ret  = -1; -        int64_t           *size = 0; -        quota_inode_ctx_t *ctx  = NULL; - -        if (inode == NULL) { -                goto out; -        } - -        QUOTA_ALLOC_OR_GOTO (ctx, quota_inode_ctx_t, out); - -        ctx->hard_lim = hard_lim; -        ctx->soft_lim = soft_lim; - -        if (buf) -                ctx->buf = *buf; - -        LOCK_INIT(&ctx->lock); - -        if (context != NULL) { -                *context = ctx; -        } - -        INIT_LIST_HEAD (&ctx->parents); - -        if (dict != NULL) { -                ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size); -                if (ret == 0) { -                        ctx->size = ntoh64 (*size); -                        gettimeofday (&ctx->tv, NULL); -                } -        } - -        ret = __inode_ctx_put (inode, this, (uint64_t )(long)ctx); -        if (ret == -1) { -                gf_log (this->name, GF_LOG_WARNING, -                        "cannot set quota context in inode (gfid:%s)", -                        uuid_utoa (inode->gfid)); -        } -out: -        return ret; -} -  int  quota_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path)  { @@ -230,18 +182,131 @@ out:          return;  } + +int32_t +quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                    int32_t op_ret, int32_t op_errno, dict_t *dict, +                    dict_t *xdata) +{ +        quota_local_t     *local          = NULL; +        uint32_t           validate_count = 0, link_count = 0; +        int32_t            ret            = 0; +        quota_inode_ctx_t *ctx            = NULL; +        int64_t           *size           = 0; +        uint64_t           value          = 0; +        call_stub_t       *stub           = NULL; + +        local = frame->local; + +        if (op_ret < 0) { +                goto unwind; +        } + +        GF_ASSERT (local); +        GF_ASSERT (frame); +        GF_VALIDATE_OR_GOTO_WITH_ERROR ("quota", this, unwind, op_errno, +                                        EINVAL); +        GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, dict, unwind, op_errno, +                                        EINVAL); + +        ret = inode_ctx_get (local->validate_loc.inode, this, &value); + +        ctx = (quota_inode_ctx_t *)(unsigned long)value; +        if ((ret == -1) || (ctx == NULL)) { +                gf_log (this->name, GF_LOG_WARNING, +                        "quota context is not present in inode (gfid:%s)", +                        uuid_utoa (local->validate_loc.inode->gfid)); +                op_errno = EINVAL; +                goto unwind; +        } + +        ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "size key not present in dict"); +                op_errno = EINVAL; +                goto unwind; +        } + +        local->just_validated = 1; /* so that we don't go into infinite +                                    * loop of validation and checking +                                    * limit when timeout is zero. +                                    */ +        LOCK (&ctx->lock); +        { +                ctx->size = ntoh64 (*size); +                gettimeofday (&ctx->tv, NULL); +        } +        UNLOCK (&ctx->lock); + +        quota_check_limit (frame, local->validate_loc.inode, this, NULL, NULL); +        return 0; + +unwind: +        LOCK (&local->lock); +        { +                local->op_ret = -1; +                local->op_errno = op_errno; + +                validate_count = --local->validate_count; +                link_count = local->link_count; + +                if ((validate_count == 0) && (link_count == 0)) { +                        stub = local->stub; +                        local->stub = NULL; +                } +        } +        UNLOCK (&local->lock); + +        if (stub != NULL) { +                call_resume (stub); +        } + +        return 0; +} + + +static inline uint64_t +quota_time_elapsed (struct timeval *now, struct timeval *then) +{ +        return (now->tv_sec - then->tv_sec); +} + + +int32_t +quota_timeout (struct timeval *tv, int32_t timeout) +{ +        struct timeval now       = {0,}; +        int32_t        timed_out = 0; + +        gettimeofday (&now, NULL); + +        if (quota_time_elapsed (&now, tv) >= timeout) { +                timed_out = 1; +        } + +        return timed_out; +} + +  int32_t  quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,                     char *name, uuid_t par)  { +        int32_t               ret            = -1;          inode_t              *_inode         = NULL, *parent = NULL;          quota_inode_ctx_t    *ctx            = NULL; +        quota_priv_t         *priv           = NULL;          quota_local_t        *local          = NULL; -        char                  need_unwind = 0; +        char                  need_validate  = 0, need_unwind = 0;          int64_t               delta          = 0; +        call_stub_t          *stub           = NULL; +        int32_t               validate_count = 0, link_count = 0;          uint64_t              value          = 0; +        char                  just_validated = 0;          uuid_t                trav_uuid      = {0,}; +          GF_VALIDATE_OR_GOTO ("quota", this, out);          GF_VALIDATE_OR_GOTO (this->name, frame, out);          GF_VALIDATE_OR_GOTO (this->name, inode, out); @@ -251,11 +316,25 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,          delta = local->delta; +        GF_VALIDATE_OR_GOTO (this->name, local->stub, out); + +        priv = this->private; +          inode_ctx_get (inode, this, &value);          ctx = (quota_inode_ctx_t *)(unsigned long)value;          _inode = inode_ref (inode); +        LOCK (&local->lock); +        { +                just_validated = local->just_validated; +                local->just_validated = 0; + +                if (just_validated) { +                        local->validate_count--; +                } +        } +        UNLOCK (&local->lock);          if ( par != NULL ) {                  uuid_copy (trav_uuid, par); @@ -265,9 +344,13 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,                  if (ctx != NULL) {                          LOCK (&ctx->lock);                          { -                                if (ctx->hard_lim >= 0) { -                                        if ((ctx->size + delta) -                                                   >= ctx->hard_lim) { +                                if (ctx->limit >= 0) { +                                        if (!just_validated +                                            && quota_timeout (&ctx->tv, +                                                              priv->timeout)) { +                                                need_validate = 1; +                                        } else if ((ctx->size + delta) +                                                   >= ctx->limit) {                                                  local->op_ret = -1;                                                  local->op_errno = EDQUOT;                                                  need_unwind = 1; @@ -276,6 +359,10 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,                          }                          UNLOCK (&ctx->lock); +                        if (need_validate) { +                                goto validate; +                        } +                          if (need_unwind) {                                  break;                          } @@ -301,6 +388,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,                  inode_unref (_inode);                  _inode = parent; +                just_validated = 0;                  if (_inode == NULL) {                          break; @@ -311,54 +399,115 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,                  ctx = (quota_inode_ctx_t *)(unsigned long)value;          } while (1); -        inode_unref (_inode); +        ret = 0; + +        if (_inode != NULL) { +                inode_unref (_inode); +        } + +        LOCK (&local->lock); +        { +                validate_count = local->validate_count; +                link_count = local->link_count; +                if ((validate_count == 0) && (link_count == 0)) { +                        stub = local->stub; +                        local->stub = NULL; +                } +        } +        UNLOCK (&local->lock); + +        if (stub != NULL) { +                call_resume (stub); +        }  out: -        return local->op_ret; +        return ret; + +validate: +        LOCK (&local->lock); +        { +                loc_wipe (&local->validate_loc); + +                if (just_validated) { +                        local->validate_count--; +                } + +                local->validate_count++; +                ret = quota_inode_loc_fill (_inode, &local->validate_loc); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "cannot fill loc for inode (gfid:%s), hence " +                                "aborting quota-checks and continuing with fop", +                                uuid_utoa (_inode->gfid)); +                        local->validate_count--; +                } +        } +        UNLOCK (&local->lock); + +        if (ret < 0) { +                goto loc_fill_failed; +        } + +        STACK_WIND (frame, quota_validate_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->getxattr, &local->validate_loc, +                    QUOTA_SIZE_KEY, NULL); + +loc_fill_failed: +        inode_unref (_inode); +        return 0;  } -int32_t -quota_get_limit_value (inode_t *inode, xlator_t *this, int64_t *n) +static int32_t +__quota_init_inode_ctx (inode_t *inode, int64_t limit, xlator_t *this, +                        dict_t *dict, struct iatt *buf, +                        quota_inode_ctx_t **context)  { -        int32_t       ret        = 0; -        char         *path       = NULL; -        limits_t     *limit_node = NULL; -        quota_priv_t *priv       = NULL; +        int32_t            ret  = -1; +        int64_t           *size = 0; +        quota_inode_ctx_t *ctx  = NULL; -        if (inode == NULL || n == NULL) { -                ret = -1; +        if (inode == NULL) {                  goto out;          } -        *n = 0; +        QUOTA_ALLOC_OR_GOTO (ctx, quota_inode_ctx_t, out); -        ret = inode_path (inode, NULL, &path); -        if (ret < 0) { -                ret = -1; -                goto out; +        ctx->limit = limit; +        if (buf) +                ctx->buf = *buf; + +        LOCK_INIT(&ctx->lock); + +        if (context != NULL) { +                *context = ctx;          } -        priv = this->private; +        INIT_LIST_HEAD (&ctx->parents); -        list_for_each_entry (limit_node, &priv->limit_head, limit_list) { -                if (strcmp (limit_node->path, path) == 0) { -                        *n = limit_node->hard_lim; -                        break; +        if (dict != NULL) { +                ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size); +                if (ret == 0) { +                        ctx->size = ntoh64 (*size); +                        gettimeofday (&ctx->tv, NULL);                  }          } +        ret = __inode_ctx_put (inode, this, (uint64_t )(long)ctx); +        if (ret == -1) { +                gf_log (this->name, GF_LOG_WARNING, +                        "cannot set quota context in inode (gfid:%s)", +                        uuid_utoa (inode->gfid)); +        }  out: -        GF_FREE (path); -          return ret;  }  static int32_t -quota_inode_ctx_get (inode_t *inode, int64_t hard_lim, int64_t soft_lim, -                     xlator_t *this, dict_t *dict, struct iatt *buf, -                     quota_inode_ctx_t **ctx, char create_if_absent) +quota_inode_ctx_get (inode_t *inode, int64_t limit, xlator_t *this, +                     dict_t *dict, struct iatt *buf, quota_inode_ctx_t **ctx, +                     char create_if_absent)  {          int32_t  ret = 0;          uint64_t ctx_int; @@ -370,8 +519,8 @@ quota_inode_ctx_get (inode_t *inode, int64_t hard_lim, int64_t soft_lim,                  if ((ret == 0) && (ctx != NULL)) {                          *ctx = (quota_inode_ctx_t *) (unsigned long)ctx_int;                  } else if (create_if_absent) { -                        ret = __quota_init_inode_ctx (inode, hard_lim, soft_lim, -                                                      this, dict, buf, ctx); +                        ret = __quota_init_inode_ctx (inode, limit, this, dict, +                                                      buf, ctx);                  }          }          UNLOCK (&inode->lock); @@ -390,21 +539,20 @@ quota_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          quota_local_t     *local      = NULL;          quota_inode_ctx_t *ctx        = NULL;          quota_dentry_t    *dentry     = NULL; +        int64_t           *size       = 0;          uint64_t           value      = 0;          limits_t          *limit_node = NULL;          quota_priv_t      *priv       = NULL; -        int64_t           *size       = NULL;          local = frame->local; -          priv = this->private;          inode_ctx_get (inode, this, &value);          ctx = (quota_inode_ctx_t *)(unsigned long)value;          if ((op_ret < 0) || (local == NULL) -            || (((ctx == NULL) || (ctx->hard_lim == local->hard_lim)) -                && (local->hard_lim < 0) && !((IA_ISREG (buf->ia_type)) +            || (((ctx == NULL) || (ctx->limit == local->limit)) +                && (local->limit < 0) && !((IA_ISREG (buf->ia_type))                                             || (IA_ISLNK (buf->ia_type))))) {                  goto unwind;          } @@ -421,8 +569,8 @@ quota_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          }          UNLOCK (&priv->lock); -        ret = quota_inode_ctx_get (local->loc.inode, local->hard_lim, -                                   local->soft_lim, this, dict, buf, &ctx, 1); +        ret = quota_inode_ctx_get (local->loc.inode, local->limit, this, dict, +                                   buf, &ctx, 1);          if ((ret == -1) || (ctx == NULL)) {                  gf_log (this->name, GF_LOG_WARNING, "cannot create quota "                          "context in inode(gfid:%s)", @@ -434,17 +582,21 @@ quota_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          LOCK (&ctx->lock);          { -                ctx->hard_lim = local->hard_lim; -                ctx->soft_lim = local->soft_lim; -                ctx->buf = *buf; +                if (dict != NULL) { +                        ret = dict_get_bin (dict, QUOTA_SIZE_KEY, +                                            (void **) &size); +                        if (ret == 0) { +                                ctx->size = ntoh64 (*size); +                                gettimeofday (&ctx->tv, NULL); +                        } +                } -                /* will be useful for quotad to determine whether quota xlator's -                   context is maintaining the correct size. -                */ -                size = GF_CALLOC (1, sizeof (*size), gf_quota_mt_int64_t); -                *size = hton64 (ctx->size); -                ret = dict_set_bin (dict, "trusted.limit.set", size, 8); +                if (local->limit != ctx->limit) { +                        ctx->limit = local->limit; +                } + +                ctx->buf = *buf;                  if (!(IA_ISREG (buf->ia_type) || IA_ISLNK (buf->ia_type))) {                          goto unlock; @@ -494,23 +646,21 @@ int32_t  quota_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc,                dict_t *xattr_req)  { -        quota_priv_t       *priv        = NULL;          int32_t             ret         = -1; +        int64_t             limit       = -1;          limits_t           *limit_node  = NULL; +        gf_boolean_t        dict_newed  = _gf_false; +        quota_priv_t       *priv        = NULL;          quota_local_t      *local       = NULL; -        int64_t             hard_lim    = -1; -        int64_t             soft_lim    = -1;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); - +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, lookup, loc, +                                xattr_req);          list_for_each_entry (limit_node, &priv->limit_head, limit_list) {                  if (strcmp (limit_node->path, loc->path) == 0) { -                        hard_lim = limit_node->hard_lim; -                        soft_lim = limit_node->soft_lim; -                        break; +                        limit = limit_node->value;                  }          } @@ -526,18 +676,25 @@ quota_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc,          frame->local = local; -        local->hard_lim = hard_lim; -        local->soft_lim = soft_lim; +        local->limit = limit; -        if (hard_lim < 0) { +        if (limit < 0) {                  goto wind;          } +        if (xattr_req == NULL) { +                xattr_req  = dict_new (); +                dict_newed = _gf_true; +        } + +        ret = dict_set_uint64 (xattr_req, QUOTA_SIZE_KEY, 0); +        if (ret < 0) { +                goto err; +        } +  wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_lookup_cbk: default_lookup_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup, loc, -                    xattr_req); +        STACK_WIND (frame, quota_lookup_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->lookup, loc, xattr_req);          ret = 0; @@ -547,6 +704,10 @@ err:                                      NULL, NULL, NULL, NULL);          } +        if (dict_newed == _gf_true) { +                dict_unref (xattr_req); +        } +          return 0;  } @@ -574,12 +735,10 @@ quota_update_size (xlator_t *this, inode_t *inode, char *name, uuid_t par,          }          do { -                if ((ctx != NULL) && (ctx->hard_lim >= 0)) { +                if ((ctx != NULL) && (ctx->limit >= 0)) {                          LOCK (&ctx->lock);                          {                                  ctx->size += delta; -                                if (ctx->size < 0) -                                        ctx->size = 0;                          }                          UNLOCK (&ctx->lock);                  } @@ -608,8 +767,6 @@ quota_update_size (xlator_t *this, inode_t *inode, char *name, uuid_t par,                          break;                  } -                value = 0; -                ctx = NULL;                  inode_ctx_get (_inode, this, &value);                  ctx = (quota_inode_ctx_t *)(unsigned long)value;          } while (1); @@ -661,8 +818,8 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          list_for_each_entry (dentry, &ctx->parents, next) {                  delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512; -                quota_update_size (this, local->loc.inode, dentry->name, -                                   dentry->par, delta); +                quota_update_size (this, local->loc.inode, +                                   dentry->name, dentry->par, delta);          }  out: @@ -674,21 +831,53 @@ out:  int32_t +quota_writev_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, +                     struct iovec *vector, int32_t count, off_t off, +                     uint32_t flags, struct iobref *iobref, dict_t *xdata) +{ +        quota_local_t *local    = NULL; +        int32_t        op_errno = EINVAL; + +        local = frame->local; +        if (local == NULL) { +                gf_log (this->name, GF_LOG_WARNING, "local is NULL"); +                goto unwind; +        } + +        if (local->op_ret == -1) { +                op_errno = local->op_errno; +                goto unwind; +        } + +        STACK_WIND (frame, quota_writev_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->writev, fd, vector, count, off, +                    flags, iobref, xdata); +        return 0; + +unwind: +        QUOTA_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL); +        return 0; +} + + +int32_t  quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,                struct iovec *vector, int32_t count, off_t off,                uint32_t flags, struct iobref *iobref, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          int32_t            ret     = -1, op_errno = EINVAL;          int32_t            parents = 0;          uint64_t           size    = 0;          quota_local_t     *local   = NULL;          quota_inode_ctx_t *ctx     = NULL; +        quota_priv_t      *priv    = NULL; +        call_stub_t       *stub    = NULL;          quota_dentry_t    *dentry  = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, writev, fd, vector, +                                count, off, flags, iobref, xdata);          GF_ASSERT (frame);          GF_VALIDATE_OR_GOTO ("quota", this, unwind); @@ -702,8 +891,7 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,          frame->local = local;          local->loc.inode = inode_ref (fd->inode); -        ret = quota_inode_ctx_get (fd->inode, -1, -1, this, NULL, NULL, &ctx, -                                   0); +        ret = quota_inode_ctx_get (fd->inode, -1, this, NULL, NULL, &ctx, 0);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_WARNING,                          "quota context not set in inode (gfid:%s)", @@ -711,6 +899,13 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,                  goto unwind;          } +        stub = fop_writev_stub (frame, quota_writev_helper, fd, vector, count, +                                off, flags, iobref, xdata); +        if (stub == NULL) { +                op_errno = ENOMEM; +                goto unwind; +        } +          priv = this->private;          GF_VALIDATE_OR_GOTO (this->name, priv, unwind); @@ -724,22 +919,32 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,          UNLOCK (&ctx->lock);          local->delta = size; +        local->stub = stub;          local->link_count = parents;          list_for_each_entry (dentry, &ctx->parents, next) {                  ret = quota_check_limit (frame, fd->inode, this, dentry->name,                                           dentry->par);                  if (ret == -1) { -                        op_errno = EDQUOT; -                        goto unwind; +                        break;                  }          } -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_writev_cbk: default_writev_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, fd, -                    vector, count, off, flags, iobref, xdata); +        stub = NULL; + +        LOCK (&local->lock); +        { +                local->link_count = 0; +                if (local->validate_count == 0) { +                        stub = local->stub; +                        local->stub = NULL; +                } +        } +        UNLOCK (&local->lock); + +        if (stub != NULL) { +                call_resume (stub); +        }          return 0; @@ -762,16 +967,48 @@ quota_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  int32_t +quota_mkdir_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, +                    mode_t mode, mode_t umask, dict_t *xdata) +{ +        quota_local_t *local    = NULL; +        int32_t        op_errno = EINVAL; + +        local = frame->local; +        if (local == NULL) { +                gf_log (this->name, GF_LOG_WARNING, "local is NULL"); +                goto unwind; +        } + +        op_errno = local->op_errno; + +        if (local->op_ret == -1) { +                goto unwind; +        } + +        STACK_WIND (frame, quota_mkdir_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata); +        return 0; + +unwind: +        QUOTA_STACK_UNWIND (mkdir, frame, -1, op_errno, NULL, NULL, +                            NULL, NULL, NULL); +        return 0; +} + + +int32_t  quota_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,               mode_t umask, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          int32_t        ret            = 0, op_errno = 0;          quota_local_t *local          = NULL; +        call_stub_t   *stub           = NULL; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, mkdir, loc, mode, +                                umask, xdata);          local = quota_local_new ();          if (local == NULL) { @@ -790,25 +1027,39 @@ quota_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,                  goto err;          } +        stub = fop_mkdir_stub (frame, quota_mkdir_helper, loc, mode, umask, +                               xdata); +        if (stub == NULL) { +                op_errno = ENOMEM; +                goto err; +        } + +        local->stub = stub;          local->delta = 0; -        ret = quota_check_limit (frame, loc->parent, this, NULL, NULL); -        if (ret == -1) { -                op_errno = EDQUOT; -                goto err; +        quota_check_limit (frame, loc->parent, this, NULL, NULL); + +        stub = NULL; + +        LOCK (&local->lock); +        { +                if (local->validate_count == 0) { +                        stub = local->stub; +                        local->stub = NULL; +                } + +                local->link_count = 0;          } +        UNLOCK (&local->lock); -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_mkdir_cbk: default_mkdir_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir, loc, -                    mode, umask, xdata); +        if (stub != NULL) { +                call_resume (stub); +        }          return 0;  err:          QUOTA_STACK_UNWIND (mkdir, frame, -1, op_errno, NULL, NULL, NULL,                              NULL, NULL); -          return 0;  } @@ -829,7 +1080,7 @@ quota_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto unwind;          } -        ret = quota_inode_ctx_get (inode, -1, -1, this, NULL, buf, &ctx, 1); +        ret = quota_inode_ctx_get (inode, -1, this, NULL, buf, &ctx, 1);          if ((ret == -1) || (ctx == NULL)) {                  gf_log (this->name, GF_LOG_WARNING, "cannot create quota "                          "context in inode(gfid:%s)", @@ -866,21 +1117,52 @@ unwind:  int32_t +quota_create_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, +                     int32_t flags, mode_t mode, mode_t umask, fd_t *fd, +                     dict_t *xdata) +{ +        quota_local_t *local    = NULL; +        int32_t        op_errno = EINVAL; + +        local = frame->local; +        if (local == NULL) { +                gf_log (this->name, GF_LOG_WARNING, "local is NULL"); +                goto unwind; +        } + +        if (local->op_ret == -1) { +                op_errno = local->op_errno; +                goto unwind; +        } + +        STACK_WIND (frame, quota_create_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, +                    fd, xdata); +        return 0; + +unwind: +        QUOTA_STACK_UNWIND (create, frame, -1, op_errno, NULL, NULL, +                            NULL, NULL, NULL, NULL); +        return 0; +} + + +int32_t  quota_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,                mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          int32_t            ret            = -1;          quota_local_t     *local          = NULL; -        int32_t            op_errno       = 0; +        call_stub_t       *stub           = NULL; +        quota_priv_t      *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, create, loc, flags, +                                mode, umask, fd, xdata);          local = quota_local_new ();          if (local == NULL) { -                op_errno = ENOMEM;                  goto err;          } @@ -889,29 +1171,41 @@ quota_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,          ret = loc_copy (&local->loc, loc);          if (ret) {                  gf_log (this->name, GF_LOG_WARNING, "loc_copy failed"); -                op_errno = ENOMEM;                  goto err;          } +        stub = fop_create_stub (frame, quota_create_helper, loc, flags, mode, +                                umask, fd, xdata); +        if (stub == NULL) { +                goto err; +        } + +        local->link_count = 1; +        local->stub = stub;          local->delta = 0; -        ret = quota_check_limit (frame, loc->parent, this, NULL, NULL); -        if (ret == -1) { -                op_errno = EDQUOT; -                goto err; +        quota_check_limit (frame, loc->parent, this, NULL, NULL); + +        stub = NULL; + +        LOCK (&local->lock); +        { +                local->link_count = 0; +                if (local->validate_count == 0) { +                        stub = local->stub; +                        local->stub = NULL; +                }          } +        UNLOCK (&local->lock); -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_create_cbk: default_create_cbk, -                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->create, loc, -                    flags, mode, umask, fd, xdata); +        if (stub != NULL) { +                call_resume (stub); +        }          return 0;  err: -        QUOTA_STACK_UNWIND (create, frame, -1, op_errno, NULL, NULL, NULL, -                            NULL, NULL, NULL); - +        QUOTA_STACK_UNWIND (create, frame, -1, ENOMEM, NULL, NULL, NULL, NULL, +                            NULL, NULL);          return 0;  } @@ -924,8 +1218,6 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          quota_local_t     *local = NULL;          quota_inode_ctx_t *ctx   = NULL;          uint64_t           value = 0; -        quota_dentry_t    *dentry = NULL; -        quota_dentry_t    *old_dentry = NULL;          if (op_ret < 0) {                  goto out; @@ -947,21 +1239,6 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                             local->loc.parent->gfid,                             (-(ctx->buf.ia_blocks * 512))); -        LOCK (&ctx->lock); -        { -                list_for_each_entry (dentry, &ctx->parents, next) { -                        if ((strcmp (dentry->name, local->loc.name) == 0) && -                            (uuid_compare (local->loc.parent->gfid, -                                           dentry->par) == 0)) { -                                old_dentry = dentry; -                                break; -                        } -                } -                if (old_dentry) -                        __quota_dentry_free (old_dentry); -        } -        UNLOCK (&ctx->lock); -  out:          QUOTA_STACK_UNWIND (unlink, frame, op_ret, op_errno, preparent,                              postparent, xdata); @@ -973,13 +1250,14 @@ int32_t  quota_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,                dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          int32_t        ret = 0;          quota_local_t *local = NULL; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, unlink, loc, xflag, +                                xdata);          local = quota_local_new ();          if (local == NULL) { @@ -994,11 +1272,8 @@ quota_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,                  goto err;          } -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_unlink_cbk: default_unlink_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, loc, -                    xflag, xdata); +        STACK_WIND (frame, quota_unlink_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata);          ret = 0; @@ -1032,7 +1307,7 @@ quota_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          quota_update_size (this, local->loc.parent, NULL, NULL,                             (buf->ia_blocks * 512)); -        ret = quota_inode_ctx_get (inode, -1, -1, this, NULL, NULL, &ctx, 0); +        ret = quota_inode_ctx_get (inode, -1, this, NULL, NULL, &ctx, 0);          if ((ret == -1) || (ctx == NULL)) {                  gf_log (this->name, GF_LOG_WARNING, "cannot find quota "                          "context in %s (gfid:%s)", local->loc.path, @@ -1087,18 +1362,49 @@ out:  int32_t +quota_link_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc, +                   loc_t *newloc, dict_t *xdata) +{ +        quota_local_t *local    = NULL; +        int32_t        op_errno = EINVAL; + +        local = frame->local; +        if (local == NULL) { +                gf_log (this->name, GF_LOG_WARNING, "local is NULL"); +                goto unwind; +        } + +        op_errno = local->op_errno; + +        if (local->op_ret == -1) { +                goto unwind; +        } + +        STACK_WIND (frame, quota_link_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata); +        return 0; + +unwind: +        QUOTA_STACK_UNWIND (link, frame, -1, op_errno, NULL, NULL, +                            NULL, NULL, NULL); +        return 0; +} + + +int32_t  quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,              dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          int32_t            ret   = -1, op_errno = ENOMEM;          quota_local_t     *local = NULL; +        call_stub_t       *stub  = NULL;          quota_inode_ctx_t *ctx = NULL; +        quota_priv_t      *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); - +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, link, oldloc, +                                newloc, xdata);          local = quota_local_new ();          if (local == NULL) { @@ -1113,8 +1419,16 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,                  goto err;          } -        ret = quota_inode_ctx_get (oldloc->inode, -1, -1, this, NULL, NULL, -                                   &ctx, 0); +        stub = fop_link_stub (frame, quota_link_helper, oldloc, newloc, xdata); +        if (stub == NULL) { +                goto err; +        } + +        local->link_count = 1; +        local->stub = stub; + +        ret = quota_inode_ctx_get (oldloc->inode, -1, this, NULL, NULL, &ctx, +                                   0);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_WARNING,                          "quota context not set in inode (gfid:%s)", @@ -1125,17 +1439,24 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,          local->delta = ctx->buf.ia_blocks * 512; -        ret = quota_check_limit (frame, newloc->parent, this, NULL, NULL); -        if (ret == -1) { -                op_errno = EDQUOT; -                goto err; +        quota_check_limit (frame, newloc->parent, this, NULL, NULL); + +        stub = NULL; + +        LOCK (&local->lock); +        { +                if (local->validate_count == 0) { +                        stub = local->stub; +                        local->stub = NULL; +                } + +                local->link_count = 0;          } +        UNLOCK (&local->lock); -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_link_cbk: default_link_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, oldloc, -                    newloc, xdata); +        if (stub != NULL) { +                call_resume (stub); +        }          ret = 0;  err: @@ -1156,11 +1477,11 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                    dict_t *xdata)  {          int32_t               ret              = -1; -        int64_t               size             = 0;          quota_local_t        *local            = NULL;          quota_inode_ctx_t    *ctx              = NULL;          quota_dentry_t       *old_dentry       = NULL, *dentry = NULL;          char                  new_dentry_found = 0; +        int64_t               size             = 0;          if (op_ret < 0) {                  goto out; @@ -1180,10 +1501,8 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          }          if (local->oldloc.parent != local->newloc.parent) { -                quota_update_size (this, local->oldloc.parent, NULL, NULL, -                                   (-size)); -                quota_update_size (this, local->newloc.parent, NULL, NULL, -                                   size); +                quota_update_size (this, local->oldloc.parent, NULL, NULL, (-size)); +                quota_update_size (this, local->newloc.parent, NULL, NULL, size);          }          if (!(IA_ISREG (local->oldloc.inode->ia_type) @@ -1191,8 +1510,8 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        ret = quota_inode_ctx_get (local->oldloc.inode, -1, -1, this, NULL, -                                   NULL, &ctx, 0); +        ret = quota_inode_ctx_get (local->oldloc.inode, -1, this, NULL, NULL, +                                   &ctx, 0);          if ((ret == -1) || (ctx == NULL)) {                  gf_log (this->name, GF_LOG_WARNING, "quota context not"                          "set in inode(gfid:%s)", @@ -1244,8 +1563,7 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                          if (dentry == NULL) {                                  gf_log (this->name, GF_LOG_WARNING,                                          "cannot create a new dentry (name:%s) " -                                        "for inode(gfid:%s)", -                                        local->newloc.name, +                                        "for inode(gfid:%s)", local->newloc.name,                                          uuid_utoa (local->newloc.inode->gfid));                                  op_ret = -1;                                  op_errno = ENOMEM; @@ -1267,17 +1585,49 @@ out:  int32_t +quota_rename_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc, +                     loc_t *newloc, dict_t *xdata) +{ +        quota_local_t *local    = NULL; +        int32_t        op_errno = EINVAL; + +        local = frame->local; +        if (local == NULL) { +                gf_log (this->name, GF_LOG_WARNING, "local is NULL"); +                goto unwind; +        } + +        op_errno = local->op_errno; + +        if (local->op_ret == -1) { +                goto unwind; +        } + +        STACK_WIND (frame, quota_rename_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata); +        return 0; + +unwind: +        QUOTA_STACK_UNWIND (rename, frame, -1, op_errno, NULL, NULL, +                            NULL, NULL, NULL, NULL); +        return 0; +} + + +int32_t  quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,                loc_t *newloc, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          int32_t            ret            = -1, op_errno = ENOMEM;          quota_local_t     *local          = NULL; +        call_stub_t       *stub           = NULL;          quota_inode_ctx_t *ctx            = NULL; +        quota_priv_t      *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, rename, oldloc, +                                newloc, xdata);          local = quota_local_new ();          if (local == NULL) { @@ -1298,10 +1648,19 @@ quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,                  goto err;          } +        stub = fop_rename_stub (frame, quota_rename_helper, oldloc, newloc, +                                xdata); +        if (stub == NULL) { +                goto err; +        } + +        local->link_count = 1; +        local->stub = stub; +          if (IA_ISREG (oldloc->inode->ia_type)              || IA_ISLNK (oldloc->inode->ia_type)) { -                ret = quota_inode_ctx_get (oldloc->inode, -1, -1, this, NULL, -                                           NULL, &ctx, 0); +                ret = quota_inode_ctx_get (oldloc->inode, -1, this, NULL, NULL, +                                           &ctx, 0);                  if (ctx == NULL) {                          gf_log (this->name, GF_LOG_WARNING,                                  "quota context not set in inode (gfid:%s)", @@ -1315,17 +1674,24 @@ quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,                  local->delta = 0;          } -        ret = quota_check_limit (frame, newloc->parent, this, NULL, NULL); -        if (ret == -1) { -                op_errno = EDQUOT; -                goto err; +        quota_check_limit (frame, newloc->parent, this, NULL, NULL); + +        stub = NULL; + +        LOCK (&local->lock); +        { +                if (local->validate_count == 0) { +                        stub = local->stub; +                        local->stub = NULL; +                } + +                local->link_count = 0;          } +        UNLOCK (&local->lock); -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_rename_cbk: default_rename_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, -                    newloc, xdata); +        if (stub != NULL) { +                call_resume (stub); +        }          ret = 0;  err: @@ -1358,7 +1724,7 @@ quota_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          quota_update_size (this, local->loc.parent, NULL, NULL, size); -        quota_inode_ctx_get (local->loc.inode, -1, -1, this, NULL, NULL, +        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL,                               &ctx, 1);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_WARNING, @@ -1393,18 +1759,49 @@ out:  int +quota_symlink_helper (call_frame_t *frame, xlator_t *this, const char *linkpath, +                      loc_t *loc, mode_t umask, dict_t *xdata) +{ +        quota_local_t *local    = NULL; +        int32_t        op_errno = EINVAL; + +        local = frame->local; +        if (local == NULL) { +                gf_log (this->name, GF_LOG_WARNING, "local is NULL"); +                goto unwind; +        } + +        if (local->op_ret == -1) { +                op_errno = local->op_errno; +                goto unwind; +        } + +        STACK_WIND (frame, quota_symlink_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->symlink, linkpath, loc, umask, +                    xdata); +        return 0; + +unwind: +        QUOTA_STACK_UNWIND (symlink, frame, -1, op_errno, NULL, NULL, +                            NULL, NULL, NULL); +        return 0; +} + + +int  quota_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath,                 loc_t *loc, mode_t umask, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          int32_t          ret      = -1;          int32_t          op_errno = ENOMEM;          quota_local_t   *local    = NULL; +        call_stub_t     *stub     = NULL; +        quota_priv_t    *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); - +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, symlink, linkpath, +                                loc, umask, xdata);          local = quota_local_new ();          if (local == NULL) { @@ -1419,19 +1816,36 @@ quota_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath,                  goto err;          } -        local->delta = strlen (linkpath); +        local->link_count = 1; -        ret = quota_check_limit (frame, loc->parent, this, NULL, NULL); -        if (ret == -1) { -                op_errno = EDQUOT; +        stub = fop_symlink_stub (frame, quota_symlink_helper, linkpath, loc, +                                 umask, xdata); +        if (stub == NULL) {                  goto err;          } -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_symlink_cbk: default_symlink_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink, -                    linkpath, loc, umask, xdata); +        local->stub = stub; +        local->delta = strlen (linkpath); + +        quota_check_limit (frame, loc->parent, this, NULL, NULL); + +        stub = NULL; + +        LOCK (&local->lock); +        { +                if (local->validate_count == 0) { +                        stub = local->stub; +                        local->stub = NULL; +                } + +                local->link_count = 0; +        } +        UNLOCK (&local->lock); + +        if (stub != NULL) { +                call_resume (stub); +        } +          return 0;  err: @@ -1448,8 +1862,8 @@ quota_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                      struct iatt *postbuf, dict_t *xdata)  {          quota_local_t     *local = NULL; -        quota_inode_ctx_t *ctx   = NULL;          int64_t            delta = 0; +        quota_inode_ctx_t *ctx   = NULL;          if (op_ret < 0) {                  goto out; @@ -1465,7 +1879,7 @@ quota_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          quota_update_size (this, local->loc.inode, NULL, NULL, delta); -        quota_inode_ctx_get (local->loc.inode, -1, -1, this, NULL, NULL, +        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL,                               &ctx, 0);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_WARNING, @@ -1491,14 +1905,14 @@ int32_t  quota_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,                  dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          int32_t          ret   = -1;          quota_local_t   *local = NULL; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); - +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, truncate, loc, +                                offset, xdata);          local = quota_local_new ();          if (local == NULL) { @@ -1513,11 +1927,8 @@ quota_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,                  goto err;          } -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_truncate_cbk: default_truncate_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, loc, -                    offset, xdata); +        STACK_WIND (frame, quota_truncate_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->truncate, loc, offset, xdata);          return 0;  err: @@ -1533,8 +1944,8 @@ quota_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                       struct iatt *postbuf, dict_t *xdata)  {          quota_local_t     *local = NULL; -        quota_inode_ctx_t *ctx   = NULL;          int64_t            delta = 0; +        quota_inode_ctx_t *ctx   = NULL;          if (op_ret < 0) {                  goto out; @@ -1550,7 +1961,7 @@ quota_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          quota_update_size (this, local->loc.inode, NULL, NULL, delta); -        quota_inode_ctx_get (local->loc.inode, -1, -1, this, NULL, NULL, +        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL,                               &ctx, 0);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_WARNING, @@ -1576,12 +1987,13 @@ int32_t  quota_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,                   dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          quota_local_t   *local = NULL; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, ftruncate, fd, +                                offset, xdata);          local = quota_local_new ();          if (local == NULL) @@ -1591,11 +2003,8 @@ quota_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,          local->loc.inode = inode_ref (fd->inode); -wind: -        STACK_WIND (frame, priv->is_quota_on? -                    quota_ftruncate_cbk: default_ftruncate_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate, fd, -                    offset, xdata); +        STACK_WIND (frame, quota_ftruncate_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata);          return 0;  err: @@ -1614,23 +2023,14 @@ quota_send_dir_limit_to_cli (call_frame_t *frame, xlator_t *this,          dict_t            *dict              = NULL;          quota_inode_ctx_t *ctx               = NULL;          uint64_t           value             = 0; -        quota_priv_t      *priv              = NULL; - -        priv = this->private; -        if (!priv->is_quota_on) { -                snprintf (dir_limit, 1024, "Quota is disabled please turn on"); -                goto dict_set; -        }          ret = inode_ctx_get (inode, this, &value);          if (ret < 0)                  goto out;          ctx = (quota_inode_ctx_t *)(unsigned long)value; -        snprintf (dir_limit, 1024, "%"PRId64",%"PRId64, ctx->size, -                  ctx->hard_lim); +        snprintf (dir_limit, 1024, "%"PRId64",%"PRId64, ctx->size, ctx->limit); -dict_set:          dict = dict_new ();          if (dict == NULL) {                  ret = -1; @@ -1693,8 +2093,7 @@ quota_getxattr (call_frame_t *frame, xlator_t *this, loc_t *loc,  int32_t  quota_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -                int32_t op_ret, int32_t op_errno, struct iatt *buf, -                dict_t *xdata) +                int32_t op_ret, int32_t op_errno, struct iatt *buf, dict_t *xdata)  {          quota_local_t     *local = NULL;          quota_inode_ctx_t *ctx   = NULL; @@ -1709,7 +2108,7 @@ quota_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, -1, this, NULL, NULL, +        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL,                               &ctx, 0);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_DEBUG, @@ -1734,14 +2133,13 @@ out:  int32_t  quota_stat (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL;          int32_t        ret   = -1; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); - +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, stat, loc, xdata);          local = quota_local_new ();          if (local == NULL) { @@ -1755,10 +2153,8 @@ quota_stat (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)                  goto unwind;          } -wind: -        STACK_WIND (frame, priv->is_quota_on? quota_stat_cbk: default_stat_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat, loc, -                    xdata); +        STACK_WIND (frame, quota_stat_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->stat, loc, xdata);          return 0;  unwind: @@ -1785,7 +2181,7 @@ quota_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, -1, this, NULL, NULL, +        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL,                               &ctx, 0);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_WARNING, @@ -1810,12 +2206,12 @@ out:  int32_t  quota_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, fstat, fd, xdata);          local = quota_local_new ();          if (local == NULL) { @@ -1826,11 +2222,8 @@ quota_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)          local->loc.inode = inode_ref (fd->inode); -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_fstat_cbk: default_fstat_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat, fd, -                    xdata); +        STACK_WIND (frame, quota_fstat_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->fstat, fd, xdata);          return 0;  unwind: @@ -1857,7 +2250,7 @@ quota_readlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, -1, this, NULL, NULL, +        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL,                               &ctx, 0);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_WARNING, @@ -1873,8 +2266,7 @@ quota_readlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          UNLOCK (&ctx->lock);  out: -        QUOTA_STACK_UNWIND (readlink, frame, op_ret, op_errno, path, buf, -                            xdata); +        QUOTA_STACK_UNWIND (readlink, frame, op_ret, op_errno, path, buf, xdata);          return 0;  } @@ -1883,13 +2275,14 @@ int32_t  quota_readlink (call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size,                  dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL;          int32_t        ret   = -1; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, readlink, loc, +                                size, xdata);          local = quota_local_new ();          if (local == NULL) { @@ -1904,11 +2297,8 @@ quota_readlink (call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size,                  goto unwind;          } -wind: -        STACK_WIND (frame, priv->is_quota_on? -                    quota_readlink_cbk: default_readlink_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->readlink, loc, -                    size, xdata); +        STACK_WIND (frame, quota_readlink_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->readlink, loc, size, xdata);          return 0;  unwind: @@ -1936,7 +2326,7 @@ quota_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, -1, this, NULL, NULL, +        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL,                               &ctx, 0);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_WARNING, @@ -1962,12 +2352,13 @@ int32_t  quota_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,               off_t offset, uint32_t flags, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, readv, fd, size, +                                offset, flags, xdata);          local = quota_local_new ();          if (local == NULL) { @@ -1978,16 +2369,13 @@ quota_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,          local->loc.inode = inode_ref (fd->inode); -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_readv_cbk: default_readv_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv, fd, -                    size, offset, flags, xdata); +        STACK_WIND (frame, quota_readv_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, +                    xdata);          return 0;  unwind: -        QUOTA_STACK_UNWIND (readv, frame, -1, ENOMEM, NULL, -1, NULL, NULL, -                            NULL); +        QUOTA_STACK_UNWIND (readv, frame, -1, ENOMEM, NULL, -1, NULL, NULL, NULL);          return 0;  } @@ -2010,7 +2398,7 @@ quota_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, -1, this, NULL, NULL, +        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL,                               &ctx, 0);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_WARNING, @@ -2036,12 +2424,13 @@ int32_t  quota_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags,               dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, fsync, fd, flags, +                                xdata);          local = quota_local_new ();          if (local == NULL) { @@ -2052,11 +2441,8 @@ quota_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags,          frame->local = local; -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_fsync_cbk: default_fsync_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsync, fd, -                    flags, xdata); +        STACK_WIND (frame, quota_fsync_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->fsync, fd, flags, xdata);          return 0;  unwind: @@ -2084,7 +2470,7 @@ quota_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, -1, this, NULL, NULL, +        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL,                               &ctx, 0);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_DEBUG, @@ -2111,13 +2497,14 @@ int32_t  quota_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc,                 struct iatt *stbuf, int32_t valid, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL;          int32_t        ret   = -1; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, setattr, loc, +                                stbuf, valid, xdata);          local = quota_local_new ();          if (local == NULL) { @@ -2132,11 +2519,9 @@ quota_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc,                  goto unwind;          } -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_setattr_cbk: default_setattr_cbk, -                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->setattr, loc, -                    stbuf, valid, xdata); +        STACK_WIND (frame, quota_setattr_cbk, FIRST_CHILD (this), +                    FIRST_CHILD (this)->fops->setattr, loc, stbuf, valid, +                    xdata);          return 0;  unwind: @@ -2163,7 +2548,7 @@ quota_fsetattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, -1, this, NULL, NULL, +        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL,                               &ctx, 0);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_WARNING, @@ -2189,13 +2574,13 @@ int32_t  quota_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd,                  struct iatt *stbuf, int32_t valid, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); - +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, fsetattr, fd, +                                stbuf, valid, xdata);          local = quota_local_new ();          if (local == NULL) { @@ -2206,11 +2591,9 @@ quota_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd,          local->loc.inode = inode_ref (fd->inode); -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_fsetattr_cbk: default_fsetattr_cbk, -                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetattr, fd, -                    stbuf, valid, xdata); +        STACK_WIND (frame, quota_fsetattr_cbk, FIRST_CHILD (this), +                    FIRST_CHILD (this)->fops->fsetattr, fd, stbuf, valid, +                    xdata);          return 0;  unwind: @@ -2235,7 +2618,7 @@ quota_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto unwind;          } -        ret = quota_inode_ctx_get (inode, -1, -1, this, NULL, buf, &ctx, 1); +        ret = quota_inode_ctx_get (inode, -1, this, NULL, buf, &ctx, 1);          if ((ret == -1) || (ctx == NULL)) {                  gf_log (this->name, GF_LOG_WARNING, "cannot create quota "                          "context in inode (gfid:%s)", uuid_utoa (inode->gfid)); @@ -2271,17 +2654,49 @@ unwind:  int +quota_mknod_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, +                    mode_t mode, dev_t rdev, mode_t umask, dict_t *xdata) +{ +        quota_local_t *local    = NULL; +        int32_t        op_errno = EINVAL; + +        local = frame->local; +        if (local == NULL) { +                gf_log (this->name, GF_LOG_WARNING, "local is NULL"); +                goto unwind; +        } + +        if (local->op_ret == -1) { +                op_errno = local->op_errno; +                goto unwind; +        } + +        STACK_WIND (frame, quota_mknod_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, umask, +                    xdata); + +        return 0; + +unwind: +        QUOTA_STACK_UNWIND (mknod, frame, -1, op_errno, NULL, NULL, +                            NULL, NULL, NULL); +        return 0; +} + + +int  quota_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,               dev_t rdev, mode_t umask, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          int32_t            ret            = -1;          quota_local_t     *local          = NULL; -        int32_t            op_errno       = 0; +        call_stub_t       *stub           = NULL; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, mknod, loc, mode, +                                rdev, umask, xdata);          local = quota_local_new ();          if (local == NULL) { @@ -2296,24 +2711,37 @@ quota_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,                  goto err;          } -        local->delta = 0; - -        ret = quota_check_limit (frame, loc->parent, this, NULL, NULL); -        if (ret == -1) { -                op_errno = EDQUOT; +        stub = fop_mknod_stub (frame, quota_mknod_helper, loc, mode, rdev, +                               umask, xdata); +        if (stub == NULL) {                  goto err;          } -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_mknod_cbk: default_mknod_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, loc, -                    mode, rdev, umask, xdata); +        local->link_count = 1; +        local->stub = stub; +        local->delta = 0; + +        quota_check_limit (frame, loc->parent, this, NULL, NULL); + +        stub = NULL; + +        LOCK (&local->lock); +        { +                local->link_count = 0; +                if (local->validate_count == 0) { +                        stub = local->stub; +                        local->stub = NULL; +                } +        } +        UNLOCK (&local->lock); +        if (stub != NULL) { +                call_resume (stub); +        }          return 0;  err: -        QUOTA_STACK_UNWIND (mknod, frame, -1, op_errno, NULL, NULL, NULL, NULL, +        QUOTA_STACK_UNWIND (mknod, frame, -1, ENOMEM, NULL, NULL, NULL, NULL,                              NULL);          return 0; @@ -2331,17 +2759,14 @@ int  quota_setxattr (call_frame_t *frame, xlator_t *this,                  loc_t *loc, dict_t *dict, int flags, dict_t *xdata)  { +        int             op_errno = EINVAL; +        int             op_ret   = -1;          quota_priv_t       *priv        = NULL; -        int                      op_errno       = EINVAL; -        int                      op_ret         = -1; -        int64_t                 *size           = 0; -        uint64_t                 value          = 0; -        quota_inode_ctx_t       *ctx            = NULL; -        int                      ret            = -1;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, setxattr, loc, +                                dict, flags, xdata);          VALIDATE_OR_GOTO (frame, err);          VALIDATE_OR_GOTO (this, err); @@ -2350,36 +2775,8 @@ quota_setxattr (call_frame_t *frame, xlator_t *this,          GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.quota*", dict,                                     op_errno, err); -        ret = dict_get_bin (dict, QUOTA_UPDATE_USAGE_KEY, (void **) &size); -        if (0 == ret) { - -                inode_ctx_get (loc->inode, this, &value); -                ctx = (quota_inode_ctx_t *)(unsigned long) value; -                if (NULL == ctx) { -                        gf_log (this->name, GF_LOG_ERROR, "Couldn't get the " -                                "context for %s. Usage may cross the limit.", -                                loc->path); -                        op_ret = -1; -                        goto err; -                } - -                LOCK (&ctx->lock); -                { -                        ctx->size = ntoh64 (*size); -                        if (ctx->size < 0) -                                ctx->size = 0; -                } -                UNLOCK (&ctx->lock); - -                QUOTA_STACK_UNWIND (setxattr, frame, ret, 0, xdata); -                return 0; -        } - -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_setxattr_cbk: default_setxattr_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, loc, -                    dict, flags, xdata); +        STACK_WIND (frame, quota_setxattr_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, xdata);          return 0;  err:          QUOTA_STACK_UNWIND (setxattr, frame, op_ret, op_errno, NULL); @@ -2398,13 +2795,14 @@ int  quota_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd,                   dict_t *dict, int flags, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          int32_t         op_ret   = -1;          int32_t         op_errno = EINVAL; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, fsetxattr, fd, +                                dict, flags, xdata);          VALIDATE_OR_GOTO (frame, err);          VALIDATE_OR_GOTO (this, err); @@ -2413,11 +2811,8 @@ quota_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd,          GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.quota*", dict,                                     op_errno, err); -wind: -        STACK_WIND (frame, priv->is_quota_on? -                    quota_fsetxattr_cbk: default_fsetxattr_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, fd, -                    dict, flags, xdata); +        STACK_WIND (frame, quota_fsetxattr_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);          return 0;   err:          QUOTA_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, NULL); @@ -2437,12 +2832,13 @@ int  quota_removexattr (call_frame_t *frame, xlator_t *this,                     loc_t *loc, const char *name, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          int32_t         op_errno = EINVAL; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, removexattr, loc, +                                name, xdata);          VALIDATE_OR_GOTO (this, err); @@ -2452,11 +2848,8 @@ quota_removexattr (call_frame_t *frame, xlator_t *this,          VALIDATE_OR_GOTO (frame, err);          VALIDATE_OR_GOTO (loc, err); -wind: -        STACK_WIND (frame, priv->is_quota_on? -                    quota_removexattr_cbk: default_removexattr_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->removexattr, -                    loc, name, xdata); +        STACK_WIND (frame, quota_removexattr_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->removexattr, loc, name, xdata);          return 0;  err:          QUOTA_STACK_UNWIND (removexattr, frame, -1,  op_errno, NULL); @@ -2476,13 +2869,14 @@ int  quota_fremovexattr (call_frame_t *frame, xlator_t *this,                      fd_t *fd, const char *name, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;          int32_t         op_ret   = -1;          int32_t         op_errno = EINVAL; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, fremovexattr, fd, +                                name, xdata);          VALIDATE_OR_GOTO (frame, err);          VALIDATE_OR_GOTO (this, err); @@ -2491,11 +2885,8 @@ quota_fremovexattr (call_frame_t *frame, xlator_t *this,          GF_IF_NATIVE_XATTR_GOTO ("trusted.quota*",                                   name, op_errno, err); -wind: -        STACK_WIND (frame, priv->is_quota_on? -                    quota_fremovexattr_cbk: default_fremovexattr_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fremovexattr, -                    fd, name, xdata); +        STACK_WIND (frame, quota_fremovexattr_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata);          return 0;   err:          QUOTA_STACK_UNWIND (fremovexattr, frame, op_ret, op_errno, NULL); @@ -2544,32 +2935,33 @@ quota_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  		goto unwind;  	}          ctx = (quota_inode_ctx_t *)(unsigned long)value; +  	usage = (ctx->size) / buf->f_bsize;          priv = this->private;          list_for_each_entry (limit_node, &priv->limit_head, limit_list) {  		/* Notice that this only works for volume-level quota. */                  if (strcmp (limit_node->path, "/") == 0) { -                        blocks = limit_node->hard_lim / buf->f_bsize; +                        blocks = limit_node->value / buf->f_bsize;                          if (usage > blocks) {                                  break;                          } -			buf->f_blocks = blocks; -			avail = buf->f_blocks - usage; -			if (buf->f_bfree > avail) { -				buf->f_bfree = avail; -			} -			/* -			 * We have to assume that the total assigned quota -			 * won't cause us to dip into the reserved space, -			 * because dealing with the overcommitted cases is -			 * just too hairy (especially when different bricks -			 * might be using different reserved percentages and -			 * such). -			 */ -			buf->f_bavail = buf->f_bfree; -			break; +                        buf->f_blocks = blocks; +                        avail = buf->f_blocks - usage; +                        if (buf->f_bfree > avail) { +                                buf->f_bfree = avail; +                        } +                        /* +                         * We have to assume that the total assigned quota +                         * won't cause us to dip into the reserved space, +                         * because dealing with the overcommitted cases is +                         * just too hairy (especially when different bricks +                         * might be using different reserved percentages and +                         * such). +                         */ +                        buf->f_bavail = buf->f_bfree; +                        break;                  }          } @@ -2585,12 +2977,14 @@ unwind:  int32_t  quota_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)  { -        quota_priv_t       *priv        = NULL;  	inode_t *root_inode = NULL; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, statfs, +                                loc, xdata); +  	if (priv->consider_statfs && loc->inode) {  		root_inode = loc->inode->table->root; @@ -2613,7 +3007,6 @@ quota_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)                  if (priv->consider_statfs)                          gf_log(this->name,GF_LOG_WARNING,                                 "missing inode, cannot adjust for quota"); -wind:  		STACK_WIND (frame, default_statfs_cbk, FIRST_CHILD(this),  			    FIRST_CHILD(this)->fops->statfs, loc, xdata);  	} @@ -2644,25 +3037,22 @@ int  quota_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,                  off_t offset, dict_t *dict)  { -        quota_priv_t       *priv        = NULL;          int ret = 0; +        quota_priv_t       *priv        = NULL;          priv = this->private; -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, readdirp, fd, size, +                                offset, dict);          if (dict) {                  ret = dict_set_uint64 (dict, QUOTA_SIZE_KEY, 0); -                if (ret < 0) { +                if (ret < 0)                          goto err; -                }          } -wind: -        STACK_WIND (frame, -                    priv->is_quota_on? quota_readdirp_cbk: default_readdirp_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, fd, -                    size, offset, dict); +        STACK_WIND (frame, quota_readdirp_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->readdirp, fd, size, offset, dict);          return 0;  err:          STACK_UNWIND_STRICT (readdirp, frame, -1, EINVAL, NULL, NULL); @@ -2723,6 +3113,34 @@ out:  }  int32_t +quota_fallocate_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, +		       int32_t mode, off_t offset, size_t len, dict_t *xdata) +{ +        quota_local_t *local    = NULL; +        int32_t        op_errno = EINVAL; + +        local = frame->local; +        if (local == NULL) { +                gf_log (this->name, GF_LOG_WARNING, "local is NULL"); +                goto unwind; +        } + +        if (local->op_ret == -1) { +                op_errno = local->op_errno; +                goto unwind; +        } + +        STACK_WIND (frame, quota_fallocate_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->fallocate, fd, mode, offset, len, +		    xdata); +        return 0; + +unwind: +        QUOTA_STACK_UNWIND (fallocate, frame, -1, op_errno, NULL, NULL, NULL); +        return 0; +} + +int32_t  quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,  		off_t offset, size_t len, dict_t *xdata)  { @@ -2731,12 +3149,13 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,          quota_local_t     *local   = NULL;          quota_inode_ctx_t *ctx     = NULL;          quota_priv_t      *priv    = NULL; +        call_stub_t       *stub    = NULL;          quota_dentry_t    *dentry  = NULL;          priv = this->private; -        GF_VALIDATE_OR_GOTO (this->name, priv, unwind); -        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +        QUOTA_WIND_IF_DISABLED (priv->quota_on, frame, this, fallocate, fd, +                                mode, offset, len, xdata);          GF_ASSERT (frame);          GF_VALIDATE_OR_GOTO ("quota", this, unwind); @@ -2750,7 +3169,7 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,          frame->local = local;          local->loc.inode = inode_ref (fd->inode); -        ret = quota_inode_ctx_get (fd->inode, -1, -1, this, NULL, NULL, &ctx, 0); +        ret = quota_inode_ctx_get (fd->inode, -1, this, NULL, NULL, &ctx, 0);          if (ctx == NULL) {                  gf_log (this->name, GF_LOG_WARNING,                          "quota context not set in inode (gfid:%s)", @@ -2758,6 +3177,15 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,                  goto unwind;          } +        stub = fop_fallocate_stub(frame, quota_fallocate_helper, fd, mode, offset, len, +				  xdata); +        if (stub == NULL) { +                op_errno = ENOMEM; +                goto unwind; +        } + +        priv = this->private; +        GF_VALIDATE_OR_GOTO (this->name, priv, unwind);          LOCK (&ctx->lock);          { @@ -2773,22 +3201,33 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,  	 * in ENOSPC errors attempting to allocate an already allocated range.  	 */          local->delta = len; +        local->stub = stub;          local->link_count = parents;          list_for_each_entry (dentry, &ctx->parents, next) {                  ret = quota_check_limit (frame, fd->inode, this, dentry->name,                                           dentry->par);                  if (ret == -1) { -                        op_errno = EDQUOT; -                        goto unwind; +                        break;                  }          } -wind: -        STACK_WIND (frame, priv->is_quota_on? -                    quota_fallocate_cbk: default_fallocate_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fallocate, fd, -                    mode, offset, len, xdata); +        stub = NULL; + +        LOCK (&local->lock); +        { +                local->link_count = 0; +                if (local->validate_count == 0) { +                        stub = local->stub; +                        local->stub = NULL; +                } +        } +        UNLOCK (&local->lock); + +        if (stub != NULL) { +                call_resume (stub); +        } +          return 0;  unwind: @@ -2820,16 +3259,11 @@ mem_acct_init (xlator_t *this)  int32_t  quota_forget (xlator_t *this, inode_t *inode)  { -        quota_priv_t         *priv    = NULL;          int32_t               ret     = 0;          uint64_t              ctx_int = 0;          quota_inode_ctx_t    *ctx     = NULL;          quota_dentry_t       *dentry  = NULL, *tmp; -        priv = this->private; -        if (!priv->is_quota_on) -                return 0; -          ret = inode_ctx_del (inode, this, &ctx_int);          if (ret < 0) { @@ -2864,93 +3298,68 @@ quota_parse_limits (quota_priv_t *priv, xlator_t *this, dict_t *xl_options,          char         *path      = NULL, *saveptr = NULL;          uint64_t      value     = 0;          limits_t     *quota_lim = NULL, *old = NULL; -        double        soft_l    = 0; -        char         *limit_dir = NULL; -        char         *saveptr_dir       = NULL; -        char         *path_str  = NULL; +        char         *last_colon= NULL;          ret = dict_get_str (xl_options, "limit-set", &str); -        if (ret) { -                gf_log (this->name, GF_LOG_INFO, "could not get the limits"); -                /* limit may not be set at all on the volume yet */ -                ret = 0; -                goto err; -        } - -        path_str = gf_strdup (str); -        if (!path_str) -                goto err; - +        if (str) { +                path = strtok_r (str, ",", &saveptr); -        limit_dir = strtok_r (path_str, ",", &saveptr); +                while (path) { +                        last_colon = strrchr (path, ':'); +                        *last_colon = '\0'; +                        str_val = last_colon + 1; -        while (limit_dir) { -                QUOTA_ALLOC_OR_GOTO (quota_lim, limits_t, err); -                saveptr_dir = NULL; +                        ret = gf_string2bytesize (str_val, &value); +                        if (ret != 0) +                                goto err; -                path = strtok_r (limit_dir, ":", &saveptr_dir); +                        QUOTA_ALLOC_OR_GOTO (quota_lim, limits_t, err); -                str_val = strtok_r (NULL, ":", &saveptr_dir); +                        quota_lim->path = path; -                ret = gf_string2bytesize (str_val, &value); -                if (ret != 0) -                        goto err; - -                quota_lim->hard_lim = value; - -                str_val = strtok_r (NULL, ",", &saveptr_dir); +                        quota_lim->value = value; -                soft_l = priv->default_soft_lim; -                if (str_val) { -                        ret = gf_string2percent (str_val, &soft_l); -                        if (ret) -                                gf_log (this->name, GF_LOG_WARNING, -                                        "Failed to convert str to " -                                        "percent. Using default soft " -                                        "limit"); -                } - -                quota_lim->soft_lim = soft_l; - -                quota_lim->path = gf_strdup (path); - -                gf_log (this->name, GF_LOG_INFO, "%s:%"PRId64, -                        quota_lim->path, quota_lim->hard_lim); - -                if (old_list != NULL) { -                        list_for_each_entry (old, old_list, -                                             limit_list) { -                                if (strcmp (old->path, quota_lim->path) == 0) { -                                        uuid_copy (quota_lim->gfid, -                                                   old->gfid); -                                        break; +                        gf_log (this->name, GF_LOG_INFO, "%s:%"PRId64, +                                quota_lim->path, quota_lim->value); + +                        if (old_list != NULL) { +                                list_for_each_entry (old, old_list, +                                                     limit_list) { +                                        if (strcmp (old->path, quota_lim->path) +                                            == 0) { +                                                uuid_copy (quota_lim->gfid, +                                                           old->gfid); +                                                break; +                                        }                                  }                          } -                } -                LOCK (&priv->lock); -                { -                        list_add_tail ("a_lim->limit_list, -                                       &priv->limit_head); -                } -                UNLOCK (&priv->lock); +                        LOCK (&priv->lock); +                        { +                                list_add_tail ("a_lim->limit_list, +                                               &priv->limit_head); +                        } +                        UNLOCK (&priv->lock); -                limit_dir = strtok_r (NULL, ",", &saveptr); +                        path = strtok_r (NULL, ",", &saveptr); +                } +        } else { +                gf_log (this->name, GF_LOG_INFO, +                        "no \"limit-set\" option provided");          }          LOCK (&priv->lock);          {                  list_for_each_entry (quota_lim, &priv->limit_head, limit_list) {                          gf_log (this->name, GF_LOG_INFO, "%s:%"PRId64, -                                quota_lim->path, quota_lim->hard_lim); +                                quota_lim->path, quota_lim->value);                  }          }          UNLOCK (&priv->lock);          ret = 0;  err: -        GF_FREE (path_str);          return ret;  } @@ -2979,7 +3388,6 @@ init (xlator_t *this)          INIT_LIST_HEAD (&priv->limit_head);          LOCK_INIT (&priv->lock); -          this->private = priv;          ret = quota_parse_limits (priv, this, this->options, NULL); @@ -2988,10 +3396,9 @@ init (xlator_t *this)                  goto err;          } +        GF_OPTION_INIT ("timeout", priv->timeout, int64, err);          GF_OPTION_INIT ("deem-statfs", priv->consider_statfs, bool, err); -        GF_OPTION_INIT ("server-quota", priv->is_quota_on, bool, err); -        GF_OPTION_INIT ("default-soft-limit", priv->default_soft_lim, percent, -                        err); +        GF_OPTION_INIT ("server-quota", priv->quota_on, bool, err);          this->local_pool = mem_pool_new (quota_local_t, 64);          if (!this->local_pool) { @@ -3017,8 +3424,8 @@ __quota_reconfigure_inode_ctx (xlator_t *this, inode_t *inode, limits_t *limit)          GF_VALIDATE_OR_GOTO (this->name, inode, out);          GF_VALIDATE_OR_GOTO (this->name, limit, out); -        ret = quota_inode_ctx_get (inode, limit->hard_lim, limit->soft_lim, -                                   this, NULL, NULL, &ctx, 1); +        ret = quota_inode_ctx_get (inode, limit->value, this, NULL, NULL, &ctx, +                                   1);          if ((ret == -1) || (ctx == NULL)) {                  gf_log (this->name, GF_LOG_WARNING, "cannot create quota "                          "context in inode(gfid:%s)", @@ -3028,8 +3435,7 @@ __quota_reconfigure_inode_ctx (xlator_t *this, inode_t *inode, limits_t *limit)          LOCK (&ctx->lock);          { -                ctx->hard_lim = limit->hard_lim; -                ctx->soft_lim = limit->soft_lim; +                ctx->limit = limit->value;          }          UNLOCK (&ctx->lock); @@ -3073,13 +3479,7 @@ reconfigure (xlator_t *this, dict_t *options)          char              found = 0;          priv = this->private; - -        GF_OPTION_RECONF ("deem-statfs", priv->consider_statfs, options, bool, -                          out); -        GF_OPTION_RECONF ("server-quota", priv->is_quota_on, options, bool, -                          out); -        GF_OPTION_RECONF ("default-soft-limit", priv->default_soft_lim, -                          options, percent, out); +        GF_VALIDATE_OR_GOTO (this->name, priv, out);          INIT_LIST_HEAD (&head); @@ -3117,7 +3517,7 @@ reconfigure (xlator_t *this, dict_t *options)                          }                          if (!found) { -                                limit->hard_lim = -1; +                                limit->value = -1;                                  __quota_reconfigure (this, top->itable, limit);                          } @@ -3127,7 +3527,10 @@ reconfigure (xlator_t *this, dict_t *options)          }          UNLOCK (&priv->lock); - +        GF_OPTION_RECONF ("timeout", priv->timeout, options, int64, out); +        GF_OPTION_RECONF ("deem-statfs", priv->consider_statfs, options, bool, +                          out); +        GF_OPTION_RECONF ("server-quota", priv->quota_on, options, bool, out);          ret = 0;  out: @@ -3169,7 +3572,6 @@ struct xlator_fops fops = {          .removexattr  = quota_removexattr,          .fremovexattr = quota_fremovexattr,          .readdirp     = quota_readdirp, -	.fallocate    = quota_fallocate,  };  struct xlator_cbks cbks = { @@ -3178,6 +3580,14 @@ struct xlator_cbks cbks = {  struct volume_options options[] = {          {.key = {"limit-set"}}, +        {.key = {"timeout"}, +         .type = GF_OPTION_TYPE_SIZET, +         .min = 0, +         .max = 60, +         .default_value = "0", +         .description = "quota caches the directory sizes on client. Timeout " +                        "indicates the timeout for the cache to be revalidated." +        },          {.key = {"deem-statfs"},           .type = GF_OPTION_TYPE_BOOL,           .default_value = "off", @@ -3191,19 +3601,5 @@ struct volume_options options[] = {           .description = "Skip the quota if xlator if the feature is not "                          "turned on. This is not a user exposed option."          }, -        {.key = {"default-soft-limit"}, -         .type = GF_OPTION_TYPE_PERCENT, -         .default_value = "90%", -         .min = 0, -         .max = LONG_MAX, -        }, -        {.key = {"timeout"}, -         .type = GF_OPTION_TYPE_SIZET, -         .min = 0, -         .max = 60, -         .default_value = "0", -         .description = "quota caches the directory sizes on client. Timeout " -                        "indicates the timeout for the cache to be revalidated." -        },          {.key = {NULL}}  }; diff --git a/xlators/features/quota/src/quota.h b/xlators/features/quota/src/quota.h index de9f6f16..2ba339aa 100644 --- a/xlators/features/quota/src/quota.h +++ b/xlators/features/quota/src/quota.h @@ -27,9 +27,15 @@  #define READDIR_BUF             4096  #define QUOTA_UPDATE_USAGE_KEY  "quota-update-usage" -#define WIND_IF_QUOTAOFF(is_quota_on, label)     \ -        if (!is_quota_on)                       \ -                goto label; +#define QUOTA_WIND_IF_DISABLED(quota_on, frame, this, fop, params ...)  \ +        do {                                                            \ +                if (!quota_on) {                                        \ +                        STACK_WIND_TAIL (frame, FIRST_CHILD(this),      \ +                                         FIRST_CHILD(this)->fops->fop,  \ +                                         params);                       \ +                        return 0;                                       \ +                }                                                       \ +        } while (0)  #define DID_REACH_LIMIT(lim, prev_size, cur_size)               \          ((cur_size) >= (lim) && (prev_size) < (lim)) @@ -104,8 +110,6 @@                          goto label;                             \          } while (0) - -  struct quota_dentry {          char            *name;          uuid_t           par; @@ -115,8 +119,7 @@ typedef struct quota_dentry quota_dentry_t;  struct quota_inode_ctx {          int64_t          size; -        int64_t          hard_lim; -        int64_t          soft_lim; +        int64_t          limit;          struct iatt      buf;          struct list_head parents;          struct timeval   tv; @@ -136,15 +139,13 @@ struct quota_local {          int32_t      op_ret;          int32_t      op_errno;          int64_t      size; -        int64_t      hard_lim; -        int64_t      soft_lim; +        int64_t      limit;          char         just_validated;          inode_t     *inode;          call_stub_t *stub;  };  typedef struct quota_local quota_local_t; -  struct qd_vols_conf {          char                    *name;          inode_table_t           *itable; @@ -153,35 +154,34 @@ struct qd_vols_conf {          double                   default_soft_lim;          gf_lock_t                lock;          loc_t                    root_loc; -	uint32_t		soft_timeout; -	uint32_t		hard_timeout; -	struct list_head	limit_head; -	call_frame_t		*frame; +        uint32_t                 soft_timeout; +        uint32_t                 hard_timeout; +        struct list_head         limit_head; +        call_frame_t            *frame;  };  typedef struct qd_vols_conf qd_vols_conf_t; -  struct quota_priv { -        int64_t                 timeout; -        double                  default_soft_lim; -        gf_boolean_t            is_quota_on; -        gf_boolean_t            consider_statfs; -        struct list_head        limit_head; -        qd_vols_conf_t        **qd_vols_conf; -        gf_lock_t               lock; +        int64_t                   timeout; +        double                    default_soft_lim; +        gf_boolean_t              quota_on; +        gf_boolean_t              consider_statfs; +        struct list_head          limit_head; +        qd_vols_conf_t          **qd_vols_conf; +        gf_lock_t                 lock;  };  typedef struct quota_priv quota_priv_t; -  struct limits {          struct list_head  limit_list;          char             *path; +        int64_t           value;          uuid_t            gfid;          int64_t           prev_size;          struct timeval    prev_log_tv;          int64_t           hard_lim;          int64_t           soft_lim; -	struct timeval	  expire; -	uint32_t	  timeout; +        struct timeval    expire; +        uint32_t          timeout;  };  typedef struct limits     limits_t;  | 
