diff options
| author | Raghavendra G <rgowdapp@redhat.com> | 2013-09-16 17:46:50 +0530 | 
|---|---|---|
| committer | Anand Avati <avati@redhat.com> | 2013-11-26 10:24:02 -0800 | 
| commit | ab3ab1978a4768e9eed8e23b47e72b25046e607a (patch) | |
| tree | f4e8d1a5aecd24d45a9db531d658a947cd94261d /xlators | |
| parent | 460ce40d3e2069bf6262dccea6f5ae2fac60d90f (diff) | |
features/quota: Improvements to quota
* Two stages of quota enforcement is done:
  Soft and hard quota Upon reaching soft quota limit on the directory
  it logs/alerts in the quota daemon log (ie DEFAULT_LOG_DIR/quotad.log)
  and no more writes allowed after hard
  quota limit. After reaching the soft-limit the daemon alerts the
  user/admin repeatively for every 'alert-time', which is
  configurable.
* Quota enforcer is moved to server-side.
  It  takes care of enforcing quota. Since enforcer doesn't have the
  cluster view, it relies on another service called
  quota-aggregator. Aggregator, on query can return the size of a
  directory based on the cluster view.
  Enforcer is always loaded in the server graph and is by passed if
  the feature is not enabled.
  Options specific to enforcer:
  server-quota - Specifies whether the feature is on/off. It is used
  to by pass the quota if turned off.
  deem-statfs - If set to on, it takes quota limits into consideration
  while estimating fs size. (df command). The algorithm followed is,
  i.   Adjust statvfs based on limit configured on root.
  ii.  If limit is set on the inode passed, use size/limits on that inode to
       populate statvfs. Otherwise, use size/limits configured on root.
  iii. Upon statvfs, update the ctx->size on the inode.
  iv.  Don't let DHT aggregate, instead take the maximum of the usages from the
       subvols of the DHT, since each of it contains the complete information.
  Enforcer also makes use of gfid-to-path conversion functionality to
  work correctly when a client like nfs predominently relies on
  nameless lookups.
* Quota Aggregator acts as a thin client to provide cluster view
  Its a lightweight *gluster client* process with no mount point,
  started upon enabling quota or restarting the volume. This is a
  single process run on each brick, which can answer queries on all
  volumes in the cluster. Its volfile stored in
  GLUSTERD_DEFAULT_WORKING_DIR/quotad/quotad.vol.
