diff options
25 files changed, 3264 insertions, 1001 deletions
diff --git a/libglusterfs/src/common-utils.c b/libglusterfs/src/common-utils.c index 2b700af4e11..4af1b445433 100644 --- a/libglusterfs/src/common-utils.c +++ b/libglusterfs/src/common-utils.c @@ -787,12 +787,44 @@ gf_string2time (const char *str, uint32_t *n)          if (errno == 0)                  errno = old_errno; -        if (!((tail[0] == '\0') || +        if (((tail[0] == '\0') ||                ((tail[0] == 's') && (tail[1] == '\0')) ||                ((tail[0] == 's') && (tail[1] == 'e') &&  	       (tail[2] == 'c') && (tail[3] == '\0')))) +               goto out; + +        else if (((tail[0] == 'm') && (tail[1] == '\0')) || +                 ((tail[0] == 'm') && (tail[1] == 'i') && +                  (tail[2] == 'n') && (tail[3] == '\0'))) { +                value = value * GF_MINUTE_IN_SECONDS; +                goto out; +        } + +        else if (((tail[0] == 'h') && (tail[1] == '\0')) || +                 ((tail[0] == 'h') && (tail[1] == 'r') && +	         (tail[2] == '\0'))) { +                value = value * GF_HOUR_IN_SECONDS; +                goto out; +        } + +        else if (((tail[0] == 'd') && (tail[1] == '\0')) || +                 ((tail[0] == 'd') && (tail[1] == 'a') && +	         (tail[2] == 'y') && (tail[3] == 's') && +                 (tail[4] == '\0'))) { +                value = value * GF_DAY_IN_SECONDS; +                goto out; +        } + +        else if (((tail[0] == 'w') && (tail[1] == '\0')) || +                 ((tail[0] == 'w') && (tail[1] == 'k') && +	         (tail[2] == '\0'))) { +                value = value * GF_WEEK_IN_SECONDS; +                goto out; +        } else {                  return -1; +        } +out:          *n = value;          return 0; @@ -2294,6 +2326,9 @@ gf_canonicalize_path (char *path)          if (!path || *path != '/')                  goto out; +        if (!strcmp (path, "/")) +                return 0; +          tmppath = gf_strdup (path);          if (!tmppath)                  goto out; @@ -2818,7 +2853,6 @@ out:  } -  /* Sets log file path from user provided arguments */  int  gf_set_log_file_path (cmd_args_t *cmd_args) @@ -2956,6 +2990,76 @@ backtrace_symbols(void *const *trace, size_t len)  	}  	return ptr; -}  +}  #undef BELOW  #endif /* __NetBSD__ */ + +/* TODO: extract common code from gf_get_soft_limit and gf_get_hard_limit into a + * function + */ +int +gf_get_soft_limit (char *limit, char **soft_limit) +{ +        int   colon_count   = 0; +        int   i             = 0; +        int   len           = 0; +        char *sl            = NULL; + +        len = strlen (limit); +        for (i = 0; i < len; i++) { +                if (limit[i] == ':') +                        colon_count++; +                if (colon_count == 2) +                        break; +        } + +        if (colon_count != 2) { +                gf_log ("common-utils", GF_LOG_DEBUG, "Soft-limit absent"); +                return 0; +        } + +        sl = GF_CALLOC (len - i, sizeof (char), gf_common_mt_char); +        if (!sl) +                return -1; +        strncpy (sl, &limit[i+1], len - i - 1); +        *soft_limit = sl; + +        return 1; +} + +int +gf_get_hard_limit (char *limit, char **hard_limit) +{ +        int    i                 = 0; +        int    hlbegin           = 0; +        int    len               = 0; +        char  *hl                = NULL; + +        len = strlen (limit); + +        for (i = 0; i < len; i++) { +                if (limit[i] == ':') +                        break; +        } + +        if (i == len) { +                gf_log ("common-utils", GF_LOG_ERROR, "Hard limit not found"); +                return -1; +        } + +        hlbegin = i + 1; +        i++; + +        while ((limit[i] != '\0') && (limit[i] != ':')) { +                i++; +        } + +        hl = GF_CALLOC (i - hlbegin + 1, sizeof (char), gf_common_mt_char); +        if (!hl) +                return -1; + +        strncpy (hl, &limit[hlbegin], i - hlbegin); +        *hard_limit = hl; + +        return 0; +} diff --git a/libglusterfs/src/common-utils.h b/libglusterfs/src/common-utils.h index e762a86eaf4..acf2202d54e 100644 --- a/libglusterfs/src/common-utils.h +++ b/libglusterfs/src/common-utils.h @@ -81,6 +81,11 @@ void trap (void);  #define GF_NFS3_PORT    2049  #define GF_CLIENT_PORT_CEILING 1024 +#define GF_MINUTE_IN_SECONDS 60 +#define GF_HOUR_IN_SECONDS (60*60) +#define GF_DAY_IN_SECONDS (24*60*60) +#define GF_WEEK_IN_SECONDS (7*24*60*60) +  enum _gf_boolean  {  	_gf_false = 0, @@ -176,6 +181,18 @@ int  gf_set_log_file_path (cmd_args_t *cmd_args);                  }                                                       \          } while (0) +#define GF_REMOVE_INTERNAL_XATTR(pattern, dict)                         \ +        do {                                                            \ +                if (!dict) {                                            \ +                        gf_log (this->name, GF_LOG_ERROR,               \ +                                "dict is null");                        \ +                        break;                                          \ +                }                                                       \ +                dict_foreach_fnmatch (dict, pattern,                    \ +                                      dict_remove_foreach_fn,           \ +                                      NULL);                            \ +        } while (0) +  #define GF_IF_INTERNAL_XATTR_GOTO(pattern, dict, op_errno, label)       \          do {                                                            \                  if (!dict) {                                            \ @@ -595,4 +612,7 @@ size_t backtrace(void **, size_t);  char **backtrace_symbols(void *const *, size_t);  #endif +int gf_get_soft_limit (char *limit, char **soft_limit); +int gf_get_hard_limit (char *limit, char **hard_limit); +  #endif /* _COMMON_UTILS_H */ diff --git a/libglusterfs/src/dict.c b/libglusterfs/src/dict.c index 3b7ddce5e0d..f2df5a6d431 100644 --- a/libglusterfs/src/dict.c +++ b/libglusterfs/src/dict.c @@ -1086,6 +1086,20 @@ dict_null_foreach_fn (dict_t *d, char *k,  }  int +dict_remove_foreach_fn (dict_t *d, char *k, +                        data_t *v, void *_tmp) +{ +        if (!d || !k) { +                gf_log ("glusterfs", GF_LOG_WARNING, "%s is NULL", +                        d?"key":"dictionary"); +                return -1; +        } + +        dict_del (d, k); +        return 0; +} + +int  dict_foreach (dict_t *dict,                int (*fn)(dict_t *this,                          char *key, diff --git a/libglusterfs/src/dict.h b/libglusterfs/src/dict.h index 9b41b5a7df0..6e5d8aa0650 100644 --- a/libglusterfs/src/dict.h +++ b/libglusterfs/src/dict.h @@ -179,7 +179,8 @@ int dict_foreach_fnmatch (dict_t *dict, char *pattern,  int dict_null_foreach_fn (dict_t *d, char *k,                            data_t *v, void *tmp); - +int dict_remove_foreach_fn (dict_t *d, char *k, +                            data_t *v, void *tmp);  dict_t *dict_copy (dict_t *this, dict_t *new);  int dict_keys_join (void *value, int size, dict_t *dict,                      int (*filter_fn)(char *key)); diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index 525d6909a59..8f05a222d58 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -85,6 +85,7 @@  #define GF_XATTR_VOL_ID_KEY   "trusted.glusterfs.volume-id"  #define GF_XATTR_LOCKINFO_KEY   "trusted.glusterfs.lockinfo"  #define GF_XATTR_GET_REAL_FILENAME_KEY "user.glusterfs.get_real_filename:" +#define QUOTA_LIMIT_KEY "trusted.glusterfs.quota.limit-set"  #define GF_READDIR_SKIP_DIRS       "readdir-filter-directories" diff --git a/libglusterfs/src/inode.h b/libglusterfs/src/inode.h index a8897626545..60adba68425 100644 --- a/libglusterfs/src/inode.h +++ b/libglusterfs/src/inode.h @@ -142,6 +142,9 @@ inode_rename (inode_table_t *table, inode_t *olddir, const char *oldname,  	      inode_t *newdir, const char *newname,  	      inode_t *inode, struct iatt *stbuf); +dentry_t * +__dentry_grep (inode_table_t *table, inode_t *parent, const char *name); +  inode_t *  inode_grep (inode_table_t *table, inode_t *parent, const char *name); diff --git a/libglusterfs/src/options.c b/libglusterfs/src/options.c index 842b6413ab4..31e5a681d11 100644 --- a/libglusterfs/src/options.c +++ b/libglusterfs/src/options.c @@ -1110,7 +1110,7 @@ DEFINE_INIT_OPT(gf_boolean_t, bool, gf_string2boolean);  DEFINE_INIT_OPT(xlator_t *, xlator, xl_by_name);  DEFINE_INIT_OPT(char *, path, pass);  DEFINE_INIT_OPT(double, double, gf_string2double); - +DEFINE_INIT_OPT(uint32_t, time, gf_string2time);  DEFINE_RECONF_OPT(char *, str, pass); @@ -1125,3 +1125,4 @@ DEFINE_RECONF_OPT(gf_boolean_t, bool, gf_string2boolean);  DEFINE_RECONF_OPT(xlator_t *, xlator, xl_by_name);  DEFINE_RECONF_OPT(char *, path, pass);  DEFINE_RECONF_OPT(double, double, gf_string2double); +DEFINE_RECONF_OPT(uint32_t, time, gf_string2time); diff --git a/libglusterfs/src/options.h b/libglusterfs/src/options.h index e2a25baa9e7..62f4ee92e91 100644 --- a/libglusterfs/src/options.h +++ b/libglusterfs/src/options.h @@ -114,6 +114,7 @@ DECLARE_INIT_OPT(gf_boolean_t, bool);  DECLARE_INIT_OPT(xlator_t *, xlator);  DECLARE_INIT_OPT(char *, path);  DECLARE_INIT_OPT(double, double); +DECLARE_INIT_OPT(uint32_t, time);  #define DEFINE_INIT_OPT(type_t, type, conv)                             \ @@ -194,6 +195,7 @@ DECLARE_RECONF_OPT(gf_boolean_t, bool);  DECLARE_RECONF_OPT(xlator_t *, xlator);  DECLARE_RECONF_OPT(char *, path);  DECLARE_RECONF_OPT(double, double); +DECLARE_RECONF_OPT(uint32_t, time);  #define DEFINE_RECONF_OPT(type_t, type, conv)                            \ diff --git a/libglusterfs/src/xlator.c b/libglusterfs/src/xlator.c index a277c58a8f5..7c8c3cdba36 100644 --- a/libglusterfs/src/xlator.c +++ b/libglusterfs/src/xlator.c @@ -659,7 +659,6 @@ loc_copy (loc_t *dst, loc_t *src)          uuid_copy (dst->gfid, src->gfid);          uuid_copy (dst->pargfid, src->pargfid); -        uuid_copy (dst->gfid, src->gfid);          if (src->inode)                  dst->inode = inode_ref (src->inode); diff --git a/rpc/rpc-lib/src/protocol-common.h b/rpc/rpc-lib/src/protocol-common.h index 4bca0f55822..d64f280cf91 100644 --- a/rpc/rpc-lib/src/protocol-common.h +++ b/rpc/rpc-lib/src/protocol-common.h @@ -82,6 +82,13 @@ enum gf_pmap_procnum {          GF_PMAP_MAXVALUE,  }; +enum gf_aggregator_procnum { +        GF_AGGREGATOR_NULL = 0, +        GF_AGGREGATOR_LOOKUP, +        GF_AGGREGATOR_GETLIMIT, +        GF_AGGREGATOR_MAXVALUE, +}; +  enum gf_pmap_port_type {          GF_PMAP_PORT_FREE = 0,          GF_PMAP_PORT_FOREIGN, @@ -227,6 +234,10 @@ typedef enum {  #define GLUSTER_FOP_VERSION   330 /* 3.3.0 */  #define GLUSTER_FOP_PROCCNT   GFS3_OP_MAXVALUE +/* Aggregator */ +#define GLUSTER_AGGREGATOR_PROGRAM 29852134 /* Completely random */ +#define GLUSTER_AGGREGATOR_VERSION 1 +  /* Second version */  #define GD_MGMT_PROGRAM          1238433 /* Completely random */  #define GD_MGMT_VERSION          2   /* 0.0.2 */ diff --git a/tests/basic/quota-nfs-anon.t b/tests/basic/quota-nfs-anon.t new file mode 100644 index 00000000000..7b5ea5f28e0 --- /dev/null +++ b/tests/basic/quota-nfs-anon.t @@ -0,0 +1,46 @@ +#!/bin/bash + +. $(dirname $0)/../include.rc +. $(dirname $0)/../volume.rc + +cleanup; + +TEST glusterd +TEST $CLI volume create $V0 $H0:$B0/${V0}{1} + +function volinfo_field() +{ +    local vol=$1; +    local field=$2; + +    $CLI volume info $vol | grep "^$field: " | sed 's/.*: //'; +} + + +## Verify volume is is created +EXPECT "$V0" volinfo_field $V0 'Volume Name'; +EXPECT 'Created' volinfo_field $V0 'Status'; + + +## Start volume and verify +TEST $CLI volume start $V0; +EXPECT 'Started' volinfo_field $V0 'Status'; + +TEST $CLI volume quota $V0 enable; + +## Mount NFS +TEST mount -t nfs -o nolock,soft,intr $H0:/$V0 $N0; +mkdir -p $N0/0/1 +TEST $CLI volume quota $V0 limit-usage /0/1 1GB 75%; + +deep=/0/1/2/3/4/5/6/7/8/9 +mkdir -p $N0/$deep +dd if=/dev/zero of=$N0/$deep/file bs=1M count=502 & + +kill_brick $V0 $H0 $B0/${V0}{1} +kill -TERM $(get_nfs_pid) + +$CLI volume start $V0 force; + + +cleanup; diff --git a/tests/basic/quota.t b/tests/basic/quota.t index ef015a30d92..5c531adbf79 100755 --- a/tests/basic/quota.t +++ b/tests/basic/quota.t @@ -9,21 +9,37 @@ TEST glusterd  TEST pidof glusterd  TEST $CLI volume info; -TEST $CLI volume create $V0 replica 2 stripe 2 $H0:$B0/${V0}{1,2,3,4,5,6,7,8}; +TEST $CLI volume create $V0 replica 2  $H0:$B0/${V0}{1,2,3,4}; -function limit_on() +function hard_limit()  {          local QUOTA_PATH=$1; -        $CLI volume quota $V0 list | grep "$QUOTA_PATH" | awk '{print $2}' +        $CLI volume quota $V0 list $QUOTA_PATH | grep "$QUOTA_PATH" | awk '{print $2}' +} + +function soft_limit() +{ +        local QUOTA_PATH=$1; +        $CLI volume quota $V0 list $QUOTA_PATH | grep "$QUOTA_PATH" | awk '{print $3}' +} + +function usage() +{ +        local QUOTA_PATH=$1; +        $CLI volume quota $V0 list $QUOTA_PATH | grep "$QUOTA_PATH" | awk '{print $4}'  }  EXPECT "$V0" volinfo_field $V0 'Volume Name';  EXPECT 'Created' volinfo_field $V0 'Status'; -EXPECT '8' brick_count $V0 +EXPECT '4' brick_count $V0  TEST $CLI volume start $V0;  EXPECT 'Started' volinfo_field $V0 'Status'; +TEST glusterfs -s $H0 --volfile-id $V0 $M0; + +TEST mkdir -p $M0/test_dir/in_test_dir +  ## ------------------------------  ## Verify quota commands  ## ------------------------------ @@ -33,19 +49,66 @@ TEST $CLI volume quota $V0 limit-usage /test_dir 100MB  TEST $CLI volume quota $V0 limit-usage /test_dir/in_test_dir 150MB -EXPECT "150MB" limit_on "/test_dir/in_test_dir"; +EXPECT "150.0MB" hard_limit "/test_dir/in_test_dir"; +EXPECT "80%" soft_limit "/test_dir/in_test_dir";  TEST $CLI volume quota $V0 remove /test_dir/in_test_dir -EXPECT "100MB" limit_on "/test_dir"; +EXPECT "100.0MB" hard_limit "/test_dir"; + +TEST $CLI volume quota $V0 limit-usage /test_dir 10MB +EXPECT "10.0MB" hard_limit "/test_dir"; +EXPECT "80%" soft_limit "/test_dir"; + +TEST $CLI volume quota $V0 soft-timeout 0 +TEST $CLI volume quota $V0 hard-timeout 0 -TEST $CLI volume quota $V0 disable  ## ------------------------------ +## Verify quota enforcement +## ----------------------------- + +TEST ! dd if=/dev/urandom of=$M0/test_dir/1.txt bs=1M count=12 +TEST rm $M0/test_dir/1.txt +# wait for marker's accounting to complete +EXPECT_WITHIN 10 "0Bytes" usage "/test_dir" + +TEST dd if=/dev/urandom of=$M0/test_dir/2.txt bs=1M count=8 +EXPECT_WITHIN 20 "8.0MB" usage "/test_dir" +TEST rm $M0/test_dir/2.txt +EXPECT_WITHIN 10 "0Bytes" usage "/test_dir" + +## rename tests +TEST dd if=/dev/urandom of=$M0/test_dir/2 bs=1M count=8 +EXPECT_WITHIN 20 "8.0MB" usage "/test_dir" +TEST mv $M0/test_dir/2 $M0/test_dir/0 +EXPECT_WITHIN 10 "8.0MB" usage "/test_dir" +TEST rm $M0/test_dir/0 +EXPECT_WITHIN 10 "0Bytes" usage "/test_dir" + +## --------------------------- + +## ------------------------------ +## Check if presence of nfs mount results in ESTALE errors for I/O +#  on a fuse mount. Note: Quota command internally uses a fuse mount, +#  though this may change. +## ----------------------------- + +TEST mount -t nfs -o nolock,soft,intr $H0:/$V0 $N0; +TEST $CLI volume quota $V0 limit-usage /test_dir 100MB + +TEST $CLI volume quota $V0 limit-usage /test_dir/in_test_dir 150MB + +EXPECT "150.0MB" hard_limit "/test_dir/in_test_dir"; +## ----------------------------- + +TEST $CLI volume quota $V0 disable  TEST $CLI volume stop $V0;  EXPECT 'Stopped' volinfo_field $V0 'Status';  TEST $CLI volume delete $V0;  TEST ! $CLI volume info $V0; +umount -l $N0 +  cleanup; diff --git a/tests/include.rc b/tests/include.rc index de28241c2ac..40dd8ac96b9 100644 --- a/tests/include.rc +++ b/tests/include.rc @@ -214,6 +214,8 @@ function _TEST_IN_LOOP()  function cleanup()  {  	killall -15 glusterfs glusterfsd glusterd 2>/dev/null || true; +        # allow completion of signal handlers for SIGTERM before issue SIGKILL +        sleep 1  	killall -9 glusterfs glusterfsd glusterd 2>/dev/null || true;          MOUNTPOINTS=`mount | grep "$B0/" | awk '{print $3}'` diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c index 8f61339e692..e320107a832 100644 --- a/xlators/cluster/dht/src/dht-common.c +++ b/xlators/cluster/dht/src/dht-common.c @@ -2248,6 +2248,18 @@ dht_getxattr (call_frame_t *frame, xlator_t *this,                  return 0;          } +        if (key && !strcmp (GF_XATTR_QUOTA_LIMIT_LIST, key)) { +                /* quota hardlimit and aggregated size of a directory is stored +                 * in inode contexts of each brick. Hence its good enough that +                 * we send getxattr for this key to any brick. +                 */ +                local->call_cnt = 1; +                subvol = dht_first_up_subvol (this); +                STACK_WIND (frame, dht_getxattr_cbk, subvol, +                            subvol->fops->getxattr, loc, key, xdata); +                return 0; +        } +          if (key && *conf->vol_uuid) {                  if ((match_uuid_local (key, conf->vol_uuid) == 0) &&                      (GF_CLIENT_PID_GSYNCD == frame->root->pid)) { @@ -2861,11 +2873,16 @@ int  dht_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  int op_ret, int op_errno, struct statvfs *statvfs, dict_t *xdata)  { -        dht_local_t *local         = NULL; -        int          this_call_cnt = 0; -        int          bsize         = 0; -        int          frsize        = 0; +        dht_local_t *local              = NULL; +        int          this_call_cnt      = 0; +        int          bsize              = 0; +        int          frsize             = 0; +        int8_t       quota_deem_statfs  = 0; +        GF_UNUSED int     ret           = 0; +        unsigned long     new_usage     = 0; +        unsigned long     cur_usage     = 0; +        ret = dict_get_int8 (xdata, "quota-deem-statfs", "a_deem_statfs);          local = frame->local; @@ -2875,8 +2892,22 @@ dht_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                          local->op_errno = op_errno;                          goto unlock;                  } +                if (!statvfs) { +                        op_errno = EINVAL; +                        local->op_ret = -1; +                        goto unlock; +                }                  local->op_ret = 0; +                if (quota_deem_statfs) { +                        new_usage = statvfs->f_blocks - statvfs->f_bfree; +                        cur_usage = local->statvfs.f_blocks - local->statvfs.f_bfree; +                        /* We take the maximux of the usage from the subvols */ +                        if (new_usage >= cur_usage) +                                local->statvfs = *statvfs; +                        goto unlock; +                } +                  if (local->statvfs.f_bsize != 0) {                          bsize = max(local->statvfs.f_bsize, statvfs->f_bsize);                          frsize = max(local->statvfs.f_frsize, statvfs->f_frsize); @@ -2897,6 +2928,7 @@ dht_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  local->statvfs.f_flag     = statvfs->f_flag;                  local->statvfs.f_namemax  = statvfs->f_namemax; +          }  unlock:          UNLOCK (&frame->lock); diff --git a/xlators/features/quota/src/Makefile.am b/xlators/features/quota/src/Makefile.am index 9546f427629..7165adc59ef 100644 --- a/xlators/features/quota/src/Makefile.am +++ b/xlators/features/quota/src/Makefile.am @@ -1,17 +1,22 @@ -xlator_LTLIBRARIES = quota.la +xlator_LTLIBRARIES = quota.la quotad.la  xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features  quota_la_LDFLAGS = -module -avoid-version +quotad_la_LDFLAGS = -module -avoid-version -quota_la_SOURCES = quota.c +quota_la_SOURCES = quota.c quota-enforcer-client.c  quota_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la -noinst_HEADERS = quota-mem-types.h quota.h +quotad_la_SOURCES = quotad.c quotad-helpers.c quotad-aggregator.c +quotad_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +noinst_HEADERS = quota-mem-types.h quota.h quotad-aggregator.h quotad-helpers.h  AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \ -	-I$(top_srcdir)/xlators/cluster/dht/src +	-I$(top_srcdir)/xlators/cluster/dht/src -I$(top_srcdir)/rpc/xdr/src/ \ +	-I$(top_srcdir)/rpc/rpc-lib/src  AM_CFLAGS = -Wall $(GF_CFLAGS) -CLEANFILES =  +CLEANFILES = diff --git a/xlators/features/quota/src/quota-enforcer-client.c b/xlators/features/quota/src/quota-enforcer-client.c new file mode 100644 index 00000000000..bfea5e42014 --- /dev/null +++ b/xlators/features/quota/src/quota-enforcer-client.c @@ -0,0 +1,364 @@ +/* +   Copyright (c) 2010-2012 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ +#include <stdio.h> +#include <string.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/resource.h> +#include <sys/file.h> +#include <netdb.h> +#include <signal.h> +#include <libgen.h> + +#include <sys/utsname.h> + +#include <stdint.h> +#include <pthread.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <time.h> +#include <semaphore.h> +#include <errno.h> + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#ifdef HAVE_MALLOC_H +#include <malloc.h> +#endif + +#ifdef HAVE_MALLOC_STATS +#ifdef DEBUG +#include <mcheck.h> +#endif +#endif + +#include "quota.h" + +extern struct rpc_clnt_program quota_enforcer_clnt; + +int32_t +quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                    int32_t op_ret, int32_t op_errno, inode_t *inode, +                    struct iatt *buf, dict_t *xdata, struct iatt *postparent); + +int +quota_enforcer_submit_request (void *req, call_frame_t *frame, +                               rpc_clnt_prog_t *prog, +                               int procnum, struct iobref *iobref, +                               xlator_t *this, fop_cbk_fn_t cbkfn, +                               xdrproc_t xdrproc) +{ +        int           ret        = -1; +        int           count      = 0; +        struct iovec  iov        = {0, }; +        struct iobuf *iobuf      = NULL; +        char          new_iobref = 0; +        ssize_t       xdr_size   = 0; +        quota_priv_t *priv       = NULL; + +        GF_ASSERT (this); + +        priv = this->private; + +        if (req) { +                xdr_size = xdr_sizeof (xdrproc, req); +                iobuf = iobuf_get2 (this->ctx->iobuf_pool, xdr_size); +                if (!iobuf) { +                        goto out; +                } + +                if (!iobref) { +                        iobref = iobref_new (); +                        if (!iobref) { +                                goto out; +                        } + +                        new_iobref = 1; +                } + +                iobref_add (iobref, iobuf); + +                iov.iov_base = iobuf->ptr; +                iov.iov_len  = iobuf_size (iobuf); + +                /* Create the xdr payload */ +                ret = xdr_serialize_generic (iov, req, xdrproc); +                if (ret == -1) { +                        goto out; +                } +                iov.iov_len = ret; +                count = 1; +        } + +        /* Send the msg */ +        ret = rpc_clnt_submit (priv->rpc_clnt, prog, procnum, cbkfn, +                               &iov, count, +                               NULL, 0, iobref, frame, NULL, 0, NULL, 0, NULL); +        ret = 0; + +out: +        if (new_iobref) +                iobref_unref (iobref); +        if (iobuf) +                iobuf_unref (iobuf); + +        return ret; +} + +int +quota_enforcer_lookup_cbk (struct rpc_req *req, struct iovec *iov, +                           int count, void *myframe) +{ +        quota_local_t    *local      = NULL; +        call_frame_t     *frame      = NULL; +        int               ret        = 0; +        gfs3_lookup_rsp   rsp        = {0,}; +        struct iatt       stbuf      = {0,}; +        struct iatt       postparent = {0,}; +        int               op_errno   = EINVAL; +        dict_t           *xdata      = NULL; +        inode_t          *inode      = NULL; +        xlator_t         *this       = NULL; + +        this = THIS; + +        frame = myframe; +        local = frame->local; +        inode = local->validate_loc.inode; + +        if (-1 == req->rpc_status) { +                rsp.op_ret   = -1; +                op_errno = ENOTCONN; +                goto out; +        } + +        ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gfs3_lookup_rsp); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "XDR decoding failed"); +                rsp.op_ret   = -1; +                op_errno = EINVAL; +                goto out; +        } + +        op_errno = gf_error_to_errno (rsp.op_errno); +        gf_stat_to_iatt (&rsp.postparent, &postparent); + +        if (rsp.op_ret == -1) +                goto out; + +        rsp.op_ret = -1; +        gf_stat_to_iatt (&rsp.stat, &stbuf); + +        GF_PROTOCOL_DICT_UNSERIALIZE (frame->this, xdata, (rsp.xdata.xdata_val), +                                      (rsp.xdata.xdata_len), rsp.op_ret, +                                      op_errno, out); + +        if ((!uuid_is_null (inode->gfid)) +            && (uuid_compare (stbuf.ia_gfid, inode->gfid) != 0)) { +                gf_log (frame->this->name, GF_LOG_DEBUG, +                        "gfid changed for %s", local->validate_loc.path); +                rsp.op_ret = -1; +                op_errno = ESTALE; +                goto out; +        } + +        rsp.op_ret = 0; + +out: +        rsp.op_errno = op_errno; +        if (rsp.op_ret == -1) { +                /* any error other than ENOENT */ +                if (rsp.op_errno != ENOENT) +                        gf_log (this->name, GF_LOG_WARNING, +                                "remote operation failed: %s. Path: %s (%s)", +                                strerror (rsp.op_errno), +                                local->validate_loc.path, +                                loc_gfid_utoa (&local->validate_loc)); +                else +                        gf_log (this->name, GF_LOG_TRACE, +                                "not found on remote node"); + +        } + +        local->validate_cbk (frame, NULL, this, rsp.op_ret, rsp.op_errno, inode, +                             &stbuf, xdata, &postparent); + +        if (xdata) +                dict_unref (xdata); + +        free (rsp.xdata.xdata_val); + +        return 0; +} + +int +quota_enforcer_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, +                       dict_t *xdata, fop_lookup_cbk_t validate_cbk) +{ +        quota_local_t          *local      = NULL; +        gfs3_lookup_req         req        = {{0,},}; +        int                     ret        = 0; +        int                     op_errno   = ESTALE; +        quota_priv_t           *priv       = NULL; + +        if (!frame || !this || !loc) +                goto unwind; + +        local = frame->local; +        local->validate_cbk = validate_cbk; + +        priv = this->private; + +        if (!(loc && loc->inode)) +                goto unwind; + +        if (!uuid_is_null (loc->inode->gfid)) +                memcpy (req.gfid, loc->inode->gfid, 16); +        else +                memcpy (req.gfid, loc->gfid, 16); + +        if (xdata) { +                GF_PROTOCOL_DICT_SERIALIZE (this, xdata, +                                            (&req.xdata.xdata_val), +                                            req.xdata.xdata_len, +                                            op_errno, unwind); +        } + +        if (loc->name) +                req.bname = (char *)loc->name; +        else +                req.bname = ""; + +        ret = quota_enforcer_submit_request (&req, frame, +                                             priv->quota_enforcer, +                                             GF_AGGREGATOR_LOOKUP, +                                             NULL, this, +                                             quota_enforcer_lookup_cbk, +                                             (xdrproc_t)xdr_gfs3_lookup_req); + +        if (ret) { +                gf_log (this->name, GF_LOG_WARNING, "failed to send the fop"); +        } + +        GF_FREE (req.xdata.xdata_val); + +        return 0; + +unwind: +        validate_cbk (frame, NULL, this, -1, op_errno, NULL, NULL, NULL, NULL); + +        GF_FREE (req.xdata.xdata_val); + +        return 0; +} + +int +quota_enforcer_notify (struct rpc_clnt *rpc, void *mydata, +                       rpc_clnt_event_t event, void *data) +{ +        xlator_t                *this = NULL; +        int                     ret = 0; + +        this = mydata; + +        switch (event) { +        case RPC_CLNT_CONNECT: +        { +                gf_log (this->name, GF_LOG_TRACE, "got RPC_CLNT_CONNECT"); +                break; +        } + +        case RPC_CLNT_DISCONNECT: +        { +                gf_log (this->name, GF_LOG_TRACE, "got RPC_CLNT_DISCONNECT"); +                break; +        } + +        default: +                gf_log (this->name, GF_LOG_TRACE, +                        "got some other RPC event %d", event); +                ret = 0; +                break; +        } + +        return ret; +} + +//Returns a started rpc_clnt. Creates a new rpc_clnt if quota_priv doesn't have +//one already +struct rpc_clnt * +quota_enforcer_init (xlator_t *this, dict_t *options) +{ +        struct rpc_clnt *rpc  = NULL; +        quota_priv_t    *priv = NULL; +        int              ret  = -1; + +        priv = this->private; +        if (priv->rpc_clnt) { +                gf_log (this->name, GF_LOG_TRACE, "quota enforcer clnt already " +                        "inited"); +                //Turns out to be a NOP if the clnt is already connected. +                rpc_clnt_start (priv->rpc_clnt); +                return priv->rpc_clnt; +        } +        priv->quota_enforcer = "a_enforcer_clnt; + +        ret = dict_set_str (options, "transport.address-family", "unix"); +        if (ret) +                goto out; + +        ret = dict_set_str (options, "transport-type", "socket"); +        if (ret) +                goto out; + +        ret = dict_set_str (options, "transport.socket.connect-path", +                            "/tmp/quotad.socket"); +        if (ret) +                goto out; + +        rpc = rpc_clnt_new (options, this->ctx, this->name, 16); +        if (!rpc) { +                ret = -1; +                goto out; +        } + +        ret = rpc_clnt_register_notify (rpc, quota_enforcer_notify, this); +        if (ret) { +                gf_log ("cli", GF_LOG_ERROR, "failed to register notify"); +                goto out; +        } + +        rpc_clnt_start (rpc); +out: +        if (ret) { +                if (rpc) +                        rpc_clnt_unref (rpc); +                rpc = NULL; +        } + +        return rpc; +} + +struct rpc_clnt_procedure quota_enforcer_actors[GF_AGGREGATOR_MAXVALUE] = { +        [GF_AGGREGATOR_NULL]     = {"NULL", NULL}, +        [GF_AGGREGATOR_LOOKUP]   = {"LOOKUP", NULL}, +}; + +struct rpc_clnt_program quota_enforcer_clnt = { +        .progname  = "Quota enforcer", +        .prognum   = GLUSTER_AGGREGATOR_PROGRAM, +        .progver   = GLUSTER_AGGREGATOR_VERSION, +        .numproc   = GF_AGGREGATOR_MAXVALUE, +        .proctable = quota_enforcer_actors, +}; diff --git a/xlators/features/quota/src/quota-mem-types.h b/xlators/features/quota/src/quota-mem-types.h index 3082865da29..97d9165681f 100644 --- a/xlators/features/quota/src/quota-mem-types.h +++ b/xlators/features/quota/src/quota-mem-types.h @@ -21,6 +21,9 @@ enum gf_quota_mem_types_ {          gf_quota_mt_int32_t,          gf_quota_mt_limits_t,          gf_quota_mt_quota_dentry_t, +        gf_quota_mt_quota_limits_level_t, +        gf_quota_mt_qd_vols_conf_t, +        gf_quota_mt_aggregator_state_t,          gf_quota_mt_end  };  #endif diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c index 345d44c5272..a50fb1184a9 100644 --- a/xlators/features/quota/src/quota.c +++ b/xlators/features/quota/src/quota.c @@ -12,44 +12,100 @@  #include "quota.h"  #include "common-utils.h"  #include "defaults.h" +#include "statedump.h"  int32_t  quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,                     char *name, uuid_t par); + +int +quota_fill_inodectx (xlator_t *this, inode_t *inode, dict_t *dict, +                     loc_t *loc, struct iatt *buf, int32_t *op_errno); +  struct volume_options options[]; +static int32_t +__quota_init_inode_ctx (inode_t *inode, xlator_t *this, +                        quota_inode_ctx_t **context) +{ +        int32_t            ret  = -1; +        quota_inode_ctx_t *ctx  = NULL; + +        if (inode == NULL) { +                goto out; +        } + +        QUOTA_ALLOC_OR_GOTO (ctx, quota_inode_ctx_t, out); + +        LOCK_INIT(&ctx->lock); + +        if (context != NULL) { +                *context = ctx; +        } + +        INIT_LIST_HEAD (&ctx->parents); + +        ret = __inode_ctx_put (inode, this, (uint64_t )(long)ctx); +        if (ret == -1) { +                gf_log (this->name, GF_LOG_WARNING, +                        "cannot set quota context in inode (gfid:%s)", +                        uuid_utoa (inode->gfid)); +        } +out: +        return ret; +} + + +static int32_t +quota_inode_ctx_get (inode_t *inode, xlator_t *this, +                     quota_inode_ctx_t **ctx, char create_if_absent) +{ +        int32_t  ret = 0; +        uint64_t ctx_int; + +        LOCK (&inode->lock); +        { +                ret = __inode_ctx_get (inode, this, &ctx_int); + +                if ((ret == 0) && (ctx != NULL)) { +                        *ctx = (quota_inode_ctx_t *) (unsigned long)ctx_int; +                } else if (create_if_absent) { +                        ret = __quota_init_inode_ctx (inode, this, ctx); +                } +        } +        UNLOCK (&inode->lock); + +        return ret; +} +  int  quota_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path)  {          int ret = -1; -        if (!loc) { +        if (!loc || (inode == NULL))                  return ret; -        }          if (inode) {                  loc->inode = inode_ref (inode); +                uuid_copy (loc->gfid, inode->gfid);          }          if (parent) {                  loc->parent = inode_ref (parent);          } -        loc->path = gf_strdup (path); -        if (!loc->path) { -                goto loc_wipe; -        } +        if (path != NULL) { +                loc->path = gf_strdup (path); -        loc->name = strrchr (loc->path, '/'); -        if (loc->name) { -                loc->name++; -        } else { -                goto loc_wipe; +                loc->name = strrchr (loc->path, '/'); +                if (loc->name) { +                        loc->name++; +                }          }          ret = 0; -loc_wipe:          if (ret < 0) {                  loc_wipe (loc);          } @@ -82,7 +138,6 @@ quota_inode_loc_fill (inode_t *inode, loc_t *loc)                  gf_log (this->name, GF_LOG_DEBUG,                          "cannot find parent for inode (gfid:%s)",                          uuid_utoa (inode->gfid)); -                goto err;          }  ignore_parent: @@ -91,7 +146,6 @@ ignore_parent:                  gf_log (this->name, GF_LOG_DEBUG,                          "cannot construct path for inode (gfid:%s)",                          uuid_utoa (inode->gfid)); -                goto err;          }          ret = quota_loc_fill (loc, inode, parent, resolvedpath); @@ -161,7 +215,9 @@ __quota_dentry_new (quota_inode_ctx_t *ctx, char *name, uuid_t par)          uuid_copy (dentry->par, par); -        list_add_tail (&dentry->next, &ctx->parents); +        if (ctx != NULL) +                list_add_tail (&dentry->next, &ctx->parents); +  err:          return dentry;  } @@ -182,19 +238,66 @@ out:          return;  } +inline void +quota_resume_fop_if_validation_done (quota_local_t *local) +{ +        call_stub_t *stub       = NULL; +        int          link_count = -1; + +        if (local == NULL) +                goto out; + +        LOCK (&local->lock); +        { +                link_count = local->link_count; +                if (link_count == 0) { +                        stub = local->stub; +                        local->stub = NULL; +                } +        } +        UNLOCK (&local->lock); + +        if (stub != NULL) { +                call_resume (stub); +        } +out: +        return; +} + +inline void +quota_handle_validate_error (quota_local_t *local, int32_t op_ret, +                             int32_t op_errno) +{ +        if (local == NULL) +                goto out; + +        LOCK (&local->lock); +        { +                if (op_ret < 0) { +                        local->op_ret = op_ret; +                        local->op_errno = op_errno; +                } + +                /* we abort checking limits on this path to root */ +                local->link_count--; +        } +        UNLOCK (&local->lock); + +        quota_resume_fop_if_validation_done (local); +out: +        return; +}  int32_t  quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -                    int32_t op_ret, int32_t op_errno, dict_t *dict, -                    dict_t *xdata) +                    int32_t op_ret, int32_t op_errno, inode_t *inode, +                    struct iatt *buf, dict_t *xdata, struct iatt *postparent)  { -        quota_local_t     *local          = NULL; -        uint32_t           validate_count = 0, link_count = 0; -        int32_t            ret            = 0; -        quota_inode_ctx_t *ctx            = NULL; -        int64_t           *size           = 0; -        uint64_t           value          = 0; -        call_stub_t       *stub           = NULL; +        quota_local_t     *local      = NULL; +        int32_t            ret        = 0; +        quota_inode_ctx_t *ctx        = NULL; +        int64_t           *size       = 0; +        uint64_t           value      = 0;          local = frame->local; @@ -206,7 +309,7 @@ quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          GF_ASSERT (frame);          GF_VALIDATE_OR_GOTO_WITH_ERROR ("quota", this, unwind, op_errno,                                          EINVAL); -        GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, dict, unwind, op_errno, +        GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, xdata, unwind, op_errno,                                          EINVAL);          ret = inode_ctx_get (local->validate_loc.inode, this, &value); @@ -220,7 +323,7 @@ quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto unwind;          } -        ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size); +        ret = dict_get_bin (xdata, QUOTA_SIZE_KEY, (void **) &size);          if (ret < 0) {                  gf_log (this->name, GF_LOG_WARNING,                          "size key not present in dict"); @@ -243,25 +346,7 @@ quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          return 0;  unwind: -        LOCK (&local->lock); -        { -                local->op_ret = -1; -                local->op_errno = op_errno; - -                validate_count = --local->validate_count; -                link_count = local->link_count; - -                if ((validate_count == 0) && (link_count == 0)) { -                        stub = local->stub; -                        local->stub = NULL; -                } -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } - +        quota_handle_validate_error (local, op_ret, op_errno);          return 0;  } @@ -288,34 +373,318 @@ quota_timeout (struct timeval *tv, int32_t timeout)          return timed_out;  } +int32_t +quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                          int32_t op_ret, int32_t op_errno, +                          gf_dirent_t *entries, dict_t *xdata) +{ +        inode_t           *parent             = NULL; +        gf_dirent_t       *entry              = NULL; +        loc_t              loc                = {0, }; +        quota_dentry_t    *dentry             = NULL, *tmp = NULL; +        quota_inode_ctx_t *ctx                = NULL; +        struct list_head   parents            = {0, }; +        quota_local_t     *local              = NULL; +        call_frame_t      *continuation_frame = NULL; + +        INIT_LIST_HEAD (&parents); + +        continuation_frame = frame->local; +        frame->local = NULL; + +        local = continuation_frame->local; + +        if (op_ret < 0) +                goto err; + +        parent = inode_parent (local->validate_loc.inode, 0, NULL); +        if (parent == NULL) { +                gf_log (this->name, GF_LOG_WARNING, "parent is NULL"); +                op_errno = EINVAL; +                goto err; +        } + +        if ((op_ret > 0) && (entries != NULL)) { +                list_for_each_entry (entry, &entries->list, list) { +                        if (__is_root_gfid (entry->inode->gfid)) { +                                /* The list contains a sub-list for each +                                 * possible path to the target inode. Each +                                 * sub-list starts with the root entry of the +                                 * tree and is followed by the child entries +                                 * for a particular path to the target entry. +                                 * The root entry is an implied sub-list +                                 * delimiter, as it denotes we have started +                                 * processing a new path. Reset the parent +                                 * pointer and continue +                                 */ + +                                parent = NULL; +                        } + +                        uuid_copy (loc.gfid, entry->d_stat.ia_gfid); + +                        loc.inode = inode_ref (entry->inode); +                        loc.parent = inode_ref (parent); +                        loc.name = entry->d_name; + +                        quota_fill_inodectx (this, entry->inode, entry->dict, +                                             &loc, &entry->d_stat, &op_errno); + +                        parent = entry->inode; + +                        loc_wipe (&loc); +                } +        } + +        quota_inode_ctx_get (local->validate_loc.inode, this, &ctx, 0); + +        local->link_count = 0; + +        if (ctx != NULL) { +                LOCK (&ctx->lock); +                { +                        list_for_each_entry (dentry, &ctx->parents, next) { +                                tmp = __quota_dentry_new (NULL, dentry->name, +                                                          dentry->par); +                                list_add_tail (&tmp->next, &parents); +                                local->link_count++; +                        } +                } +                UNLOCK (&ctx->lock); +        } + +        if (local->link_count != 0) { +                list_for_each_entry_safe (dentry, tmp, &parents, next) { +                        quota_check_limit (continuation_frame, +                                           local->validate_loc.inode, +                                           this, dentry->name, dentry->par); +                        __quota_dentry_free (dentry); +                } +        } else { +                local->link_count = 1; +                quota_check_limit (continuation_frame, parent, this, NULL, +                                   NULL); +        } + +        STACK_DESTROY (frame->root); +        return 0; + +err: +        STACK_DESTROY (frame->root); + +        quota_handle_validate_error (local, -1, op_errno); +        return 0; +}  int32_t -quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, -                   char *name, uuid_t par) +quota_build_ancestry_open_cbk (call_frame_t *frame, void *cookie, +                               xlator_t *this, int32_t op_ret, int32_t op_errno, +                               fd_t *fd, dict_t *xdata)  { -        int32_t               ret            = -1; -        inode_t              *_inode         = NULL, *parent = NULL; -        quota_inode_ctx_t    *ctx            = NULL; -        quota_priv_t         *priv           = NULL; -        quota_local_t        *local          = NULL; -        char                  need_validate  = 0, need_unwind = 0; -        int64_t               delta          = 0; -        call_stub_t          *stub           = NULL; -        int32_t               validate_count = 0, link_count = 0; -        uint64_t              value          = 0; -        char                  just_validated = 0; -        uuid_t                trav_uuid      = {0,}; +        int            ret                = -1; +        dict_t        *xdata_req          = NULL; +        quota_local_t *local              = NULL; +        call_frame_t  *continuation_frame = NULL; + +        xdata_req = dict_new (); +        if (xdata_req == NULL) { +                ret = -ENOMEM; +                goto err; +        } -        GF_VALIDATE_OR_GOTO ("quota", this, out); -        GF_VALIDATE_OR_GOTO (this->name, frame, out); -        GF_VALIDATE_OR_GOTO (this->name, inode, out); +        ret = dict_set_int8 (xdata_req, QUOTA_LIMIT_KEY, 1); +        if (ret < 0) +                goto err; + +        ret = dict_set_int8 (xdata_req, GET_ANCESTRY_DENTRY_KEY, 1); +        if (ret < 0) +                goto err; + +        /* This would ask posix layer to construct dentry chain till root */ +        STACK_WIND (frame, quota_build_ancestry_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->readdirp, fd, 0, 0, xdata_req); + +        ret = 0; + +err: +        fd_unref (fd); + +        dict_unref (xdata_req); + +        if (ret < 0) { +                continuation_frame = frame->local; +                frame->local = NULL; + +                STACK_DESTROY (frame->root); + +                local = continuation_frame->local; +                quota_handle_validate_error (local, -1, op_errno); +        } + +        return ret; +} + +int +quota_build_ancestry (call_frame_t *frame, inode_t *inode, xlator_t *this) +{ +        loc_t          loc       = {0, }; +        fd_t          *fd        = NULL; +        quota_local_t *local     = NULL; +        call_frame_t  *new_frame = NULL; +        int            ret       = -1; + +        loc.inode = inode_ref (inode); +        uuid_copy (loc.gfid, inode->gfid); + +        gf_log (this->name, GF_LOG_WARNING, "building ancestry");          local = frame->local; -        GF_VALIDATE_OR_GOTO (this->name, local, out); + +        LOCK (&local->lock); +        { +                loc_wipe (&local->validate_loc); + +                ret = quota_inode_loc_fill (inode, &local->validate_loc); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "cannot fill loc for inode (gfid:%s), hence " +                                "aborting quota-checks and continuing with fop", +                                uuid_utoa (inode->gfid)); +                } +        } +        UNLOCK (&local->lock); + +        fd = fd_create (inode, 0); + +        new_frame = copy_frame (frame); +        new_frame->root->uid = new_frame->root->gid = 0; + +        new_frame->local = frame; + +        if (IA_ISDIR (inode->ia_type)) { +                STACK_WIND (new_frame, quota_build_ancestry_open_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->opendir, &loc, fd, +                            NULL); +        } else { +                STACK_WIND (new_frame, quota_build_ancestry_open_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->open, &loc, 0, fd, +                            NULL); +        } + +        loc_wipe (&loc); + +        return 0; +} + +int +quota_validate (call_frame_t *frame, inode_t *inode, xlator_t *this, +                fop_lookup_cbk_t cbk_fn) +{ +        quota_local_t     *local = NULL; +        int                ret   = 0; +        dict_t            *xdata = NULL; +        quota_priv_t      *priv  = NULL; + +        local = frame->local; +        priv = this->private; + +        LOCK (&local->lock); +        { +                loc_wipe (&local->validate_loc); + +                ret = quota_inode_loc_fill (inode, &local->validate_loc); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "cannot fill loc for inode (gfid:%s), hence " +                                "aborting quota-checks and continuing with fop", +                                uuid_utoa (inode->gfid)); +                } +        } +        UNLOCK (&local->lock); + +        if (ret < 0) { +                ret = -ENOMEM; +                goto err; +        } + +        xdata = dict_new (); +        if (xdata == NULL) { +                ret = -ENOMEM; +                goto err; +        } + +        ret = dict_set_int8 (xdata, QUOTA_SIZE_KEY, 1); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_WARNING, "dict set failed"); +                ret = -ENOMEM; +                goto err; +        } + +        ret = dict_set_str (xdata, "volume-uuid", priv->volume_uuid); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_WARNING, "dict set failed"); +                ret = -ENOMEM; +                goto err; +        } + +        ret = quota_enforcer_lookup (frame, this, &local->validate_loc, xdata, +                                     cbk_fn); +        if (ret < 0) { +                ret = -ENOTCONN; +                goto err; +        } + +        ret = 0; +err: +        return ret; +} + +int32_t +quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, +                   char *name, uuid_t par) +{ +        int32_t            ret                 = -1, op_errno = EINVAL; +        inode_t           *_inode              = NULL, *parent = NULL; +        quota_inode_ctx_t *ctx                 = NULL; +        quota_priv_t      *priv                = NULL; +        quota_local_t     *local               = NULL; +        char               need_validate       = 0; +        gf_boolean_t       hard_limit_exceeded = 0; +        int64_t            delta               = 0, wouldbe_size = 0; +        uint64_t           value               = 0; +        char               just_validated      = 0; +        uuid_t             trav_uuid           = {0,}; +        uint32_t           timeout             = 0; + +        GF_VALIDATE_OR_GOTO ("quota", this, err); +        GF_VALIDATE_OR_GOTO (this->name, frame, err); +        GF_VALIDATE_OR_GOTO (this->name, inode, err); + +        local  = frame->local; +        GF_VALIDATE_OR_GOTO (this->name, local, err);          delta = local->delta; -        GF_VALIDATE_OR_GOTO (this->name, local->stub, out); +        GF_VALIDATE_OR_GOTO (this->name, local->stub, err); +        /* Allow all the trusted clients +         * Don't block the gluster internal processes like rebalance, gsyncd, +         * self heal etc from the disk quotas. +         * +         * Method: Allow all the clients with PID negative. This is by the +         * assumption that any kernel assigned pid doesn't have the negative +         * number. +         */ +        if (0 > frame->root->pid) { +                ret = 0; +		LOCK (&local->lock); +		{ +			--local->link_count; +		} +		UNLOCK (&local->lock); +                goto resume; +        }          priv = this->private; @@ -328,10 +697,6 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,          {                  just_validated = local->just_validated;                  local->just_validated = 0; - -                if (just_validated) { -                        local->validate_count--; -                }          }          UNLOCK (&local->lock); @@ -340,34 +705,56 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,          }          do { -                if (ctx != NULL) { +                if (ctx != NULL && (ctx->hard_lim > 0 || ctx->soft_lim > 0)) { +                        wouldbe_size = ctx->size + delta; +                          LOCK (&ctx->lock);                          { -                                if (ctx->limit >= 0) { -                                        if (!just_validated -                                            && quota_timeout (&ctx->tv, -                                                              priv->timeout)) { -                                                need_validate = 1; -                                        } else if ((ctx->size + delta) -                                                   >= ctx->limit) { -                                                local->op_ret = -1; -                                                local->op_errno = EDQUOT; -                                                need_unwind = 1; -                                        } +                                timeout = priv->soft_timeout; + +                                if ((ctx->soft_lim >= 0) +                                    && (wouldbe_size > ctx->soft_lim)) { +                                        timeout = priv->hard_timeout; +                                } + +                                if (!just_validated +                                    && quota_timeout (&ctx->tv, timeout)) { +                                        need_validate = 1; +                                } else if (wouldbe_size >= ctx->hard_lim) { +                                        hard_limit_exceeded = 1;                                  }                          }                          UNLOCK (&ctx->lock); +                        /* We log usage only if quota limit is configured on +                           that inode. */ +                        quota_log_usage (this, ctx, _inode, delta); +                          if (need_validate) { -                                goto validate; -                        } +                                ret = quota_validate (frame, _inode, this, +                                                      quota_validate_cbk); +                                if (ret < 0) { +                                        op_errno = -ret; +                                        goto err; +                                } -                        if (need_unwind) {                                  break;                          } + +                        if (hard_limit_exceeded) { +                                op_errno = EDQUOT; +                                goto err; +                        } +                  }                  if (__is_root_gfid (_inode->gfid)) { +                        LOCK (&local->lock); +                        { +                                --local->link_count; +                        } +                        UNLOCK (&local->lock); +                          break;                  } @@ -379,10 +766,13 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,                  }                  if (parent == NULL) { -                        gf_log (this->name, GF_LOG_DEBUG, -                                "cannot find parent for inode (gfid:%s), hence " -                                "aborting enforcing quota-limits and continuing" -                                " with the fop", uuid_utoa (_inode->gfid)); +                        ret = quota_build_ancestry (frame, _inode, this); +                        if (ret < 0) { +                                op_errno = -ret; +                                goto err; +                        } + +                        break;                  }                  inode_unref (_inode); @@ -398,240 +788,94 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,                  ctx = (quota_inode_ctx_t *)(unsigned long)value;          } while (1); -        ret = 0; -          if (_inode != NULL) {                  inode_unref (_inode); +                _inode = NULL;          } -        LOCK (&local->lock); -        { -                validate_count = local->validate_count; -                link_count = local->link_count; -                if ((validate_count == 0) && (link_count == 0)) { -                        stub = local->stub; -                        local->stub = NULL; -                } -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } - -out: -        return ret; - -validate: -        LOCK (&local->lock); -        { -                loc_wipe (&local->validate_loc); - -                if (just_validated) { -                        local->validate_count--; -                } - -                local->validate_count++; -                ret = quota_inode_loc_fill (_inode, &local->validate_loc); -                if (ret < 0) { -                        gf_log (this->name, GF_LOG_WARNING, -                                "cannot fill loc for inode (gfid:%s), hence " -                                "aborting quota-checks and continuing with fop", -                                uuid_utoa (_inode->gfid)); -                        local->validate_count--; -                } -        } -        UNLOCK (&local->lock); - -        if (ret < 0) { -                goto loc_fill_failed; -        } +resume: +        quota_resume_fop_if_validation_done (local); +        return 0; -        STACK_WIND (frame, quota_validate_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->getxattr, &local->validate_loc, -                    QUOTA_SIZE_KEY, NULL); +err: +        quota_handle_validate_error (local, -1, op_errno); -loc_fill_failed:          inode_unref (_inode);          return 0;  } - -int32_t -quota_get_limit_value (inode_t *inode, xlator_t *this, int64_t *n) +inline int +quota_get_limits (xlator_t *this, dict_t *dict, int64_t *hard_lim, +                  int64_t *soft_lim)  { -        int32_t       ret        = 0; -        char         *path       = NULL; -        limits_t     *limit_node = NULL; -        quota_priv_t *priv       = NULL; - -        if (inode == NULL || n == NULL) { -                ret = -1; -                goto out; -        } - -        *n = 0; +        quota_limit_t *limit            = NULL; +        quota_priv_t  *priv             = NULL; +        int64_t        soft_lim_percent = 0, *ptr = NULL; +        int            ret              = 0; -        ret = inode_path (inode, NULL, &path); -        if (ret < 0) { -                ret = -1; +        if ((this == NULL) || (dict == NULL) || (hard_lim == NULL) +            || (soft_lim == NULL))                  goto out; -        }          priv = this->private; -        list_for_each_entry (limit_node, &priv->limit_head, limit_list) { -                if (strcmp (limit_node->path, path) == 0) { -                        *n = limit_node->value; -                        break; -                } -        } +        ret = dict_get_bin (dict, QUOTA_LIMIT_KEY, (void **) &ptr); +        limit = (quota_limit_t *)ptr; -out: -        GF_FREE (path); - -        return ret; -} - - -static int32_t -__quota_init_inode_ctx (inode_t *inode, int64_t limit, xlator_t *this, -                        dict_t *dict, struct iatt *buf, -                        quota_inode_ctx_t **context) -{ -        int32_t            ret  = -1; -        int64_t           *size = 0; -        quota_inode_ctx_t *ctx  = NULL; - -        if (inode == NULL) { -                goto out; +        if (limit) { +                *hard_lim = ntoh64 (limit->hard_lim); +                soft_lim_percent = ntoh64 (limit->soft_lim_percent);          } -        QUOTA_ALLOC_OR_GOTO (ctx, quota_inode_ctx_t, out); - -        ctx->limit = limit; -        if (buf) -                ctx->buf = *buf; - -        LOCK_INIT(&ctx->lock); - -        if (context != NULL) { -                *context = ctx; +        if (soft_lim_percent < 0) { +                soft_lim_percent = priv->default_soft_lim;          } -        INIT_LIST_HEAD (&ctx->parents); - -        if (dict != NULL) { -                ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size); -                if (ret == 0) { -                        ctx->size = ntoh64 (*size); -                        gettimeofday (&ctx->tv, NULL); -                } +        if ((*hard_lim > 0) && (soft_lim_percent > 0)) { +                *soft_lim = (soft_lim_percent * (*hard_lim))/100;          } -        ret = __inode_ctx_put (inode, this, (uint64_t )(long)ctx); -        if (ret == -1) { -                gf_log (this->name, GF_LOG_WARNING, -                        "cannot set quota context in inode (gfid:%s)", -                        uuid_utoa (inode->gfid)); -        }  out: -        return ret; -} - - -static int32_t -quota_inode_ctx_get (inode_t *inode, int64_t limit, xlator_t *this, -                     dict_t *dict, struct iatt *buf, quota_inode_ctx_t **ctx, -                     char create_if_absent) -{ -        int32_t  ret = 0; -        uint64_t ctx_int; - -        LOCK (&inode->lock); -        { -                ret = __inode_ctx_get (inode, this, &ctx_int); - -                if ((ret == 0) && (ctx != NULL)) { -                        *ctx = (quota_inode_ctx_t *) (unsigned long)ctx_int; -                } else if (create_if_absent) { -                        ret = __quota_init_inode_ctx (inode, limit, this, dict, -                                                      buf, ctx); -                } -        } -        UNLOCK (&inode->lock); - -        return ret; +        return 0;  } - -int32_t -quota_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -                  int32_t op_ret, int32_t op_errno, inode_t *inode, -                  struct iatt *buf, dict_t *dict, struct iatt *postparent) +int +quota_fill_inodectx (xlator_t *this, inode_t *inode, dict_t *dict, +                     loc_t *loc, struct iatt *buf, int32_t *op_errno)  { -        int32_t            ret        = -1; -        char               found      = 0; -        quota_local_t     *local      = NULL; -        quota_inode_ctx_t *ctx        = NULL; -        quota_dentry_t    *dentry     = NULL; -        int64_t           *size       = 0; -        uint64_t           value      = 0; -        limits_t          *limit_node = NULL; -        quota_priv_t      *priv       = NULL; +        int32_t            ret      = -1; +        char               found    = 0; +        quota_inode_ctx_t *ctx      = NULL; +        quota_dentry_t    *dentry   = NULL; +        uint64_t           value    = 0; +        int64_t            hard_lim = -1, soft_lim = -1; -        local = frame->local; - -        priv = this->private; +        quota_get_limits (this, dict, &hard_lim, &soft_lim);          inode_ctx_get (inode, this, &value);          ctx = (quota_inode_ctx_t *)(unsigned long)value; -        if ((op_ret < 0) || (local == NULL) -            || (((ctx == NULL) || (ctx->limit == local->limit)) -                && (local->limit < 0) && !((IA_ISREG (buf->ia_type)) -                                           || (IA_ISLNK (buf->ia_type))))) { -                goto unwind; -        } - -        LOCK (&priv->lock); -        { -                list_for_each_entry (limit_node, &priv->limit_head, -                                     limit_list) { -                        if (strcmp (local->loc.path, limit_node->path) == 0) { -                                uuid_copy (limit_node->gfid, buf->ia_gfid); -                                break; -                        } -                } +        if ((((ctx == NULL) || (ctx->hard_lim == hard_lim)) +             && (hard_lim < 0) && !((IA_ISREG (buf->ia_type)) +                                    || (IA_ISLNK (buf->ia_type))))) { +                ret = 0; +                goto out;          } -        UNLOCK (&priv->lock); -        ret = quota_inode_ctx_get (local->loc.inode, local->limit, this, dict, -                                   buf, &ctx, 1); +        ret = quota_inode_ctx_get (inode, this, &ctx, 1);          if ((ret == -1) || (ctx == NULL)) {                  gf_log (this->name, GF_LOG_WARNING, "cannot create quota "                          "context in inode(gfid:%s)", -                        uuid_utoa (local->loc.inode->gfid)); -                op_ret = -1; -                op_errno = ENOMEM; -                goto unwind; +                        uuid_utoa (inode->gfid)); +                ret = -1; +                *op_errno = ENOMEM; +                goto out;          }          LOCK (&ctx->lock);          { - -                if (dict != NULL) { -                        ret = dict_get_bin (dict, QUOTA_SIZE_KEY, -                                            (void **) &size); -                        if (ret == 0) { -                                ctx->size = ntoh64 (*size); -                                gettimeofday (&ctx->tv, NULL); -                        } -                } - -                if (local->limit != ctx->limit) { -                        ctx->limit = local->limit; -                } +                ctx->hard_lim = hard_lim; +                ctx->soft_lim = soft_lim;                  ctx->buf = *buf; @@ -639,12 +883,12 @@ quota_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                          goto unlock;                  } -                if (local->loc.name == NULL) +                if (loc->name == NULL)                          goto unlock;                  list_for_each_entry (dentry, &ctx->parents, next) { -                        if ((strcmp (dentry->name, local->loc.name) == 0) && -                            (uuid_compare (local->loc.parent->gfid, +                        if ((strcmp (dentry->name, loc->name) == 0) && +                            (uuid_compare (loc->parent->gfid,                                             dentry->par) == 0)) {                                  found = 1;                                  break; @@ -653,18 +897,18 @@ quota_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  if (!found) {                          dentry = __quota_dentry_new (ctx, -                                                     (char *)local->loc.name, -                                                     local->loc.parent->gfid); +                                                     (char *)loc->name, +                                                     loc->parent->gfid);                          if (dentry == NULL) {                                  /* -                                gf_log (this->name, GF_LOG_WARNING, +                                  gf_log (this->name, GF_LOG_WARNING,                                          "cannot create a new dentry (par:%"                                          PRId64", name:%s) for inode(ino:%"                                          PRId64", gfid:%s)",                                          uuid_utoa (local->loc.inode->gfid));                                  */ -                                op_ret = -1; -                                op_errno = ENOMEM; +                                ret = -1; +                                *op_errno = ENOMEM;                                  goto unlock;                          }                  } @@ -672,6 +916,25 @@ quota_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  unlock:          UNLOCK (&ctx->lock); +out: +        return ret; +} + +int32_t +quota_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                  int32_t op_ret, int32_t op_errno, inode_t *inode, +                  struct iatt *buf, dict_t *dict, struct iatt *postparent) +{ +        quota_local_t *local = NULL; + +        if (op_ret < 0) +                goto unwind; + +        local = frame->local; + +        op_ret = quota_fill_inodectx (this, inode, dict, &local->loc, buf, +                                      &op_errno); +  unwind:          QUOTA_STACK_UNWIND (lookup, frame, op_ret, op_errno, inode, buf,                              dict, postparent); @@ -683,65 +946,53 @@ int32_t  quota_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc,                dict_t *xattr_req)  { -        int32_t             ret         = -1; -        int64_t             limit       = -1; -        limits_t           *limit_node  = NULL; -        gf_boolean_t        dict_newed  = _gf_false; -        quota_priv_t       *priv        = NULL; -        quota_local_t      *local       = NULL; +        quota_priv_t  *priv             = NULL; +        int32_t        ret              = -1; +        quota_local_t *local            = NULL;          priv = this->private; -        list_for_each_entry (limit_node, &priv->limit_head, limit_list) { -                if (strcmp (limit_node->path, loc->path) == 0) { -                        limit = limit_node->value; -                } -        } +        xattr_req = xattr_req ? dict_ref(xattr_req) : dict_new(); +        if (!xattr_req) +                goto err; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          local = quota_local_new ();          if (local == NULL) {                  goto err;          } -        ret = loc_copy (&local->loc, loc); -        if (ret == -1) { -                goto err; -        } -          frame->local = local; +        loc_copy (&local->loc, loc); -        local->limit = limit; - -        if (limit < 0) { -                goto wind; -        } - -        if (xattr_req == NULL) { -                xattr_req  = dict_new (); -                dict_newed = _gf_true; -        } - -        ret = dict_set_uint64 (xattr_req, QUOTA_SIZE_KEY, 0); +        ret = dict_set_int8 (xattr_req, QUOTA_LIMIT_KEY, 1);          if (ret < 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "dict set of key for hard-limit failed");                  goto err;          }  wind: -        STACK_WIND (frame, quota_lookup_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->lookup, loc, xattr_req); +        /* TODO: check with vshastry@redhat.com to cleanup the ugliness of +         * checking priv->is_quota_on here by using STACK_WIND_TAIL macro +         */ +        STACK_WIND (frame, +                    priv->is_quota_on ? quota_lookup_cbk : default_lookup_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup, loc, +                    xattr_req);          ret = 0;  err: +        if (xattr_req) +                dict_unref (xattr_req); +          if (ret < 0) {                  QUOTA_STACK_UNWIND (lookup, frame, -1, ENOMEM,                                      NULL, NULL, NULL, NULL);          } -        if (dict_newed == _gf_true) { -                dict_unref (xattr_req); -        } -          return 0;  } @@ -769,10 +1020,13 @@ quota_update_size (xlator_t *this, inode_t *inode, char *name, uuid_t par,          }          do { -                if ((ctx != NULL) && (ctx->limit >= 0)) { +                if ((ctx != NULL) && (ctx->hard_lim >= 0)) { +                        quota_log_usage (this, ctx, _inode, delta);                          LOCK (&ctx->lock);                          {                                  ctx->size += delta; +                                if (ctx->size < 0) +                                        ctx->size = 0;                          }                          UNLOCK (&ctx->lock);                  } @@ -783,6 +1037,7 @@ quota_update_size (xlator_t *this, inode_t *inode, char *name, uuid_t par,                  parent = inode_parent (_inode, trav_uuid, name);                  if (parent == NULL) { +                        /* TODO: build ancestry and continue updating size */                          gf_log (this->name, GF_LOG_DEBUG,                                  "cannot find parent for inode (gfid:%s), hence "                                  "aborting size updation of parents", @@ -801,6 +1056,8 @@ quota_update_size (xlator_t *this, inode_t *inode, char *name, uuid_t par,                          break;                  } +                value = 0; +                ctx = NULL;                  inode_ctx_get (_inode, this, &value);                  ctx = (quota_inode_ctx_t *)(unsigned long)value;          } while (1); @@ -819,8 +1076,9 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          uint64_t                 ctx_int        = 0;          quota_inode_ctx_t       *ctx            = NULL;          quota_local_t           *local          = NULL; -        quota_dentry_t          *dentry         = NULL; +        quota_dentry_t          *dentry         = NULL, *tmp = NULL;          int64_t                  delta          = 0; +        struct list_head         head           = {0, };          local = frame->local; @@ -828,6 +1086,8 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } +        INIT_LIST_HEAD (&head); +          ret = inode_ctx_get (local->loc.inode, this, &ctx_int);          if (ret) {                  gf_log (this->name, GF_LOG_WARNING, @@ -847,13 +1107,23 @@ quota_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          LOCK (&ctx->lock);          {                  ctx->buf = *postbuf; + +                list_for_each_entry (dentry, &ctx->parents, next) { +                        tmp = __quota_dentry_new (NULL, dentry->name, +                                                  dentry->par); +                        list_add_tail (&tmp->next, &head); +                } +          }          UNLOCK (&ctx->lock); -        list_for_each_entry (dentry, &ctx->parents, next) { -                delta = (postbuf->ia_blocks - prebuf->ia_blocks) * 512; -                quota_update_size (this, local->loc.inode, -                                   dentry->name, dentry->par, delta); +        if (postbuf->ia_blocks != prebuf->ia_blocks) +                delta = local->delta; + +        list_for_each_entry_safe (dentry, tmp, &head, next) { +                quota_update_size (this, local->loc.inode, dentry->name, +                                   dentry->par, delta); +                __quota_dentry_free (dentry);          }  out: @@ -871,6 +1141,9 @@ quota_writev_helper (call_frame_t *frame, xlator_t *this, fd_t *fd,  {          quota_local_t *local    = NULL;          int32_t        op_errno = EINVAL; +        quota_priv_t  *priv     = NULL; + +        priv = this->private;          local = frame->local;          if (local == NULL) { @@ -883,9 +1156,10 @@ quota_writev_helper (call_frame_t *frame, xlator_t *this, fd_t *fd,                  goto unwind;          } -        STACK_WIND (frame, quota_writev_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->writev, fd, vector, count, off, -                    flags, iobref, xdata); +        STACK_WIND (frame, +                    priv->is_quota_on? quota_writev_cbk: default_writev_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, fd, +                    vector, count, off, flags, iobref, xdata);          return 0;  unwind: @@ -899,14 +1173,21 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,                struct iovec *vector, int32_t count, off_t off,                uint32_t flags, struct iobref *iobref, dict_t *xdata)  { +        quota_priv_t      *priv    = NULL;          int32_t            ret     = -1, op_errno = EINVAL;          int32_t            parents = 0;          uint64_t           size    = 0;          quota_local_t     *local   = NULL;          quota_inode_ctx_t *ctx     = NULL; -        quota_priv_t      *priv    = NULL; +        quota_dentry_t    *dentry  = NULL, *tmp = NULL;          call_stub_t       *stub    = NULL; -        quota_dentry_t    *dentry  = NULL; +        struct list_head   head    = {0, }; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + +        INIT_LIST_HEAD (&head);          GF_ASSERT (frame);          GF_VALIDATE_OR_GOTO ("quota", this, unwind); @@ -920,12 +1201,13 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,          frame->local = local;          local->loc.inode = inode_ref (fd->inode); -        ret = quota_inode_ctx_get (fd->inode, -1, this, NULL, NULL, &ctx, 0); +        ret = quota_inode_ctx_get (fd->inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (fd->inode->gfid)); -                goto unwind;          }          stub = fop_writev_stub (frame, quota_writev_helper, fd, vector, count, @@ -939,47 +1221,46 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,          GF_VALIDATE_OR_GOTO (this->name, priv, unwind);          size = iov_length (vector, count); -        LOCK (&ctx->lock); -        { -                list_for_each_entry (dentry, &ctx->parents, next) { -                        parents++; +        if (ctx != NULL) { +                LOCK (&ctx->lock); +                { +                        list_for_each_entry (dentry, &ctx->parents, next) { +                                tmp = __quota_dentry_new (NULL, dentry->name, +                                                          dentry->par); +                                list_add_tail (&tmp->next, &head); +                                parents++; +                        }                  } +                UNLOCK (&ctx->lock);          } -        UNLOCK (&ctx->lock);          local->delta = size; -        local->stub = stub; -        local->link_count = parents; -        list_for_each_entry (dentry, &ctx->parents, next) { -                ret = quota_check_limit (frame, fd->inode, this, dentry->name, -                                         dentry->par); -                if (ret == -1) { -                        break; -                } -        } - -        stub = NULL; +        local->link_count = parents; +        local->stub = stub; -        LOCK (&local->lock); -        { -                local->link_count = 0; -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; +        if (parents == 0) { +                local->link_count = 1; +                quota_check_limit (frame, fd->inode, this, NULL, NULL); +        } else { +                list_for_each_entry_safe (dentry, tmp, &head, next) { +                        quota_check_limit (frame, fd->inode, this, dentry->name, +                                           dentry->par); +                        __quota_dentry_free (dentry);                  }          } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        }          return 0;  unwind:          QUOTA_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL);          return 0; + +wind: +        STACK_WIND (frame, default_writev_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->writev, fd, +                    vector, count, off, flags, iobref, xdata); +        return 0;  } @@ -1014,8 +1295,11 @@ quota_mkdir_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,                  goto unwind;          } -        STACK_WIND (frame, quota_mkdir_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata); +        STACK_WIND (frame, +                    quota_mkdir_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir, loc, +                    mode, umask, xdata); +          return 0;  unwind: @@ -1029,9 +1313,14 @@ int32_t  quota_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,               mode_t umask, dict_t *xdata)  { -        int32_t        ret            = 0, op_errno = 0; -        quota_local_t *local          = NULL; -        call_stub_t   *stub           = NULL; +        quota_priv_t  *priv  = NULL; +        int32_t        ret   = 0, op_errno = 0; +        quota_local_t *local = NULL; +        call_stub_t   *stub  = NULL; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          local = quota_local_new ();          if (local == NULL) { @@ -1041,8 +1330,6 @@ quota_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,          frame->local = local; -        local->link_count = 1; -          ret = loc_copy (&local->loc, loc);          if (ret) {                  op_errno = ENOMEM; @@ -1059,32 +1346,24 @@ quota_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,          local->stub = stub;          local->delta = 0; +        local->link_count = 1;          quota_check_limit (frame, loc->parent, this, NULL, NULL); - -        stub = NULL; - -        LOCK (&local->lock); -        { -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; -                } - -                local->link_count = 0; -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } -          return 0; +  err:          QUOTA_STACK_UNWIND (mkdir, frame, -1, op_errno, NULL, NULL, NULL,                              NULL, NULL);          return 0; + +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_mkdir_cbk: default_mkdir_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir, loc, +                    mode, umask, xdata); + +        return 0;  } @@ -1104,7 +1383,7 @@ quota_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto unwind;          } -        ret = quota_inode_ctx_get (inode, -1, this, NULL, buf, &ctx, 1); +        ret = quota_inode_ctx_get (inode, this, &ctx, 1);          if ((ret == -1) || (ctx == NULL)) {                  gf_log (this->name, GF_LOG_WARNING, "cannot create quota "                          "context in inode(gfid:%s)", @@ -1147,8 +1426,12 @@ quota_create_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,  {          quota_local_t *local    = NULL;          int32_t        op_errno = EINVAL; +        quota_priv_t  *priv     = NULL;          local = frame->local; + +        priv = this->private; +          if (local == NULL) {                  gf_log (this->name, GF_LOG_WARNING, "local is NULL");                  goto unwind; @@ -1159,9 +1442,11 @@ quota_create_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,                  goto unwind;          } -        STACK_WIND (frame, quota_create_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, -                    fd, xdata); + +        STACK_WIND (frame, +                    priv->is_quota_on? quota_create_cbk: default_create_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->create, loc, +                    flags, mode, umask, fd, xdata);          return 0;  unwind: @@ -1175,12 +1460,19 @@ int32_t  quota_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,                mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata)  { -        int32_t            ret            = -1; -        quota_local_t     *local          = NULL; -        call_stub_t       *stub           = NULL; +        quota_priv_t  *priv     = NULL; +        int32_t        ret      = -1; +        quota_local_t *local    = NULL; +        int32_t        op_errno = 0; +        call_stub_t   *stub     = NULL; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          local = quota_local_new ();          if (local == NULL) { +                op_errno = ENOMEM;                  goto err;          } @@ -1189,6 +1481,7 @@ quota_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,          ret = loc_copy (&local->loc, loc);          if (ret) {                  gf_log (this->name, GF_LOG_WARNING, "loc_copy failed"); +                op_errno = ENOMEM;                  goto err;          } @@ -1203,29 +1496,19 @@ quota_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,          local->delta = 0;          quota_check_limit (frame, loc->parent, this, NULL, NULL); - -        stub = NULL; - -        LOCK (&local->lock); -        { -                local->link_count = 0; -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; -                } -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } -          return 0;  err: -        QUOTA_STACK_UNWIND (create, frame, -1, ENOMEM, NULL, NULL, NULL, NULL, -                            NULL, NULL); +        QUOTA_STACK_UNWIND (create, frame, -1, op_errno, NULL, NULL, NULL, +                            NULL, NULL, NULL);          return 0; + +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_create_cbk: default_create_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->create, loc, +                    flags, mode, umask, fd, xdata); +        return 0;  } @@ -1237,6 +1520,8 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          quota_local_t     *local = NULL;          quota_inode_ctx_t *ctx   = NULL;          uint64_t           value = 0; +        quota_dentry_t    *dentry = NULL; +        quota_dentry_t    *old_dentry = NULL;          if (op_ret < 0) {                  goto out; @@ -1254,9 +1539,26 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_update_size (this, local->loc.inode, (char *)local->loc.name, -                           local->loc.parent->gfid, -                           (-(ctx->buf.ia_blocks * 512))); +        if (!local->skip_check) +                quota_update_size (this, local->loc.inode, +                                   (char *)local->loc.name, +                                   local->loc.parent->gfid, +                                   (-(ctx->buf.ia_blocks * 512))); + +        LOCK (&ctx->lock); +        { +                list_for_each_entry (dentry, &ctx->parents, next) { +                        if ((strcmp (dentry->name, local->loc.name) == 0) && +                            (uuid_compare (local->loc.parent->gfid, +                                           dentry->par) == 0)) { +                                old_dentry = dentry; +                                break; +                        } +                } +                if (old_dentry) +                        __quota_dentry_free (old_dentry); +        } +        UNLOCK (&ctx->lock);  out:          QUOTA_STACK_UNWIND (unlink, frame, op_ret, op_errno, preparent, @@ -1269,9 +1571,14 @@ int32_t  quota_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,                dict_t *xdata)  { -        int32_t        ret = 0; +        quota_priv_t       *priv        = NULL; +        int32_t        ret = -1;          quota_local_t *local = NULL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          local = quota_local_new ();          if (local == NULL) {                  goto err; @@ -1279,14 +1586,21 @@ quota_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,          frame->local = local; +        if (xdata && dict_get (xdata, GLUSTERFS_INTERNAL_FOP_KEY)) { +                local->skip_check = _gf_true; +        } +          ret = loc_copy (&local->loc, loc);          if (ret) {                  gf_log (this->name, GF_LOG_WARNING, "loc_copy failed");                  goto err;          } -        STACK_WIND (frame, quota_unlink_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_unlink_cbk: default_unlink_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, loc, +                    xflag, xdata);          ret = 0; @@ -1317,16 +1631,19 @@ quota_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          local = (quota_local_t *) frame->local; +        if (local->skip_check) +                goto out; +          quota_update_size (this, local->loc.parent, NULL, NULL,                             (buf->ia_blocks * 512)); -        ret = quota_inode_ctx_get (inode, -1, this, NULL, NULL, &ctx, 0); +        ret = quota_inode_ctx_get (inode, this, &ctx, 0);          if ((ret == -1) || (ctx == NULL)) { -                gf_log (this->name, GF_LOG_WARNING, "cannot find quota " -                        "context in %s (gfid:%s)", local->loc.path, +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (inode->gfid)); -                op_ret = -1; -                op_errno = EINVAL;                  goto out;          } @@ -1380,6 +1697,9 @@ quota_link_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc,  {          quota_local_t *local    = NULL;          int32_t        op_errno = EINVAL; +        quota_priv_t  *priv     = NULL; + +        priv = this->private;          local = frame->local;          if (local == NULL) { @@ -1393,8 +1713,9 @@ quota_link_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc,                  goto unwind;          } -        STACK_WIND (frame, quota_link_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata); +        STACK_WIND (frame, priv->is_quota_on? quota_link_cbk: default_link_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, oldloc, +                    newloc, xdata);          return 0;  unwind: @@ -1408,10 +1729,24 @@ int32_t  quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,              dict_t *xdata)  { +        quota_priv_t      *priv  = NULL;          int32_t            ret   = -1, op_errno = ENOMEM;          quota_local_t     *local = NULL; +        quota_inode_ctx_t *ctx   = NULL;          call_stub_t       *stub  = NULL; -        quota_inode_ctx_t *ctx = NULL; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + +        quota_inode_ctx_get (oldloc->inode, this, &ctx, 0); +        if (ctx == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error", +                        uuid_utoa (oldloc->inode->gfid)); +        }          local = quota_local_new ();          if (local == NULL) { @@ -1420,6 +1755,11 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,          frame->local = (void *) local; +        if (xdata && dict_get (xdata, GLUSTERFS_INTERNAL_FOP_KEY)) { +                local->skip_check = _gf_true; +                goto wind; +        } +          ret = loc_copy (&local->loc, newloc);          if (ret == -1) {                  gf_log (this->name, GF_LOG_WARNING, "loc_copy failed"); @@ -1433,46 +1773,22 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,          local->link_count = 1;          local->stub = stub; - -        ret = quota_inode_ctx_get (oldloc->inode, -1, this, NULL, NULL, &ctx, -                                   0); -        if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", -                        oldloc->inode ? uuid_utoa (oldloc->inode->gfid) : "0"); -                op_errno = EINVAL; -                goto err; -        } - -        local->delta = ctx->buf.ia_blocks * 512; +        local->delta = (ctx != NULL) ? ctx->buf.ia_blocks * 512 : 0;          quota_check_limit (frame, newloc->parent, this, NULL, NULL); +        return 0; -        stub = NULL; - -        LOCK (&local->lock); -        { -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; -                } - -                local->link_count = 0; -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } - -        ret = 0;  err: -        if (ret < 0) { -                QUOTA_STACK_UNWIND (link, frame, -1, op_errno, NULL, NULL, -                                    NULL, NULL, NULL); -        } +        QUOTA_STACK_UNWIND (link, frame, -1, op_errno, NULL, NULL, +                            NULL, NULL, NULL);          return 0; + +wind: +        STACK_WIND (frame, default_link_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, oldloc, +                    newloc, xdata); +        return 0;  } @@ -1484,11 +1800,11 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                    dict_t *xdata)  {          int32_t               ret              = -1; +        int64_t               size             = 0;          quota_local_t        *local            = NULL;          quota_inode_ctx_t    *ctx              = NULL;          quota_dentry_t       *old_dentry       = NULL, *dentry = NULL;          char                  new_dentry_found = 0; -        int64_t               size             = 0;          if (op_ret < 0) {                  goto out; @@ -1508,8 +1824,10 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          }          if (local->oldloc.parent != local->newloc.parent) { -                quota_update_size (this, local->oldloc.parent, NULL, NULL, (-size)); -                quota_update_size (this, local->newloc.parent, NULL, NULL, size); +                quota_update_size (this, local->oldloc.parent, NULL, NULL, +                                   (-size)); +                quota_update_size (this, local->newloc.parent, NULL, NULL, +                                   size);          }          if (!(IA_ISREG (local->oldloc.inode->ia_type) @@ -1517,14 +1835,14 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        ret = quota_inode_ctx_get (local->oldloc.inode, -1, this, NULL, NULL, -                                   &ctx, 0); +        ret = quota_inode_ctx_get (local->oldloc.inode, this, &ctx, 0);          if ((ret == -1) || (ctx == NULL)) { -                gf_log (this->name, GF_LOG_WARNING, "quota context not" -                        "set in inode(gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (local->oldloc.inode->gfid)); -                op_ret = -1; -                op_errno = EINVAL; +                  goto out;          } @@ -1570,7 +1888,8 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                          if (dentry == NULL) {                                  gf_log (this->name, GF_LOG_WARNING,                                          "cannot create a new dentry (name:%s) " -                                        "for inode(gfid:%s)", local->newloc.name, +                                        "for inode(gfid:%s)", +                                        local->newloc.name,                                          uuid_utoa (local->newloc.inode->gfid));                                  op_ret = -1;                                  op_errno = ENOMEM; @@ -1597,6 +1916,9 @@ quota_rename_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc,  {          quota_local_t *local    = NULL;          int32_t        op_errno = EINVAL; +        quota_priv_t  *priv     = NULL; + +        priv = this->private;          local = frame->local;          if (local == NULL) { @@ -1610,8 +1932,11 @@ quota_rename_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc,                  goto unwind;          } -        STACK_WIND (frame, quota_rename_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata); +        STACK_WIND (frame, +                    priv->is_quota_on? quota_rename_cbk: default_rename_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, +                    newloc, xdata); +          return 0;  unwind: @@ -1625,10 +1950,15 @@ int32_t  quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,                loc_t *newloc, dict_t *xdata)  { -        int32_t            ret            = -1, op_errno = ENOMEM; -        quota_local_t     *local          = NULL; -        call_stub_t       *stub           = NULL; -        quota_inode_ctx_t *ctx            = NULL; +        quota_priv_t      *priv  = NULL; +        int32_t            ret   = -1, op_errno = ENOMEM; +        quota_local_t     *local = NULL; +        quota_inode_ctx_t *ctx   = NULL; +        call_stub_t       *stub  = NULL; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          local = quota_local_new ();          if (local == NULL) { @@ -1660,46 +1990,34 @@ quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,          if (IA_ISREG (oldloc->inode->ia_type)              || IA_ISLNK (oldloc->inode->ia_type)) { -                ret = quota_inode_ctx_get (oldloc->inode, -1, this, NULL, NULL, -                                           &ctx, 0); +                ret = quota_inode_ctx_get (oldloc->inode, this, &ctx, 0);                  if (ctx == NULL) {                          gf_log (this->name, GF_LOG_WARNING, -                                "quota context not set in inode (gfid:%s)", +                                "quota context not set in inode (gfid:%s), " +                                "considering file size as zero while enforcing " +                                "quota on new ancestry",                                  oldloc->inode ? uuid_utoa (oldloc->inode->gfid)                                  : "0"); -                        op_errno = EINVAL; -                        goto err; +                        local->delta = 0; +                } else { +                        local->delta = ctx->buf.ia_blocks * 512;                  } -                local->delta = ctx->buf.ia_blocks * 512;          } else {                  local->delta = 0;          }          quota_check_limit (frame, newloc->parent, this, NULL, NULL); +        return 0; -        stub = NULL; - -        LOCK (&local->lock); -        { -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; -                } - -                local->link_count = 0; -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } - -        ret = 0;  err: -        if (ret == -1) { -                QUOTA_STACK_UNWIND (rename, frame, -1, op_errno, NULL, -                                    NULL, NULL, NULL, NULL, NULL); -        } +        QUOTA_STACK_UNWIND (rename, frame, -1, op_errno, NULL, +                            NULL, NULL, NULL, NULL, NULL); +        return 0; + +wind: +        STACK_WIND (frame, default_rename_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, +                    newloc, xdata);          return 0;  } @@ -1725,12 +2043,14 @@ quota_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          quota_update_size (this, local->loc.parent, NULL, NULL, size); -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 1); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 1);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (local->loc.inode->gfid)); +                  goto out;          } @@ -1765,6 +2085,7 @@ quota_symlink_helper (call_frame_t *frame, xlator_t *this, const char *linkpath,  {          quota_local_t *local    = NULL;          int32_t        op_errno = EINVAL; +        quota_priv_t  *priv     = NULL;          local = frame->local;          if (local == NULL) { @@ -1772,14 +2093,17 @@ quota_symlink_helper (call_frame_t *frame, xlator_t *this, const char *linkpath,                  goto unwind;          } +        priv = this->private; +          if (local->op_ret == -1) {                  op_errno = local->op_errno;                  goto unwind;          } -        STACK_WIND (frame, quota_symlink_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->symlink, linkpath, loc, umask, -                    xdata); +        STACK_WIND (frame, +                    priv->is_quota_on? quota_symlink_cbk: default_symlink_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink, +                    linkpath, loc, umask, xdata);          return 0;  unwind: @@ -1793,10 +2117,15 @@ int  quota_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath,                 loc_t *loc, mode_t umask, dict_t *xdata)  { -        int32_t          ret      = -1; -        int32_t          op_errno = ENOMEM; -        quota_local_t   *local    = NULL; -        call_stub_t     *stub     = NULL; +        quota_priv_t  *priv     = NULL; +        int32_t        ret      = -1; +        int32_t        op_errno = ENOMEM; +        quota_local_t *local    = NULL; +        call_stub_t   *stub     = NULL; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          local = quota_local_new ();          if (local == NULL) { @@ -1811,8 +2140,6 @@ quota_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath,                  goto err;          } -        local->link_count = 1; -          stub = fop_symlink_stub (frame, quota_symlink_helper, linkpath, loc,                                   umask, xdata);          if (stub == NULL) { @@ -1821,26 +2148,9 @@ quota_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath,          local->stub = stub;          local->delta = strlen (linkpath); +        local->link_count = 1;          quota_check_limit (frame, loc->parent, this, NULL, NULL); - -        stub = NULL; - -        LOCK (&local->lock); -        { -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; -                } - -                local->link_count = 0; -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } -          return 0;  err: @@ -1848,6 +2158,13 @@ err:                              NULL, NULL);          return 0; + +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_symlink_cbk: default_symlink_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink, +                    linkpath, loc, umask, xdata); +        return 0;  } @@ -1857,8 +2174,8 @@ quota_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                      struct iatt *postbuf, dict_t *xdata)  {          quota_local_t     *local = NULL; -        int64_t            delta = 0;          quota_inode_ctx_t *ctx   = NULL; +        int64_t            delta = 0;          if (op_ret < 0) {                  goto out; @@ -1874,11 +2191,12 @@ quota_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          quota_update_size (this, local->loc.inode, NULL, NULL, delta); -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (local->loc.inode->gfid));                  goto out;          } @@ -1900,9 +2218,15 @@ int32_t  quota_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,                  dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          int32_t          ret   = -1;          quota_local_t   *local = NULL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + +          local = quota_local_new ();          if (local == NULL) {                  goto err; @@ -1916,8 +2240,11 @@ quota_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,                  goto err;          } -        STACK_WIND (frame, quota_truncate_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_truncate_cbk: default_truncate_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, loc, +                    offset, xdata);          return 0;  err: @@ -1933,8 +2260,8 @@ quota_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                       struct iatt *postbuf, dict_t *xdata)  {          quota_local_t     *local = NULL; -        int64_t            delta = 0;          quota_inode_ctx_t *ctx   = NULL; +        int64_t            delta = 0;          if (op_ret < 0) {                  goto out; @@ -1950,11 +2277,12 @@ quota_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          quota_update_size (this, local->loc.inode, NULL, NULL, delta); -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (local->loc.inode->gfid));                  goto out;          } @@ -1976,8 +2304,13 @@ int32_t  quota_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,                   dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t   *local = NULL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          local = quota_local_new ();          if (local == NULL)                  goto err; @@ -1986,8 +2319,11 @@ quota_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,          local->loc.inode = inode_ref (fd->inode); -        STACK_WIND (frame, quota_ftruncate_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); +wind: +        STACK_WIND (frame, priv->is_quota_on? +                    quota_ftruncate_cbk: default_ftruncate_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate, fd, +                    offset, xdata);          return 0;  err: @@ -2006,14 +2342,23 @@ quota_send_dir_limit_to_cli (call_frame_t *frame, xlator_t *this,          dict_t            *dict              = NULL;          quota_inode_ctx_t *ctx               = NULL;          uint64_t           value             = 0; +        quota_priv_t      *priv              = NULL; + +        priv = this->private; +        if (!priv->is_quota_on) { +                snprintf (dir_limit, 1024, "Quota is disabled please turn on"); +                goto dict_set; +        }          ret = inode_ctx_get (inode, this, &value);          if (ret < 0)                  goto out;          ctx = (quota_inode_ctx_t *)(unsigned long)value; -        snprintf (dir_limit, 1024, "%"PRId64",%"PRId64, ctx->size, ctx->limit); +        snprintf (dir_limit, 1024, "%"PRId64",%"PRId64, ctx->size, +                  ctx->hard_lim); +dict_set:          dict = dict_new ();          if (dict == NULL) {                  ret = -1; @@ -2024,7 +2369,7 @@ quota_send_dir_limit_to_cli (call_frame_t *frame, xlator_t *this,          if (ret < 0)                  goto out; -        gf_log (this->name, GF_LOG_INFO, "str = %s", dir_limit); +        gf_log (this->name, GF_LOG_DEBUG, "str = %s", dir_limit);          QUOTA_STACK_UNWIND (getxattr, frame, 0, 0, dict, NULL); @@ -2076,7 +2421,8 @@ quota_getxattr (call_frame_t *frame, xlator_t *this, loc_t *loc,  int32_t  quota_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -                int32_t op_ret, int32_t op_errno, struct iatt *buf, dict_t *xdata) +                int32_t op_ret, int32_t op_errno, struct iatt *buf, +                dict_t *xdata)  {          quota_local_t     *local = NULL;          quota_inode_ctx_t *ctx   = NULL; @@ -2091,12 +2437,17 @@ quota_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_DEBUG, -                        "quota context not set in inode (gfid:%s)", -                        uuid_utoa (local->loc.inode->gfid)); +                if (!IA_ISDIR (buf->ia_type)) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "quota context is NULL on " +                                "inode (%s). " +                                "If quota is not enabled recently and crawler " +                                "has finished crawling, its an error", +                                uuid_utoa (local->loc.inode->gfid)); +                } +                  goto out;          } @@ -2116,9 +2467,15 @@ out:  int32_t  quota_stat (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL;          int32_t        ret   = -1; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + +          local = quota_local_new ();          if (local == NULL) {                  goto unwind; @@ -2131,8 +2488,10 @@ quota_stat (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)                  goto unwind;          } -        STACK_WIND (frame, quota_stat_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->stat, loc, xdata); +wind: +        STACK_WIND (frame, priv->is_quota_on? quota_stat_cbk: default_stat_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat, loc, +                    xdata);          return 0;  unwind: @@ -2159,12 +2518,17 @@ quota_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", -                        uuid_utoa (local->loc.inode->gfid)); +                if (!IA_ISDIR (buf->ia_type)) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "quota context is NULL on " +                                "inode (%s). " +                                "If quota is not enabled recently and crawler " +                                "has finished crawling, its an error", +                                uuid_utoa (local->loc.inode->gfid)); +                } +                  goto out;          } @@ -2184,8 +2548,13 @@ out:  int32_t  quota_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          local = quota_local_new ();          if (local == NULL) {                  goto unwind; @@ -2195,8 +2564,11 @@ quota_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)          local->loc.inode = inode_ref (fd->inode); -        STACK_WIND (frame, quota_fstat_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->fstat, fd, xdata); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_fstat_cbk: default_fstat_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat, fd, +                    xdata);          return 0;  unwind: @@ -2223,11 +2595,12 @@ quota_readlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (local->loc.inode->gfid));                  goto out;          } @@ -2239,7 +2612,8 @@ quota_readlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          UNLOCK (&ctx->lock);  out: -        QUOTA_STACK_UNWIND (readlink, frame, op_ret, op_errno, path, buf, xdata); +        QUOTA_STACK_UNWIND (readlink, frame, op_ret, op_errno, path, buf, +                            xdata);          return 0;  } @@ -2248,9 +2622,14 @@ int32_t  quota_readlink (call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size,                  dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL;          int32_t        ret   = -1; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          local = quota_local_new ();          if (local == NULL) {                  goto unwind; @@ -2264,8 +2643,11 @@ quota_readlink (call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size,                  goto unwind;          } -        STACK_WIND (frame, quota_readlink_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->readlink, loc, size, xdata); +wind: +        STACK_WIND (frame, priv->is_quota_on? +                    quota_readlink_cbk: default_readlink_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->readlink, loc, +                    size, xdata);          return 0;  unwind: @@ -2293,11 +2675,12 @@ quota_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (local->loc.inode->gfid));                  goto out;          } @@ -2319,8 +2702,13 @@ int32_t  quota_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,               off_t offset, uint32_t flags, dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          local = quota_local_new ();          if (local == NULL) {                  goto unwind; @@ -2330,13 +2718,16 @@ quota_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,          local->loc.inode = inode_ref (fd->inode); -        STACK_WIND (frame, quota_readv_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, -                    xdata); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_readv_cbk: default_readv_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv, fd, +                    size, offset, flags, xdata);          return 0;  unwind: -        QUOTA_STACK_UNWIND (readv, frame, -1, ENOMEM, NULL, -1, NULL, NULL, NULL); +        QUOTA_STACK_UNWIND (readv, frame, -1, ENOMEM, NULL, -1, NULL, NULL, +                            NULL);          return 0;  } @@ -2359,11 +2750,12 @@ quota_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", +                gf_log (this->name, GF_LOG_DEBUG, "quota context is NULL on " +                        "inode (%s). " +                        "If quota is not enabled recently and crawler has " +                        "finished crawling, its an error",                          uuid_utoa (local->loc.inode->gfid));                  goto out;          } @@ -2385,8 +2777,13 @@ int32_t  quota_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags,               dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          local = quota_local_new ();          if (local == NULL) {                  goto unwind; @@ -2396,8 +2793,11 @@ quota_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags,          frame->local = local; -        STACK_WIND (frame, quota_fsync_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->fsync, fd, flags, xdata); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_fsync_cbk: default_fsync_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsync, fd, +                    flags, xdata);          return 0;  unwind: @@ -2425,12 +2825,16 @@ quota_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_DEBUG, -                        "quota context not set in inode (gfid:%s)", -                        uuid_utoa (local->loc.inode->gfid)); +                if (!IA_ISDIR (statpost->ia_type)) { +                        gf_log (this->name, GF_LOG_DEBUG, "quota context is " +                                "NULL on inode (%s). " +                                "If quota is not enabled recently and crawler " +                                "has finished crawling, its an error", +                                uuid_utoa (local->loc.inode->gfid)); +                } +                  goto out;          } @@ -2452,9 +2856,14 @@ int32_t  quota_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc,                 struct iatt *stbuf, int32_t valid, dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL;          int32_t        ret   = -1; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          local = quota_local_new ();          if (local == NULL) {                  goto unwind; @@ -2468,8 +2877,11 @@ quota_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc,                  goto unwind;          } -        STACK_WIND (frame, quota_setattr_cbk, FIRST_CHILD (this), -                    FIRST_CHILD (this)->fops->setattr, loc, stbuf, valid, xdata); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_setattr_cbk: default_setattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->setattr, loc, +                    stbuf, valid, xdata);          return 0;  unwind: @@ -2496,12 +2908,16 @@ quota_fsetattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto out;          } -        quota_inode_ctx_get (local->loc.inode, -1, this, NULL, NULL, -                             &ctx, 0); +        quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", -                        uuid_utoa (local->loc.inode->gfid)); +                if (!IA_ISDIR (statpost->ia_type)) { +                        gf_log (this->name, GF_LOG_DEBUG, "quota context is " +                                "NULL on inode (%s). " +                                "If quota is not enabled recently and crawler " +                                "has finished crawling, its an error", +                                uuid_utoa (local->loc.inode->gfid)); +                } +                  goto out;          } @@ -2522,8 +2938,14 @@ int32_t  quota_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd,                  struct iatt *stbuf, int32_t valid, dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          quota_local_t *local = NULL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + +          local = quota_local_new ();          if (local == NULL) {                  goto unwind; @@ -2533,8 +2955,11 @@ quota_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd,          local->loc.inode = inode_ref (fd->inode); -        STACK_WIND (frame, quota_fsetattr_cbk, FIRST_CHILD (this), -                    FIRST_CHILD (this)->fops->fsetattr, fd, stbuf, valid, xdata); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_fsetattr_cbk: default_fsetattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetattr, fd, +                    stbuf, valid, xdata);          return 0;  unwind: @@ -2559,7 +2984,7 @@ quota_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto unwind;          } -        ret = quota_inode_ctx_get (inode, -1, this, NULL, buf, &ctx, 1); +        ret = quota_inode_ctx_get (inode, this, &ctx, 1);          if ((ret == -1) || (ctx == NULL)) {                  gf_log (this->name, GF_LOG_WARNING, "cannot create quota "                          "context in inode (gfid:%s)", uuid_utoa (inode->gfid)); @@ -2600,6 +3025,7 @@ quota_mknod_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,  {          quota_local_t *local    = NULL;          int32_t        op_errno = EINVAL; +        quota_priv_t  *priv     = NULL;          local = frame->local;          if (local == NULL) { @@ -2607,14 +3033,17 @@ quota_mknod_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,                  goto unwind;          } +        priv = this->private; +          if (local->op_ret == -1) {                  op_errno = local->op_errno;                  goto unwind;          } -        STACK_WIND (frame, quota_mknod_cbk, FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, umask, -                    xdata); +        STACK_WIND (frame, +                    priv->is_quota_on? quota_mknod_cbk: default_mknod_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, loc, +                    mode, rdev, umask, xdata);          return 0; @@ -2629,9 +3058,14 @@ int  quota_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,               dev_t rdev, mode_t umask, dict_t *xdata)  { -        int32_t            ret            = -1; -        quota_local_t     *local          = NULL; -        call_stub_t       *stub           = NULL; +        quota_priv_t  *priv     = NULL; +        int32_t        ret      = -1; +        quota_local_t *local    = NULL; +        call_stub_t   *stub     = NULL; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          local = quota_local_new ();          if (local == NULL) { @@ -2657,35 +3091,49 @@ quota_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,          local->delta = 0;          quota_check_limit (frame, loc->parent, this, NULL, NULL); - -        stub = NULL; - -        LOCK (&local->lock); -        { -                local->link_count = 0; -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; -                } -        } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        } -          return 0;  err:          QUOTA_STACK_UNWIND (mknod, frame, -1, ENOMEM, NULL, NULL, NULL, NULL,                              NULL);          return 0; + +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_mknod_cbk: default_mknod_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, loc, +                    mode, rdev, umask, xdata); + +        return 0; +  }  int  quota_setxattr_cbk (call_frame_t *frame, void *cookie,                      xlator_t *this, int op_ret, int op_errno, dict_t *xdata)  { +        quota_local_t     *local = NULL; +        quota_inode_ctx_t *ctx   = NULL; +        int                ret   = 0; + +        local = frame->local; +        if (!local) +                goto out; + +        ret = quota_inode_ctx_get (local->loc.inode, this, &ctx, 1); +        if ((ret < 0) || (ctx == NULL)) { +                op_errno = ENOMEM; +                goto out; +        } + +        LOCK (&ctx->lock); +        { +                ctx->hard_lim = local->limit.hard_lim; +                ctx->soft_lim = local->limit.soft_lim_percent; +        } +        UNLOCK (&ctx->lock); + +out:          QUOTA_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata);          return 0;  } @@ -2694,20 +3142,70 @@ int  quota_setxattr (call_frame_t *frame, xlator_t *this,                  loc_t *loc, dict_t *dict, int flags, dict_t *xdata)  { -        int             op_errno = EINVAL; -        int             op_ret   = -1; +        quota_priv_t  *priv     = NULL; +        int            op_errno = EINVAL; +        int            op_ret   = -1; +        int64_t        hard_lim = -1, soft_lim = -1; +        quota_local_t *local    = NULL; +        char          *src      = NULL; +        char          *dst      = NULL; +        int            len      = 0; +        int            ret      = -1; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          VALIDATE_OR_GOTO (frame, err);          VALIDATE_OR_GOTO (this, err);          VALIDATE_OR_GOTO (loc, err); -        GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.quota*", dict, -                                   op_errno, err); +        if (0 <= frame->root->pid) { +                ret = dict_get_ptr_and_len (dict, QUOTA_LIMIT_KEY, +                                            (void **)&src, &len); +                if (ret) { +                        gf_log (this->name, GF_LOG_DEBUG, "dict_get on %s " +                                "failed", QUOTA_LIMIT_KEY); +                } else { +                        dst = GF_CALLOC (len, sizeof (char), gf_common_mt_char); +                        if (dst) +                                memcpy (dst, src, len); +                } + +                GF_REMOVE_INTERNAL_XATTR ("trusted.glusterfs.quota*", +                                          dict); +                if (!ret && IA_ISDIR (loc->inode->ia_type) && dst) { +                        ret = dict_set_dynptr (dict, QUOTA_LIMIT_KEY, +                                               dst, len); +                        if (ret) +                                gf_log (this->name, GF_LOG_WARNING, "setting " +                                        "key %s failed", QUOTA_LIMIT_KEY); +                        else +                                dst = NULL; +                } +        } -        STACK_WIND (frame, quota_setxattr_cbk, -                    FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->setxattr, -                    loc, dict, flags, xdata); +        quota_get_limits (this, dict, &hard_lim, &soft_lim); + +        if (hard_lim > 0) { +                local = quota_local_new (); +                if (local == NULL) { +                        op_errno = ENOMEM; +                        goto err; +                } + +                frame->local = local; +                loc_copy (&local->loc, loc); + +                local->limit.hard_lim = hard_lim; +                local->limit.soft_lim_percent = soft_lim; +        } + +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_setxattr_cbk: default_setxattr_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, loc, +                    dict, flags, xdata);          return 0;  err:          QUOTA_STACK_UNWIND (setxattr, frame, op_ret, op_errno, NULL); @@ -2718,6 +3216,27 @@ int  quota_fsetxattr_cbk (call_frame_t *frame, void *cookie,                       xlator_t *this, int op_ret, int op_errno, dict_t *xdata)  { +        quota_inode_ctx_t *ctx   = NULL; +        quota_local_t     *local = NULL; + +        local = frame->local; +        if (!local) +                goto out; + +        op_ret = quota_inode_ctx_get (local->loc.inode, this, &ctx, 1); +        if ((op_ret < 0) || (ctx == NULL)) { +                op_errno = ENOMEM; +                goto out; +        } + +        LOCK (&ctx->lock); +        { +                ctx->hard_lim = local->limit.hard_lim; +                ctx->soft_lim = local->limit.soft_lim_percent; +        } +        UNLOCK (&ctx->lock); + +out:          QUOTA_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, xdata);          return 0;  } @@ -2726,20 +3245,40 @@ int  quota_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd,                   dict_t *dict, int flags, dict_t *xdata)  { -        int32_t         op_ret   = -1; -        int32_t         op_errno = EINVAL; +        quota_priv_t  *priv     = NULL; +        int32_t        op_ret   = -1; +        int32_t        op_errno = EINVAL; +        quota_local_t *local    = NULL; +        int64_t        hard_lim = -1, soft_lim = -1; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          VALIDATE_OR_GOTO (frame, err);          VALIDATE_OR_GOTO (this, err);          VALIDATE_OR_GOTO (fd, err); -        GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.quota*", dict, -                                   op_errno, err); +        if (0 <= frame->root->pid) +                GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.quota*", dict, +                                            op_errno, err); -        STACK_WIND (frame, quota_fsetxattr_cbk, -                    FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->fsetxattr, -                    fd, dict, flags, xdata); +        quota_get_limits (this, dict, &hard_lim, &soft_lim); + +        if (hard_lim > 0) { +                local = quota_local_new (); +                frame->local = local; +                local->loc.inode = inode_ref (fd->inode); + +                local->limit.hard_lim = hard_lim; +                local->limit.soft_lim_percent = soft_lim; +        } + +wind: +        STACK_WIND (frame, priv->is_quota_on? +                    quota_fsetxattr_cbk: default_fsetxattr_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, fd, +                    dict, flags, xdata);          return 0;   err:          QUOTA_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, NULL); @@ -2759,19 +3298,28 @@ int  quota_removexattr (call_frame_t *frame, xlator_t *this,                     loc_t *loc, const char *name, dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          int32_t         op_errno = EINVAL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          VALIDATE_OR_GOTO (this, err); +        /* all quota xattrs can be cleaned up by doing setxattr on special key. +         * Hence its ok that we don't allow removexattr on quota keys here. +         */          GF_IF_NATIVE_XATTR_GOTO ("trusted.quota*",                                   name, op_errno, err);          VALIDATE_OR_GOTO (frame, err);          VALIDATE_OR_GOTO (loc, err); -        STACK_WIND (frame, quota_removexattr_cbk, -                    FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->removexattr, +wind: +        STACK_WIND (frame, priv->is_quota_on? +                    quota_removexattr_cbk: default_removexattr_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->removexattr,                      loc, name, xdata);          return 0;  err: @@ -2792,9 +3340,14 @@ int  quota_fremovexattr (call_frame_t *frame, xlator_t *this,                      fd_t *fd, const char *name, dict_t *xdata)  { +        quota_priv_t       *priv        = NULL;          int32_t         op_ret   = -1;          int32_t         op_errno = EINVAL; +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +          VALIDATE_OR_GOTO (frame, err);          VALIDATE_OR_GOTO (this, err);          VALIDATE_OR_GOTO (fd, err); @@ -2802,9 +3355,10 @@ quota_fremovexattr (call_frame_t *frame, xlator_t *this,          GF_IF_NATIVE_XATTR_GOTO ("trusted.quota*",                                   name, op_errno, err); -        STACK_WIND (frame, quota_fremovexattr_cbk, -                    FIRST_CHILD(this), -                    FIRST_CHILD(this)->fops->fremovexattr, +wind: +        STACK_WIND (frame, priv->is_quota_on? +                    quota_fremovexattr_cbk: default_fremovexattr_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fremovexattr,                      fd, name, xdata);          return 0;   err: @@ -2818,16 +3372,16 @@ quota_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                    int32_t op_ret, int32_t op_errno, struct statvfs *buf,                    dict_t *xdata)  { -	inode_t           *root_inode = NULL; -        quota_priv_t      *priv       = NULL; -        uint64_t           value      = 0; -        quota_inode_ctx_t *ctx        = NULL; -        limits_t          *limit_node = NULL; -	int64_t            usage      = -1; -	int64_t            avail      = -1; -        int64_t            blocks     = 0; +	inode_t           *inode  = NULL; +        uint64_t           value  = 0; +	int64_t            usage  = -1; +	int64_t            avail  = -1; +        int64_t            blocks = 0; +        quota_inode_ctx_t *ctx    = NULL; +        int                ret    = 0; +        gf_boolean_t       dict_created = _gf_false; -	root_inode = cookie; +        inode = cookie;          /* This fop will fail mostly in case of client disconnect's,           * which is already logged. Hence, not logging here */ @@ -2838,74 +3392,192 @@ quota_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  	 * cookie, and it would only do so if the value was non-NULL.  This  	 * check is therefore just routine defensive coding.  	 */ -	if (!root_inode) { +	if (!inode) {  		gf_log(this->name,GF_LOG_WARNING,  		       "null inode, cannot adjust for quota");  		goto unwind;  	} -	if (!root_inode->table || (root_inode != root_inode->table->root)) { -		gf_log(this->name,GF_LOG_WARNING, -		       "non-root inode, cannot adjust for quota"); -		goto unwind; -	} -        inode_ctx_get (root_inode, this, &value); +        inode_ctx_get (inode, this, &value);  	if (!value) {  		goto unwind;  	} + +        /* if limit is set on this inode, report statfs based on this inode +         * else report based on root. +         */          ctx = (quota_inode_ctx_t *)(unsigned long)value; +        if (ctx->hard_lim <= 0) { +                inode_ctx_get (inode->table->root, this, &value); +                ctx = (quota_inode_ctx_t *)(unsigned long) value; +                if (!ctx) +                        goto unwind; +        } +  	usage = (ctx->size) / buf->f_bsize; -        priv = this->private; -        list_for_each_entry (limit_node, &priv->limit_head, limit_list) { -		/* Notice that this only works for volume-level quota. */ -                if (strcmp (limit_node->path, "/") == 0) { -                        blocks = limit_node->value / buf->f_bsize; -                        if (usage > blocks) { -                                break; -                        } +        if (ctx->hard_lim > 0) { +                blocks = ctx->hard_lim / buf->f_bsize; +                buf->f_blocks = blocks; -			buf->f_blocks = blocks; -			avail = buf->f_blocks - usage; -			if (buf->f_bfree > avail) { -				buf->f_bfree = avail; -			} -			/* -			 * We have to assume that the total assigned quota -			 * won't cause us to dip into the reserved space, -			 * because dealing with the overcommitted cases is -			 * just too hairy (especially when different bricks -			 * might be using different reserved percentages and -			 * such). -			 */ -			buf->f_bavail = buf->f_bfree; -			break; +                avail = buf->f_blocks - usage; +                avail = (avail >= 0) ? avail : 0; + +                if (buf->f_bfree > avail) { +                        buf->f_bfree = avail;                  } +                /* +                 * We have to assume that the total assigned quota +                 * won't cause us to dip into the reserved space, +                 * because dealing with the overcommitted cases is +                 * just too hairy (especially when different bricks +                 * might be using different reserved percentages and +                 * such). +                 */ +                buf->f_bavail = buf->f_bfree; +        } + +        if (!xdata) { +                xdata = dict_new (); +                if (!xdata) +                        goto unwind; +                dict_created = _gf_true;          } +        ret = dict_set_int8 (xdata, "quota-deem-statfs", 1); +        if (-1 == ret) +                gf_log (this->name, GF_LOG_ERROR, "Dict set failed, " +                        "deem-statfs option may have no effect"); +  unwind: -	if (root_inode) { -		inode_unref(root_inode); -	} -        STACK_UNWIND_STRICT (statfs, frame, op_ret, op_errno, buf, xdata); +        QUOTA_STACK_UNWIND (statfs, frame, op_ret, op_errno, buf, xdata); + +        if (dict_created) +                dict_unref (xdata);          return 0;  }  int32_t +quota_statfs_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, +                     dict_t *xdata) +{ +        quota_local_t   *local          = NULL; +        int              op_errno       = EINVAL; + +        GF_VALIDATE_OR_GOTO ("quota", (local = frame->local), err); + +        if (-1 == local->op_ret) { +                op_errno = local->op_errno; +                goto err; +        } + +        STACK_WIND_COOKIE (frame, quota_statfs_cbk, loc->inode, +                           FIRST_CHILD(this), +                           FIRST_CHILD(this)->fops->statfs, loc, xdata); +        return 0; +err: +        QUOTA_STACK_UNWIND (statfs, frame, -1, op_errno, NULL, NULL); +        return 0; +} + +int32_t +quota_statfs_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                           int32_t op_ret, int32_t op_errno, inode_t *inode, +                           struct iatt *buf, dict_t *xdata, +                           struct iatt *postparent) +{ +        quota_local_t     *local      = NULL; +        int32_t            ret        = 0; +        quota_inode_ctx_t *ctx        = NULL; +        int64_t           *size       = 0; +        uint64_t           value      = 0; + +        local = frame->local; + +        if (op_ret < 0) +                goto resume; + +        GF_ASSERT (local); +        GF_ASSERT (frame); +        GF_VALIDATE_OR_GOTO_WITH_ERROR ("quota", this, resume, op_errno, +                                        EINVAL); +        GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, xdata, resume, op_errno, +                                        EINVAL); + +        ret = inode_ctx_get (local->validate_loc.inode, this, &value); + +        ctx = (quota_inode_ctx_t *)(unsigned long)value; +        if ((ret == -1) || (ctx == NULL)) { +                gf_log (this->name, GF_LOG_WARNING, +                        "quota context is not present in inode (gfid:%s)", +                        uuid_utoa (local->validate_loc.inode->gfid)); +                op_errno = EINVAL; +                goto resume; +        } + +        ret = dict_get_bin (xdata, QUOTA_SIZE_KEY, (void **) &size); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "size key not present in dict"); +                op_errno = EINVAL; +                goto resume; +        } + +        LOCK (&ctx->lock); +        { +                ctx->size = ntoh64 (*size); +                gettimeofday (&ctx->tv, NULL); +        } +        UNLOCK (&ctx->lock); + +resume: +        --local->link_count; + +        quota_resume_fop_if_validation_done (local); +        return 0; +} + +int32_t  quota_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)  { -	inode_t *root_inode = NULL; -        quota_priv_t    *priv = NULL; +        quota_local_t   *local  = NULL; +        int              op_errno       = 0; +        call_stub_t     *stub           = NULL; +        quota_priv_t *priv  = NULL; +        int           ret       = 0;          priv = this->private; +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); +  	if (priv->consider_statfs && loc->inode) { -		root_inode = loc->inode->table->root; -		inode_ref(root_inode); -		STACK_WIND_COOKIE (frame, quota_statfs_cbk, root_inode, -				   FIRST_CHILD(this), -				   FIRST_CHILD(this)->fops->statfs, loc, xdata); +                local = quota_local_new (); +                if (!local) { +                        op_errno = ENOMEM; +                        goto err; +                } +                frame->local = local; + +                local->inode = inode_ref (loc->inode); +                local->link_count = 1; + +                stub = fop_statfs_stub (frame, quota_statfs_helper, loc, xdata); +                if (!stub) { +                        op_errno = ENOMEM; +                        goto err; +                } + +                local->stub = stub; + +                ret = quota_validate (frame, local->inode, this, +                                      quota_statfs_validate_cbk); +                if (0 > ret) { +                        op_errno = -ret; +                        --local->link_count; +                } + +                quota_resume_fop_if_validation_done (local);  	}  	else {  		/* @@ -2921,51 +3593,112 @@ quota_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)                  if (priv->consider_statfs)                          gf_log(this->name,GF_LOG_WARNING,                                 "missing inode, cannot adjust for quota"); +wind:  		STACK_WIND (frame, default_statfs_cbk, FIRST_CHILD(this),  			    FIRST_CHILD(this)->fops->statfs, loc, xdata);  	}          return 0; -} +err: +        STACK_UNWIND_STRICT (statfs, frame, -1, op_errno, NULL, NULL); + +        if (local) +                quota_local_cleanup (this, local); +        return 0; +}  int  quota_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                      int op_ret, int op_errno, gf_dirent_t *entries,                      dict_t *xdata)  { -        gf_dirent_t *entry = NULL; +        gf_dirent_t   *entry = NULL; +        quota_local_t *local = NULL; +        loc_t          loc   = {0, };          if (op_ret <= 0)                  goto unwind; +        local = frame->local; +          list_for_each_entry (entry, &entries->list, list) { -                /* TODO: fill things */ +                if ((strcmp (entry->d_name, ".") == 0) +                    || (strcmp (entry->d_name, "..") == 0)) +                        continue; + +                uuid_copy (loc.gfid, entry->d_stat.ia_gfid); +                loc.inode = inode_ref (entry->inode); +                loc.parent = inode_ref (local->loc.inode); +                uuid_copy (loc.pargfid, loc.parent->gfid); +                loc.name = entry->d_name; + +                quota_fill_inodectx (this, entry->inode, entry->dict, +                                     &loc, &entry->d_stat, &op_errno); + +                loc_wipe (&loc);          }  unwind: -        STACK_UNWIND_STRICT (readdirp, frame, op_ret, op_errno, entries, xdata); +        QUOTA_STACK_UNWIND (readdirp, frame, op_ret, op_errno, entries, xdata);          return 0;  } +  int  quota_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,                  off_t offset, dict_t *dict)  { -        int ret = 0; +        quota_priv_t  *priv     = NULL; +        int            ret      = 0; +        gf_boolean_t   new_dict = _gf_false; +        quota_local_t *local    = NULL; + +        priv = this->private; + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind); + +        local = quota_local_new (); + +        if (local == NULL) { +                goto err; +        } + +        frame->local = local; + +        local->loc.inode = inode_ref (fd->inode); + +        if (dict == NULL) { +                dict = dict_new (); +                new_dict = _gf_true; +        }          if (dict) { -                ret = dict_set_uint64 (dict, QUOTA_SIZE_KEY, 0); +                ret = dict_set_int8 (dict, QUOTA_LIMIT_KEY, 1);                  if (ret < 0) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "dict set of key for hard-limit failed");                          goto err;                  }          } -        STACK_WIND (frame, quota_readdirp_cbk, -                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, -                    fd, size, offset, dict); +wind: +        STACK_WIND (frame, +                    priv->is_quota_on? quota_readdirp_cbk: default_readdirp_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, fd, +                    size, offset, dict); + +        if (new_dict) { +                dict_unref (dict); +        } +          return 0;  err:          STACK_UNWIND_STRICT (readdirp, frame, -1, EINVAL, NULL, NULL); + +        if (new_dict) { +                dict_unref (dict); +        } +          return 0;  } @@ -3022,12 +3755,14 @@ out:          return 0;  } +  int32_t -quota_fallocate_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, -		       int32_t mode, off_t offset, size_t len, dict_t *xdata) +quota_fallocate_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, +                        int32_t mode, off_t offset, size_t len, dict_t *xdata)  {          quota_local_t *local    = NULL;          int32_t        op_errno = EINVAL; +        quota_priv_t  *priv     = NULL;          local = frame->local;          if (local == NULL) { @@ -3035,12 +3770,16 @@ quota_fallocate_helper(call_frame_t *frame, xlator_t *this, fd_t *fd,                  goto unwind;          } +        priv = this->private; +          if (local->op_ret == -1) {                  op_errno = local->op_errno;                  goto unwind;          } -        STACK_WIND (frame, quota_fallocate_cbk, FIRST_CHILD(this), +        STACK_WIND (frame, priv->is_quota_on? +                    quota_fallocate_cbk: default_fallocate_cbk, +                    FIRST_CHILD(this),                      FIRST_CHILD(this)->fops->fallocate, fd, mode, offset, len,  		    xdata);          return 0; @@ -3050,6 +3789,7 @@ unwind:          return 0;  } +  int32_t  quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,  		off_t offset, size_t len, dict_t *xdata) @@ -3059,8 +3799,13 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,          quota_local_t     *local   = NULL;          quota_inode_ctx_t *ctx     = NULL;          quota_priv_t      *priv    = NULL; -        call_stub_t       *stub    = NULL;          quota_dentry_t    *dentry  = NULL; +        call_stub_t       *stub    = NULL; + +        priv = this->private; +        GF_VALIDATE_OR_GOTO (this->name, priv, unwind); + +        WIND_IF_QUOTAOFF (priv->is_quota_on, wind);          GF_ASSERT (frame);          GF_VALIDATE_OR_GOTO ("quota", this, unwind); @@ -3074,16 +3819,17 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,          frame->local = local;          local->loc.inode = inode_ref (fd->inode); -        ret = quota_inode_ctx_get (fd->inode, -1, this, NULL, NULL, &ctx, 0); +        ret = quota_inode_ctx_get (fd->inode, this, &ctx, 0);          if (ctx == NULL) { -                gf_log (this->name, GF_LOG_WARNING, -                        "quota context not set in inode (gfid:%s)", -                        uuid_utoa (fd->inode->gfid)); -                goto unwind; +                gf_log (this->name, GF_LOG_DEBUG, "quota context is " +                        "NULL on inode (%s). " +                        "If quota is not enabled recently and crawler " +                        "has finished crawling, its an error", +                        uuid_utoa (local->loc.inode->gfid));          } -        stub = fop_fallocate_stub(frame, quota_fallocate_helper, fd, mode, offset, len, -				  xdata);  +        stub = fop_fallocate_stub(frame, quota_fallocate_helper, fd, mode, +                                  offset, len, xdata);          if (stub == NULL) {                  op_errno = ENOMEM;                  goto unwind; @@ -3092,13 +3838,15 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,          priv = this->private;          GF_VALIDATE_OR_GOTO (this->name, priv, unwind); -        LOCK (&ctx->lock); -        { -                list_for_each_entry (dentry, &ctx->parents, next) { -                        parents++; +        if (ctx != NULL) { +                LOCK (&ctx->lock); +                { +                        list_for_each_entry (dentry, &ctx->parents, next) { +                                parents++; +                        }                  } +                UNLOCK (&ctx->lock);          } -        UNLOCK (&ctx->lock);  	/*  	 * Note that by using len as the delta we're assuming the range from @@ -3109,37 +3857,75 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,          local->stub = stub;          local->link_count = parents; -        list_for_each_entry (dentry, &ctx->parents, next) { -                ret = quota_check_limit (frame, fd->inode, this, dentry->name, -                                         dentry->par); -                if (ret == -1) { -                        break; -                } -        } - -        stub = NULL; - -        LOCK (&local->lock); -        { -                local->link_count = 0; -                if (local->validate_count == 0) { -                        stub = local->stub; -                        local->stub = NULL; +        if (parents == 0) { +                local->link_count = 1; +                quota_check_limit (frame, fd->inode, this, NULL, NULL); +        } else { +                list_for_each_entry (dentry, &ctx->parents, next) { +                        quota_check_limit (frame, fd->inode, this, dentry->name, +                                           dentry->par);                  }          } -        UNLOCK (&local->lock); - -        if (stub != NULL) { -                call_resume (stub); -        }          return 0;  unwind:          QUOTA_STACK_UNWIND (fallocate, frame, -1, op_errno, NULL, NULL, NULL);          return 0; + +wind: +        STACK_WIND (frame, priv->is_quota_on? +                    quota_fallocate_cbk: default_fallocate_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->fallocate, fd, mode, offset, len, +		    xdata); +        return 0;  } +/* Logs if +*  i.   Usage crossed soft limit +*  ii.  Usage above soft limit and alert-time timed out +*/ +void +quota_log_usage (xlator_t *this, quota_inode_ctx_t *ctx, inode_t *inode, +                 int64_t delta) +{ +        struct timeval           cur_time       = {0,}; +        char                    *usage_str      = NULL; +        char                    *path           = NULL; +        int64_t                  cur_size       = 0; +        quota_priv_t            *priv           = NULL; + +        priv = this->private; +        cur_size = ctx->size + delta; +        if ((ctx->soft_lim <= 0) || (timerisset (&ctx->prev_log) && +                                     !quota_timeout (&ctx->prev_log, +                                                     priv->log_timeout))) { +                return; +        } + +        gettimeofday (&cur_time, NULL); +        ctx->prev_log = cur_time; + +        usage_str = gf_uint64_2human_readable (cur_size); +        inode_path (inode, NULL, &path); +        if (!path) +                path = uuid_utoa (inode->gfid); + +        /* Usage crossed/reached soft limit */ +        if (DID_REACH_LIMIT (ctx->soft_lim, ctx->size, cur_size)) { + +                gf_log (this->name, GF_LOG_ALERT, "Usage crossed " +                        "soft limit: %s used by %s", usage_str, path); +        } +        /* Usage is above soft limit */ +        else if (cur_size > ctx->soft_lim){ +                gf_log (this->name, GF_LOG_ALERT, "Usage is above " +                        "soft limit: %s used by %s", usage_str, path); +        } +        if (usage_str) +                GF_FREE (usage_str); +}  int32_t  mem_acct_init (xlator_t *this) @@ -3192,83 +3978,6 @@ quota_forget (xlator_t *this, inode_t *inode)          return 0;  } - -int -quota_parse_limits (quota_priv_t *priv, xlator_t *this, dict_t *xl_options, -                    struct list_head *old_list) -{ -        int32_t       ret       = -1; -        char         *str       = NULL; -        char         *str_val   = NULL; -        char         *path      = NULL, *saveptr = NULL; -        uint64_t      value     = 0; -        limits_t     *quota_lim = NULL, *old = NULL; -        char         *last_colon= NULL; - -        ret = dict_get_str (xl_options, "limit-set", &str); - -        if (str) { -                path = strtok_r (str, ",", &saveptr); - -                while (path) { -                        last_colon = strrchr (path, ':'); -                        *last_colon = '\0'; -                        str_val = last_colon + 1; - -                        ret = gf_string2bytesize (str_val, &value); -                        if (ret != 0) -                                goto err; - -                        QUOTA_ALLOC_OR_GOTO (quota_lim, limits_t, err); - -                        quota_lim->path = path; - -                        quota_lim->value = value; - -                        gf_log (this->name, GF_LOG_INFO, "%s:%"PRId64, -                                quota_lim->path, quota_lim->value); - -                        if (old_list != NULL) { -                                list_for_each_entry (old, old_list, -                                                     limit_list) { -                                        if (strcmp (old->path, quota_lim->path) -                                            == 0) { -                                                uuid_copy (quota_lim->gfid, -                                                           old->gfid); -                                                break; -                                        } -                                } -                        } - -                        LOCK (&priv->lock); -                        { -                                list_add_tail ("a_lim->limit_list, -                                               &priv->limit_head); -                        } -                        UNLOCK (&priv->lock); - -                        path = strtok_r (NULL, ",", &saveptr); -                } -        } else { -                gf_log (this->name, GF_LOG_INFO, -                        "no \"limit-set\" option provided"); -        } - -        LOCK (&priv->lock); -        { -                list_for_each_entry (quota_lim, &priv->limit_head, limit_list) { -                        gf_log (this->name, GF_LOG_INFO, "%s:%"PRId64, -                                quota_lim->path, quota_lim->value); -                } -        } -        UNLOCK (&priv->lock); - -        ret = 0; -err: -        return ret; -} - -  int32_t  init (xlator_t *this)  { @@ -3290,20 +3999,18 @@ init (xlator_t *this)          QUOTA_ALLOC_OR_GOTO (priv, quota_priv_t, err); -        INIT_LIST_HEAD (&priv->limit_head); -          LOCK_INIT (&priv->lock);          this->private = priv; -        ret = quota_parse_limits (priv, this, this->options, NULL); - -        if (ret) { -                goto err; -        } - -        GF_OPTION_INIT ("timeout", priv->timeout, int64, err);          GF_OPTION_INIT ("deem-statfs", priv->consider_statfs, bool, err); +        GF_OPTION_INIT ("server-quota", priv->is_quota_on, bool, err); +        GF_OPTION_INIT ("default-soft-limit", priv->default_soft_lim, percent, +                        err); +        GF_OPTION_INIT ("soft-timeout", priv->soft_timeout, time, err); +        GF_OPTION_INIT ("hard-timeout", priv->hard_timeout, time, err); +        GF_OPTION_INIT ("alert-time", priv->log_timeout, time, err); +        GF_OPTION_INIT ("volume-uuid", priv->volume_uuid, str, err);          this->local_pool = mem_pool_new (quota_local_t, 64);          if (!this->local_pool) { @@ -3313,134 +4020,100 @@ init (xlator_t *this)                  goto err;          } +        if (priv->is_quota_on) { +                priv->rpc_clnt = quota_enforcer_init (this, this->options); +                if (priv->rpc_clnt == NULL) { +                        ret = -1; +                        gf_log (this->name, GF_LOG_WARNING, +                                "quota enforcer rpc init failed"); +                        goto err; +                } +        } +          ret = 0;  err:          return ret;  } - -void -__quota_reconfigure_inode_ctx (xlator_t *this, inode_t *inode, limits_t *limit) -{ -        int                ret = -1; -        quota_inode_ctx_t *ctx = NULL; - -        GF_VALIDATE_OR_GOTO ("quota", this, out); -        GF_VALIDATE_OR_GOTO (this->name, inode, out); -        GF_VALIDATE_OR_GOTO (this->name, limit, out); - -        ret = quota_inode_ctx_get (inode, limit->value, this, NULL, NULL, &ctx, -                                   1); -        if ((ret == -1) || (ctx == NULL)) { -                gf_log (this->name, GF_LOG_WARNING, "cannot create quota " -                        "context in inode(gfid:%s)", -                        uuid_utoa (inode->gfid)); -                goto out; -        } - -        LOCK (&ctx->lock); -        { -                ctx->limit = limit->value; -        } -        UNLOCK (&ctx->lock); - -out: -        return; -} - - -void -__quota_reconfigure (xlator_t *this, inode_table_t *itable, limits_t *limit) -{ -        inode_t *inode = NULL; - -        if ((this == NULL) || (itable == NULL) || (limit == NULL)) { -                goto out; -        } - -        if (!uuid_is_null (limit->gfid)) { -                inode = inode_find (itable, limit->gfid); -        } else { -                inode = inode_resolve (itable, limit->path); -        } - -        if (inode != NULL) { -                __quota_reconfigure_inode_ctx (this, inode, limit); -        } - -out: -        return; -} - -  int  reconfigure (xlator_t *this, dict_t *options)  {          int32_t           ret   = -1;          quota_priv_t     *priv  = NULL; -        limits_t         *limit = NULL, *next = NULL, *new = NULL; -        struct list_head  head  = {0, }; -        xlator_t         *top   = NULL; -        char              found = 0;          priv = this->private; -        INIT_LIST_HEAD (&head); +        GF_OPTION_RECONF ("deem-statfs", priv->consider_statfs, options, bool, +                          out); +        GF_OPTION_RECONF ("server-quota", priv->is_quota_on, options, bool, +                          out); +        GF_OPTION_RECONF ("default-soft-limit", priv->default_soft_lim, +                          options, percent, out); +        GF_OPTION_RECONF ("alert-time", priv->log_timeout, options, +                          time, out); +        GF_OPTION_RECONF ("soft-timeout", priv->soft_timeout, options, +                          time, out); +        GF_OPTION_RECONF ("hard-timeout", priv->hard_timeout, options, +                          time, out); + +        if (priv->is_quota_on) { +                priv->rpc_clnt = quota_enforcer_init (this, +                                                      this->options); +                if (priv->rpc_clnt == NULL) { +                        ret = -1; +                        gf_log (this->name, GF_LOG_WARNING, +                                "quota enforcer rpc init failed"); +                        goto out; +                } -        LOCK (&priv->lock); -        { -                list_splice_init (&priv->limit_head, &head); +        } else { +                if (priv->rpc_clnt) { +                        // Quotad is shutdown when there is no started volume +                        // which has quota enabled. So, we should disable the +                        // enforcer client when quota is disabled on a volume, +                        // to avoid spurious reconnect attempts to a service +                        // (quotad), that is known to be down. +                        rpc_clnt_disable (priv->rpc_clnt); +                }          } -        UNLOCK (&priv->lock); -        ret = quota_parse_limits (priv, this, options, &head); -        if (ret == -1) { -                gf_log ("quota", GF_LOG_WARNING, -                        "quota reconfigure failed, " -                        "new changes will not take effect"); -                goto out; -        } +        ret = 0; +out: +        return ret; +} -        LOCK (&priv->lock); -        { -                top = ((glusterfs_ctx_t *)this->ctx)->active->top; -                GF_ASSERT (top); +int32_t +quota_priv_dump (xlator_t *this) +{ +        quota_priv_t *priv = NULL; +        int32_t       ret  = -1; -                list_for_each_entry (limit, &priv->limit_head, limit_list) { -                        __quota_reconfigure (this, top->itable, limit); -                } -                list_for_each_entry_safe (limit, next, &head, limit_list) { -                        found = 0; -                        list_for_each_entry (new, &priv->limit_head, -                                             limit_list) { -                                if (strcmp (new->path, limit->path) == 0) { -                                        found = 1; -                                        break; -                                } -                        } +        GF_ASSERT (this); -                        if (!found) { -                                limit->value = -1; -                                __quota_reconfigure (this, top->itable, limit); -                        } +        priv = this->private; -                        list_del_init (&limit->limit_list); -                        GF_FREE (limit); -                } +        gf_proc_dump_add_section ("xlators.features.quota.priv", this->name); + +        ret = TRY_LOCK (&priv->lock); +        if (ret) +             goto out; +        else { +                gf_proc_dump_write("soft-timeout", "%d", priv->soft_timeout); +                gf_proc_dump_write("hard-timeout", "%d", priv->hard_timeout); +                gf_proc_dump_write("alert-time", "%d", priv->log_timeout); +                gf_proc_dump_write("quota-on", "%d", priv->is_quota_on); +                gf_proc_dump_write("statfs", "%d", priv->consider_statfs); +                gf_proc_dump_write("volume-uuid", "%s", priv->volume_uuid); +                gf_proc_dump_write("validation-count", "%ld", +                                    priv->validation_count);          }          UNLOCK (&priv->lock); -        GF_OPTION_RECONF ("timeout", priv->timeout, options, int64, out); -        GF_OPTION_RECONF ("deem-statfs", priv->consider_statfs, options, bool, -                          out); - -        ret = 0;  out: -        return ret; +        return 0;  } -  void  fini (xlator_t *this)  { @@ -3482,16 +4155,11 @@ struct xlator_cbks cbks = {          .forget = quota_forget  }; +struct xlator_dumpops dumpops = { +        .priv    = quota_priv_dump, +};  struct volume_options options[] = {          {.key = {"limit-set"}}, -        {.key = {"timeout"}, -         .type = GF_OPTION_TYPE_SIZET, -         .min = 0, -         .max = 60, -         .default_value = "0", -         .description = "quota caches the directory sizes on client. Timeout " -                        "indicates the timeout for the cache to be revalidated." -        },          {.key = {"deem-statfs"},           .type = GF_OPTION_TYPE_BOOL,           .default_value = "off", @@ -3499,5 +4167,62 @@ struct volume_options options[] = {                          "consideration while estimating fs size. (df command)"                          " (Default is off)."          }, +        {.key = {"server-quota"}, +         .type = GF_OPTION_TYPE_BOOL, +         .default_value = "off", +         .description = "Skip the quota enforcement if the feature is" +                        " not turned on. This is not a user exposed option." +        }, +        {.key = {"default-soft-limit"}, +         .type = GF_OPTION_TYPE_PERCENT, +         .default_value = "80%", +         .min = 0, +         .max = LONG_MAX, +        }, +        {.key = {"soft-timeout"}, +         .type = GF_OPTION_TYPE_TIME, +         .min = 0, +         .max = 1800, +         .default_value = "60", +         .description = "quota caches the directory sizes on client. " +                        "soft-timeout indicates the timeout for the validity of" +                        " cache before soft-limit has been crossed." +        }, +        {.key = {"hard-timeout"}, +         .type = GF_OPTION_TYPE_TIME, +         .min = 0, +         .max = 60, +         .default_value = "5", +         .description = "quota caches the directory sizes on client. " +                        "hard-timeout indicates the timeout for the validity of" +                        " cache after soft-limit has been crossed." +        }, +        { .key   = {"username"}, +          .type  = GF_OPTION_TYPE_ANY, +        }, +        { .key   = {"password"}, +          .type  = GF_OPTION_TYPE_ANY, +        }, +        { .key   = {"transport-type"}, +          .value = {"tcp", "socket", "ib-verbs", "unix", "ib-sdp", +                    "tcp/client", "ib-verbs/client", "rdma"}, +          .type  = GF_OPTION_TYPE_STR, +        }, +        { .key   = {"remote-host"}, +          .type  = GF_OPTION_TYPE_INTERNET_ADDRESS, +        }, +        { .key   = {"remote-port"}, +          .type  = GF_OPTION_TYPE_INT, +        }, +        { .key  = {"volume-uuid"}, +          .type = GF_OPTION_TYPE_STR, +          .description = "uuid of the volume this brick is part of." +        }, +        { .key  = {"alert-time"}, +          .type = GF_OPTION_TYPE_TIME, +          .min = 0, +          .max = 7*86400, +          .default_value = "86400", +        },          {.key = {NULL}}  }; diff --git a/xlators/features/quota/src/quota.h b/xlators/features/quota/src/quota.h index 84ecbb3084e..de522e6914f 100644 --- a/xlators/features/quota/src/quota.h +++ b/xlators/features/quota/src/quota.h @@ -12,20 +12,48 @@  #include "config.h"  #endif +#ifndef _QUOTA_H +#define _QUOTA_H +  #include "xlator.h"  #include "call-stub.h"  #include "defaults.h" -#include "byte-order.h"  #include "common-utils.h"  #include "quota-mem-types.h" +#include "glusterfs.h" +#include "compat.h" +#include "logging.h" +#include "dict.h" +#include "stack.h" +#include "common-utils.h" +#include "event.h" +#include "globals.h" +#include "rpcsvc.h" +#include "rpc-clnt.h" +#include "byte-order.h" +#include "glusterfs3-xdr.h" +#include "glusterfs3.h" +#include "xdr-generic.h" +#include "compat-errno.h" +#include "protocol-common.h" -#define QUOTA_XATTR_PREFIX      "trusted."  #define DIRTY                   "dirty"  #define SIZE                    "size"  #define CONTRIBUTION            "contri"  #define VAL_LENGTH              8  #define READDIR_BUF             4096 +#ifndef UUID_CANONICAL_FORM_LEN +#define UUID_CANONICAL_FORM_LEN 36 +#endif + +#define WIND_IF_QUOTAOFF(is_quota_on, label)     \ +        if (!is_quota_on)                       \ +                goto label; + +#define DID_REACH_LIMIT(lim, prev_size, cur_size)               \ +        ((cur_size) >= (lim) && (prev_size) < (lim)) +  #define QUOTA_SAFE_INCREMENT(lock, var)         \          do {                                    \                  LOCK (lock);                    \ @@ -46,7 +74,7 @@                                   gf_quota_mt_##type);   \                  if (!var) {                             \                          gf_log ("", GF_LOG_ERROR,       \ -                                "out of memory :(");    \ +                                "out of memory");    \                          ret = -1;                       \                          goto label;                     \                  }                                       \ @@ -74,11 +102,17 @@  #define GET_CONTRI_KEY(var, _vol_name, _gfid, _ret)             \          do {                                                    \                  char _gfid_unparsed[40];                        \ -                uuid_unparse (_gfid, _gfid_unparsed);           \ -                _ret = gf_asprintf (var, QUOTA_XATTR_PREFIX     \ -                                    "%s.%s." CONTRIBUTION,      \ -                                    _vol_name, _gfid_unparsed); \ -        } while (0) +                if (_gfid != NULL) {                            \ +                        uuid_unparse (_gfid, _gfid_unparsed);   \ +                        _ret = gf_asprintf (var, QUOTA_XATTR_PREFIX     \ +                                            "%s.%s." CONTRIBUTION,      \ +                                            _vol_name, _gfid_unparsed); \ +                } else {                                                \ +                        _ret = gf_asprintf (var, QUOTA_XATTR_PREFIX     \ +                                            "%s.." CONTRIBUTION,       \ +                                            _vol_name);             \ +                }                                                       \ +         } while (0)  #define GET_CONTRI_KEY_OR_GOTO(var, _vol_name, _gfid, label)    \ @@ -96,6 +130,8 @@                          goto label;                             \          } while (0) + +  struct quota_dentry {          char            *name;          uuid_t           par; @@ -105,47 +141,70 @@ typedef struct quota_dentry quota_dentry_t;  struct quota_inode_ctx {          int64_t          size; -        int64_t          limit; +        int64_t          hard_lim; +        int64_t          soft_lim;          struct iatt      buf;          struct list_head parents;          struct timeval   tv; +        struct timeval   prev_log;          gf_lock_t        lock;  };  typedef struct quota_inode_ctx quota_inode_ctx_t; +struct quota_limit { +        int64_t hard_lim; +        int64_t soft_lim_percent; +} __attribute__ ((packed)); +typedef struct quota_limit quota_limit_t; +  struct quota_local { -        gf_lock_t    lock; -        uint32_t     validate_count; -        uint32_t     link_count; -        loc_t        loc; -        loc_t        oldloc; -        loc_t        newloc; -        loc_t        validate_loc; -        int64_t      delta; -        int32_t      op_ret; -        int32_t      op_errno; -        int64_t      size; -        int64_t      limit; -        char         just_validated; -        inode_t     *inode; -        call_stub_t *stub; +        gf_lock_t           lock; +        uint32_t            validate_count; +        uint32_t            link_count; +        loc_t               loc; +        loc_t               oldloc; +        loc_t               newloc; +        loc_t               validate_loc; +        int64_t             delta; +        int32_t             op_ret; +        int32_t             op_errno; +        int64_t             size; +        gf_boolean_t        skip_check; +        char                just_validated; +        fop_lookup_cbk_t    validate_cbk; +        inode_t            *inode; +        call_stub_t        *stub; +        struct iobref      *iobref; +        quota_limit_t       limit;  }; -typedef struct quota_local quota_local_t; +typedef struct quota_local  quota_local_t;  struct quota_priv { -        int64_t           timeout; -        gf_boolean_t      consider_statfs; -        struct list_head  limit_head; -        gf_lock_t         lock; +        uint32_t                soft_timeout; +        uint32_t                hard_timeout; +        uint32_t               log_timeout; +        double                 default_soft_lim; +        gf_boolean_t           is_quota_on; +        gf_boolean_t           consider_statfs; +        gf_lock_t              lock; +        rpc_clnt_prog_t       *quota_enforcer; +        struct rpcsvc_program *quotad_aggregator; +        struct rpc_clnt       *rpc_clnt; +        rpcsvc_t              *rpcsvc; +        inode_table_t         *itable; +        char                  *volume_uuid; +        uint64_t               validation_count;  }; -typedef struct quota_priv quota_priv_t; +typedef struct quota_priv      quota_priv_t; -struct limits { -        struct list_head  limit_list; -        char             *path; -        int64_t           value; -        uuid_t            gfid; -}; -typedef struct limits     limits_t; +int +quota_enforcer_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, +                       dict_t *xdata, fop_lookup_cbk_t cbk); +struct rpc_clnt * +quota_enforcer_init (xlator_t *this, dict_t *options); -uint64_t cn = 1; +void +quota_log_usage (xlator_t *this, quota_inode_ctx_t *ctx, inode_t *inode, +                 int64_t delta); + +#endif diff --git a/xlators/features/quota/src/quotad-aggregator.c b/xlators/features/quota/src/quotad-aggregator.c new file mode 100644 index 00000000000..f3f65ca2a04 --- /dev/null +++ b/xlators/features/quota/src/quotad-aggregator.c @@ -0,0 +1,423 @@ +/* +   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "cli1-xdr.h" +#include "quota.h" +#include "quotad-helpers.h" +#include "quotad-aggregator.h" + +struct rpcsvc_program quotad_aggregator_prog; + +struct iobuf * +quotad_serialize_reply (rpcsvc_request_t *req, void *arg, struct iovec *outmsg, +                        xdrproc_t xdrproc) +{ +        struct iobuf *iob      = NULL; +        ssize_t       retlen   = 0; +        ssize_t       xdr_size = 0; + +        GF_VALIDATE_OR_GOTO ("server", req, ret); + +        /* First, get the io buffer into which the reply in arg will +         * be serialized. +         */ +        if (arg && xdrproc) { +                xdr_size = xdr_sizeof (xdrproc, arg); +                iob = iobuf_get2 (req->svc->ctx->iobuf_pool, xdr_size); +                if (!iob) { +                        gf_log_callingfn (THIS->name, GF_LOG_ERROR, +                                          "Failed to get iobuf"); +                        goto ret; +                }; + +                iobuf_to_iovec (iob, outmsg); +                /* Use the given serializer to translate the give C structure in arg +                 * to XDR format which will be written into the buffer in outmsg. +                 */ +                /* retlen is used to received the error since size_t is unsigned and we +                 * need -1 for error notification during encoding. +                 */ + +                retlen = xdr_serialize_generic (*outmsg, arg, xdrproc); +                if (retlen == -1) { +                        /* Failed to Encode 'GlusterFS' msg in RPC is not exactly +                           failure of RPC return values.. client should get +                           notified about this, so there are no missing frames */ +                        gf_log_callingfn ("", GF_LOG_ERROR, "Failed to encode message"); +                        req->rpc_err = GARBAGE_ARGS; +                        retlen = 0; +                } +        } +        outmsg->iov_len = retlen; +ret: +        if (retlen == -1) { +                iobuf_unref (iob); +                iob = NULL; +        } + +        return iob; +} + +int +quotad_aggregator_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, +                                void *arg, struct iovec *payload, +                                int payloadcount, struct iobref *iobref, +                                xdrproc_t xdrproc) +{ +        struct iobuf              *iob        = NULL; +        int                        ret        = -1; +        struct iovec               rsp        = {0,}; +        quotad_aggregator_state_t *state      = NULL; +        char                       new_iobref = 0; + +        GF_VALIDATE_OR_GOTO ("server", req, ret); + +        if (frame) { +                state = frame->root->state; +                frame->local = NULL; +        } + +        if (!iobref) { +                iobref = iobref_new (); +                if (!iobref) { +                        goto ret; +                } + +                new_iobref = 1; +        } + +        iob = quotad_serialize_reply (req, arg, &rsp, xdrproc); +        if (!iob) { +                gf_log ("", GF_LOG_ERROR, "Failed to serialize reply"); +                goto ret; +        } + +        iobref_add (iobref, iob); + +        ret = rpcsvc_submit_generic (req, &rsp, 1, payload, payloadcount, +                                     iobref); + +        iobuf_unref (iob); + +        ret = 0; +ret: +        if (state) { +                quotad_aggregator_free_state (state); +        } + +        if (frame) { +                if (frame->root->client) +                        gf_client_unref (frame->root->client); + +                STACK_DESTROY (frame->root); +        } + +        if (new_iobref) { +                iobref_unref (iobref); +        } + +        return ret; +} + +int +quotad_aggregator_getlimit_cbk (xlator_t *this, call_frame_t *frame, +                                void *lookup_rsp) +{ +        gfs3_lookup_rsp            *rsp   = lookup_rsp; +        gf_cli_rsp                 cli_rsp = {0,}; +        dict_t                     *xdata = NULL; +        int                         ret = -1; + +        GF_PROTOCOL_DICT_UNSERIALIZE (frame->this, xdata, +                                      (rsp->xdata.xdata_val), +                                      (rsp->xdata.xdata_len), rsp->op_ret, +                                      rsp->op_errno, out); + +        ret = 0; +out: +        rsp->op_ret = ret; +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, "failed to unserialize " +                        "nameless lookup rsp"); +                goto reply; +        } +        cli_rsp.op_ret = rsp->op_ret; +        cli_rsp.op_errno = rsp->op_errno; +        cli_rsp.op_errstr = ""; +        if (xdata) { +                GF_PROTOCOL_DICT_SERIALIZE (frame->this, xdata, +                                            (&cli_rsp.dict.dict_val), +                                            (cli_rsp.dict.dict_len), +                                            cli_rsp.op_errno, reply); +        } + +reply: +        quotad_aggregator_submit_reply (frame, frame->local, (void*)&cli_rsp, NULL, 0, +                                        NULL, (xdrproc_t)xdr_gf_cli_rsp); + +        dict_unref (xdata); +        GF_FREE (cli_rsp.dict.dict_val); +        return 0; +} + +int +quotad_aggregator_getlimit (rpcsvc_request_t *req) +{ +        call_frame_t              *frame = NULL; +        gf_cli_req                 cli_req = {{0}, }; +        gf_cli_rsp                 cli_rsp = {0}; +        gfs3_lookup_req            args  = {{0,},}; +        gfs3_lookup_rsp            rsp   = {0,}; +        quotad_aggregator_state_t *state = NULL; +        xlator_t                  *this  = NULL; +        dict_t                    *dict  = NULL; +        int                        ret   = -1, op_errno = 0; +        char                      *gfid_str = NULL; +        uuid_t                     gfid = {0}; + +        GF_VALIDATE_OR_GOTO ("quotad-aggregator", req, err); + +        this = THIS; + +        ret = xdr_to_generic (req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req); +        if (ret < 0)  { +                //failed to decode msg; +                gf_log ("", GF_LOG_ERROR, "xdr decoding error"); +                req->rpc_err = GARBAGE_ARGS; +                goto err; +        } + +        if (cli_req.dict.dict_len) { +                dict = dict_new (); +                ret = dict_unserialize (cli_req.dict.dict_val, +                                        cli_req.dict.dict_len, &dict); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, "Failed to " +                                "unserialize req-buffer to dictionary"); +                        goto err; +                } +        } + +        ret = dict_get_str (dict, "gfid", &gfid_str); +        if (ret) { +                goto err; +        } + +        uuid_parse ((const char*)gfid_str, gfid); + +        frame = quotad_aggregator_get_frame_from_req (req); +        if (frame == NULL) { +                rsp.op_errno = ENOMEM; +                goto err; +        } +        state = frame->root->state; +        state->xdata = dict; +        ret = dict_set_int32 (state->xdata, QUOTA_LIMIT_KEY, 42); +        if (ret) +                goto err; + +        ret = dict_set_int32 (state->xdata, QUOTA_SIZE_KEY, 42); +        if (ret) +                goto err; + +        ret = dict_set_int32 (state->xdata, GET_ANCESTRY_PATH_KEY,42); +        if (ret) +                goto err; + +        memcpy (&args.gfid, &gfid, 16); + +        args.bname           = alloca (req->msg[0].iov_len); +        args.xdata.xdata_val = alloca (req->msg[0].iov_len); + +        ret = qd_nameless_lookup (this, frame, &args, state->xdata, +                                  quotad_aggregator_getlimit_cbk); +        if (ret) { +                rsp.op_errno = ret; +                goto err; +        } + +        return ret; + +err: +        cli_rsp.op_ret = -1; +        cli_rsp.op_errno = op_errno; +        cli_rsp.op_errstr = ""; + +        quotad_aggregator_getlimit_cbk (this, frame, &cli_rsp); +        dict_unref (dict); + +        return ret; +} + +int +quotad_aggregator_lookup_cbk (xlator_t *this, call_frame_t *frame, +                              void *rsp) +{ +        quotad_aggregator_submit_reply (frame, frame->local, rsp, NULL, 0, NULL, +                                        (xdrproc_t)xdr_gfs3_lookup_rsp); + +        return 0; +} + + +int +quotad_aggregator_lookup (rpcsvc_request_t *req) +{ +        call_frame_t              *frame = NULL; +        gfs3_lookup_req            args  = {{0,},}; +        int                        ret   = -1, op_errno = 0; +        gfs3_lookup_rsp            rsp   = {0,}; +        quotad_aggregator_state_t *state = NULL; +        xlator_t                  *this  = NULL; + +        GF_VALIDATE_OR_GOTO ("quotad-aggregator", req, err); + +        this = THIS; + +        args.bname           = alloca (req->msg[0].iov_len); +        args.xdata.xdata_val = alloca (req->msg[0].iov_len); + +        ret = xdr_to_generic (req->msg[0], &args, +                              (xdrproc_t)xdr_gfs3_lookup_req); +        if (ret < 0) { +                rsp.op_errno = EINVAL; +                goto err; +        } + +        frame = quotad_aggregator_get_frame_from_req (req); +        if (frame == NULL) { +                rsp.op_errno = ENOMEM; +                goto err; +        } + +        state = frame->root->state; + +        GF_PROTOCOL_DICT_UNSERIALIZE (this, state->xdata, +                                      (args.xdata.xdata_val), +                                      (args.xdata.xdata_len), ret, +                                      op_errno, err); + + +        ret = qd_nameless_lookup (this, frame, &args, state->xdata, +                                  quotad_aggregator_lookup_cbk); +        if (ret) { +                rsp.op_errno = ret; +                goto err; +        } + +        return ret; + +err: +        rsp.op_ret = -1; +        rsp.op_errno = op_errno; + +        quotad_aggregator_lookup_cbk (this, frame, &rsp); +        return ret; +} + +int +quotad_aggregator_rpc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, +                              void *data) +{ +        if (!xl || !data) { +                gf_log_callingfn ("server", GF_LOG_WARNING, +                                  "Calling rpc_notify without initializing"); +                goto out; +        } + +        switch (event) { +        case RPCSVC_EVENT_ACCEPT: +                break; + +        case RPCSVC_EVENT_DISCONNECT: +                break; + +        default: +                break; +        } + +out: +        return 0; +} + +int +quotad_aggregator_init (xlator_t *this) +{ +        quota_priv_t *priv = NULL; +        int           ret  = -1; + +        priv = this->private; + +        ret = dict_set_str (this->options, "transport.address-family", "unix"); +        if (ret) +                goto out; + +        ret = dict_set_str (this->options, "transport-type", "socket"); +        if (ret) +                goto out; + +        ret = dict_set_str (this->options, "transport.socket.listen-path", +                            "/tmp/quotad.socket"); +        if (ret) +                goto out; + +        /* RPC related */ +        priv->rpcsvc = rpcsvc_init (this, this->ctx, this->options, 0); +        if (priv->rpcsvc == NULL) { +                gf_log (this->name, GF_LOG_WARNING, +                        "creation of rpcsvc failed"); +                ret = -1; +                goto out; +        } + +        ret = rpcsvc_create_listeners (priv->rpcsvc, this->options, +                                       this->name); +        if (ret < 1) { +                gf_log (this->name, GF_LOG_WARNING, +                        "creation of listener failed"); +                ret = -1; +                goto out; +        } + +        priv->quotad_aggregator = "ad_aggregator_prog; +        quotad_aggregator_prog.options = this->options; + +        ret = rpcsvc_program_register (priv->rpcsvc, "ad_aggregator_prog); +        if (ret) { +                gf_log (this->name, GF_LOG_WARNING, +                        "registration of program (name:%s, prognum:%d, " +                        "progver:%d) failed", quotad_aggregator_prog.progname, +                        quotad_aggregator_prog.prognum, +                        quotad_aggregator_prog.progver); +                goto out; +        } + +        ret = 0; +out: +        return ret; +} + +rpcsvc_actor_t quotad_aggregator_actors[] = { +        [GF_AGGREGATOR_NULL]     = {"NULL", GF_AGGREGATOR_NULL, NULL, NULL, 0, +                                    DRC_NA}, +        [GF_AGGREGATOR_LOOKUP]   = {"LOOKUP", GF_AGGREGATOR_NULL, +                                    quotad_aggregator_lookup, NULL, 0, DRC_NA}, +        [GF_AGGREGATOR_GETLIMIT] = {"GETLIMIT", GF_AGGREGATOR_GETLIMIT, +                                   quotad_aggregator_getlimit, NULL, 0}, +}; + + +struct rpcsvc_program quotad_aggregator_prog = { +        .progname  = "GlusterFS 3.3", +        .prognum   = GLUSTER_AGGREGATOR_PROGRAM, +        .progver   = GLUSTER_AGGREGATOR_VERSION, +        .numactors = GF_AGGREGATOR_MAXVALUE, +        .actors    = quotad_aggregator_actors +}; diff --git a/xlators/features/quota/src/quotad-aggregator.h b/xlators/features/quota/src/quotad-aggregator.h new file mode 100644 index 00000000000..5ddea5b3c46 --- /dev/null +++ b/xlators/features/quota/src/quotad-aggregator.h @@ -0,0 +1,37 @@ +/* +   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _QUOTAD_AGGREGATOR_H +#define _QUOTAD_AGGREGATOR_H + +#include "quota.h" +#include "stack.h" +#include "glusterfs3-xdr.h" +#include "inode.h" + +typedef struct { +        void          *pool; +        xlator_t      *this; +	xlator_t      *active_subvol; +        inode_table_t *itable; +        loc_t          loc; +        dict_t        *xdata; +} quotad_aggregator_state_t; + +typedef int (*quotad_aggregator_lookup_cbk_t) (xlator_t *this, +                                               call_frame_t *frame, +                                               void *rsp); +int +qd_nameless_lookup (xlator_t *this, call_frame_t *frame, gfs3_lookup_req *req, +                    dict_t *xdata, quotad_aggregator_lookup_cbk_t lookup_cbk); +int +quotad_aggregator_init (xlator_t *this); + +#endif diff --git a/xlators/features/quota/src/quotad-helpers.c b/xlators/features/quota/src/quotad-helpers.c new file mode 100644 index 00000000000..fd309911474 --- /dev/null +++ b/xlators/features/quota/src/quotad-helpers.c @@ -0,0 +1,113 @@ +/* +   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "quotad-helpers.h" + +quotad_aggregator_state_t * +get_quotad_aggregator_state (xlator_t *this, rpcsvc_request_t *req) +{ +        quotad_aggregator_state_t *state         = NULL; +	xlator_t                  *active_subvol = NULL; +        quota_priv_t              *priv          = NULL; + +        state = (void *)GF_CALLOC (1, sizeof (*state), +                                   gf_quota_mt_aggregator_state_t); +        if (!state) +                return NULL; + +	state->this = THIS; +        priv = this->private; + +        LOCK (&priv->lock); +        { +                active_subvol = state->active_subvol = FIRST_CHILD (this); +        } +        UNLOCK (&priv->lock); + +        if (active_subvol->itable == NULL) +                active_subvol->itable = inode_table_new (4096, active_subvol); + +	state->itable = active_subvol->itable; + +        state->pool = this->ctx->pool; + +        return state; +} + +void +quotad_aggregator_free_state (quotad_aggregator_state_t *state) +{ +        if (state->xdata) +                dict_unref (state->xdata); + +        GF_FREE (state); +} + +call_frame_t * +quotad_aggregator_alloc_frame (rpcsvc_request_t *req) +{ +        call_frame_t              *frame = NULL; +        quotad_aggregator_state_t *state = NULL; +        xlator_t                  *this  = NULL; + +        GF_VALIDATE_OR_GOTO ("server", req, out); +        GF_VALIDATE_OR_GOTO ("server", req->trans, out); +        GF_VALIDATE_OR_GOTO ("server", req->svc, out); +        GF_VALIDATE_OR_GOTO ("server", req->svc->ctx, out); + +        this = req->svc->mydata; + +        frame = create_frame (this, req->svc->ctx->pool); +        if (!frame) +                goto out; + +        state = get_quotad_aggregator_state (this, req); +        if (!state) +                goto out; + +        frame->root->state = state; +        frame->root->unique = 0; + +        frame->this = this; +out: +        return frame; +} + +call_frame_t * +quotad_aggregator_get_frame_from_req (rpcsvc_request_t *req) +{ +        call_frame_t *frame  = NULL; +        client_t     *client = NULL; + +        GF_VALIDATE_OR_GOTO ("server", req, out); + +        frame = quotad_aggregator_alloc_frame (req); +        if (!frame) +                goto out; + +        client = req->trans->xl_private; + +        frame->root->op       = req->procnum; + +        frame->root->unique   = req->xid; + +        frame->root->uid      = req->uid; +        frame->root->gid      = req->gid; +        frame->root->pid      = req->pid; + +        gf_client_ref (client); +        frame->root->client   = client; + +        frame->root->lk_owner = req->lk_owner; + +        frame->local = req; +out: +        return frame; +} diff --git a/xlators/features/quota/src/quotad-helpers.h b/xlators/features/quota/src/quotad-helpers.h new file mode 100644 index 00000000000..a10fb7fa82a --- /dev/null +++ b/xlators/features/quota/src/quotad-helpers.h @@ -0,0 +1,24 @@ +/* +   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef QUOTAD_HELPERS_H +#define QUOTAD_HELPERS_H + +#include "rpcsvc.h" +#include "quota.h" +#include "quotad-aggregator.h" + +void +quotad_aggregator_free_state (quotad_aggregator_state_t *state); + +call_frame_t * +quotad_aggregator_get_frame_from_req (rpcsvc_request_t *req); + +#endif diff --git a/xlators/features/quota/src/quotad.c b/xlators/features/quota/src/quotad.c new file mode 100644 index 00000000000..243b943e986 --- /dev/null +++ b/xlators/features/quota/src/quotad.c @@ -0,0 +1,210 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ +#include "quota.h" +#include "quotad-aggregator.h" +#include "common-utils.h" + +int32_t +mem_acct_init (xlator_t *this) +{ +        int     ret = -1; + +        if (!this) +                return ret; + +        ret = xlator_mem_acct_init (this, gf_quota_mt_end + 1); + +        if (0 != ret) { +                gf_log (this->name, GF_LOG_WARNING, "Memory accounting " +                        "init failed"); +                return ret; +        } + +        return ret; +} + +int32_t +qd_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +               int32_t op_ret, int32_t op_errno, inode_t *inode, +               struct iatt *buf, dict_t *xdata, struct iatt *postparent) +{ +        quotad_aggregator_lookup_cbk_t  lookup_cbk = NULL; +        gfs3_lookup_rsp                 rsp = {0, }; + +        lookup_cbk = cookie; + +        rsp.op_ret = op_ret; +        rsp.op_errno = op_errno; + +        gf_stat_from_iatt (&rsp.postparent, postparent); + +        GF_PROTOCOL_DICT_SERIALIZE (this, xdata, (&rsp.xdata.xdata_val), +                                    rsp.xdata.xdata_len, rsp.op_errno, out); + +        gf_stat_from_iatt (&rsp.stat, buf); + +out: +        lookup_cbk (this, frame, &rsp); + +        GF_FREE (rsp.xdata.xdata_val); + +        inode_unref (inode); + +        return 0; +} + +xlator_t * +qd_find_subvol (xlator_t *this, char *volume_uuid) +{ +        xlator_list_t *child  = NULL; +        xlator_t      *subvol = NULL; +        char           key[1024]; +        char          *optstr = NULL; + +        if (!this || !volume_uuid) +                goto out; + +        for (child = this->children; child; child = child->next) { +                snprintf(key, 1024, "%s.volume-id", child->xlator->name); +                if (dict_get_str(this->options, key, &optstr) < 0) +                        continue; + +                if (strcmp (optstr, volume_uuid) == 0) { +                        subvol = child->xlator; +                        break; +                } +        } + +out: +        return subvol; +} + +int +qd_nameless_lookup (xlator_t *this, call_frame_t *frame, gfs3_lookup_req *req, +                    dict_t *xdata, quotad_aggregator_lookup_cbk_t lookup_cbk) +{ +        gfs3_lookup_rsp            rsp         = {0, }; +        int                        op_errno    = 0, ret = -1; +        loc_t                      loc         = {0, }; +        quotad_aggregator_state_t *state       = NULL; +        quota_priv_t              *priv        = NULL; +        xlator_t                  *subvol      = NULL; +        char                      *volume_uuid = NULL; + +        priv = this->private; +        state = frame->root->state; + +        frame->root->op = GF_FOP_LOOKUP; + +        loc.inode = inode_new (state->itable); +        if (loc.inode == NULL) { +                op_errno = ENOMEM; +                goto out; +        } + +        memcpy (loc.gfid, req->gfid, 16); + +        ret = dict_get_str (xdata, "volume-uuid", &volume_uuid); +        if (ret < 0) { +                op_errno = EINVAL; +                goto out; +        } + +        subvol = qd_find_subvol (this, volume_uuid); +        if (subvol == NULL) { +                op_errno = EINVAL; +                goto out; +        } + +        STACK_WIND_COOKIE (frame, qd_lookup_cbk, lookup_cbk, subvol, +                           subvol->fops->lookup, &loc, xdata); +        return 0; + +out: +        rsp.op_ret = -1; +        rsp.op_errno = op_errno; + +        lookup_cbk (this, frame, &rsp); + +        inode_unref (loc.inode); +        return 0; +} + +int +qd_reconfigure (xlator_t *this, dict_t *options) +{ +        /* As of now quotad is restarted upon alteration of volfile */ +        return 0; +} + +void +qd_fini (xlator_t *this) +{ +        return; +} + +int32_t +qd_init (xlator_t *this) +{ +        int32_t          ret            = -1; +        quota_priv_t    *priv           = NULL; + +        if (NULL == this->children) { +                gf_log (this->name, GF_LOG_ERROR, +                        "FATAL: quota (%s) not configured for min of 1 child", +                        this->name); +                ret = -1; +                goto err; +        } + +        QUOTA_ALLOC_OR_GOTO (priv, quota_priv_t, err); +        LOCK_INIT (&priv->lock); + +        this->private = priv; + +        ret = quotad_aggregator_init (this); +        if (ret < 0) +                goto err; + +        ret = 0; +err: +        if (ret) { +                GF_FREE (priv); +        } +        return ret; +} + +class_methods_t class_methods = { +        .init           = qd_init, +        .fini           = qd_fini, +        .reconfigure    = qd_reconfigure, +}; + +struct xlator_fops fops = { +}; + +struct xlator_cbks cbks = { +}; + +struct volume_options options[] = { +        { .key   = {"transport-type"}, +          .value = {"rpc", "rpc-over-rdma", "tcp", "socket", "ib-verbs", +                    "unix", "ib-sdp", "tcp/server", "ib-verbs/server", "rdma", +                    "rdma*([ \t]),*([ \t])socket", +                    "rdma*([ \t]),*([ \t])tcp", +                    "tcp*([ \t]),*([ \t])rdma", +                    "socket*([ \t]),*([ \t])rdma"}, +          .type  = GF_OPTION_TYPE_STR +        }, +        { .key   = {"transport.*"}, +          .type  = GF_OPTION_TYPE_ANY, +        }, +        {.key = {NULL}} +}; diff --git a/xlators/lib/src/libxlator.h b/xlators/lib/src/libxlator.h index 1d5e1657f4e..08bd77b918c 100644 --- a/xlators/lib/src/libxlator.h +++ b/xlators/lib/src/libxlator.h @@ -32,6 +32,7 @@  #define MARKER_UUID_TYPE    1  #define MARKER_XTIME_TYPE   2  #define GF_XATTR_QUOTA_SIZE_KEY "trusted.glusterfs.quota.size" +#define GF_XATTR_QUOTA_LIMIT_LIST "trusted.limit.list"  typedef int32_t (*xlator_specf_unwind_t) (call_frame_t *frame,  | 