Credits:
Raghavendra Bhat        <rabhat@redhat.com>
Varun Shastry           <vshastry@redhat.com>
Shishir Gowda           <sgowda@redhat.com>
Kruthika Dhananjay      <kdhananj@redhat.com>
Brian Foster            <bfoster@redhat.com>
Krishnan Parthasarathi  <kparthas@redhat.com>
Change-Id: Id1cb25b414951da34c665a55f77385d482e0f9de
BUG: 969461
Signed-off-by: Raghavendra G <rgowdapp@redhat.com>
Reviewed-on: http://review.gluster.org/5952
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Anand Avati <avati@redhat.com>
Diffstat (limited to 'xlators')
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.c | 40 | ||||
| -rw-r--r-- | xlators/features/quota/src/Makefile.am | 15 | ||||
| -rw-r--r-- | xlators/features/quota/src/quota-enforcer-client.c | 364 | ||||
| -rw-r--r-- | xlators/features/quota/src/quota-mem-types.h | 3 | ||||
| -rw-r--r-- | xlators/features/quota/src/quota.c | 2607 | ||||
| -rw-r--r-- | xlators/features/quota/src/quota.h | 135 | ||||
| -rw-r--r-- | xlators/features/quota/src/quotad-aggregator.c | 423 | ||||
| -rw-r--r-- | xlators/features/quota/src/quotad-aggregator.h | 37 | ||||
| -rw-r--r-- | xlators/features/quota/src/quotad-helpers.c | 113 | ||||
| -rw-r--r-- | xlators/features/quota/src/quotad-helpers.h | 24 | ||||
| -rw-r--r-- | xlators/features/quota/src/quotad.c | 210 | ||||
| -rw-r--r-- | xlators/lib/src/libxlator.h | 1 | 
12 files changed, 2984 insertions, 988 deletions
| diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c index 8f61339e692..e320107a832 100644 --- a/xlators/cluster/dht/src/dht-common.c +++ b/xlators/cluster/dht/src/dht-common.c @@ -2248,6 +2248,18 @@ dht_getxattr (call_frame_t *frame, xlator_t *this,                  return 0;          } +        if (key && !strcmp (GF_XATTR_QUOTA_LIMIT_LIST, key)) { +                /* quota hardlimit and aggregated size of a directory is stored +                 * in inode contexts of each brick. Hence its good enough that +                 * we send getxattr for this key to any brick. +                 */ +                local->call_cnt = 1; +                subvol = dht_first_up_subvol (this); +                STACK_WIND (frame, dht_getxattr_cbk, subvol, +                            subvol->fops->getxattr, loc, key, xdata); +                return 0; +        } +          if (key && *conf->vol_uuid) {                  if ((match_uuid_local (key, conf->vol_uuid) == 0) &&                      (GF_CLIENT_PID_GSYNCD == frame->root->pid)) { @@ -2861,11 +2873,16 @@ int  dht_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  int op_ret, int op_errno, struct statvfs *statvfs, dict_t *xdata)  { -        dht_local_t *local         = NULL; -        int          this_call_cnt = 0; -        int          bsize         = 0; -        int          frsize        = 0; +        dht_local_t *local              = NULL; +        int          this_call_cnt      = 0; +        int          bsize              = 0; +        int          frsize             = 0; +        int8_t       quota_deem_statfs  = 0; +        GF_UNUSED int     ret           = 0; +        unsigned long     new_usage     = 0; +        unsigned long     cur_usage     = 0; +        ret = dict_get_int8 (xdata, "quota-deem-statfs", "a_deem_statfs);          local = frame->local; @@ -2875,8 +2892,22 @@ dht_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                          local->op_errno = op_errno;                          goto unlock;                  } +                if (!statvfs) { +                        op_errno = EINVAL; +                        local->op_ret = -1; +                        goto unlock; +                }                  local->op_ret = 0; +                if (quota_deem_statfs) { +                        new_usage = statvfs->f_blocks - statvfs->f_bfree; +                        cur_usage = local->statvfs.f_blocks - local->statvfs.f_bfree; +                        /* We take the maximux of the usage from the subvols */ +                        if (new_usage >= cur_usage) +                                local->statvfs = *statvfs; +                        goto unlock; +                } +                  if (local->statvfs.f_bsize != 0) {                          bsize = max(local->statvfs.f_bsize, statvfs->f_bsize);                          frsize = max(local->statvfs.f_frsize, statvfs->f_frsize); @@ -2897,6 +2928,7 @@ dht_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  local->statvfs.f_flag     = statvfs->f_flag;                  local->statvfs.f_namemax  = statvfs->f_namemax; +          }  unlock:          UNLOCK (&frame->lock); diff --git a/xlators/features/quota/src/Makefile.am b/xlators/features/quota/src/Makefile.am index 9546f427629..7165adc59ef 100644 --- a/xlators/features/quota/src/Makefile.am +++ b/xlators/features/quota/src/Makefile.am @@ -1,17 +1,22 @@ -xlator_LTLIBRARIES = quota.la +xlator_LTLIBRARIES = quota.la quotad.la  xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features  quota_la_LDFLAGS = -module -avoid-version +quotad_la_LDFLAGS = -module -avoid-version -quota_la_SOURCES = quota.c +quota_la_SOURCES = quota.c quota-enforcer-client.c  quota_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la -noinst_HEADERS = quota-mem-types.h quota.h +quotad_la_SOURCES = quotad.c quotad-helpers.c quotad-aggregator.c +quotad_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +noinst_HEADERS = quota-mem-types.h quota.h quotad-aggregator.h quotad-helpers.h  AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \ -	-I$(top_srcdir)/xlators/cluster/dht/src +	-I$(top_srcdir)/xlators/cluster/dht/src -I$(top_srcdir)/rpc/xdr/src/ \ +	-I$(top_srcdir)/rpc/rpc-lib/src  AM_CFLAGS = -Wall $(GF_CFLAGS) -CLEANFILES =  +CLEANFILES = diff --git a/xlators/features/quota/src/quota-enforcer-client.c b/xlators/features/quota/src/quota-enforcer-client.c new file mode 100644 index 00000000000..bfea5e42014 --- /dev/null +++ b/xlators/features/quota/src/quota-enforcer-client.c @@ -0,0 +1,364 @@ +/* +   Copyright (c) 2010-2012 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ +#include <stdio.h> +#include <string.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/resource.h> +#include <sys/file.h> +#include <netdb.h> +#include <signal.h> +#include <libgen.h> + +#include <sys/utsname.h> + +#include <stdint.h> +#include <pthread.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <time.h> +#include <semaphore.h> +#include <errno.h> + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#ifdef HAVE_MALLOC_H +#include <malloc.h> +#endif + +#ifdef HAVE_MALLOC_STATS +#ifdef DEBUG +#include <mcheck.h> +#endif +#endif + +#include "quota.h" + +extern struct rpc_clnt_program quota_enforcer_clnt; + +int32_t +quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                    int32_t op_ret, int32_t op_errno, inode_t *inode, +                    struct iatt *buf, dict_t *xdata, struct iatt *postparent); + +int +quota_enforcer_submit_request (void *req, call_frame_t *frame, +                               rpc_clnt_prog_t *prog, +                               int procnum, struct iobref *iobref, +                               xlator_t *this, fop_cbk_fn_t cbkfn, +                               xdrproc_t xdrproc) +{ +        int           ret        = -1; +        int           count      = 0; +        struct iovec  iov        = {0, }; +        struct iobuf *iobuf      = NULL; +        char          new_iobref = 0; +        ssize_t       xdr_size   = 0; +        quota_priv_t *priv       = NULL; + +        GF_ASSERT (this); + +        priv = this->private; + +        if (req) { +                xdr_size = xdr_sizeof (xdrproc, req); +                iobuf = iobuf_get2 (this->ctx->iobuf_pool, xdr_size); +                if (!iobuf) { +                        goto out; +                } + +                if (!iobref) { +                        iobref = iobref_new (); +                        if (!iobref) { +                                goto out; +                        } + +                        new_iobref = 1; +                } + +                iobref_add (iobref, iobuf); + +                iov.iov_base = iobuf->ptr; +                iov.iov_len  = iobuf_size (iobuf); + +                /* Create the xdr payload */ +                ret = xdr_serialize_generic (iov, req, xdrproc); +                if (ret == -1) { +                        goto out; +                } +                iov.iov_len = ret; +                count = 1; +        } + +        /* Send the msg */ +        ret = rpc_clnt_submit (priv->rpc_clnt, prog, procnum, cbkfn, +                               &iov, count, +                               NULL, 0, iobref, frame, NULL, 0, NULL, 0, NULL); +        ret = 0; + +out: +        if (new_iobref) +                iobref_unref (iobref); +        if (iobuf) +                iobuf_unref (iobuf); + +        return ret; +} + +int +quota_enforcer_lookup_cbk (struct rpc_req *req, struct iovec *iov, +                           int count, void *myframe) +{ +        quota_local_t    *local      = NULL; +        call_frame_t     *frame      = NULL; +        int               ret        = 0; +        gfs3_lookup_rsp   rsp        = {0,}; +        struct iatt       stbuf      = {0,}; +        struct iatt       postparent = {0,}; +        int               op_errno   = EINVAL; +        dict_t           *xdata      = NULL; +        inode_t          *inode      = NULL; +        xlator_t         *this       = NULL; + +        this = THIS; + +        frame = myframe; +        local = frame->local; +        inode = local->validate_loc.inode; + +        if (-1 == req->rpc_status) { +                rsp.op_ret   = -1; +                op_errno = ENOTCONN; +                goto out; +        } + +        ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gfs3_lookup_rsp); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "XDR decoding failed"); +                rsp.op_ret   = -1; +                op_errno = EINVAL; +                goto out; +        } + +        op_errno = gf_error_to_errno (rsp.op_errno); +        gf_stat_to_iatt (&rsp.postparent, &postparent); + +        if (rsp.op_ret == -1) +                goto out; + +        rsp.op_ret = -1; +        gf_stat_to_iatt (&rsp.stat, &stbuf); + +        GF_PROTOCOL_DICT_UNSERIALIZE (frame->this, xdata, (rsp.xdata.xdata_val), +                                      (rsp.xdata.xdata_len), rsp.op_ret, +                                      op_errno, out); + +        if ((!uuid_is_null (inode->gfid)) +            && (uuid_compare (stbuf.ia_gfid, inode->gfid) != 0)) { +                gf_log (frame->this->name, GF_LOG_DEBUG, +                        "gfid changed for %s", local->validate_loc.path); +                rsp.op_ret = -1; +                op_errno = ESTALE; +                goto out; +        } + +        rsp.op_ret = 0; + +out: +        rsp.op_errno = op_errno; +        if (rsp.op_ret == -1) { +                /* any error other than ENOENT */ +                if (rsp.op_errno != ENOENT) +                        gf_log (this->name, GF_LOG_WARNING, +                                "remote operation failed: %s. Path: %s (%s)", +                                strerror (rsp.op_errno), +                                local->validate_loc.path, +                                loc_gfid_utoa (&local->validate_loc)); +                else +                        gf_log (this->name, GF_LOG_TRACE, +                                "not found on remote node"); + +        } + +        local->validate_cbk (frame, NULL, this, rsp.op_ret, rsp.op_errno, inode, +                             &stbuf, xdata, &postparent); + +        if (xdata) +                dict_unref (xdata); + +        free (rsp.xdata.xdata_val); + +        return 0; +} + +int +quota_enforcer_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, +                       dict_t *xdata, fop_lookup_cbk_t validate_cbk) +{ +        quota_local_t          *local      = NULL; +        gfs3_lookup_req         req        = {{0,},}; +        int                     ret        = 0; +        int                     op_errno   = ESTALE; +        quota_priv_t           *priv       = NULL; + +        if (!frame || !this || !loc) +                goto unwind; + +        local = frame->local; +        local->validate_cbk = validate_cbk; + +        priv = this->private; + +        if (!(loc && loc->inode)) +                goto unwind; + +        if (!uuid_is_null (loc->inode->gfid)) +                memcpy (req.gfid, loc->inode->gfid, 16); +        else +                memcpy (req.gfid, loc->gfid, 16); + +        if (xdata) { +                GF_PROTOCOL_DICT_SERIALIZE (this, xdata, +                                            (&req.xdata.xdata_val), +                                            req.xdata.xdata_len, +                                            op_errno, unwind); +        } + +        if (loc->name) +                req.bname = (char *)loc->name; +        else +                req.bname = ""; + +        ret = quota_enforcer_submit_request (&req, frame, +                                             priv->quota_enforcer, +                                             GF_AGGREGATOR_LOOKUP, +                                             NULL, this, +                                             quota_enforcer_lookup_cbk, +                                             (xdrproc_t)xdr_gfs3_lookup_req); + +        if (ret) { +                gf_log (this->name, GF_LOG_WARNING, "failed to send the fop"); +        } + +        GF_FREE (req.xdata.xdata_val); + +        return 0; + +unwind: +        validate_cbk (frame, NULL, this, -1, op_errno, NULL, NULL, NULL, NULL); + +        GF_FREE (req.xdata.xdata_val); + +        return 0; +} + +int +quota_enforcer_notify (struct rpc_clnt *rpc, void *mydata, +                       rpc_clnt_event_t event, void *data) +{ +        xlator_t                *this = NULL; +        int                     ret = 0; + +        this = mydata; + +        switch (event) { +        case RPC_CLNT_CONNECT: +        { +                gf_log (this->name, GF_LOG_TRACE, "got RPC_CLNT_CONNECT"); +                break; +        } + +        case RPC_CLNT_DISCONNECT: +        { +                gf_log (this->name, GF_LOG_TRACE, "got RPC_CLNT_DISCONNECT"); +                break; +        } + +        default: +                gf_log (this->name, GF_LOG_TRACE, +                        "got some other RPC event %d", event); +                ret = 0; +                break; +        } + +        return ret; +} + +//Returns a started rpc_clnt. Creates a new rpc_clnt if quota_priv doesn't have +//one already +struct rpc_clnt * +quota_enforcer_init (xlator_t *this, dict_t *options) +{ +        struct rpc_clnt *rpc  = NULL; +        quota_priv_t    *priv = NULL; +        int              ret  = -1; + +        priv = this->private; +        if (priv->rpc_clnt) { +                gf_log (this->name, GF_LOG_TRACE, "quota enforcer clnt already " +                        "inited"); +                //Turns out to be a NOP if the clnt is already connected. +                rpc_clnt_start (priv->rpc_clnt); +                return priv->rpc_clnt; +        } +        priv->quota_enforcer = "a_enforcer_clnt; + +        ret = dict_set_str (options, "transport.address-family", "unix"); +        if (ret) +                goto out; + +        ret = dict_set_str (options, "transport-type", "socket"); +        if (ret) +                goto out; + +        ret = dict_set_str (options, "transport.socket.connect-path", +                            "/tmp/quotad.socket"); +        if (ret) +                goto out; + +        rpc = rpc_clnt_new (options, this->ctx, this->name, 16); +        if (!rpc) { +                ret = -1; +                goto out; +        } + +        ret = rpc_clnt_register_notify (rpc, quota_enforcer_notify, this); +        if (ret) { +                gf_log ("cli", GF_LOG_ERROR, "failed to register notify"); +                goto out; +        } + +        rpc_clnt_start (rpc); +out: +        if (ret) { +                if (rpc) +                        rpc_clnt_unref (rpc); +                rpc = NULL; +        } + +        return rpc; +} + +struct rpc_clnt_procedure quota_enforcer_actors[GF_AGGREGATOR_MAXVALUE] = { +        [GF_AGGREGATOR_NULL]     = {"NULL", NULL}, +        [GF_AGGREGATOR_LOOKUP]   = {"LOOKUP", NULL}, +}; + +struct rpc_clnt_program quota_enforcer_clnt = { +        .progname  = "Quota enforcer", +        .prognum   = GLUSTER_AGGREGATOR_PROGRAM, +        .progver   = GLUSTER_AGGREGATOR_VERSION, +        .numproc   = GF_AGGREGATOR_MAXVALUE, +        .proctable = quota_enforcer_actors, +}; diff --git a/xlators/features/quota/src/quota-mem-types.h b/xlators/features/quota/src/quota-mem-types.h index 3082865da29..97d9165681f 100644 --- a/xlators/features/quota/src/quota-mem-types.h +++ b/xlators/features/quota/src/quota-mem-types.h @@ -21,6 +21,9 @@ enum gf_quota_mem_types_ {          gf_quota_mt_int32_t,          gf_quota_mt_limits_t,          gf_quota_mt_quota_dentry_t, +        gf_quota_mt_quota_limits_level_t, +        gf_quota_mt_qd_vols_conf_t, +        gf_quota_mt_aggregator_state_t,          gf_quota_mt_end  };  #endif diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c index 345d44c5272..a50fb1184a9 100644 --- a/xlators/features/quota/src/quota.c +++ b/xlators/features/quota/src/quota.c @@ -12,44 +12,100 @@  #include "quota.h"  #include "common-utils.h"  #include "defaults.h" +#include "statedump.h"  int32_t  quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,                     char *name, uuid_t par); + +int +quota_fill_inodectx (xlator_t *this, inode_t *inode, dict_t *dict, +                     loc_t *loc, struct iatt *buf, int32_t *op_errno); +  struct volume_options options[]; +static int32_t +__quota_init_inode_ctx (inode_t *inode, xlator_t *this, +                        quota_inode_ctx_t **context) +{ +        int32_t            ret  = -1; +        quota_inode_ctx_t *ctx  = NULL; + +        if (inode == NULL) { +                goto out; +        } + +        QUOTA_ALLOC_OR_GOTO (ctx, quota_inode_ctx_t, out); + +        LOCK_INIT(&ctx->lock); + +        if (context != NULL) { +                *context = ctx; +        } + +        INIT_LIST_HEAD (&ctx->parents); + +        ret = __inode_ctx_put (inode, this, (uint64_t )(long)ctx); +        if (ret == -1) { +                gf_log (this->name, GF_LOG_WARNING, +                        "cannot set quota context in inode (gfid:%s)", +                        uuid_utoa (inode->gfid)); +        } +out: +        return ret; +} + + +static int32_t +quota_inode_ctx_get (inode_t *inode, xlator_t *this, +                     quota_inode_ctx_t **ctx, char create_if_absent) +{ +        int32_t  ret = 0; +        uint64_t ctx_int; + +        LOCK (&inode->lock); +        { +                ret = __inode_ctx_get (inode, this, &ctx_int); + +                if ((ret == 0) && (ctx != NULL)) { +                        *ctx = (quota_inode_ctx_t *) (unsigned long)ctx_int; +                } else if (create_if_absent) { +                        ret = __quota_init_inode_ctx (inode, this, ctx); +                } +        } +        UNLOCK (&inode->lock); + +        return ret; +} +  int  quota_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path)  {          int ret = -1; -        if (!loc) { +        if (!loc || (inode == NULL))                  return ret; -        }          if (inode) {                  loc->inode = inode_ref (inode); +                uuid_copy (loc->gfid, inode->gfid);          }          if (parent) {                  loc->parent = inode_ref (parent);          } -        loc->path = gf_strdup (path); -        if (!loc->path) { -                goto loc_wipe; -        } +        if (path != NULL) { +                loc->path = gf_strdup (path); -        loc->name = strrchr (loc->path, '/'); -        if (loc->name) { -                loc->name++; -        } else { -                goto loc_wipe; +                loc->name = strrchr (loc->path, '/'); +                if (loc->name) { +                        loc->name++; +                }          }          ret = 0; -loc_wipe:          if (ret < 0) {                  loc_wipe (loc);          } @@ -82,7 +138,6 @@ quota_inode_loc_fill (inode_t *inode, loc_t *loc)                  gf_log (this->name, GF_LOG_DEBUG,                          "cannot find parent for inode (gfid:%s)",                          uuid_utoa (inode->gfid)); -                goto err;          }  ignore_parent: @@ -91,7 +146,6 @@ ignore_parent:                  gf_log (this->name, GF_LOG_DEBUG,                          "cannot construct path for inode (gfid:%s)",                          uuid_utoa (inode->gfid)); -                goto err;          }          ret = quota_loc_fill (loc, inode, parent, resolvedpath); @@ -161,7 +215,9 @@ __quota_dentry_new (quota_inode_ctx_t *ctx, char *name, uuid_t par)          uuid_copy (dentry->par, par); -        list_add_tail (&dentry->next, &ctx->parents); +        if (ctx != NULL) +                list_add_tail (&dentry->next, &ctx->parents); +  err:          return dentry;  } @@ -182,19 +238,66 @@ out:          return;  } +inline void +quota_resume_fop_if_validation_done (quota_local_t *local) +{ +        call_stub_t *stub       = NULL; +        int          link_count = -1; + +        if (local == NULL) +                goto out; + +        LOCK (&local->lock); +        { +                link_count = local->link_count; +                if (link_count == 0) { +                        stub = local->stub; +                        local->stub = NULL; +                } +        } +        UNLOCK (&local->lock); + +        if (stub != NULL) { +                call_resume (stub); +        } +out: +        return; +} + +inline void +quota_handle_validate_error (quota_local_t *local, int32_t op_ret, +                             int32_t op_errno) +{ +        if (local == NULL) +                goto out; + +        LOCK (&local->lock); +        { +                if (op_ret < 0) { +                        local->op_ret = op_ret; +                        local->op_errno = op_errno; +                } + +                /* we abort checking limits on this path to root */ +                local->link_count--; +        } +        UNLOCK (&local->lock); + +        quota_resume_fop_if_validation_done (local); +out: +        return; +}  int32_t  quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -                    int32_t op_ret, int32_t op_errno, dict_t *dict, -                    dict_t *xdata) +                    int32_t op_ret, int32_t op_errno, inode_t *inode, +                    struct iatt *buf, dict_t *xdata, struct iatt *postparent)  { -        quota_local_t     *local          = NULL; -        uint32_t           validate_count = 0, link_count = 0; -        int32_t            ret            = 0; -        quota_inode_ctx_t *ctx            = NULL; -        int64_t           *size           = 0; -        uint64_t           value          = 0; -        call_stub_t       *stub           = NULL; +        quota_local_t     *local      = NULL; +        int32_t            ret        = 0; +        quota_inode_ctx_t *ctx        = NULL; +        int64_t           *size       = 0; +        uint64_t           value      = 0;          local = frame->local; @@ -206,7 +309,7 @@ quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          GF_ASSERT (frame);          GF_VALIDATE_OR_GOTO_WITH_ERROR ("quota", this, unwind, op_errno,                                          EINVAL); -        GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, dict, unwind, op_errno, +        GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, xdata, unwind, op_errno,                                          EINVAL);          ret = inode_ctx_get (local->validate_loc.inode, this, &value); @@ -220,7 +323,7 @@ quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto unwind;          } -        ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size); +        ret = dict_get_bin (xdata, QUOTA_SIZE_KEY, (void **) &size);          if (ret < 0) {                  gf_log (this->name, GF_LOG_WARNING,                          "size key not present in dict"); @@ -243,25 +346,7 @@ quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          return 0;  unwind: -        LOCK (&local->lock); -        { -                local->op_ret = -1; -                local->op_errno = op_errno; - -                validate_count = --local->validate_count; -                link_count = local->link_count; - -                if ((validate_count == 0) && (link_count == 0)) { -                        stub = local->stub; -                        local->stub = NULL; -                } -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } - +        quota_handle_validate_error (local, op_ret, op_errno);          return 0;  } @@ -288,34 +373,318 @@ quota_timeout (struct timeval *tv, int32_t timeout)          return timed_out;  } +int32_t +quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                          int32_t op_ret, int32_t op_errno, +                          gf_dirent_t *entries, dict_t *xdata) +{ +        inode_t           *parent             = NULL; +        gf_dirent_t       *entry              = NULL; +        loc_t              loc                = {0, }; +        quota_dentry_t    *dentry             = NULL, *tmp = NULL; +        quota_inode_ctx_t *ctx                = NULL; +        struct list_head   parents            = {0, }; +        quota_local_t     *local              = NULL; +        call_frame_t      *continuation_frame = NULL; + +        INIT_LIST_HEAD (&parents); + +        continuation_frame = frame->local; +        frame->local = NULL; + +        local = continuation_frame->local; + +        if (op_ret < 0) +                goto err; + +        parent = inode_parent (local->validate_loc.inode, 0, NULL); +        if (parent == NULL) { +                gf_log (this->name, GF_LOG_WARNING, "parent is NULL"); +                op_errno = EINVAL; +                goto err; +        } + +        if ((op_ret > 0) && (entries != NULL)) { +                list_for_each_entry (entry, &entries->list, list) { +                        if (__is_root_gfid (entry->inode->gfid)) { +                                /* The list contains a sub-list for each +                                 * possible path to the target inode. Each +                                 * sub-list starts with the root entry of the +                                 * tree and is followed by the child entries +                                 * for a particular path to the target entry. +                                 * The root entry is an implied sub-list +                                 * delimiter, as it denotes we have started +                                 * processing a new path. Reset the parent +                                 * pointer and continue +                                 */ + +                                parent = NULL; +                        } + +                        uuid_copy (loc.gfid, entry->d_stat.ia_gfid); + +                        loc.inode = inode_ref (entry->inode); +                        loc.parent = inode_ref (parent); +                        loc.name = entry->d_name; + +                        quota_fill_inodectx (this, entry->inode, entry->dict, +                                             &loc, &entry->d_stat, &op_errno); + +                        parent = entry->inode; + +                        loc_wipe (&loc); +                } +        } + +        quota_inode_ctx_get (local->validate_loc.inode, this, &ctx, 0); + +        local->link_count = 0; + +        if (ctx != NULL) { +                LOCK (&ctx->lock); +                { +                        list_for_each_entry (dentry, &ctx->parents, next) { +                                tmp = __quota_dentry_new (NULL, dentry->name, +                                                          dentry->par); +                                list_add_tail (&tmp->next, &parents); +                                local->link_count++; +                        } +                } +                UNLOCK (&ctx->lock); +        } + +        if (local->link_count != 0) { +                list_for_each_entry_safe (dentry, tmp, &parents, next) { +                        quota_check_limit (continuation_frame, +                                           local->validate_loc.inode, +                                           this, dentry->name, dentry->par); +                        __quota_dentry_free (dentry); +                } +        } else { +                local->link_count = 1; +                quota_check_limit (continuation_frame, parent, this, NULL, +                                   NULL); +        } + +        STACK_DESTROY (frame->root); +        return 0; + +err: +        STACK_DESTROY (frame->root); + +        quota_handle_validate_error (local, -1, op_errno); +        return 0; +}  int32_t -quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, -                   char *name, uuid_t par) +quota_build_ancestry_open_cbk (call_frame_t *frame, void *cookie, +                               xlator_t *this, int32_t op_ret, int32_t op_errno, +                               fd_t *fd, dict_t *xdata)  { -        int32_t               ret            = -1; -        inode_t              *_inode         = NULL, *parent = NULL; -        quota_inode_ctx_t    *ctx            = NULL; -        quota_priv_t         *priv           = NULL; -        quota_local_t        *local          = NULL; -        char                  need_validate  = 0, need_unwind = 0; -        int64_t               delta          = 0; -        call_stub_t          *stub           = NULL; -        int32_t               validate_count = 0, link_count = 0; -        uint64_t              value          = 0; -        char                  just_validated = 0; -        uuid_t                trav_uuid      = {0,}; +        int            ret                = -1; +        dict_t        *xdata_req          = NULL; +        quota_local_t *local              = NULL; +        call_frame_t  *continuation_frame = NULL; + +        xdata_req = dict_new (); +        if (xdata_req == NULL) { +                ret = -ENOMEM; +                goto err; +        } -        GF_VALIDATE_OR_GOTO ("quota", this, out); -        GF_VALIDATE_OR_GOTO (this->name, frame, out); -        GF_VALIDATE_OR_GOTO (this->name, inode, out); +        ret = dict_set_int8 (xdata_req, QUOTA_LIMIT_KEY, 1); +        if (ret < 0) +                goto err; + +        ret = dict_set_int8 (xdata_req, GET_ANCESTRY_DENTRY_KEY, 1); +        if (ret < 0) +                goto err; + +        /* This would ask posix layer to construct dentry chain till root */ +        STACK_WIND (frame, quota_build_ancestry_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->readdirp, fd, 0, 0, xdata_req); + +        ret = 0; + +err: +        fd_unref (fd); + +        dict_unref (xdata_req); + +        if (ret < 0) { +                continuation_frame = frame->local; +                frame->local = NULL; + +                STACK_DESTROY (frame->root); + +                local = continuation_frame->local; +                quota_handle_validate_error (local, -1, op_errno); +        } + +        return ret; +} + +int +quota_build_ancestry (call_frame_t *frame, inode_t *inode, xlator_t *this) +{ +        loc_t          loc       = {0, }; +        fd_t          *fd        = NULL; +        quota_local_t *local     = NULL; +        call_frame_t  *new_frame = NULL; +        int            ret       = -1; + +        loc.inode = inode_ref (inode); +        uuid_copy (loc.gfid, inode->gfid); + +        gf_log (this->name, GF_LOG_WARNING, "building ancestry");          local = frame->local; -        GF_VALIDATE_OR_GOTO (this->name, local, out); + +        LOCK (&local->lock); +        { +                loc_wipe (&local->validate_loc); + +                ret = quota_inode_loc_fill (inode, &local->validate_loc); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "cannot fill loc for inode (gfid:%s), hence " +                                "aborting quota-checks and continuing with fop", +                                uuid_utoa (inode->gfid)); +                } +        } +        UNLOCK (&local->lock); + +        fd = fd_create (inode, 0); + +        new_frame = copy_frame (frame); +        new_frame->root->uid = new_frame->root->gid = 0; + +        new_frame->local = frame; + +        if (IA_ISDIR (inode->ia_type)) { +                STACK_WIND (new_frame, quota_build_ancestry_open_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->opendir, &loc, fd, +                            NULL); +        } else { +                STACK_WIND (new_frame, quota_build_ancestry_open_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->open, &loc, 0, fd, +                            NULL); +        } + +        loc_wipe (&loc); + +        return 0; +} + +int +quota_validate (call_frame_t *frame, inode_t *inode, xlator_t *this, +                fop_lookup_cbk_t cbk_fn) +{ +        quota_local_t     *local = NULL; +        int                ret   = 0; +        dict_t            *xdata = NULL; +        quota_priv_t      *priv  = NULL; + +        local = frame->local; +        priv = this->private; + +        LOCK (&local->lock); +        { +                loc_wipe (&local->validate_loc); + +                ret = quota_inode_loc_fill (inode, &local->validate_loc); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "cannot fill loc for inode (gfid:%s), hence " +                                "aborting quota-checks and continuing with fop", +                                uuid_utoa (inode->gfid)); +                } +        } +        UNLOCK (&local->lock); + +        if (ret < 0) { +                ret = -ENOMEM; +                goto err; +        } + +        xdata = dict_new (); +        if (xdata == NULL) { +                ret = -ENOMEM; +                goto err; +        } + +        ret = dict_set_int8 (xdata, QUOTA_SIZE_KEY, 1); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_WARNING, "dict set failed"); +                ret = -ENOMEM; +                goto err; +        } + +        ret = dict_set_str (xdata, "volume-uuid", priv->volume_uuid); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_WARNING, "dict set failed"); +                ret = -ENOMEM; +                goto err; +        } + +        ret = quota_enforcer_lookup (frame, this, &local->validate_loc, xdata, +                                     cbk_fn); +        if (ret < 0) { +                ret = -ENOTCONN; +                goto err; +        } + +        ret = 0; +err: +        return ret; +} + +int32_t +quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, +                   char *name, uuid_t par) +{ +        int32_t            ret                 = -1, op_errno = EINVAL; +        inode_t           *_inode              = NULL, *parent = NULL; +        quota_inode_ctx_t *ctx                 = NULL; +        quota_priv_t      *priv                = NULL; +        quota_local_t     *local               = NULL; +        char               need_validate       = 0; +        gf_boolean_t       hard_limit_exceeded = 0; +        int64_t            delta               = 0, wouldbe_size = 0; +        uint64_t           value               = 0; +        char               just_validated      = 0; +        uuid_t             trav_uuid           = {0,}; +        uint32_t           timeout             = 0; + +        GF_VALIDATE_OR_GOTO ("quota", this, err); +        GF_VALIDATE_OR_GOTO (this->name, frame, err); +        GF_VALIDATE_OR_GOTO (this->name, inode, err); + +        local  = frame->local; +        GF_VALIDATE_OR_GOTO (this->name, local, err);          delta = local->delta; -        GF_VALIDATE_OR_GOTO (this->name, local->stub, out); +        GF_VALIDATE_OR_GOTO (this->name, local->stub, err); +        /* Allow all the trusted clients +         * Don't block the gluster internal processes like rebalance, gsyncd, +         * self heal etc from the disk quotas. +         * +         * Method: Allow all the clients with PID negative. This is by the +         * assumption that any kernel assigned pid doesn't have the negative +         * number. +         */ +        if (0 > frame->root->pid) { +                ret = 0; +		LOCK (&local->lock); +		{ +			--local->link_count; +		} +		UNLOCK (&local->lock); +                goto resume; +        }          priv = this->private; @@ -328,10 +697,6 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,          {                  just_validated = local->just_validated;                  local->just_validated = 0; - -                if (just_validated) { -                        local->validate_count--; -                }          }          UNLOCK (&local->lock); @@ -340,34 +705,56 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,          }          do { -                if (ctx != NULL) { +                if (ctx != NULL && (ctx->hard_lim > 0 || ctx->soft_lim > 0)) { +                        wouldbe_size = ctx->size + delta; +                          LOCK (&ctx->lock);                          { -                                if (ctx->limit >= 0) { -                                        if (!just_validated -                                            && quota_timeout (&ctx->tv, -                                                              priv->timeout)) { -                                                need_validate = 1; -                                        } else if ((ctx->size + delta) -                                                   >= ctx->limit) { -                                                local->op_ret = -1; -                                                local->op_errno = EDQUOT; -                                                need_unwind = 1; -                                        } +                                timeout = priv->soft_timeout; + +                                if ((ctx->soft_lim >= 0) +                                    && (wouldbe_size > ctx->soft_lim)) { +                                        timeout = priv->hard_timeout; +                                } + +                                if (!just_validated +                                    && quota_timeout (&ctx->tv, timeout)) { +                                        need_validate = 1; +                                } else if (wouldbe_size >= ctx->hard_lim) { +                                        hard_limit_exceeded = 1;                                  }                          }                          UNLOCK (&ctx->lock); +                        /* We log usage only if quota limit is configured on +                           that inode. */ +                        quota_log_usage (this, ctx, _inode, delta); +                          if (need_validate) { -                                goto validate; -                        } +                                ret = quota_validate (frame, _inode, this, +                                                      quota_validate_cbk); +                                if (ret < 0) { +                                        op_errno = -ret; +                                        goto err; +                                } -                        if (need_unwind) {                                  break;                          } + +                        if (hard_limit_exceeded) { +                                op_errno = EDQUOT; +                                goto err; +                        } +                  }                  if (__is_root_gfid (_inode->gfid)) { +                        LOCK (&local->lock); +                        { +                                --local->link_count; +                        } +                        UNLOCK (&local->lock); +                          break;                  } @@ -379,10 +766,13 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,                  }                  if (parent == NULL) { -                        gf_log (this->name, GF_LOG_DEBUG, -                                "cannot find parent for inode (gfid:%s), hence " -                                "aborting enforcing quota-limits and continuing" -                                " with the fop", uuid_utoa (_inode->gfid)); +                        ret = quota_build_ancestry (frame, _inode, this); +                        if (ret < 0) { +                                op_errno = -ret; +                                goto err; +                        } + +                        break;                  }                  inode_unref (_inode); @@ -398,240 +788,94 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,                  ctx = (quota_inode_ctx_t *)(unsigned long)value;          } while (1); -        ret = 0; -          if (_inode != NULL) {                  inode_unref (_inode); +                _inode = NULL;          } -        LOCK (&local->lock); -        { -                validate_count = local->validate_count; -                link_count = local->link_count; -                if ((validate_count == 0) && (link_count == 0)) { -                        stub = local->stub; -                        local->stub = NULL; -                } -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } - -out: -        return ret; - -validate: -        LOCK (&local->lock); -        { -                loc_wipe (&local->validate_loc); - -                if (just_validated) { -                        local->validate_count--; -                } - -                local->validate_count++; -                ret = quota_inode_loc_fill (_inode, &local->validate_loc); -                if (ret < 0) { -                        gf_log (this->name, GF_LOG_WARNING, -                                "cannot fill loc for inode (gfid:%s), hence " -                                "aborting quota-checks and continuing with fop", -                                uuid_utoa (_inode->gfid)); -                        local->validate_count--; -                } -        } -        UNLOCK (&local->lock); - -        if (ret < 0) { -                goto loc_fill_failed; -        } +resume: +        quota_resume_fop_if_validation_done (local); +        return 0; -        STACK_WIND (frame, quota_validate_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->getxattr, &local->validate_loc, -                    QUOTA_SIZE_KEY, NULL); +err: +        quota_handle_validate_error (local, -1, op_errno); -loc_fill_failed:          inode_unref (_inode);          return 0;  } - -int32_t -quota_get_limit_value (inode_t *inode, xlator_t *this, int64_t *n) +inline int +quota_get_limits (xlator_t *this, dict_t *dict, int64_t *hard_lim, +                  int64_t *soft_lim)  { -        int32_t       ret        = 0; -        char         *path       = NULL; -        limits_t     *limit_node = NULL; -        quota_priv_t *priv       = NULL; - -        if (inode == NULL || n == NULL) { -                ret = -1; -                goto out; -        } - -        *n = 0; +        quota_limit_t *limit            = NULL; +        quota_priv_t  *priv             = NULL; +        int64_t        soft_lim_percent = 0, *ptr = NULL; +        int            ret              = 0; -        ret = inode_path (inode, NULL, &path); -        if (ret < 0) { -                ret = -1; +        if ((this == NULL) || (dict == NULL) || (hard_lim == NULL) +            || (soft_lim == NULL))                  goto out; -        }          priv = this->private; -        list_for_each_entry (limit_node, &priv->limit_head, limit_list) { -                if (strcmp (limit_node->path, path) == 0) { -                        *n = limit_node->value; -                        break; -                } -        } +        ret = dict_get_bin (dict, QUOTA_LIMIT_KEY, (void **) &ptr); +        limit = (quota_limit_t *)ptr; -out: -        GF_FREE (path); - -        return ret; -} - - -static int32_t -__quota_init_inode_ctx (inode_t *inode, int64_t limit, xlator_t *this, -                        dict_t *dict, struct iatt *buf, -                        quota_inode_ctx_t **context) -{ -        int32_t            ret  = -1; -        int64_t           *size = 0; -        quota_inode_ctx_t *ctx  = NULL; - -        if (inode == NULL) { -                goto out; +        if (limit) { +                *hard_lim = ntoh64 (limit->hard_lim); +                soft_lim_percent = ntoh64 (limit->soft_lim_percent);          } -        QUOTA_ALLOC_OR_GOTO (ctx, quota_inode_ctx_t, out); - -        ctx->limit = limit; -        if (buf) -                ctx->buf = *buf; - -        LOCK_INIT(&ctx->lock); - -        if (context != NULL) { -                *context = ctx; +        if (soft_lim_percent < 0) { +                soft_lim_percent = priv->default_soft_lim;          } -        INIT_LIST_HEAD (&ctx->parents); - -        if (dict != NULL) { -                ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size); -                if (ret == 0) { -                        ctx->size = ntoh64 (*size); -                        gettimeofday (&ctx->tv, NULL); -                } +        if ((*hard_lim > 0) && (soft_lim_percent > 0)) { +                *soft_lim = (soft_lim_percent * (*hard_lim))/100;          } -        ret = __inode_ctx_put (inode, this, (uint64_t )(long)ctx); -        if (ret == -1) { -                gf_log (this->name, GF_LOG_WARNING, -                        "cannot set quota context in inode (gfid:%s)", -                        uuid_utoa (inode->gfid)); -        }  out: -        return ret; -} - - -static int32_t -quota_inode_ctx_get (inode_t *inode, int64_t limit, xlator_t *this, -                     dict_t *dict, struct iatt *buf, quota_inode_ctx_t **ctx, -                     char create_if_absent) -{ -        int32_t  ret = 0; -        uint64_t ctx_int; - -        LOCK (&inode->lock); -        { -                ret = __inode_ctx_get (inode, this, &ctx_int); - -                if ((ret == 0) && (ctx != NULL)) { -                        *ctx = (quota_inode_ctx_t *) (unsigned long)ctx_int; -                } else if (create_if_absent) { -                        ret = __quota_init_inode_ctx (inode, limit, this, dict, -                                                      buf, ctx); -                } -        } -        UNLOCK (&inode->lock); - -        return ret; +        return 0;  } - -int32_t -quota_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -                  int32_t op_ret, int32_t op_errno, inode_t *inode, -                  struct iatt *buf, dict_t *dict, struct iatt *postparent) +int +quota_fill_inodectx (xlator_t *this, inode_t *inode, dict_t *dict, +                     loc_t *loc, struct iatt *buf, int32_t *op_errno)  { -        int32_t            ret        = -1; -        char               found      = 0; -        quota_local_t     *local      = NULL; -        quota_inode_ctx_t *ctx        = NULL; -        quota_dentry_t    *dentry     = NULL; -        int64_t           *size       = 0; -        uint64_t           value      = 0; -        limits_t          *limit_node = NULL; -        quota_priv_t      *priv       = NULL; +        int32_t            ret      = -1; +        char               found    = 0; +        quota_inode_ctx_t *ctx      = NULL; +        quota_dentry_t    *dentry   = NULL; +        uint64_t           value    = 0; +        int64_t            hard_lim = -1, soft_lim = -1; -        local = frame->local; - -        priv = this->private; +        quota_get_limits (this, dict, &hard_lim, &soft_lim);          inode_ctx_get (inode, this, &value);          ctx = (quota_inode_ctx_t *)(unsigned long)value; -        if ((op_ret < 0) || (local == NULL) -            || (((ctx == NULL) || (ctx->limit == local->limit)) -                && (local->limit < 0) && !((IA_ISREG (buf->ia_type)) -                                           || (IA_ISLNK (buf->ia_type))))) { -                goto unwind; -        } - -        LOCK (&priv->lock); -        { -                list_for_each_entry (limit_node, &priv->limit_head, -                                     limit_list) { -                        if (strcmp (local->loc.path, limit_node->path) == 0) { -                                uuid_copy (limit_node->gfid, buf->ia_gfid); -                                break; -                        } -                } +        if ((((ctx == NULL) || (ctx->hard_lim == hard_lim)) +             && (hard_lim < 0) && !((IA_ISREG (buf->ia_type)) +                                    || (IA_ISLNK (buf->ia_type))))) { +                ret = 0; +                goto out;          } -        UNLOCK (&priv->lock); -        ret = quota_inode_ctx_get (local->loc.inode, local->limit, this, dict, -                                   buf, &ctx, 1); +        ret = quota_inode_ctx_get (inode, this, &ctx, 1);          if ((ret == -1) || (ctx == NULL)) {                  gf_log (this->name, GF_LOG_WARNING, "cannot create quota "                          "context in inode(gfid:%s)", -                        uuid_utoa (local->loc.inode->gfid)); -                op_ret = -1; -                op_errno = ENOMEM; -                goto unwind; +                        uuid_utoa (inode->gfid)); +                ret = -1; +                *op_errno = ENOMEM; +                goto out;          }          LOCK (&ctx->lock);          { - -                if (dict != NULL) { -                        ret = dict_get_bin (dict, QUOTA_SIZE_KEY, -                                            (void **) &size); -                        if (ret == 0) { -                                ctx->size = ntoh64 (*size); -                                gettimeofday (&ctx->tv, NULL); -                        } -                } - -                if (local->limit != ctx->limit) { -                        ctx->limit = local->limit; -                } +                ctx->hard_lim = hard_lim; +                ctx->soft_lim = soft_lim;                  ctx->buf = *buf; @@ -639,12 +883,12 @@ quota_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                          goto unlock;                  } -                if (local->loc.name == NULL) +                if (loc->name == NULL)                          goto unlock;                  list_for_each_entry (dentry, &ctx->parents, next) { -                        if ((strcmp (dentry->name, local->loc.name) == 0) && -                            (uuid_compare (local->loc.parent->gfid, +                        if ((strcmp (dentry->name, loc->name) == 0) && +                            (uuid_compare (loc->parent->gfid,                                             dentry->par) == 0)) {                                  found = 1;                                  break; @@ -653,18 +897,18 @@ quota_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  if (!found) {                          dentry = __quota_dentry_new (ctx, -                                                     (char *)local->loc.name, -                                                     local->loc.parent->gfid); +                                                     (char *)loc->name, +                                                     loc->parent->gfid);                          if (dentry == NULL) {                                  /* -                                gf_log (this->name, GF_LOG_WARNING, +                                  gf_log (this->name, GF_LOG_WARNING,                                          "cannot create a new dentry (par:%"                                          PRId64", name:%s) for inode(ino:%"                                          PRId64", gfid:%s)",                                          uuid_utoa (local->loc.inode->gfid));                                  */ -                                op_ret = -1; -                                op_errno = ENOMEM; +                                ret = -1; +                                *op_errno = ENOMEM;                                  goto unlock;                          }                  } @@ -672,6 +916,25 @@ quota_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  unlock:          UNLOCK (&ctx->lock); +out: +        return ret; +} + +int32_t +quota_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                  int32_t op_ret, int32_t op_errno, inode_t *inode, +                  struct iatt *buf, dict_t *dict, struct iatt *postparent) +{ +        quota_local_t *local = NULL; + +        if (op_ret < 0) +                goto unwind; + +        local = frame->local; + +        op_ret = quota_fill_inodectx (this, inode, dict, &local->loc, buf, +                                      &op_errno); +  unwind:          QUOTA_STACK_UNWIND (lookup, frame, op_ret, op_errno, inode, buf,                              dict, postparent); @@ -683,65 +946,53 @@ int32_t  quota_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc,                dict_t *xattr_req)  { -        int32_t             ret         = -1; -        int64_t             limit       = -1; -        limits_t           *limit_node  = NULL; -        gf_boolean_t        dict_newed  = _gf_false; -        quota_priv_t       *priv        = NULL; -        quota_local_t      *local       = NULL; +        quota_priv_t  *priv             = NULL; +        int32_t        ret              = -1; +        quota_local_t *local            = NULL;          priv = this->private; -        list_for_each_entry (limit_node, &priv->limit_head, limit_list) { -                if (strcmp (limit_node->path, loc->path) == 0) { -                        limit = limit_node->value; -                } -        } +        xattr_req = xattr_req ? dict_ref(xattr_req) : dict_new(); +        if (!xattr_req) +                goto err; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          local = quota_local_new ();          if (local == NULL) {                  goto err;          } -        ret = loc_copy (&local->loc, loc); -        if (ret == -1) { -                goto err; -        } -          frame->local = local; +        loc_copy (&local->loc, loc); -        local->limit = limit; - -        if (limit < 0) { -                goto wind; -        } - -        if (xattr_req == NULL) { -                xattr_req  = dict_new (); -                dict_newed = _gf_true; -        } - -        ret = dict_set_uint64 (xattr_req, QUOTA_SIZE_KEY, 0); +        ret = dict_set_int8 (xattr_req, QUOTA_LIMIT_KEY, 1);          if (ret < 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "dict set of key for hard-limit failed");                  goto err;          }  wind: -        STACK_WIND (frame, quota_lookup_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->lookup, loc, xattr_req); +        /* TODO: check with vshastry@redhat.com to cleanup the ugliness of +         * checking priv->is_quota_on here by using STACK_WIND_TAIL macro +         */ +        STACK_WIND (frame, +                    priv->is_quota_on ? quota_lookup_cbk : default_lookup_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup, loc, +                    xattr_req);          ret = 0;  err: +        if (xattr_req) +                dict_unref (xattr_req); +          if (ret < 0) {                  QUOTA_STACK_UNWIND (lookup, frame, -1, ENOMEM,                                      NULL, NULL, NULL, NULL);          } -        if (dict_newed == _gf_true) { -                dict_unref (xattr_req); -        } -          return 0;  } @@ -769,10 +1020,13 @@ quota_update_size (xlator_t *this, inode_t *inode, char *name, uuid_t par,          }          do { -                if ((ctx != NULL) && (ctx->limit >= 0)) { +                if ((ctx != NULL) && (ctx->hard_lim >= 0)) { +                        quota_log_usage (this, ctx, _inode, delta);                          LOCK (&ctx->lock);                          {                                  ctx->size += delta; +                                if (ctx->size < 0) +                                        ctx->size = 0;                          }                          UNLOCK (&ctx->lock);                  } @@ -783,6 +1037,7 @@ quota_update_size (xlator_t *this, inode_t *inode, char *name, uuid_t par,                  parent = inode_parent (_inode, trav_uuid, name);                  if (parent == NULL) { +                        /* TODO: build ancestry and continue updating size */                          gf_log (this->name, GF_LOG_DEBUG,                                  "cannot find parent for inode (gfid:%s), hence "                                  "aborting size updation of parents", @@ -801,6 +1056,8 @@ quota_update_size (xlator_t *this, inode_t *inode, char *name, uuid_t par,                          break;                  } +                value = 0; +                ctx = NULL;                  inode_ctx_get (_inode, this, &value);                  ctx = (quota_inode_ctx_t *)(unsigned long)value;          } while (1); @@ -819,8 +1076,9 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          uint64_t                 ctx_int        = 0;          quota_inode_ctx_t       *ctx            = NULL;          quota_local_t           *local          = NULL; -        quota_dentry_t          *dentry         = NULL; +        quota_dentry_t          *dentry         = NULL, *tmp = NULL;          int64_t                  delta          = 0; +        struct list_head         head           = {0, };          local = frame->local; @@ -828,6 +1086,8 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } +        INIT_LIST_HEAD (&head); +          ret = inode_ctx_get (local->loc.inode, this, &ctx_int);          if (ret) {                  gf_log (this->name, GF_LOG_WARNING, @@ -847,13 +1107,23 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          LOCK (&ctx->lock);          {                  ctx->buf = *postbuf; + +                list_for_each_entry (dentry, &ctx->parents, next) { +                        tmp = __quota_dentry_new (NULL, dentry->name, +                                                  dentry->par); +                        list_add_tail (&tmp->next, &head); +                } +          }          UNLOCK (&ctx->lock); -        list_for_each_entry (dentry, &ctx->parents, next) { -                delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512; -                quota_update_size (this, local->loc.inode, -                                   dentry->name, dentry->par, delta); +        if (postbuf->ia_blocks != prebuf->ia_blocks) +                delta = local->delta; + +        list_for_each_entry_safe (dentry, tmp, &head, next) { +                quota_update_size (this, local->loc.inode, dentry->name, +                                   dentry->par, delta); +                __quota_dentry_free (dentry);          }  out: @@ -871,6 +1141,9 @@ quota_writev_helper (call_frame_t *frame, xlator_t *this, fd_t *fd,  {          quota_local_t *local    = NULL;          int32_t        op_errno = EINVAL; +        quota_priv_t  *priv     = NULL; + +        priv = this->private;          local = frame->local;          if (local == NULL) { @@ -883,9 +1156,10 @@ quota_writev_helper (call_frame_t *frame, xlator_t *this, fd_t *fd,                  goto unwind;          } -        STACK_WIND (frame, quota_writev_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->writev, fd, vector, count, off, -                    flags, iobref, xdata); +        STACK_WIND (frame, +                    priv->is_quota_on? quota_writev_cbk: default_writev_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, fd, +                    vector, count, off, flags, iobref, xdata);          return 0;  unwind: @@ -899,14 +1173,21 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,                struct iovec *vector, int32_t count, off_t off,                uint32_t flags, struct iobref *iobref, dict_t *xdata)  { +        quota_priv_t      *priv    = NULL;          int32_t            ret     = -1, op_errno = EINVAL;          int32_t            parents = 0;          uint64_t           size    = 0;          quota_local_t     *local   = NULL;          quota_inode_ctx_t *ctx     = NULL; -        quota_priv_t      *priv    = NULL; +        quota_dentry_t    *dentry  = NULL, *tmp = NULL;          call_stub_t       *stub    = NULL; -        quota_dentry_t    *dentry  = NULL; +        struct list_head   head    = {0, }; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + +        INIT_LIST_HEAD (&head);          GF_ASSERT (frame);          GF_VALIDATE_OR_GOTO ("quota", this, unwind); @@ -920,12 +1201,13 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,          frame->local = local;          local->loc.inode = inode_ref (fd->inode); -        ret = quota_inode_ctx_get (fd->inode, -1, this, NULL, NULL, &ctx, 0); +        ret = quota_inode_ctx_get (fd->inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (fd->inode->gfid)); -                goto unwind;          }          stub = fop_writev_stub (frame, quota_writev_helper, fd, vector, count, @@ -939,47 +1221,46 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,          GF_VALIDATE_OR_GOTO (this->name, priv, unwind);          size = iov_length (vector, count); -        LOCK (&ctx->lock); -        { -                list_for_each_entry (dentry, &ctx->parents, next) { -                        parents++; +        if (ctx != NULL) { +                LOCK (&ctx->lock); +                { +                        list_for_each_entry (dentry, &ctx->parents, next) { +                                tmp = __quota_dentry_new (NULL, dentry->name, +                                                          dentry->par); +                                list_add_tail (&tmp->next, &head); +                                parents++; +                        }                  } +                UNLOCK (&ctx->lock);          } -        UNLOCK (&ctx->lock);          local->delta = size; -        local->stub = stub; -        local->link_count = parents; -        list_for_each_entry (dentry, &ctx->parents, next) { -                ret = quota_check_limit (frame, fd->inode, this, dentry->name, -                                         dentry->par); -                if (ret == -1) { -                        break; -                } -        } - -        stub = NULL; +        local->link_count = parents; +        local->stub = stub; -        LOCK (&local->lock); -        { -                local->link_count = 0; -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; +        if (parents == 0) { +                local->link_count = 1; +                quota_check_limit (frame, fd->inode, this, NULL, NULL); +        } else { +                list_for_each_entry_safe (dentry, tmp, &head, next) { +                        quota_check_limit (frame, fd->inode, this, dentry->name, +                                           dentry->par); +                        __quota_dentry_free (dentry);                  }          } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        }          return 0;  unwind:          QUOTA_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL);          return 0; + +wind: +        STACK_WIND (frame, default_writev_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->writev, fd, +                    vector, count, off, flags, iobref, xdata); +        return 0;  } @@ -1014,8 +1295,11 @@ quota_mkdir_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,                  goto unwind;          } -        STACK_WIND (frame, quota_mkdir_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata); +        STACK_WIND (frame, +                    quota_mkdir_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir, loc, +                    mode, umask, xdata); +          return 0;  unwind: @@ -1029,9 +1313,14 @@ int32_t  quota_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,               mode_t umask, dict_t *xdata)  { -        int32_t        ret            = 0, op_errno = 0; -        quota_local_t *local          = NULL; -        call_stub_t   *stub           = NULL; +        quota_priv_t  *priv  = NULL; +        int32_t        ret   = 0, op_errno = 0; +        quota_local_t *local = NULL; +        call_stub_t   *stub  = NULL; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          local = quota_local_new ();          if (local == NULL) { @@ -1041,8 +1330,6 @@ quota_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,          frame->local = local; -        local->link_count = 1; -          ret = loc_copy (&local->loc, loc);          if (ret) {                  op_errno = ENOMEM; @@ -1059,32 +1346,24 @@ quota_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,          local->stub = stub;          local->delta = 0; +        local->link_count = 1;          quota_check_limit (frame, loc->parent, this, NULL, NULL); - -        stub = NULL; - -        LOCK (&local->lock); -        { -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; -                } - -                local->link_count = 0; -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } -          return 0; +  err:          QUOTA_STACK_UNWIND (mkdir, frame, -1, op_errno, NULL, NULL, NULL,                              NULL, NULL);          return 0; + +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_mkdir_cbk: default_mkdir_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir, loc, +                    mode, umask, xdata); + +        return 0;  } @@ -1104,7 +1383,7 @@ quota_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto unwind;          } -        ret = quota_inode_ctx_get (inode, -1, this, NULL, buf, &ctx, 1); +        ret = quota_inode_ctx_get (inode, this, &ctx, 1);          if ((ret == -1) || (ctx == NULL)) {                  gf_log (this->name, GF_LOG_WARNING, "cannot create quota "                          "context in inode(gfid:%s)", @@ -1147,8 +1426,12 @@ quota_create_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,  {          quota_local_t *local    = NULL;          int32_t        op_errno = EINVAL; +        quota_priv_t  *priv     = NULL;          local = frame->local; + +        priv = this->private; +          if (local == NULL) {                  gf_log (this->name, GF_LOG_WARNING, "local is NULL");                  goto unwind; @@ -1159,9 +1442,11 @@ quota_create_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,                  goto unwind;          } -        STACK_WIND (frame, quota_create_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, -                    fd, xdata); + +        STACK_WIND (frame, +                    priv->is_quota_on? quota_create_cbk: default_create_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->create, loc, +                    flags, mode, umask, fd, xdata);          return 0;  unwind: @@ -1175,12 +1460,19 @@ int32_t  quota_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,                mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata)  { -        int32_t            ret            = -1; -        quota_local_t     *local          = NULL; -        call_stub_t       *stub           = NULL; +        quota_priv_t  *priv     = NULL; +        int32_t        ret      = -1; +        quota_local_t *local    = NULL; +        int32_t        op_errno = 0; +        call_stub_t   *stub     = NULL; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          local = quota_local_new ();          if (local == NULL) { +                op_errno = ENOMEM;                  goto err;          } @@ -1189,6 +1481,7 @@ quota_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,          ret = loc_copy (&local->loc, loc);          if (ret) {                  gf_log (this->name, GF_LOG_WARNING, "loc_copy failed"); +                op_errno = ENOMEM;                  goto err;          } @@ -1203,29 +1496,19 @@ quota_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,          local->delta = 0;          quota_check_limit (frame, loc->parent, this, NULL, NULL); - -        stub = NULL; - -        LOCK (&local->lock); -        { -                local->link_count = 0; -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; -                } -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } -          return 0;  err: -        QUOTA_STACK_UNWIND (create, frame, -1, ENOMEM, NULL, NULL, NULL, NULL, -                            NULL, NULL); +        QUOTA_STACK_UNWIND (create, frame, -1, op_errno, NULL, NULL, NULL, +                            NULL, NULL, NULL);          return 0; + +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_create_cbk: default_create_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->create, loc, +                    flags, mode, umask, fd, xdata); +        return 0;  } @@ -1237,6 +1520,8 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          quota_local_t     *local = NULL;          quota_inode_ctx_t *ctx   = NULL;          uint64_t           value = 0; +        quota_dentry_t    *dentry = NULL; +        quota_dentry_t    *old_dentry = NULL;          if (op_ret < 0) {                  goto out; @@ -1254,9 +1539,26 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_update_size (this, local->loc.inode, (char *)local->loc.name, -                           local->loc.parent->gfid, -                           (-(ctx->buf.ia_blocks * 512))); +        if (!local->skip_check) +                quota_update_size (this, local->loc.inode, +                                   (char *)local->loc.name, +                                   local->loc.parent->gfid, +                                   (-(ctx->buf.ia_blocks * 512))); + +        LOCK (&ctx->lock); +        { +                list_for_each_entry (dentry, &ctx->parents, next) { +                        if ((strcmp (dentry->name, local->loc.name) == 0) && +                            (uuid_compare (local->loc.parent->gfid, +                                           dentry->par) == 0)) { +                                old_dentry = dentry; +                                break; +                        } +                } +                if (old_dentry) +                        __quota_dentry_free (old_dentry); +        } +        UNLOCK (&ctx->lock);  out:          QUOTA_STACK_UNWIND (unlink, frame, op_ret, op_errno, preparent, @@ -1269,9 +1571,14 @@ int32_t  quota_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,                dict_t *xdata)  { -        int32_t        ret = 0; +        quota_priv_t       *priv        = NULL; +        int32_t        ret = -1;          quota_local_t *local = NULL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          local = quota_local_new ();          if (local == NULL) {                  goto err; @@ -1279,14 +1586,21 @@ quota_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,          frame->local = local; +        if (xdata && dict_get (xdata, GLUSTERFS_INTERNAL_FOP_KEY)) { +                local->skip_check = _gf_true; +        } +          ret = loc_copy (&local->loc, loc);          if (ret) {                  gf_log (this->name, GF_LOG_WARNING, "loc_copy failed");                  goto err;          } -        STACK_WIND (frame, quota_unlink_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_unlink_cbk: default_unlink_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, loc, +                    xflag, xdata);          ret = 0; @@ -1317,16 +1631,19 @@ quota_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          local = (quota_local_t *) frame->local; +        if (local->skip_check) +                goto out; +          quota_update_size (this, local->loc.parent, NULL, NULL,                             (buf->ia_blocks * 512)); -        ret = quota_inode_ctx_get (inode, -1, this, NULL, NULL, &ctx, 0); +        ret = quota_inode_ctx_get (inode, this, &ctx, 0);          if ((ret == -1) || (ctx == NULL)) { -                gf_log (this->name, GF_LOG_WARNING, "cannot find quota " -                        "context in %s (gfid:%s)", local->loc.path, +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (inode->gfid)); -                op_ret = -1; -                op_errno = EINVAL;                  goto out;          } @@ -1380,6 +1697,9 @@ quota_link_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc,  {          quota_local_t *local    = NULL;          int32_t        op_errno = EINVAL; +        quota_priv_t  *priv     = NULL; + +        priv = this->private;          local = frame->local;          if (local == NULL) { @@ -1393,8 +1713,9 @@ quota_link_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc,                  goto unwind;          } -        STACK_WIND (frame, quota_link_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata); +        STACK_WIND (frame, priv->is_quota_on? quota_link_cbk: default_link_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, oldloc, +                    newloc, xdata);          return 0;  unwind: @@ -1408,10 +1729,24 @@ int32_t  quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,              dict_t *xdata)  { +        quota_priv_t      *priv  = NULL;          int32_t            ret   = -1, op_errno = ENOMEM;          quota_local_t     *local = NULL; +        quota_inode_ctx_t *ctx   = NULL;          call_stub_t       *stub  = NULL; -        quota_inode_ctx_t *ctx = NULL; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + +        quota_inode_ctx_get (oldloc->inode, this, &ctx, 0); +        if (ctx == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error", +                        uuid_utoa (oldloc->inode->gfid)); +        }          local = quota_local_new ();          if (local == NULL) { @@ -1420,6 +1755,11 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,          frame->local = (void *) local; +        if (xdata && dict_get (xdata, GLUSTERFS_INTERNAL_FOP_KEY)) { +                local->skip_check = _gf_true; +                goto wind; +        } +          ret = loc_copy (&local->loc, newloc);          if (ret == -1) {                  gf_log (this->name, GF_LOG_WARNING, "loc_copy failed"); @@ -1433,46 +1773,22 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,          local->link_count = 1;          local->stub = stub; - -        ret = quota_inode_ctx_get (oldloc->inode, -1, this, NULL, NULL, &ctx, -                                   0); -        if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", -                        oldloc->inode ? uuid_utoa (oldloc->inode->gfid) : "0"); -                op_errno = EINVAL; -                goto err; -        } - -        local->delta = ctx->buf.ia_blocks * 512; +        local->delta = (ctx != NULL) ? ctx->buf.ia_blocks * 512 : 0;          quota_check_limit (frame, newloc->parent, this, NULL, NULL); +        return 0; -        stub = NULL; - -        LOCK (&local->lock); -        { -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; -                } - -                local->link_count = 0; -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } - -        ret = 0;  err: -        if (ret < 0) { -                QUOTA_STACK_UNWIND (link, frame, -1, op_errno, NULL, NULL, -                                    NULL, NULL, NULL); -        } +        QUOTA_STACK_UNWIND (link, frame, -1, op_errno, NULL, NULL, +                            NULL, NULL, NULL);          return 0; + +wind: +        STACK_WIND (frame, default_link_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, oldloc, +                    newloc, xdata); +        return 0;  } @@ -1484,11 +1800,11 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                    dict_t *xdata)  {          int32_t               ret              = -1; +        int64_t               size             = 0;          quota_local_t        *local            = NULL;          quota_inode_ctx_t    *ctx              = NULL;          quota_dentry_t       *old_dentry       = NULL, *dentry = NULL;          char                  new_dentry_found = 0; -        int64_t               size             = 0;          if (op_ret < 0) {                  goto out; @@ -1508,8 +1824,10 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          }          if (local->oldloc.parent != local->newloc.parent) { -                quota_update_size (this, local->oldloc.parent, NULL, NULL, (-size)); -                quota_update_size (this, local->newloc.parent, NULL, NULL, size); +                quota_update_size (this, local->oldloc.parent, NULL, NULL, +                                   (-size)); +                quota_update_size (this, local->newloc.parent, NULL, NULL, +                                   size);          }          if (!(IA_ISREG (local->oldloc.inode->ia_type) @@ -1517,14 +1835,14 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        ret = quota_inode_ctx_get (local->oldloc.inode, -1, this, NULL, NULL, -                                   &ctx, 0); +        ret = quota_inode_ctx_get (local->oldloc.inode, this, &ctx, 0);          if ((ret == -1) || (ctx == NULL)) { -                gf_log (this->name, GF_LOG_WARNING, "quota context not" -                        "set in inode(gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (local->oldloc.inode->gfid)); -                op_ret = -1; -                op_errno = EINVAL; +                  goto out;          } @@ -1570,7 +1888,8 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                          if (dentry == NULL) {                                  gf_log (this->name, GF_LOG_WARNING,                                          "cannot create a new dentry (name:%s) " -                                        "for inode(gfid:%s)", local->newloc.name, +                                        "for inode(gfid:%s)", +                                        local->newloc.name,                                          uuid_utoa (local->newloc.inode->gfid));                                  op_ret = -1;                                  op_errno = ENOMEM; @@ -1597,6 +1916,9 @@ quota_rename_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc,  {          quota_local_t *local    = NULL;          int32_t        op_errno = EINVAL; +        quota_priv_t  *priv     = NULL; + +        priv = this->private;          local = frame->local;          if (local == NULL) { @@ -1610,8 +1932,11 @@ quota_rename_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc,                  goto unwind;          } -        STACK_WIND (frame, quota_rename_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata); +        STACK_WIND (frame, +                    priv->is_quota_on? quota_rename_cbk: default_rename_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, +                    newloc, xdata); +          return 0;  unwind: @@ -1625,10 +1950,15 @@ int32_t  quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,                loc_t *newloc, dict_t *xdata)  { -        int32_t            ret            = -1, op_errno = ENOMEM; -        quota_local_t     *local          = NULL; -        call_stub_t       *stub           = NULL; -        quota_inode_ctx_t *ctx            = NULL; +        quota_priv_t      *priv  = NULL; +        int32_t            ret   = -1, op_errno = ENOMEM; +        quota_local_t     *local = NULL; +        quota_inode_ctx_t *ctx   = NULL; +        call_stub_t       *stub  = NULL; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          local = quota_local_new ();          if (local == NULL) { @@ -1660,46 +1990,34 @@ quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,          if (IA_ISREG (oldloc->inode->ia_type)              || IA_ISLNK (oldloc->inode->ia_type)) { -                ret = quota_inode_ctx_get (oldloc->inode, -1, this, NULL, NULL, -                                           &ctx, 0); +                ret = quota_inode_ctx_get (oldloc->inode, this, &ctx, 0);                  if (ctx == NULL) {                          gf_log (this->name, GF_LOG_WARNING, -                                "quota context not set in inode (gfid:%s)", +                                "quota context not set in inode (gfid:%s), " +                                "considering file size as zero while enforcing " +                                "quota on new ancestry",                                  oldloc->inode ? uuid_utoa (oldloc->inode->gfid)                                  : "0"); -                        op_errno = EINVAL; -                        goto err; +                        local->delta = 0; +                } else { +                        local->delta = ctx->buf.ia_blocks * 512;                  } -                local->delta = ctx->buf.ia_blocks * 512;          } else {                  local->delta = 0;          }          quota_check_limit (frame, newloc->parent, this, NULL, NULL); +        return 0; -        stub = NULL; - -        LOCK (&local->lock); -        { -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; -                } - -                local->link_count = 0; -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } - -        ret = 0;  err: -        if (ret == -1) { -                QUOTA_STACK_UNWIND (rename, frame, -1, op_errno, NULL, -                                    NULL, NULL, NULL, NULL, NULL); -        } +        QUOTA_STACK_UNWIND (rename, frame, -1, op_errno, NULL, +                            NULL, NULL, NULL, NULL, NULL); +        return 0; + +wind: +        STACK_WIND (frame, default_rename_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, +                    newloc, xdata);          return 0;  } @@ -1725,12 +2043,14 @@ quota_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          quota_update_size (this, local->loc.parent, NULL, NULL, size); -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 1); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 1);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (local->loc.inode->gfid)); +                  goto out;          } @@ -1765,6 +2085,7 @@ quota_symlink_helper (call_frame_t *frame, xlator_t *this, const char *linkpath,  {          quota_local_t *local    = NULL;          int32_t        op_errno = EINVAL; +        quota_priv_t  *priv     = NULL;          local = frame->local;          if (local == NULL) { @@ -1772,14 +2093,17 @@ quota_symlink_helper (call_frame_t *frame, xlator_t *this, const char *linkpath,                  goto unwind;          } +        priv = this->private; +          if (local->op_ret == -1) {                  op_errno = local->op_errno;                  goto unwind;          } -        STACK_WIND (frame, quota_symlink_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->symlink, linkpath, loc, umask, -                    xdata); +        STACK_WIND (frame, +                    priv->is_quota_on? quota_symlink_cbk: default_symlink_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink, +                    linkpath, loc, umask, xdata);          return 0;  unwind: @@ -1793,10 +2117,15 @@ int  quota_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath,                 loc_t *loc, mode_t umask, dict_t *xdata)  { -        int32_t          ret      = -1; -        int32_t          op_errno = ENOMEM; -        quota_local_t   *local    = NULL; -        call_stub_t     *stub     = NULL; +        quota_priv_t  *priv     = NULL; +        int32_t        ret      = -1; +        int32_t        op_errno = ENOMEM; +        quota_local_t *local    = NULL; +        call_stub_t   *stub     = NULL; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          local = quota_local_new ();          if (local == NULL) { @@ -1811,8 +2140,6 @@ quota_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath,                  goto err;          } -        local->link_count = 1; -          stub = fop_symlink_stub (frame, quota_symlink_helper, linkpath, loc,                                   umask, xdata);          if (stub == NULL) { @@ -1821,26 +2148,9 @@ quota_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath,          local->stub = stub;          local->delta = strlen (linkpath); +        local->link_count = 1;          quota_check_limit (frame, loc->parent, this, NULL, NULL); - -        stub = NULL; - -        LOCK (&local->lock); -        { -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; -                } - -                local->link_count = 0; -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } -          return 0;  err: @@ -1848,6 +2158,13 @@ err:                              NULL, NULL);          return 0; + +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_symlink_cbk: default_symlink_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink, +                    linkpath, loc, umask, xdata); +        return 0;  } @@ -1857,8 +2174,8 @@ quota_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                      struct iatt *postbuf, dict_t *xdata)  {          quota_local_t     *local = NULL; -        int64_t            delta = 0;          quota_inode_ctx_t *ctx   = NULL; +        int64_t            delta = 0;          if (op_ret < 0) {                  goto out; @@ -1874,11 +2191,12 @@ quota_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          quota_update_size (this, local->loc.inode, NULL, NULL, delta); -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (local->loc.inode->gfid));                  goto out;          } @@ -1900,9 +2218,15 @@ int32_t  quota_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,                  dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          int32_t          ret   = -1;          quota_local_t   *local = NULL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + +          local = quota_local_new ();          if (local == NULL) {                  goto err; @@ -1916,8 +2240,11 @@ quota_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,                  goto err;          } -        STACK_WIND (frame, quota_truncate_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_truncate_cbk: default_truncate_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, loc, +                    offset, xdata);          return 0;  err: @@ -1933,8 +2260,8 @@ quota_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                       struct iatt *postbuf, dict_t *xdata)  {          quota_local_t     *local = NULL; -        int64_t            delta = 0;          quota_inode_ctx_t *ctx   = NULL; +        int64_t            delta = 0;          if (op_ret < 0) {                  goto out; @@ -1950,11 +2277,12 @@ quota_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          quota_update_size (this, local->loc.inode, NULL, NULL, delta); -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (local->loc.inode->gfid));                  goto out;          } @@ -1976,8 +2304,13 @@ int32_t  quota_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,                   dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t   *local = NULL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          local = quota_local_new ();          if (local == NULL)                  goto err; @@ -1986,8 +2319,11 @@ quota_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,          local->loc.inode = inode_ref (fd->inode); -        STACK_WIND (frame, quota_ftruncate_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); +wind: +        STACK_WIND (frame, priv->is_quota_on? +                    quota_ftruncate_cbk: default_ftruncate_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate, fd, +                    offset, xdata);          return 0;  err: @@ -2006,14 +2342,23 @@ quota_send_dir_limit_to_cli (call_frame_t *frame, xlator_t *this,          dict_t            *dict              = NULL;          quota_inode_ctx_t *ctx               = NULL;          uint64_t           value             = 0; +        quota_priv_t      *priv              = NULL; + +        priv = this->private; +        if (!priv->is_quota_on) { +                snprintf (dir_limit, 1024, "Quota is disabled please turn on"); +                goto dict_set; +        }          ret = inode_ctx_get (inode, this, &value);          if (ret < 0)                  goto out;          ctx = (quota_inode_ctx_t *)(unsigned long)value; -        snprintf (dir_limit, 1024, "%"PRId64",%"PRId64, ctx->size, ctx->limit); +        snprintf (dir_limit, 1024, "%"PRId64",%"PRId64, ctx->size, +                  ctx->hard_lim); +dict_set:          dict = dict_new ();          if (dict == NULL) {                  ret = -1; @@ -2024,7 +2369,7 @@ quota_send_dir_limit_to_cli (call_frame_t *frame, xlator_t *this,          if (ret < 0)                  goto out; -        gf_log (this->name, GF_LOG_INFO, "str = %s", dir_limit); +        gf_log (this->name, GF_LOG_DEBUG, "str = %s", dir_limit);          QUOTA_STACK_UNWIND (getxattr, frame, 0, 0, dict, NULL); @@ -2076,7 +2421,8 @@ quota_getxattr (call_frame_t *frame, xlator_t *this, loc_t *loc,  int32_t  quota_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -                int32_t op_ret, int32_t op_errno, struct iatt *buf, dict_t *xdata) +                int32_t op_ret, int32_t op_errno, struct iatt *buf, +                dict_t *xdata)  {          quota_local_t     *local = NULL;          quota_inode_ctx_t *ctx   = NULL; @@ -2091,12 +2437,17 @@ quota_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_DEBUG, -                        "quota context not set in inode (gfid:%s)", -                        uuid_utoa (local->loc.inode->gfid)); +                if (!IA_ISDIR (buf->ia_type)) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "quota context is NULL on " +                                "inode (%s). " +                                "If quota is not enabled recently and crawler " +                                "has finished crawling, its an error", +                                uuid_utoa (local->loc.inode->gfid)); +                } +                  goto out;          } @@ -2116,9 +2467,15 @@ out:  int32_t  quota_stat (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL;          int32_t        ret   = -1; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + +          local = quota_local_new ();          if (local == NULL) {                  goto unwind; @@ -2131,8 +2488,10 @@ quota_stat (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)                  goto unwind;          } -        STACK_WIND (frame, quota_stat_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->stat, loc, xdata); +wind: +        STACK_WIND (frame, priv->is_quota_on? quota_stat_cbk: default_stat_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat, loc, +                    xdata);          return 0;  unwind: @@ -2159,12 +2518,17 @@ quota_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", -                        uuid_utoa (local->loc.inode->gfid)); +                if (!IA_ISDIR (buf->ia_type)) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "quota context is NULL on " +                                "inode (%s). " +                                "If quota is not enabled recently and crawler " +                                "has finished crawling, its an error", +                                uuid_utoa (local->loc.inode->gfid)); +                } +                  goto out;          } @@ -2184,8 +2548,13 @@ out:  int32_t  quota_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          local = quota_local_new ();          if (local == NULL) {                  goto unwind; @@ -2195,8 +2564,11 @@ quota_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)          local->loc.inode = inode_ref (fd->inode); -        STACK_WIND (frame, quota_fstat_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->fstat, fd, xdata); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_fstat_cbk: default_fstat_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat, fd, +                    xdata);          return 0;  unwind: @@ -2223,11 +2595,12 @@ quota_readlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (local->loc.inode->gfid));                  goto out;          } @@ -2239,7 +2612,8 @@ quota_readlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          UNLOCK (&ctx->lock);  out: -        QUOTA_STACK_UNWIND (readlink, frame, op_ret, op_errno, path, buf, xdata); +        QUOTA_STACK_UNWIND (readlink, frame, op_ret, op_errno, path, buf, +                            xdata);          return 0;  } @@ -2248,9 +2622,14 @@ int32_t  quota_readlink (call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size,                  dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL;          int32_t        ret   = -1; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          local = quota_local_new ();          if (local == NULL) {                  goto unwind; @@ -2264,8 +2643,11 @@ quota_readlink (call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size,                  goto unwind;          } -        STACK_WIND (frame, quota_readlink_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->readlink, loc, size, xdata); +wind: +        STACK_WIND (frame, priv->is_quota_on? +                    quota_readlink_cbk: default_readlink_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->readlink, loc, +                    size, xdata);          return 0;  unwind: @@ -2293,11 +2675,12 @@ quota_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (local->loc.inode->gfid));                  goto out;          } @@ -2319,8 +2702,13 @@ int32_t  quota_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,               off_t offset, uint32_t flags, dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          local = quota_local_new ();          if (local == NULL) {                  goto unwind; @@ -2330,13 +2718,16 @@ quota_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,          local->loc.inode = inode_ref (fd->inode); -        STACK_WIND (frame, quota_readv_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, -                    xdata); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_readv_cbk: default_readv_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv, fd, +                    size, offset, flags, xdata);          return 0;  unwind: -        QUOTA_STACK_UNWIND (readv, frame, -1, ENOMEM, NULL, -1, NULL, NULL, NULL); +        QUOTA_STACK_UNWIND (readv, frame, -1, ENOMEM, NULL, -1, NULL, NULL, +                            NULL);          return 0;  } @@ -2359,11 +2750,12 @@ quota_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (local->loc.inode->gfid));                  goto out;          } @@ -2385,8 +2777,13 @@ int32_t  quota_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags,               dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          local = quota_local_new ();          if (local == NULL) {                  goto unwind; @@ -2396,8 +2793,11 @@ quota_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags,          frame->local = local; -        STACK_WIND (frame, quota_fsync_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->fsync, fd, flags, xdata); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_fsync_cbk: default_fsync_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsync, fd, +                    flags, xdata);          return 0;  unwind: @@ -2425,12 +2825,16 @@ quota_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_DEBUG, -                        "quota context not set in inode (gfid:%s)", -                        uuid_utoa (local->loc.inode->gfid)); +                if (!IA_ISDIR (statpost->ia_type)) { +                        gf_log (this->name, GF_LOG_DEBUG, "quota context is " +                                "NULL on inode (%s). " +                                "If quota is not enabled recently and crawler " +                                "has finished crawling, its an error", +                                uuid_utoa (local->loc.inode->gfid)); +                } +                  goto out;          } @@ -2452,9 +2856,14 @@ int32_t  quota_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc,                 struct iatt *stbuf, int32_t valid, dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL;          int32_t        ret   = -1; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          local = quota_local_new ();          if (local == NULL) {                  goto unwind; @@ -2468,8 +2877,11 @@ quota_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc,                  goto unwind;          } -        STACK_WIND (frame, quota_setattr_cbk, FIRST_CHILD (this), -                    FIRST_CHILD (this)->fops->setattr, loc, stbuf, valid, xdata); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_setattr_cbk: default_setattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->setattr, loc, +                    stbuf, valid, xdata);          return 0;  unwind: @@ -2496,12 +2908,16 @@ quota_fsetattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", -                        uuid_utoa (local->loc.inode->gfid)); +                if (!IA_ISDIR (statpost->ia_type)) { +                        gf_log (this->name, GF_LOG_DEBUG, "quota context is " +                                "NULL on inode (%s). " +                                "If quota is not enabled recently and crawler " +                                "has finished crawling, its an error", +                                uuid_utoa (local->loc.inode->gfid)); +                } +                  goto out;          } @@ -2522,8 +2938,14 @@ int32_t  quota_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd,                  struct iatt *stbuf, int32_t valid, dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + +          local = quota_local_new ();          if (local == NULL) {                  goto unwind; @@ -2533,8 +2955,11 @@ quota_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd,          local->loc.inode = inode_ref (fd->inode); -        STACK_WIND (frame, quota_fsetattr_cbk, FIRST_CHILD (this), -                    FIRST_CHILD (this)->fops->fsetattr, fd, stbuf, valid, xdata); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_fsetattr_cbk: default_fsetattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetattr, fd, +                    stbuf, valid, xdata);          return 0;  unwind: @@ -2559,7 +2984,7 @@ quota_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto unwind;          } -        ret = quota_inode_ctx_get (inode, -1, this, NULL, buf, &ctx, 1); +        ret = quota_inode_ctx_get (inode, this, &ctx, 1);          if ((ret == -1) || (ctx == NULL)) {                  gf_log (this->name, GF_LOG_WARNING, "cannot create quota "                          "context in inode (gfid:%s)", uuid_utoa (inode->gfid)); @@ -2600,6 +3025,7 @@ quota_mknod_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,  {          quota_local_t *local    = NULL;          int32_t        op_errno = EINVAL; +        quota_priv_t  *priv     = NULL;          local = frame->local;          if (local == NULL) { @@ -2607,14 +3033,17 @@ quota_mknod_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,                  goto unwind;          } +        priv = this->private; +          if (local->op_ret == -1) {                  op_errno = local->op_errno;                  goto unwind;          } -        STACK_WIND (frame, quota_mknod_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, umask, -                    xdata); +        STACK_WIND (frame, +                    priv->is_quota_on? quota_mknod_cbk: default_mknod_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, loc, +                    mode, rdev, umask, xdata);          return 0; @@ -2629,9 +3058,14 @@ int  quota_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,               dev_t rdev, mode_t umask, dict_t *xdata)  { -        int32_t            ret            = -1; -        quota_local_t     *local          = NULL; -        call_stub_t       *stub           = NULL; +        quota_priv_t  *priv     = NULL; +        int32_t        ret      = -1; +        quota_local_t *local    = NULL; +        call_stub_t   *stub     = NULL; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          local = quota_local_new ();          if (local == NULL) { @@ -2657,35 +3091,49 @@ quota_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,          local->delta = 0;          quota_check_limit (frame, loc->parent, this, NULL, NULL); - -        stub = NULL; - -        LOCK (&local->lock); -        { -                local->link_count = 0; -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; -                } -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } -          return 0;  err:          QUOTA_STACK_UNWIND (mknod, frame, -1, ENOMEM, NULL, NULL, NULL, NULL,                              NULL);          return 0; + +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_mknod_cbk: default_mknod_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, loc, +                    mode, rdev, umask, xdata); + +        return 0; +  }  int  quota_setxattr_cbk (call_frame_t *frame, void *cookie,                      xlator_t *this, int op_ret, int op_errno, dict_t *xdata)  { +        quota_local_t     *local = NULL; +        quota_inode_ctx_t *ctx   = NULL; +        int                ret   = 0; + +        local = frame->local; +        if (!local) +                goto out; + +        ret = quota_inode_ctx_get (local->loc.inode, this, &ctx, 1); +        if ((ret < 0) || (ctx == NULL)) { +                op_errno = ENOMEM; +                goto out; +        } + +        LOCK (&ctx->lock); +        { +                ctx->hard_lim = local->limit.hard_lim; +                ctx->soft_lim = local->limit.soft_lim_percent; +        } +        UNLOCK (&ctx->lock); + +out:          QUOTA_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata);          return 0;  } @@ -2694,20 +3142,70 @@ int  quota_setxattr (call_frame_t *frame, xlator_t *this,                  loc_t *loc, dict_t *dict, int flags, dict_t *xdata)  { -        int             op_errno = EINVAL; -        int             op_ret   = -1; +        quota_priv_t  *priv     = NULL; +        int            op_errno = EINVAL; +        int            op_ret   = -1; +        int64_t        hard_lim = -1, soft_lim = -1; +        quota_local_t *local    = NULL; +        char          *src      = NULL; +        char          *dst      = NULL; +        int            len      = 0; +        int            ret      = -1; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          VALIDATE_OR_GOTO (frame, err);          VALIDATE_OR_GOTO (this, err);          VALIDATE_OR_GOTO (loc, err); -        GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.quota*", dict, -                                   op_errno, err); +        if (0 <= frame->root->pid) { +                ret = dict_get_ptr_and_len (dict, QUOTA_LIMIT_KEY, +                                            (void **)&src, &len); +                if (ret) { +                        gf_log (this->name, GF_LOG_DEBUG, "dict_get on %s " +                                "failed", QUOTA_LIMIT_KEY); +                } else { +                        dst = GF_CALLOC (len, sizeof (char), gf_common_mt_char); +                        if (dst) +                                memcpy (dst, src, len); +                } + +                GF_REMOVE_INTERNAL_XATTR ("trusted.glusterfs.quota*", +                                          dict); +                if (!ret && IA_ISDIR (loc->inode->ia_type) && dst) { +                        ret = dict_set_dynptr (dict, QUOTA_LIMIT_KEY, +                                               dst, len); +                        if (ret) +                                gf_log (this->name, GF_LOG_WARNING, "setting " +                                        "key %s failed", QUOTA_LIMIT_KEY); +                        else +                                dst = NULL; +                } +        } -        STACK_WIND (frame, quota_setxattr_cbk, -                    FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->setxattr, -                    loc, dict, flags, xdata); +        quota_get_limits (this, dict, &hard_lim, &soft_lim); + +        if (hard_lim > 0) { +                local = quota_local_new (); +                if (local == NULL) { +                        op_errno = ENOMEM; +                        goto err; +                } + +                frame->local = local; +                loc_copy (&local->loc, loc); + +                local->limit.hard_lim = hard_lim; +                local->limit.soft_lim_percent = soft_lim; +        } + +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_setxattr_cbk: default_setxattr_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, loc, +                    dict, flags, xdata);          return 0;  err:          QUOTA_STACK_UNWIND (setxattr, frame, op_ret, op_errno, NULL); @@ -2718,6 +3216,27 @@ int  quota_fsetxattr_cbk (call_frame_t *frame, void *cookie,                       xlator_t *this, int op_ret, int op_errno, dict_t *xdata)  { +        quota_inode_ctx_t *ctx   = NULL; +        quota_local_t     *local = NULL; + +        local = frame->local; +        if (!local) +                goto out; + +        op_ret = quota_inode_ctx_get (local->loc.inode, this, &ctx, 1); +        if ((op_ret < 0) || (ctx == NULL)) { +                op_errno = ENOMEM; +                goto out; +        } + +        LOCK (&ctx->lock); +        { +                ctx->hard_lim = local->limit.hard_lim; +                ctx->soft_lim = local->limit.soft_lim_percent; +        } +        UNLOCK (&ctx->lock); + +out:          QUOTA_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, xdata);          return 0;  } @@ -2726,20 +3245,40 @@ int  quota_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd,                   dict_t *dict, int flags, dict_t *xdata)  { -        int32_t         op_ret   = -1; -        int32_t         op_errno = EINVAL; +        quota_priv_t  *priv     = NULL; +        int32_t        op_ret   = -1; +        int32_t        op_errno = EINVAL; +        quota_local_t *local    = NULL; +        int64_t        hard_lim = -1, soft_lim = -1; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          VALIDATE_OR_GOTO (frame, err);          VALIDATE_OR_GOTO (this, err);          VALIDATE_OR_GOTO (fd, err); -        GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.quota*", dict, -                                   op_errno, err); +        if (0 <= frame->root->pid) +                GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.quota*", dict, +                                            op_errno, err); -        STACK_WIND (frame, quota_fsetxattr_cbk, -                    FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->fsetxattr, -                    fd, dict, flags, xdata); +        quota_get_limits (this, dict, &hard_lim, &soft_lim); + +        if (hard_lim > 0) { +                local = quota_local_new (); +                frame->local = local; +                local->loc.inode = inode_ref (fd->inode); + +                local->limit.hard_lim = hard_lim; +                local->limit.soft_lim_percent = soft_lim; +        } + +wind: +        STACK_WIND (frame, priv->is_quota_on? +                    quota_fsetxattr_cbk: default_fsetxattr_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, fd, +                    dict, flags, xdata);          return 0;   err:          QUOTA_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, NULL); @@ -2759,19 +3298,28 @@ int  quota_removexattr (call_frame_t *frame, xlator_t *this,                     loc_t *loc, const char *name, dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          int32_t         op_errno = EINVAL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          VALIDATE_OR_GOTO (this, err); +        /* all quota xattrs can be cleaned up by doing setxattr on special key. +         * Hence its ok that we don't allow removexattr on quota keys here. +         */          GF_IF_NATIVE_XATTR_GOTO ("trusted.quota*",                                   name, op_errno, err);          VALIDATE_OR_GOTO (frame, err);          VALIDATE_OR_GOTO (loc, err); -        STACK_WIND (frame, quota_removexattr_cbk, -                    FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->removexattr, +wind: +        STACK_WIND (frame, priv->is_quota_on? +                    quota_removexattr_cbk: default_removexattr_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->removexattr,                      loc, name, xdata);          return 0;  err: @@ -2792,9 +3340,14 @@ int  quota_fremovexattr (call_frame_t *frame, xlator_t *this,                      fd_t *fd, const char *name, dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          int32_t         op_ret   = -1;          int32_t         op_errno = EINVAL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          VALIDATE_OR_GOTO (frame, err);          VALIDATE_OR_GOTO (this, err);          VALIDATE_OR_GOTO (fd, err); @@ -2802,9 +3355,10 @@ quota_fremovexattr (call_frame_t *frame, xlator_t *this,          GF_IF_NATIVE_XATTR_GOTO ("trusted.quota*",                                   name, op_errno, err); -        STACK_WIND (frame, quota_fremovexattr_cbk, -                    FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->fremovexattr, +wind: +        STACK_WIND (frame, priv->is_quota_on? +                    quota_fremovexattr_cbk: default_fremovexattr_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fremovexattr,                      fd, name, xdata);          return 0;   err: @@ -2818,16 +3372,16 @@ quota_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                    int32_t op_ret, int32_t op_errno, struct statvfs *buf,                    dict_t *xdata)  { -	inode_t           *root_inode = NULL; -        quota_priv_t      *priv       = NULL; -        uint64_t           value      = 0; -        quota_inode_ctx_t *ctx        = NULL; -        limits_t          *limit_node = NULL; -	int64_t            usage      = -1; -	int64_t            avail      = -1; -        int64_t            blocks     = 0; +	inode_t           *inode  = NULL; +        uint64_t           value  = 0; +	int64_t            usage  = -1; +	int64_t            avail  = -1; +        int64_t            blocks = 0; +        quota_inode_ctx_t *ctx    = NULL; +        int                ret    = 0; +        gf_boolean_t       dict_created = _gf_false; -	root_inode = cookie; +        inode = cookie;          /* This fop will fail mostly in case of client disconnect's,           * which is already logged. Hence, not logging here */ @@ -2838,74 +3392,192 @@ quota_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  	 * cookie, and it would only do so if the value was non-NULL.  This  	 * check is therefore just routine defensive coding.  	 */ -	if (!root_inode) { +	if (!inode) {  		gf_log(this->name,GF_LOG_WARNING,  		       "null inode, cannot adjust for quota");  		goto unwind;  	} -	if (!root_inode->table || (root_inode != root_inode->table->root)) { -		gf_log(this->name,GF_LOG_WARNING, -		       "non-root inode, cannot adjust for quota"); -		goto unwind; -	} -        inode_ctx_get (root_inode, this, &value); +        inode_ctx_get (inode, this, &value);  	if (!value) {  		goto unwind;  	} + +        /* if limit is set on this inode, report statfs based on this inode +         * else report based on root. +         */          ctx = (quota_inode_ctx_t *)(unsigned long)value; +        if (ctx->hard_lim <= 0) { +                inode_ctx_get (inode->table->root, this, &value); +                ctx = (quota_inode_ctx_t *)(unsigned long) value; +                if (!ctx) +                        goto unwind; +        } +  	usage = (ctx->size) / buf->f_bsize; -        priv = this->private; -        list_for_each_entry (limit_node, &priv->limit_head, limit_list) { -		/* Notice that this only works for volume-level quota. */ -                if (strcmp (limit_node->path, "/") == 0) { -                        blocks = limit_node->value / buf->f_bsize; -                        if (usage > blocks) { -                                break; -                        } +        if (ctx->hard_lim > 0) { +                blocks = ctx->hard_lim / buf->f_bsize; +                buf->f_blocks = blocks; -			buf->f_blocks = blocks; -			avail = buf->f_blocks - usage; -			if (buf->f_bfree > avail) { -				buf->f_bfree = avail; -			} -			/* -			 * We have to assume that the total assigned quota -			 * won't cause us to dip into the reserved space, -			 * because dealing with the overcommitted cases is -			 * just too hairy (especially when different bricks -			 * might be using different reserved percentages and -			 * such). -			 */ -			buf->f_bavail = buf->f_bfree; -			break; +                avail = buf->f_blocks - usage; +                avail = (avail >= 0) ? avail : 0; + +                if (buf->f_bfree > avail) { +                        buf->f_bfree = avail;                  } +                /* +                 * We have to assume that the total assigned quota +                 * won't cause us to dip into the reserved space, +                 * because dealing with the overcommitted cases is +                 * just too hairy (especially when different bricks +                 * might be using different reserved percentages and +                 * such). +                 */ +                buf->f_bavail = buf->f_bfree; +        } + +        if (!xdata) { +                xdata = dict_new (); +                if (!xdata) +                        goto unwind; +                dict_created = _gf_true;          } +        ret = dict_set_int8 (xdata, "quota-deem-statfs", 1); +        if (-1 == ret) +                gf_log (this->name, GF_LOG_ERROR, "Dict set failed, " +                        "deem-statfs option may have no effect"); +  unwind: -	if (root_inode) { -		inode_unref(root_inode); -	} -        STACK_UNWIND_STRICT (statfs, frame, op_ret, op_errno, buf, xdata); +        QUOTA_STACK_UNWIND (statfs, frame, op_ret, op_errno, buf, xdata); + +        if (dict_created) +                dict_unref (xdata);          return 0;  }  int32_t +quota_statfs_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, +                     dict_t *xdata) +{ +        quota_local_t   *local          = NULL; +        int              op_errno       = EINVAL; + +        GF_VALIDATE_OR_GOTO ("quota", (local = frame->local), err); + +        if (-1 == local->op_ret) { +                op_errno = local->op_errno; +                goto err; +        } + +        STACK_WIND_COOKIE (frame, quota_statfs_cbk, loc->inode, +                           FIRST_CHILD(this), +                           FIRST_CHILD(this)->fops->statfs, loc, xdata); +        return 0; +err: +        QUOTA_STACK_UNWIND (statfs, frame, -1, op_errno, NULL, NULL); +        return 0; +} + +int32_t +quota_statfs_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                           int32_t op_ret, int32_t op_errno, inode_t *inode, +                           struct iatt *buf, dict_t *xdata, +                           struct iatt *postparent) +{ +        quota_local_t     *local      = NULL; +        int32_t            ret        = 0; +        quota_inode_ctx_t *ctx        = NULL; +        int64_t           *size       = 0; +        uint64_t           value      = 0; + +        local = frame->local; + +        if (op_ret < 0) +                goto resume; + +        GF_ASSERT (local); +        GF_ASSERT (frame); +        GF_VALIDATE_OR_GOTO_WITH_ERROR ("quota", this, resume, op_errno, +                                        EINVAL); +        GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, xdata, resume, op_errno, +                                        EINVAL); + +        ret = inode_ctx_get (local->validate_loc.inode, this, &value); + +        ctx = (quota_inode_ctx_t *)(unsigned long)value; +        if ((ret == -1) || (ctx == NULL)) { +                gf_log (this->name, GF_LOG_WARNING, +                        "quota context is not present in inode (gfid:%s)", +                        uuid_utoa (local->validate_loc.inode->gfid)); +                op_errno = EINVAL; +                goto resume; +        } + +        ret = dict_get_bin (xdata, QUOTA_SIZE_KEY, (void **) &size); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "size key not present in dict"); +                op_errno = EINVAL; +                goto resume; +        } + +        LOCK (&ctx->lock); +        { +                ctx->size = ntoh64 (*size); +                gettimeofday (&ctx->tv, NULL); +        } +        UNLOCK (&ctx->lock); + +resume: +        --local->link_count; + +        quota_resume_fop_if_validation_done (local); +        return 0; +} + +int32_t  quota_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)  { -	inode_t *root_inode = NULL; -        quota_priv_t    *priv = NULL; +        quota_local_t   *local  = NULL; +        int              op_errno       = 0; +        call_stub_t     *stub           = NULL; +        quota_priv_t *priv  = NULL; +        int           ret       = 0;          priv = this->private; +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +  	if (priv->consider_statfs && loc->inode) { -		root_inode = loc->inode->table->root; -		inode_ref(root_inode); -		STACK_WIND_COOKIE (frame, quota_statfs_cbk, root_inode, -				   FIRST_CHILD(this), -				   FIRST_CHILD(this)->fops->statfs, loc, xdata); +                local = quota_local_new (); +                if (!local) { +                        op_errno = ENOMEM; +                        goto err; +                } +                frame->local = local; + +                local->inode = inode_ref (loc->inode); +                local->link_count = 1; + +                stub = fop_statfs_stub (frame, quota_statfs_helper, loc, xdata); +                if (!stub) { +                        op_errno = ENOMEM; +                        goto err; +                } + +                local->stub = stub; + +                ret = quota_validate (frame, local->inode, this, +                                      quota_statfs_validate_cbk); +                if (0 > ret) { +                        op_errno = -ret; +                        --local->link_count; +                } + +                quota_resume_fop_if_validation_done (local);  	}  	else {  		/* @@ -2921,51 +3593,112 @@ quota_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)                  if (priv->consider_statfs)                          gf_log(this->name,GF_LOG_WARNING,                                 "missing inode, cannot adjust for quota"); +wind:  		STACK_WIND (frame, default_statfs_cbk, FIRST_CHILD(this),  			    FIRST_CHILD(this)->fops->statfs, loc, xdata);  	}          return 0; -} +err: +        STACK_UNWIND_STRICT (statfs, frame, -1, op_errno, NULL, NULL); + +        if (local) +                quota_local_cleanup (this, local); +        return 0; +}  int  quota_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                      int op_ret, int op_errno, gf_dirent_t *entries,                      dict_t *xdata)  { -        gf_dirent_t *entry = NULL; +        gf_dirent_t   *entry = NULL; +        quota_local_t *local = NULL; +        loc_t          loc   = {0, };          if (op_ret <= 0)                  goto unwind; +        local = frame->local; +          list_for_each_entry (entry, &entries->list, list) { -                /* TODO: fill things */ +                if ((strcmp (entry->d_name, ".") == 0) +                    || (strcmp (entry->d_name, "..") == 0)) +                        continue; + +                uuid_copy (loc.gfid, entry->d_stat.ia_gfid); +                loc.inode = inode_ref (entry->inode); +                loc.parent = inode_ref (local->loc.inode); +                uuid_copy (loc.pargfid, loc.parent->gfid); +                loc.name = entry->d_name; + +                quota_fill_inodectx (this, entry->inode, entry->dict, +                                     &loc, &entry->d_stat, &op_errno); + +                loc_wipe (&loc);          }  unwind: -        STACK_UNWIND_STRICT (readdirp, frame, op_ret, op_errno, entries, xdata); +        QUOTA_STACK_UNWIND (readdirp, frame, op_ret, op_errno, entries, xdata);          return 0;  } +  int  quota_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,                  off_t offset, dict_t *dict)  { -        int ret = 0; +        quota_priv_t  *priv     = NULL; +        int            ret      = 0; +        gf_boolean_t   new_dict = _gf_false; +        quota_local_t *local    = NULL; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + +        local = quota_local_new (); + +        if (local == NULL) { +                goto err; +        } + +        frame->local = local; + +        local->loc.inode = inode_ref (fd->inode); + +        if (dict == NULL) { +                dict = dict_new (); +                new_dict = _gf_true; +        }          if (dict) { -                ret = dict_set_uint64 (dict, QUOTA_SIZE_KEY, 0); +                ret = dict_set_int8 (dict, QUOTA_LIMIT_KEY, 1);                  if (ret < 0) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "dict set of key for hard-limit failed");                          goto err;                  }          } -        STACK_WIND (frame, quota_readdirp_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, -                    fd, size, offset, dict); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_readdirp_cbk: default_readdirp_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, fd, +                    size, offset, dict); + +        if (new_dict) { +                dict_unref (dict); +        } +          return 0;  err:          STACK_UNWIND_STRICT (readdirp, frame, -1, EINVAL, NULL, NULL); + +        if (new_dict) { +                dict_unref (dict); +        } +          return 0;  } @@ -3022,12 +3755,14 @@ out:          return 0;  } +  int32_t -quota_fallocate_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, -		       int32_t mode, off_t offset, size_t len, dict_t *xdata) +quota_fallocate_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, +                        int32_t mode, off_t offset, size_t len, dict_t *xdata)  {          quota_local_t *local    = NULL;          int32_t        op_errno = EINVAL; +        quota_priv_t  *priv     = NULL;          local = frame->local;          if (local == NULL) { @@ -3035,12 +3770,16 @@ quota_fallocate_helper(call_frame_t *frame, xlator_t *this, fd_t *fd,                  goto unwind;          } +        priv = this->private; +          if (local->op_ret == -1) {                  op_errno = local->op_errno;                  goto unwind;          } -        STACK_WIND (frame, quota_fallocate_cbk, FIRST_CHILD(this), +        STACK_WIND (frame, priv->is_quota_on? +                    quota_fallocate_cbk: default_fallocate_cbk, +                    FIRST_CHILD(this),                      FIRST_CHILD(this)->fops->fallocate, fd, mode, offset, len,  		    xdata);          return 0; @@ -3050,6 +3789,7 @@ unwind:          return 0;  } +  int32_t  quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,  		off_t offset, size_t len, dict_t *xdata) @@ -3059,8 +3799,13 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,          quota_local_t     *local   = NULL;          quota_inode_ctx_t *ctx     = NULL;          quota_priv_t      *priv    = NULL; -        call_stub_t       *stub    = NULL;          quota_dentry_t    *dentry  = NULL; +        call_stub_t       *stub    = NULL; + +        priv = this->private; +        GF_VALIDATE_OR_GOTO (this->name, priv, unwind); + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          GF_ASSERT (frame);          GF_VALIDATE_OR_GOTO ("quota", this, unwind); @@ -3074,16 +3819,17 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,          frame->local = local;          local->loc.inode = inode_ref (fd->inode); -        ret = quota_inode_ctx_get (fd->inode, -1, this, NULL, NULL, &ctx, 0); +        ret = quota_inode_ctx_get (fd->inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", -                        uuid_utoa (fd->inode->gfid)); -                goto unwind; +                gf_log (this->name, GF_LOG_DEBUG, "quota context is " +                        "NULL on inode (%s). " +                        "If quota is not enabled recently and crawler " +                        "has finished crawling, its an error", +                        uuid_utoa (local->loc.inode->gfid));          } -        stub = fop_fallocate_stub(frame, quota_fallocate_helper, fd, mode, offset, len, -				  xdata);  +        stub = fop_fallocate_stub(frame, quota_fallocate_helper, fd, mode, +                                  offset, len, xdata);          if (stub == NULL) {                  op_errno = ENOMEM;                  goto unwind; @@ -3092,13 +3838,15 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,          priv = this->private;          GF_VALIDATE_OR_GOTO (this->name, priv, unwind); -        LOCK (&ctx->lock); -        { -                list_for_each_entry (dentry, &ctx->parents, next) { -                        parents++; +        if (ctx != NULL) { +                LOCK (&ctx->lock); +                { +                        list_for_each_entry (dentry, &ctx->parents, next) { +                                parents++; +                        }                  } +                UNLOCK (&ctx->lock);          } -        UNLOCK (&ctx->lock);  	/*  	 * Note that by using len as the delta we're assuming the range from @@ -3109,37 +3857,75 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,          local->stub = stub;          local->link_count = parents; -        list_for_each_entry (dentry, &ctx->parents, next) { -                ret = quota_check_limit (frame, fd->inode, this, dentry->name, -                                         dentry->par); -                if (ret == -1) { -                        break; -                } -        } - -        stub = NULL; - -        LOCK (&local->lock); -        { -                local->link_count = 0; -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; +        if (parents == 0) { +                local->link_count = 1; +                quota_check_limit (frame, fd->inode, this, NULL, NULL); +        } else { +                list_for_each_entry (dentry, &ctx->parents, next) { +                        quota_check_limit (frame, fd->inode, this, dentry->name, +                                           dentry->par);                  }          } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        }          return 0;  unwind:          QUOTA_STACK_UNWIND (fallocate, frame, -1, op_errno, NULL, NULL, NULL);          return 0; + +wind: +        STACK_WIND (frame, priv->is_quota_on? +                    quota_fallocate_cbk: default_fallocate_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->fallocate, fd, mode, offset, len, +		    xdata); +        return 0;  } +/* Logs if +*  i.   Usage crossed soft limit +*  ii.  Usage above soft limit and alert-time timed out +*/ +void +quota_log_usage (xlator_t *this, quota_inode_ctx_t *ctx, inode_t *inode, +                 int64_t delta) +{ +        struct timeval           cur_time       = {0,}; +        char                    *usage_str      = NULL; +        char                    *path           = NULL; +        int64_t                  cur_size       = 0; +        quota_priv_t            *priv           = NULL; + +        priv = this->private; +        cur_size = ctx->size + delta; +        if ((ctx->soft_lim <= 0) || (timerisset (&ctx->prev_log) && +                                     !quota_timeout (&ctx->prev_log, +                                                     priv->log_timeout))) { +                return; +        } + +        gettimeofday (&cur_time, NULL); +        ctx->prev_log = cur_time; + +        usage_str = gf_uint64_2human_readable (cur_size); +        inode_path (inode, NULL, &path); +        if (!path) +                path = uuid_utoa (inode->gfid); + +        /* Usage crossed/reached soft limit */ +        if (DID_REACH_LIMIT (ctx->soft_lim, ctx->size, cur_size)) { + +                gf_log (this->name, GF_LOG_ALERT, "Usage crossed " +                        "soft limit: %s used by %s", usage_str, path); +        } +        /* Usage is above soft limit */ +        else if (cur_size > ctx->soft_lim){ +                gf_log (this->name, GF_LOG_ALERT, "Usage is above " +                        "soft limit: %s used by %s", usage_str, path); +        } +        if (usage_str) +                GF_FREE (usage_str); +}  int32_t  mem_acct_init (xlator_t *this) @@ -3192,83 +3978,6 @@ quota_forget (xlator_t *this, inode_t *inode)          return 0;  } - -int -quota_parse_limits (quota_priv_t *priv, xlator_t *this, dict_t *xl_options, -                    struct list_head *old_list) -{ -        int32_t       ret       = -1; -        char         *str       = NULL; -        char         *str_val   = NULL; -        char         *path      = NULL, *saveptr = NULL; -        uint64_t      value     = 0; -        limits_t     *quota_lim = NULL, *old = NULL; -        char         *last_colon= NULL; - -        ret = dict_get_str (xl_options, "limit-set", &str); - -        if (str) { -                path = strtok_r (str, ",", &saveptr); - -                while (path) { -                        last_colon = strrchr (path, ':'); -                        *last_colon = '\0'; -                        str_val = last_colon + 1; - -                        ret = gf_string2bytesize (str_val, &value); -                        if (ret != 0) -                                goto err; - -                        QUOTA_ALLOC_OR_GOTO (quota_lim, limits_t, err); - -                        quota_lim->path = path; - -                        quota_lim->value = value; - -                        gf_log (this->name, GF_LOG_INFO, "%s:%"PRId64, -                                quota_lim->path, quota_lim->value); - -                        if (old_list != NULL) { -                                list_for_each_entry (old, old_list, -                                                     limit_list) { -                                        if (strcmp (old->path, quota_lim->path) -                                            == 0) { -                                                uuid_copy (quota_lim->gfid, -                                                           old->gfid); -                                                break; -                                        } -                                } -                        } - -                        LOCK (&priv->lock); -                        { -                                list_add_tail ("a_lim->limit_list, -                                               &priv->limit_head); -                        } -                        UNLOCK (&priv->lock); - -                        path = strtok_r (NULL, ",", &saveptr); -                } -        } else { -                gf_log (this->name, GF_LOG_INFO, -                        "no \"limit-set\" option provided"); -        } - -        LOCK (&priv->lock); -        { -                list_for_each_entry (quota_lim, &priv->limit_head, limit_list) { -                        gf_log (this->name, GF_LOG_INFO, "%s:%"PRId64, -                                quota_lim->path, quota_lim->value); -                } -        } -        UNLOCK (&priv->lock); - -        ret = 0; -err: -        return ret; -} - -  int32_t  init (xlator_t *this)  { @@ -3290,20 +3999,18 @@ init (xlator_t *this)          QUOTA_ALLOC_OR_GOTO (priv, quota_priv_t, err); -        INIT_LIST_HEAD (&priv->limit_head); -          LOCK_INIT (&priv->lock);          this->private = priv; -        ret = quota_parse_limits (priv, this, this->options, NULL); - -        if (ret) { -                goto err; -        } - -        GF_OPTION_INIT ("timeout", priv->timeout, int64, err);          GF_OPTION_INIT ("deem-statfs", priv->consider_statfs, bool, err); +        GF_OPTION_INIT ("server-quota", priv->is_quota_on, bool, err); +        GF_OPTION_INIT ("default-soft-limit", priv->default_soft_lim, percent, +                        err); +        GF_OPTION_INIT ("soft-timeout", priv->soft_timeout, time, err); +        GF_OPTION_INIT ("hard-timeout", priv->hard_timeout, time, err); +        GF_OPTION_INIT ("alert-time", priv->log_timeout, time, err); +        GF_OPTION_INIT ("volume-uuid", priv->volume_uuid, str, err);          this->local_pool = mem_pool_new (quota_local_t, 64);          if (!this->local_pool) { @@ -3313,134 +4020,100 @@ init (xlator_t *this)                  goto err;          } +        if (priv->is_quota_on) { +                priv->rpc_clnt = quota_enforcer_init (this, this->options); +                if (priv->rpc_clnt == NULL) { +                        ret = -1; +                        gf_log (this->name, GF_LOG_WARNING, +                                "quota enforcer rpc init failed"); +                        goto err; +                } +        } +          ret = 0;  err:          return ret;  } - -void -__quota_reconfigure_inode_ctx (xlator_t *this, inode_t *inode, limits_t *limit) -{ -        int                ret = -1; -        quota_inode_ctx_t *ctx = NULL; - -        GF_VALIDATE_OR_GOTO ("quota", this, out); -        GF_VALIDATE_OR_GOTO (this->name, inode, out); -        GF_VALIDATE_OR_GOTO (this->name, limit, out); - -        ret = quota_inode_ctx_get (inode, limit->value, this, NULL, NULL, &ctx, -                                   1); -        if ((ret == -1) || (ctx == NULL)) { -                gf_log (this->name, GF_LOG_WARNING, "cannot create quota " -                        "context in inode(gfid:%s)", -                        uuid_utoa (inode->gfid)); -                goto out; -        } - -        LOCK (&ctx->lock); -        { -                ctx->limit = limit->value; -        } -        UNLOCK (&ctx->lock); - -out: -        return; -} - - -void -__quota_reconfigure (xlator_t *this, inode_table_t *itable, limits_t *limit) -{ -        inode_t *inode = NULL; - -        if ((this == NULL) || (itable == NULL) || (limit == NULL)) { -                goto out; -        } - -        if (!uuid_is_null (limit->gfid)) { -                inode = inode_find (itable, limit->gfid); -        } else { -                inode = inode_resolve (itable, limit->path); -        } - -        if (inode != NULL) { -                __quota_reconfigure_inode_ctx (this, inode, limit); -        } - -out: -        return; -} - -  int  reconfigure (xlator_t *this, dict_t *options)  {          int32_t           ret   = -1;          quota_priv_t     *priv  = NULL; -        limits_t         *limit = NULL, *next = NULL, *new = NULL; -        struct list_head  head  = {0, }; -        xlator_t         *top   = NULL; -        char              found = 0;          priv = this->private; -        INIT_LIST_HEAD (&head); +        GF_OPTION_RECONF ("deem-statfs", priv->consider_statfs, options, bool, +                          out); +        GF_OPTION_RECONF ("server-quota", priv->is_quota_on, options, bool, +                          out); +        GF_OPTION_RECONF ("default-soft-limit", priv->default_soft_lim, +                          options, percent, out); +        GF_OPTION_RECONF ("alert-time", priv->log_timeout, options, +                          time, out); +        GF_OPTION_RECONF ("soft-timeout", priv->soft_timeout, options, +                          time, out); +        GF_OPTION_RECONF ("hard-timeout", priv->hard_timeout, options, +                          time, out); + +        if (priv->is_quota_on) { +                priv->rpc_clnt = quota_enforcer_init (this, +                                                      this->options); +                if (priv->rpc_clnt == NULL) { +                        ret = -1; +                        gf_log (this->name, GF_LOG_WARNING, +                                "quota enforcer rpc init failed"); +                        goto out; +                } -        LOCK (&priv->lock); -        { -                list_splice_init (&priv->limit_head, &head); +        } else { +                if (priv->rpc_clnt) { +                        // Quotad is shutdown when there is no started volume +                        // which has quota enabled. So, we should disable the +                        // enforcer client when quota is disabled on a volume, +                        // to avoid spurious reconnect attempts to a service +                        // (quotad), that is known to be down. +                        rpc_clnt_disable (priv->rpc_clnt); +                }          } -        UNLOCK (&priv->lock); -        ret = quota_parse_limits (priv, this, options, &head); -        if (ret == -1) { -                gf_log ("quota", GF_LOG_WARNING, -                        "quota reconfigure failed, " -                        "new changes will not take effect"); -                goto out; -        } +        ret = 0; +out: +        return ret; +} -        LOCK (&priv->lock); -        { -                top = ((glusterfs_ctx_t *)this->ctx)->active->top; -                GF_ASSERT (top); +int32_t +quota_priv_dump (xlator_t *this) +{ +        quota_priv_t *priv = NULL; +        int32_t       ret  = -1; -                list_for_each_entry (limit, &priv->limit_head, limit_list) { -                        __quota_reconfigure (this, top->itable, limit); -                } -                list_for_each_entry_safe (limit, next, &head, limit_list) { -                        found = 0; -                        list_for_each_entry (new, &priv->limit_head, -                                             limit_list) { -                                if (strcmp (new->path, limit->path) == 0) { -                                        found = 1; -                                        break; -                                } -                        } +        GF_ASSERT (this); -                        if (!found) { -                                limit->value = -1; -                                __quota_reconfigure (this, top->itable, limit); -                        } +        priv = this->private; -                        list_del_init (&limit->limit_list); -                        GF_FREE (limit); -                } +        gf_proc_dump_add_section ("xlators.features.quota.priv", this->name); + +        ret = TRY_LOCK (&priv->lock); +        if (ret) +             goto out; +        else { +                gf_proc_dump_write("soft-timeout", "%d", priv->soft_timeout); +                gf_proc_dump_write("hard-timeout", "%d", priv->hard_timeout); +                gf_proc_dump_write("alert-time", "%d", priv->log_timeout); +                gf_proc_dump_write("quota-on", "%d", priv->is_quota_on); +                gf_proc_dump_write("statfs", "%d", priv->consider_statfs); +                gf_proc_dump_write("volume-uuid", "%s", priv->volume_uuid); +                gf_proc_dump_write("validation-count", "%ld", +                                    priv->validation_count);          }          UNLOCK (&priv->lock); -        GF_OPTION_RECONF ("timeout", priv->timeout, options, int64, out); -        GF_OPTION_RECONF ("deem-statfs", priv->consider_statfs, options, bool, -                          out); - -        ret = 0;  out: -        return ret; +        return 0;  } -  void  fini (xlator_t *this)  { @@ -3482,16 +4155,11 @@ struct xlator_cbks cbks = {          .forget = quota_forget  }; +struct xlator_dumpops dumpops = { +        .priv    = quota_priv_dump, +};  struct volume_options options[] = {          {.key = {"limit-set"}}, -        {.key = {"timeout"}, -         .type = GF_OPTION_TYPE_SIZET, -         .min = 0, -         .max = 60, -         .default_value = "0", -         .description = "quota caches the directory sizes on client. Timeout " -                        "indicates the timeout for the cache to be revalidated." -        },          {.key = {"deem-statfs"},           .type = GF_OPTION_TYPE_BOOL,           .default_value = "off", @@ -3499,5 +4167,62 @@ struct volume_options options[] = {                          "consideration while estimating fs size. (df command)"                          " (Default is off)."          }, +        {.key = {"server-quota"}, +         .type = GF_OPTION_TYPE_BOOL, +         .default_value = "off", +         .description = "Skip the quota enforcement if the feature is" +                        " not turned on. This is not a user exposed option." +        }, +        {.key = {"default-soft-limit"}, +         .type = GF_OPTION_TYPE_PERCENT, +         .default_value = "80%", +         .min = 0, +         .max = LONG_MAX, +        }, +        {.key = {"soft-timeout"}, +         .type = GF_OPTION_TYPE_TIME, +         .min = 0, +         .max = 1800, +         .default_value = "60", +         .description = "quota caches the directory sizes on client. " +                        "soft-timeout indicates the timeout for the validity of" +                        " cache before soft-limit has been crossed." +        }, +        {.key = {"hard-timeout"}, +         .type = GF_OPTION_TYPE_TIME, +         .min = 0, +         .max = 60, +         .default_value = "5", +         .description = "quota caches the directory sizes on client. " +                        "hard-timeout indicates the timeout for the validity of" +                        " cache after soft-limit has been crossed." +        }, +        { .key   = {"username"}, +          .type  = GF_OPTION_TYPE_ANY, +        }, +        { .key   = {"password"}, +          .type  = GF_OPTION_TYPE_ANY, +        }, +        { .key   = {"transport-type"}, +          .value = {"tcp", "socket", "ib-verbs", "unix", "ib-sdp", +                    "tcp/client", "ib-verbs/client", "rdma"}, +          .type  = GF_OPTION_TYPE_STR, +        }, +        { .key   = {"remote-host"}, +          .type  = GF_OPTION_TYPE_INTERNET_ADDRESS, +        }, +        { .key   = {"remote-port"}, +          .type  = GF_OPTION_TYPE_INT, +        }, +        { .key  = {"volume-uuid"}, +          .type = GF_OPTION_TYPE_STR, +          .description = "uuid of the volume this brick is part of." +        }, +        { .key  = {"alert-time"}, +          .type = GF_OPTION_TYPE_TIME, +          .min = 0, +          .max = 7*86400, +          .default_value = "86400", +        },          {.key = {NULL}}  }; diff --git a/xlators/features/quota/src/quota.h b/xlators/features/quota/src/quota.h index 84ecbb3084e..de522e6914f 100644 --- a/xlators/features/quota/src/quota.h +++ b/xlators/features/quota/src/quota.h @@ -12,20 +12,48 @@  #include "config.h"  #endif +#ifndef _QUOTA_H +#define _QUOTA_H +  #include "xlator.h"  #include "call-stub.h"  #include "defaults.h" -#include "byte-order.h"  #include "common-utils.h"  #include "quota-mem-types.h" +#include "glusterfs.h" +#include "compat.h" +#include "logging.h" +#include "dict.h" +#include "stack.h" +#include "common-utils.h" +#include "event.h" +#include "globals.h" +#include "rpcsvc.h" +#include "rpc-clnt.h" +#include "byte-order.h" +#include "glusterfs3-xdr.h" +#include "glusterfs3.h" +#include "xdr-generic.h" +#include "compat-errno.h" +#include "protocol-common.h" -#define QUOTA_XATTR_PREFIX      "trusted."  #define DIRTY                   "dirty"  #define SIZE                    "size"  #define CONTRIBUTION            "contri"  #define VAL_LENGTH              8  #define READDIR_BUF             4096 +#ifndef UUID_CANONICAL_FORM_LEN +#define UUID_CANONICAL_FORM_LEN 36 +#endif + +#define WIND_IF_QUOTAOFF(is_quota_on, label)     \ +        if (!is_quota_on)                       \ +                goto label; + +#define DID_REACH_LIMIT(lim, prev_size, cur_size)               \ +        ((cur_size) >= (lim) && (prev_size) < (lim)) +  #define QUOTA_SAFE_INCREMENT(lock, var)         \          do {                                    \                  LOCK (lock);                    \ @@ -46,7 +74,7 @@                                   gf_quota_mt_##type);   \                  if (!var) {                             \                          gf_log ("", GF_LOG_ERROR,       \ -                                "out of memory :(");    \ +                                "out of memory");    \                          ret = -1;                       \                          goto label;                     \                  }                                       \ @@ -74,11 +102,17 @@  #define GET_CONTRI_KEY(var, _vol_name, _gfid, _ret)             \          do {                                                    \                  char _gfid_unparsed[40];                        \ -                uuid_unparse (_gfid, _gfid_unparsed);           \ -                _ret = gf_asprintf (var, QUOTA_XATTR_PREFIX     \ -                                    "%s.%s." CONTRIBUTION,      \ -                                    _vol_name, _gfid_unparsed); \ -        } while (0) +                if (_gfid != NULL) {                            \ +                        uuid_unparse (_gfid, _gfid_unparsed);   \ +                        _ret = gf_asprintf (var, QUOTA_XATTR_PREFIX     \ +                                            "%s.%s." CONTRIBUTION,      \ +                                            _vol_name, _gfid_unparsed); \ +                } else {                                                \ +                        _ret = gf_asprintf (var, QUOTA_XATTR_PREFIX     \ +                                            "%s.." CONTRIBUTION,       \ +                                            _vol_name);             \ +                }                                                       \ +         } while (0)  #define GET_CONTRI_KEY_OR_GOTO(var, _vol_name, _gfid, label)    \ @@ -96,6 +130,8 @@                          goto label;                             \          } while (0) + +  struct quota_dentry {          char            *name;          uuid_t           par; @@ -105,47 +141,70 @@ typedef struct quota_dentry quota_dentry_t;  struct quota_inode_ctx {          int64_t          size; -        int64_t          limit; +        int64_t          hard_lim; +        int64_t          soft_lim;          struct iatt      buf;          struct list_head parents;          struct timeval   tv; +        struct timeval   prev_log;          gf_lock_t        lock;  };  typedef struct quota_inode_ctx quota_inode_ctx_t; +struct quota_limit { +        int64_t hard_lim; +        int64_t soft_lim_percent; +} __attribute__ ((packed)); +typedef struct quota_limit quota_limit_t; +  struct quota_local { -        gf_lock_t    lock; -        uint32_t     validate_count; -        uint32_t     link_count; -        loc_t        loc; -        loc_t        oldloc; -        loc_t        newloc; -        loc_t        validate_loc; -        int64_t      delta; -        int32_t      op_ret; -        int32_t      op_errno; -        int64_t      size; -        int64_t      limit; -        char         just_validated; -        inode_t     *inode; -        call_stub_t *stub; +        gf_lock_t           lock; +        uint32_t            validate_count; +        uint32_t            link_count; +        loc_t               loc; +        loc_t               oldloc; +        loc_t               newloc; +        loc_t               validate_loc; +        int64_t             delta; +        int32_t             op_ret; +        int32_t             op_errno; +        int64_t             size; +        gf_boolean_t        skip_check; +        char                just_validated; +        fop_lookup_cbk_t    validate_cbk; +        inode_t            *inode; +        call_stub_t        *stub; +        struct iobref      *iobref; +        quota_limit_t       limit;  }; -typedef struct quota_local quota_local_t; +typedef struct quota_local  quota_local_t;  struct quota_priv { -        int64_t           timeout; -        gf_boolean_t      consider_statfs; -        struct list_head  limit_head; -        gf_lock_t         lock; +        uint32_t                soft_timeout; +        uint32_t                hard_timeout; +        uint32_t               log_timeout; +        double                 default_soft_lim; +        gf_boolean_t           is_quota_on; +        gf_boolean_t           consider_statfs; +        gf_lock_t              lock; +        rpc_clnt_prog_t       *quota_enforcer; +        struct rpcsvc_program *quotad_aggregator; +        struct rpc_clnt       *rpc_clnt; +        rpcsvc_t              *rpcsvc; +        inode_table_t         *itable; +        char                  *volume_uuid; +        uint64_t               validation_count;  }; -typedef struct quota_priv quota_priv_t; +typedef struct quota_priv      quota_priv_t; -struct limits { -        struct list_head  limit_list; -        char             *path; -        int64_t           value; -        uuid_t            gfid; -}; -typedef struct limits     limits_t; +int +quota_enforcer_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, +                       dict_t *xdata, fop_lookup_cbk_t cbk); +struct rpc_clnt * +quota_enforcer_init (xlator_t *this, dict_t *options); -uint64_t cn = 1; +void +quota_log_usage (xlator_t *this, quota_inode_ctx_t *ctx, inode_t *inode, +                 int64_t delta); + +#endif diff --git a/xlators/features/quota/src/quotad-aggregator.c b/xlators/features/quota/src/quotad-aggregator.c new file mode 100644 index 00000000000..f3f65ca2a04 --- /dev/null +++ b/xlators/features/quota/src/quotad-aggregator.c @@ -0,0 +1,423 @@ +/* +   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "cli1-xdr.h" +#include "quota.h" +#include "quotad-helpers.h" +#include "quotad-aggregator.h" + +struct rpcsvc_program quotad_aggregator_prog; + +struct iobuf * +quotad_serialize_reply (rpcsvc_request_t *req, void *arg, struct iovec *outmsg, +                        xdrproc_t xdrproc) +{ +        struct iobuf *iob      = NULL; +        ssize_t       retlen   = 0; +        ssize_t       xdr_size = 0; + +        GF_VALIDATE_OR_GOTO ("server", req, ret); + +        /* First, get the io buffer into which the reply in arg will +         * be serialized. +         */ +        if (arg && xdrproc) { +                xdr_size = xdr_sizeof (xdrproc, arg); +                iob = iobuf_get2 (req->svc->ctx->iobuf_pool, xdr_size); +                if (!iob) { +                        gf_log_callingfn (THIS->name, GF_LOG_ERROR, +                                          "Failed to get iobuf"); +                        goto ret; +                }; + +                iobuf_to_iovec (iob, outmsg); +                /* Use the given serializer to translate the give C structure in arg +                 * to XDR format which will be written into the buffer in outmsg. +                 */ +                /* retlen is used to received the error since size_t is unsigned and we +                 * need -1 for error notification during encoding. +                 */ + +                retlen = xdr_serialize_generic (*outmsg, arg, xdrproc); +                if (retlen == -1) { +                        /* Failed to Encode 'GlusterFS' msg in RPC is not exactly +                           failure of RPC return values.. client should get +                           notified about this, so there are no missing frames */ +                        gf_log_callingfn ("", GF_LOG_ERROR, "Failed to encode message"); +                        req->rpc_err = GARBAGE_ARGS; +                        retlen = 0; +                } +        } +        outmsg->iov_len = retlen; +ret: +        if (retlen == -1) { +                iobuf_unref (iob); +                iob = NULL; +        } + +        return iob; +} + +int +quotad_aggregator_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, +                                void *arg, struct iovec *payload, +                                int payloadcount, struct iobref *iobref, +                                xdrproc_t xdrproc) +{ +        struct iobuf              *iob        = NULL; +        int                        ret        = -1; +        struct iovec               rsp        = {0,}; +        quotad_aggregator_state_t *state      = NULL; +        char                       new_iobref = 0; + +        GF_VALIDATE_OR_GOTO ("server", req, ret); + +        if (frame) { +                state = frame->root->state; +                frame->local = NULL; +        } + +        if (!iobref) { +                iobref = iobref_new (); +                if (!iobref) { +                        goto ret; +                } + +                new_iobref = 1; +        } + +        iob = quotad_serialize_reply (req, arg, &rsp, xdrproc); +        if (!iob) { +                gf_log ("", GF_LOG_ERROR, "Failed to serialize reply"); +                goto ret; +        } + +        iobref_add (iobref, iob); + +        ret = rpcsvc_submit_generic (req, &rsp, 1, payload, payloadcount, +                                     iobref); + +        iobuf_unref (iob); + +        ret = 0; +ret: +        if (state) { +                quotad_aggregator_free_state (state); +        } + +        if (frame) { +                if (frame->root->client) +                        gf_client_unref (frame->root->client); + +                STACK_DESTROY (frame->root); +        } + +        if (new_iobref) { +                iobref_unref (iobref); +        } + +        return ret; +} + +int +quotad_aggregator_getlimit_cbk (xlator_t *this, call_frame_t *frame, +                                void *lookup_rsp) +{ +        gfs3_lookup_rsp            *rsp   = lookup_rsp; +        gf_cli_rsp                 cli_rsp = {0,}; +        dict_t                     *xdata = NULL; +        int                         ret = -1; + +        GF_PROTOCOL_DICT_UNSERIALIZE (frame->this, xdata, +                                      (rsp->xdata.xdata_val), +                                      (rsp->xdata.xdata_len), rsp->op_ret, +                                      rsp->op_errno, out); + +        ret = 0; +out: +        rsp->op_ret = ret; +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, "failed to unserialize " +                        "nameless lookup rsp"); +                goto reply; +        } +        cli_rsp.op_ret = rsp->op_ret; +        cli_rsp.op_errno = rsp->op_errno; +        cli_rsp.op_errstr = ""; +        if (xdata) { +                GF_PROTOCOL_DICT_SERIALIZE (frame->this, xdata, +                                            (&cli_rsp.dict.dict_val), +                                            (cli_rsp.dict.dict_len), +                                            cli_rsp.op_errno, reply); +        } + +reply: +        quotad_aggregator_submit_reply (frame, frame->local, (void*)&cli_rsp, NULL, 0, +                                        NULL, (xdrproc_t)xdr_gf_cli_rsp); + +        dict_unref (xdata); +        GF_FREE (cli_rsp.dict.dict_val); +        return 0; +} + +int +quotad_aggregator_getlimit (rpcsvc_request_t *req) +{ +        call_frame_t              *frame = NULL; +        gf_cli_req                 cli_req = {{0}, }; +        gf_cli_rsp                 cli_rsp = {0}; +        gfs3_lookup_req            args  = {{0,},}; +        gfs3_lookup_rsp            rsp   = {0,}; +        quotad_aggregator_state_t *state = NULL; +        xlator_t                  *this  = NULL; +        dict_t                    *dict  = NULL; +        int                        ret   = -1, op_errno = 0; +        char                      *gfid_str = NULL; +        uuid_t                     gfid = {0}; + +        GF_VALIDATE_OR_GOTO ("quotad-aggregator", req, err); + +        this = THIS; + +        ret = xdr_to_generic (req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req); +        if (ret < 0)  { +                //failed to decode msg; +                gf_log ("", GF_LOG_ERROR, "xdr decoding error"); +                req->rpc_err = GARBAGE_ARGS; +                goto err; +        } + +        if (cli_req.dict.dict_len) { +                dict = dict_new (); +                ret = dict_unserialize (cli_req.dict.dict_val, +                                        cli_req.dict.dict_len, &dict); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, "Failed to " +                                "unserialize req-buffer to dictionary"); +                        goto err; +                } +        } + +        ret = dict_get_str (dict, "gfid", &gfid_str); +        if (ret) { +                goto err; +        } + +        uuid_parse ((const char*)gfid_str, gfid); + +        frame = quotad_aggregator_get_frame_from_req (req); +        if (frame == NULL) { +                rsp.op_errno = ENOMEM; +                goto err; +        } +        state = frame->root->state; +        state->xdata = dict; +        ret = dict_set_int32 (state->xdata, QUOTA_LIMIT_KEY, 42); +        if (ret) +                goto err; + +        ret = dict_set_int32 (state->xdata, QUOTA_SIZE_KEY, 42); +        if (ret) +                goto err; + +        ret = dict_set_int32 (state->xdata, GET_ANCESTRY_PATH_KEY,42); +        if (ret) +                goto err; + +        memcpy (&args.gfid, &gfid, 16); + +        args.bname           = alloca (req->msg[0].iov_len); +        args.xdata.xdata_val = alloca (req->msg[0].iov_len); + +        ret = qd_nameless_lookup (this, frame, &args, state->xdata, +                                  quotad_aggregator_getlimit_cbk); +        if (ret) { +                rsp.op_errno = ret; +                goto err; +        } + +        return ret; + +err: +        cli_rsp.op_ret = -1; +        cli_rsp.op_errno = op_errno; +        cli_rsp.op_errstr = ""; + +        quotad_aggregator_getlimit_cbk (this, frame, &cli_rsp); +        dict_unref (dict); + +        return ret; +} + +int +quotad_aggregator_lookup_cbk (xlator_t *this, call_frame_t *frame, +                              void *rsp) +{ +        quotad_aggregator_submit_reply (frame, frame->local, rsp, NULL, 0, NULL, +                                        (xdrproc_t)xdr_gfs3_lookup_rsp); + +        return 0; +} + + +int +quotad_aggregator_lookup (rpcsvc_request_t *req) +{ +        call_frame_t              *frame = NULL; +        gfs3_lookup_req            args  = {{0,},}; +        int                        ret   = -1, op_errno = 0; +        gfs3_lookup_rsp            rsp   = {0,}; +        quotad_aggregator_state_t *state = NULL; +        xlator_t                  *this  = NULL; + +        GF_VALIDATE_OR_GOTO ("quotad-aggregator", req, err); + +        this = THIS; + +        args.bname           = alloca (req->msg[0].iov_len); +        args.xdata.xdata_val = alloca (req->msg[0].iov_len); + +        ret = xdr_to_generic (req->msg[0], &args, +                              (xdrproc_t)xdr_gfs3_lookup_req); +        if (ret < 0) { +                rsp.op_errno = EINVAL; +                goto err; +        } + +        frame = quotad_aggregator_get_frame_from_req (req); +        if (frame == NULL) { +                rsp.op_errno = ENOMEM; +                goto err; +        } + +        state = frame->root->state; + +        GF_PROTOCOL_DICT_UNSERIALIZE (this, state->xdata, +                                      (args.xdata.xdata_val), +                                      (args.xdata.xdata_len), ret, +                                      op_errno, err); + + +        ret = qd_nameless_lookup (this, frame, &args, state->xdata, +                                  quotad_aggregator_lookup_cbk); +        if (ret) { +                rsp.op_errno = ret; +                goto err; +        } + +        return ret; + +err: +        rsp.op_ret = -1; +        rsp.op_errno = op_errno; + +        quotad_aggregator_lookup_cbk (this, frame, &rsp); +        return ret; +} + +int +quotad_aggregator_rpc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, +                              void *data) +{ +        if (!xl || !data) { +                gf_log_callingfn ("server", GF_LOG_WARNING, +                                  "Calling rpc_notify without initializing"); +                goto out; +        } + +        switch (event) { +        case RPCSVC_EVENT_ACCEPT: +                break; + +        case RPCSVC_EVENT_DISCONNECT: +                break; + +        default: +                break; +        } + +out: +        return 0; +} + +int +quotad_aggregator_init (xlator_t *this) +{ +        quota_priv_t *priv = NULL; +        int           ret  = -1; + +        priv = this->private; + +        ret = dict_set_str (this->options, "transport.address-family", "unix"); +        if (ret) +                goto out; + +        ret = dict_set_str (this->options, "transport-type", "socket"); +        if (ret) +                goto out; + +        ret = dict_set_str (this->options, "transport.socket.listen-path", +                            "/tmp/quotad.socket"); +        if (ret) +                goto out; + +        /* RPC related */ +        priv->rpcsvc = rpcsvc_init (this, this->ctx, this->options, 0); +        if (priv->rpcsvc == NULL) { +                gf_log (this->name, GF_LOG_WARNING, +                        "creation of rpcsvc failed"); +                ret = -1; +                goto out; +        } + +        ret = rpcsvc_create_listeners (priv->rpcsvc, this->options, +                                       this->name); +        if (ret < 1) { +                gf_log (this->name, GF_LOG_WARNING, +                        "creation of listener failed"); +                ret = -1; +                goto out; +        } + +        priv->quotad_aggregator = "ad_aggregator_prog; +        quotad_aggregator_prog.options = this->options; + +        ret = rpcsvc_program_register (priv->rpcsvc, "ad_aggregator_prog); +        if (ret) { +                gf_log (this->name, GF_LOG_WARNING, +                        "registration of program (name:%s, prognum:%d, " +                        "progver:%d) failed", quotad_aggregator_prog.progname, +                        quotad_aggregator_prog.prognum, +                        quotad_aggregator_prog.progver); +                goto out; +        } + +        ret = 0; +out: +        return ret; +} + +rpcsvc_actor_t quotad_aggregator_actors[] = { +        [GF_AGGREGATOR_NULL]     = {"NULL", GF_AGGREGATOR_NULL, NULL, NULL, 0, +                                    DRC_NA}, +        [GF_AGGREGATOR_LOOKUP]   = {"LOOKUP", GF_AGGREGATOR_NULL, +                                    quotad_aggregator_lookup, NULL, 0, DRC_NA}, +        [GF_AGGREGATOR_GETLIMIT] = {"GETLIMIT", GF_AGGREGATOR_GETLIMIT, +                                   quotad_aggregator_getlimit, NULL, 0}, +}; + + +struct rpcsvc_program quotad_aggregator_prog = { +        .progname  = "GlusterFS 3.3", +        .prognum   = GLUSTER_AGGREGATOR_PROGRAM, +        .progver   = GLUSTER_AGGREGATOR_VERSION, +        .numactors = GF_AGGREGATOR_MAXVALUE, +        .actors    = quotad_aggregator_actors +}; diff --git a/xlators/features/quota/src/quotad-aggregator.h b/xlators/features/quota/src/quotad-aggregator.h new file mode 100644 index 00000000000..5ddea5b3c46 --- /dev/null +++ b/xlators/features/quota/src/quotad-aggregator.h @@ -0,0 +1,37 @@ +/* +   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _QUOTAD_AGGREGATOR_H +#define _QUOTAD_AGGREGATOR_H + +#include "quota.h" +#include "stack.h" +#include "glusterfs3-xdr.h" +#include "inode.h" + +typedef struct { +        void          *pool; +        xlator_t      *this; +	xlator_t      *active_subvol; +        inode_table_t *itable; +        loc_t          loc; +        dict_t        *xdata; +} quotad_aggregator_state_t; + +typedef int (*quotad_aggregator_lookup_cbk_t) (xlator_t *this, +                                               call_frame_t *frame, +                                               void *rsp); +int +qd_nameless_lookup (xlator_t *this, call_frame_t *frame, gfs3_lookup_req *req, +                    dict_t *xdata, quotad_aggregator_lookup_cbk_t lookup_cbk); +int +quotad_aggregator_init (xlator_t *this); + +#endif diff --git a/xlators/features/quota/src/quotad-helpers.c b/xlators/features/quota/src/quotad-helpers.c new file mode 100644 index 00000000000..fd309911474 --- /dev/null +++ b/xlators/features/quota/src/quotad-helpers.c @@ -0,0 +1,113 @@ +/* +   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "quotad-helpers.h" + +quotad_aggregator_state_t * +get_quotad_aggregator_state (xlator_t *this, rpcsvc_request_t *req) +{ +        quotad_aggregator_state_t *state         = NULL; +	xlator_t                  *active_subvol = NULL; +        quota_priv_t              *priv          = NULL; + +        state = (void *)GF_CALLOC (1, sizeof (*state), +                                   gf_quota_mt_aggregator_state_t); +        if (!state) +                return NULL; + +	state->this = THIS; +        priv = this->private; + +        LOCK (&priv->lock); +        { +                active_subvol = state->active_subvol = FIRST_CHILD (this); +        } +        UNLOCK (&priv->lock); + +        if (active_subvol->itable == NULL) +                active_subvol->itable = inode_table_new (4096, active_subvol); + +	state->itable = active_subvol->itable; + +        state->pool = this->ctx->pool; + +        return state; +} + +void +quotad_aggregator_free_state (quotad_aggregator_state_t *state) +{ +        if (state->xdata) +                dict_unref (state->xdata); + +        GF_FREE (state); +} + +call_frame_t * +quotad_aggregator_alloc_frame (rpcsvc_request_t *req) +{ +        call_frame_t              *frame = NULL; +        quotad_aggregator_state_t *state = NULL; +        xlator_t                  *this  = NULL; + +        GF_VALIDATE_OR_GOTO ("server", req, out); +        GF_VALIDATE_OR_GOTO ("server", req->trans, out); +        GF_VALIDATE_OR_GOTO ("server", req->svc, out); +        GF_VALIDATE_OR_GOTO ("server", req->svc->ctx, out); + +        this = req->svc->mydata; + +        frame = create_frame (this, req->svc->ctx->pool); +        if (!frame) +                goto out; + +        state = get_quotad_aggregator_state (this, req); +        if (!state) +                goto out; + +        frame->root->state = state; +        frame->root->unique = 0; + +        frame->this = this; +out: +        return frame; +} + +call_frame_t * +quotad_aggregator_get_frame_from_req (rpcsvc_request_t *req) +{ +        call_frame_t *frame  = NULL; +        client_t     *client = NULL; + +        GF_VALIDATE_OR_GOTO ("server", req, out); + +        frame = quotad_aggregator_alloc_frame (req); +        if (!frame) +                goto out; + +        client = req->trans->xl_private; + +        frame->root->op       = req->procnum; + +        frame->root->unique   = req->xid; + +        frame->root->uid      = req->uid; +        frame->root->gid      = req->gid; +        frame->root->pid      = req->pid; + +        gf_client_ref (client); +        frame->root->client   = client; + +        frame->root->lk_owner = req->lk_owner; + +        frame->local = req; +out: +        return frame; +} diff --git a/xlators/features/quota/src/quotad-helpers.h b/xlators/features/quota/src/quotad-helpers.h new file mode 100644 index 00000000000..a10fb7fa82a --- /dev/null +++ b/xlators/features/quota/src/quotad-helpers.h @@ -0,0 +1,24 @@ +/* +   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef QUOTAD_HELPERS_H +#define QUOTAD_HELPERS_H + +#include "rpcsvc.h" +#include "quota.h" +#include "quotad-aggregator.h" + +void +quotad_aggregator_free_state (quotad_aggregator_state_t *state); + +call_frame_t * +quotad_aggregator_get_frame_from_req (rpcsvc_request_t *req); + +#endif diff --git a/xlators/features/quota/src/quotad.c b/xlators/features/quota/src/quotad.c new file mode 100644 index 00000000000..243b943e986 --- /dev/null +++ b/xlators/features/quota/src/quotad.c @@ -0,0 +1,210 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ +#include "quota.h" +#include "quotad-aggregator.h" +#include "common-utils.h" + +int32_t +mem_acct_init (xlator_t *this) +{ +        int     ret = -1; + +        if (!this) +                return ret; + +        ret = xlator_mem_acct_init (this, gf_quota_mt_end + 1); + +        if (0 != ret) { +                gf_log (this->name, GF_LOG_WARNING, "Memory accounting " +                        "init failed"); +                return ret; +        } + +        return ret; +} + +int32_t +qd_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +               int32_t op_ret, int32_t op_errno, inode_t *inode, +               struct iatt *buf, dict_t *xdata, struct iatt *postparent) +{ +        quotad_aggregator_lookup_cbk_t  lookup_cbk = NULL; +        gfs3_lookup_rsp                 rsp = {0, }; + +        lookup_cbk = cookie; + +        rsp.op_ret = op_ret; +        rsp.op_errno = op_errno; + +        gf_stat_from_iatt (&rsp.postparent, postparent); + +        GF_PROTOCOL_DICT_SERIALIZE (this, xdata, (&rsp.xdata.xdata_val), +                                    rsp.xdata.xdata_len, rsp.op_errno, out); + +        gf_stat_from_iatt (&rsp.stat, buf); + +out: +        lookup_cbk (this, frame, &rsp); + +        GF_FREE (rsp.xdata.xdata_val); + +        inode_unref (inode); + +        return 0; +} + +xlator_t * +qd_find_subvol (xlator_t *this, char *volume_uuid) +{ +        xlator_list_t *child  = NULL; +        xlator_t      *subvol = NULL; +        char           key[1024]; +        char          *optstr = NULL; + +        if (!this || !volume_uuid) +                goto out; + +        for (child = this->children; child; child = child->next) { +                snprintf(key, 1024, "%s.volume-id", child->xlator->name); +                if (dict_get_str(this->options, key, &optstr) < 0) +                        continue; + +                if (strcmp (optstr, volume_uuid) == 0) { +                        subvol = child->xlator; +                        break; +                } +        } + +out: +        return subvol; +} + +int +qd_nameless_lookup (xlator_t *this, call_frame_t *frame, gfs3_lookup_req *req, +                    dict_t *xdata, quotad_aggregator_lookup_cbk_t lookup_cbk) +{ +        gfs3_lookup_rsp            rsp         = {0, }; +        int                        op_errno    = 0, ret = -1; +        loc_t                      loc         = {0, }; +        quotad_aggregator_state_t *state       = NULL; +        quota_priv_t              *priv        = NULL; +        xlator_t                  *subvol      = NULL; +        char                      *volume_uuid = NULL; + +        priv = this->private; +        state = frame->root->state; + +        frame->root->op = GF_FOP_LOOKUP; + +        loc.inode = inode_new (state->itable); +        if (loc.inode == NULL) { +                op_errno = ENOMEM; +                goto out; +        } + +        memcpy (loc.gfid, req->gfid, 16); + +        ret = dict_get_str (xdata, "volume-uuid", &volume_uuid); +        if (ret < 0) { +                op_errno = EINVAL; +                goto out; +        } + +        subvol = qd_find_subvol (this, volume_uuid); +        if (subvol == NULL) { +                op_errno = EINVAL; +                goto out; +        } + +        STACK_WIND_COOKIE (frame, qd_lookup_cbk, lookup_cbk, subvol, +                           subvol->fops->lookup, &loc, xdata); +        return 0; + +out: +        rsp.op_ret = -1; +        rsp.op_errno = op_errno; + +        lookup_cbk (this, frame, &rsp); + +        inode_unref (loc.inode); +        return 0; +} + +int +qd_reconfigure (xlator_t *this, dict_t *options) +{ +        /* As of now quotad is restarted upon alteration of volfile */ +        return 0; +} + +void +qd_fini (xlator_t *this) +{ +        return; +} + +int32_t +qd_init (xlator_t *this) +{ +        int32_t          ret            = -1; +        quota_priv_t    *priv           = NULL; + +        if (NULL == this->children) { +                gf_log (this->name, GF_LOG_ERROR, +                        "FATAL: quota (%s) not configured for min of 1 child", +                        this->name); +                ret = -1; +                goto err; +        } + +        QUOTA_ALLOC_OR_GOTO (priv, quota_priv_t, err); +        LOCK_INIT (&priv->lock); + +        this->private = priv; + +        ret = quotad_aggregator_init (this); +        if (ret < 0) +                goto err; + +        ret = 0; +err: +        if (ret) { +                GF_FREE (priv); +        } +        return ret; +} + +class_methods_t class_methods = { +        .init           = qd_init, +        .fini           = qd_fini, +        .reconfigure    = qd_reconfigure, +}; + +struct xlator_fops fops = { +}; + +struct xlator_cbks cbks = { +}; + +struct volume_options options[] = { +        { .key   = {"transport-type"}, +          .value = {"rpc", "rpc-over-rdma", "tcp", "socket", "ib-verbs", +                    "unix", "ib-sdp", "tcp/server", "ib-verbs/server", "rdma", +                    "rdma*([ \t]),*([ \t])socket", +                    "rdma*([ \t]),*([ \t])tcp", +                    "tcp*([ \t]),*([ \t])rdma", +                    "socket*([ \t]),*([ \t])rdma"}, +          .type  = GF_OPTION_TYPE_STR +        }, +        { .key   = {"transport.*"}, +          .type  = GF_OPTION_TYPE_ANY, +        }, +        {.key = {NULL}} +}; diff --git a/xlators/lib/src/libxlator.h b/xlators/lib/src/libxlator.h index 1d5e1657f4e..08bd77b918c 100644 --- a/xlators/lib/src/libxlator.h +++ b/xlators/lib/src/libxlator.h @@ -32,6 +32,7 @@  #define MARKER_UUID_TYPE    1  #define MARKER_XTIME_TYPE   2  #define GF_XATTR_QUOTA_SIZE_KEY "trusted.glusterfs.quota.size" +#define GF_XATTR_QUOTA_LIMIT_LIST "trusted.limit.list"  typedef int32_t (*xlator_specf_unwind_t) (call_frame_t *frame, | 
