diff options
| author | vmallika <vmallika@redhat.com> | 2015-03-17 20:05:19 +0530 | 
|---|---|---|
| committer | Vijay Bellur <vbellur@redhat.com> | 2015-03-17 21:07:04 -0700 | 
| commit | 7970183f4c730d4aca3cfaa106fde015a0fbc415 (patch) | |
| tree | be48e8390e3acc8468061f95785b943a14c43ced | |
| parent | 33bb32ce5866a15e7d5164c67f214c4797236066 (diff) | |
Quota/marker : Support for inode quota
Currently, the only way to retrieve the number of files/objects in a
directory or volume is to do a crawl of the entire directory/volume.
This is expensive and is not scalable.
The new mechanism proposes to store count of objects/files as part of
an extended attribute of a directory. Each directory's extended
attribute value will indicate the number of files/objects present
in a tree with the directory being considered as the root of the tree.
Currently file usage is accounted in marker by doing multiple FOPs
like setting and getting xattrs. Doing this with STACK WIND and
UNWIND can be harder to debug as involves multiple callbacks.
In this code we are replacing current mechanism with syncop approach
as syncop code is much simpler to follow and help us implement inode
quota in an organized way.
Change-Id: Ibf366fbe07037284e89a241ddaff7750fc8771b4
BUG: 1188636
Signed-off-by: vmallika <vmallika@redhat.com>
Signed-off-by: Sachin Pandit <spandit@redhat.com>
Reviewed-on: http://review.gluster.org/9567
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Tested-by: Vijay Bellur <vbellur@redhat.com>
| -rw-r--r-- | cli/src/cli-rpc-ops.c | 65 | ||||
| -rw-r--r-- | libglusterfs/src/glusterfs.h | 3 | ||||
| -rw-r--r-- | libglusterfs/src/syncop.c | 36 | ||||
| -rw-r--r-- | libglusterfs/src/syncop.h | 4 | ||||
| -rw-r--r-- | libglusterfs/src/xlator.c | 42 | ||||
| -rw-r--r-- | libglusterfs/src/xlator.h | 1 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-mem-types.h | 2 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota-helper.c | 50 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota-helper.h | 2 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota.c | 1723 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota.h | 62 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker.c | 67 | 
12 files changed, 1788 insertions, 269 deletions
diff --git a/cli/src/cli-rpc-ops.c b/cli/src/cli-rpc-ops.c index b2964b68ff6..8ea43f824bc 100644 --- a/cli/src/cli-rpc-ops.c +++ b/cli/src/cli-rpc-ops.c @@ -2413,23 +2413,29 @@ static int  print_quota_list_output (cli_local_t *local, char *mountdir,                           char *default_sl, char *path)  { -        int64_t used_space       = 0; -        int64_t avail            = 0; -        char    *used_str         = NULL; -        char    *avail_str        = NULL; -        int     ret               = -1; -        char    *sl_final         = NULL; -        char    *hl_str           = NULL; -        double  sl_num           = 0; -        gf_boolean_t sl          = _gf_false; -        gf_boolean_t hl          = _gf_false; -        char percent_str[20]     = {0}; +        int64_t         avail            = 0; +        char           *used_str         = NULL; +        char           *avail_str        = NULL; +        int             ret              = -1; +        char           *sl_final         = NULL; +        char           *hl_str           = NULL; +        double          sl_num           = 0; +        gf_boolean_t    sl               = _gf_false; +        gf_boolean_t    hl               = _gf_false; +        char            percent_str[20]  = {0}; +        ssize_t         xattr_size       = 0;          struct quota_limit {                  int64_t hl;                  int64_t sl;          } __attribute__ ((__packed__)) existing_limits; +        struct quota_meta { +                int64_t size; +                int64_t file_count; +                int64_t dir_count; +        } __attribute__ ((__packed__)) used_space; +          ret = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.limit-set",                               (void *)&existing_limits,                               sizeof (existing_limits)); @@ -2490,10 +2496,26 @@ print_quota_list_output (cli_local_t *local, char *mountdir,                  sl_final = percent_str;          } -        ret = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.size", -                             &used_space, sizeof (used_space)); +        used_space.size = used_space.file_count = used_space.dir_count = 0; +        xattr_size = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.size", +                                    NULL, 0); +        if (xattr_size > sizeof (int64_t)) { +                ret = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.size", +                                     &used_space, sizeof (used_space)); +        } else if (xattr_size > 0) { +                /* This is for compatibility. +                 * Older version had only file usage +                 */ +                ret = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.size", +                             &(used_space.size), sizeof (used_space.size)); +        } else { +                ret = -1; +        }          if (ret < 0) { +                gf_log ("cli", GF_LOG_ERROR, "Failed to get quota size " +                        "on path %s: %s", mountdir, strerror (errno)); +                  if (global_state->mode & GLUSTER_MODE_XML) {                          ret = cli_quota_xml_output (local, path, hl_str,                                                      sl_final, "N/A", @@ -2510,14 +2532,16 @@ print_quota_list_output (cli_local_t *local, char *mountdir,                                   "N/A", "N/A", "N/A", "N/A");                  }          } else { -                used_space = ntoh64 (used_space); +                used_space.size = ntoh64 (used_space.size); +                used_space.file_count = ntoh64 (used_space.file_count); +                used_space.dir_count = ntoh64 (used_space.dir_count); -                used_str = gf_uint64_2human_readable (used_space); +                used_str = gf_uint64_2human_readable (used_space.size); -                if (existing_limits.hl > used_space) { -                        avail = existing_limits.hl - used_space; +                if (existing_limits.hl > used_space.size) { +                        avail = existing_limits.hl - used_space.size;                          hl = _gf_false; -                        if (used_space > sl_num) +                        if (used_space.size > sl_num)                                  sl = _gf_true;                          else                                  sl = _gf_false; @@ -2544,8 +2568,9 @@ print_quota_list_output (cli_local_t *local, char *mountdir,                  if (used_str == NULL) {                          cli_out ("%-40s %7s %9s %11"PRIu64                                   "%9"PRIu64" %15s %18s", path, hl_str, -                                  sl_final, used_space, avail, sl? "Yes" : "No", -                                  hl? "Yes" : "No"); +                                  sl_final, used_space.size, avail, +                                  sl ? "Yes" : "No", +                                  hl ? "Yes" : "No");                  } else {                          cli_out ("%-40s %7s %9s %11s %7s %15s %20s", path, hl_str,                                   sl_final, used_str, avail_str, sl? "Yes" : "No", diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index 8d7659b5015..a810f3a81f0 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -128,11 +128,12 @@  #define GLUSTERFS_POSIXLK_COUNT "glusterfs.posixlk-count"  #define GLUSTERFS_PARENT_ENTRYLK "glusterfs.parent-entrylk"  #define GLUSTERFS_INODELK_DOM_COUNT "glusterfs.inodelk-dom-count" -#define QUOTA_SIZE_KEY "trusted.glusterfs.quota.size"  #define GFID_TO_PATH_KEY "glusterfs.gfid2path"  #define GF_XATTR_STIME_PATTERN "trusted.glusterfs.*.stime"  #define GF_XATTR_TRIGGER_SYNC "glusterfs.geo-rep.trigger-sync" +#define QUOTA_SIZE_KEY "trusted.glusterfs.quota.size" +  /* Index xlator related */  #define GF_XATTROP_INDEX_GFID "glusterfs.xattrop_index_gfid"  #define GF_XATTROP_INDEX_COUNT "glusterfs.xattrop_index_count" diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index e3321cf6ddb..e241e2c1ee0 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -2537,3 +2537,39 @@ syncop_inodelk (xlator_t *subvol, const char *volume, loc_t *loc, int32_t cmd,          return args.op_ret;  } + +int32_t +syncop_xattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                    int32_t op_ret, int32_t op_errno, dict_t *dict, +                    dict_t *xdata) +{ +        struct syncargs *args = NULL; + +        args = cookie; + +        args->op_ret   = op_ret; +        args->op_errno = op_errno; + +        if (xdata) +                args->xdata = dict_ref (xdata); + +        __wake (args); + +        return 0; + +} + +int +syncop_xattrop (xlator_t *subvol, loc_t *loc, gf_xattrop_flags_t flags, +                dict_t *dict, dict_t *xdata) +{ +        struct syncargs args = {0, }; + +        SYNCOP (subvol, (&args), syncop_xattrop_cbk, subvol->fops->xattrop, +                loc, flags, dict, xdata); + +        if (args.op_ret < 0) +                return -args.op_errno; + +        return args.op_ret; +} diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h index 7f8ec7345b0..a9244a51552 100644 --- a/libglusterfs/src/syncop.h +++ b/libglusterfs/src/syncop.h @@ -439,4 +439,8 @@ syncop_inodelk (xlator_t *subvol, const char *volume, loc_t *loc, int32_t cmd,  int  syncop_ipc (xlator_t *subvol, int op, dict_t *xdata_in, dict_t **xdata_out); +int +syncop_xattrop (xlator_t *subvol, loc_t *loc, gf_xattrop_flags_t flags, +                dict_t *dict, dict_t *xdata); +  #endif /* _SYNCOP_H */ diff --git a/libglusterfs/src/xlator.c b/libglusterfs/src/xlator.c index 49af7d2e0e6..cc4726e0ea5 100644 --- a/libglusterfs/src/xlator.c +++ b/libglusterfs/src/xlator.c @@ -860,9 +860,51 @@ loc_is_root (loc_t *loc)          } else if (loc && loc->inode && __is_root_gfid (loc->inode->gfid)) {                  return _gf_true;          } +          return _gf_false;  } +int32_t +loc_build_child (loc_t *child, loc_t *parent, char *name) +{ +        int32_t  ret = -1; + +        GF_VALIDATE_OR_GOTO ("xlator", child, out); +        GF_VALIDATE_OR_GOTO ("xlator", parent, out); +        GF_VALIDATE_OR_GOTO ("xlator", name, out); + +        loc_gfid (parent, child->pargfid); + +        if (strcmp (parent->path, "/") == 0) +                ret = gf_asprintf ((char **)&child->path, "/%s", name); +        else +                ret = gf_asprintf ((char **)&child->path, "%s/%s", parent->path, +                                   name); + +        if (ret < 0 || !child->path) { +                ret = -1; +                goto out; +        } + +        child->name = strrchr (child->path, '/') + 1; + +        child->parent = inode_ref (parent->inode); +        child->inode = inode_new (parent->inode->table); + +        if (!child->inode) { +                ret = -1; +                goto out; +        } + +        ret = 0; + +out: +        if ((ret < 0) && child) +                loc_wipe (child); + +        return ret; +} +  int  xlator_destroy (xlator_t *xl)  { diff --git a/libglusterfs/src/xlator.h b/libglusterfs/src/xlator.h index e953ec04372..733f6cf47ab 100644 --- a/libglusterfs/src/xlator.h +++ b/libglusterfs/src/xlator.h @@ -960,6 +960,7 @@ int loc_path (loc_t *loc, const char *bname);  void loc_gfid (loc_t *loc, uuid_t gfid);  char* loc_gfid_utoa (loc_t *loc);  gf_boolean_t loc_is_root (loc_t *loc); +int32_t loc_build_child (loc_t *child, loc_t *parent, char *name);  int xlator_mem_acct_init (xlator_t *xl, int num_types);  int is_gf_log_command (xlator_t *trans, const char *name, char *value);  int glusterd_check_log_level (const char *value); diff --git a/xlators/features/marker/src/marker-mem-types.h b/xlators/features/marker/src/marker-mem-types.h index 1f74d504897..dc5ad16ed76 100644 --- a/xlators/features/marker/src/marker-mem-types.h +++ b/xlators/features/marker/src/marker-mem-types.h @@ -20,6 +20,8 @@ enum gf_marker_mem_types_ {          gf_marker_mt_quota_inode_ctx_t,          gf_marker_mt_marker_inode_ctx_t,          gf_marker_mt_inode_contribution_t, +        gf_marker_mt_quota_meta_t, +        gf_marker_mt_quota_synctask_t,          gf_marker_mt_end  };  #endif diff --git a/xlators/features/marker/src/marker-quota-helper.c b/xlators/features/marker/src/marker-quota-helper.c index ec0d83316c7..cdc2475c3e8 100644 --- a/xlators/features/marker/src/marker-quota-helper.c +++ b/xlators/features/marker/src/marker-quota-helper.c @@ -37,23 +37,27 @@ mq_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path)          if (parent)                  loc->parent = inode_ref (parent); +        if (!uuid_is_null (inode->gfid)) +                uuid_copy (loc->gfid, inode->gfid); +          loc->path = gf_strdup (path);          if (!loc->path) {                  gf_log ("loc fill", GF_LOG_ERROR, "strdup failed"); -                goto loc_wipe; +                goto out;          }          loc->name = strrchr (loc->path, '/');          if (loc->name)                  loc->name++;          else -                goto loc_wipe; +                goto out;          ret = 0; -loc_wipe: + +out:          if (ret < 0)                  loc_wipe (loc); -out: +          return ret;  } @@ -222,37 +226,39 @@ mq_add_new_contribution_node (xlator_t *this, quota_inode_ctx_t *ctx,  int32_t -mq_dict_set_contribution (xlator_t *this, dict_t *dict, -                          loc_t *loc) +mq_dict_set_contribution (xlator_t *this, dict_t *dict, loc_t *loc, +                          uuid_t gfid, char *contri_key)  { -        int32_t ret              = -1; -        char    contri_key [512] = {0, }; +        int32_t ret                  = -1; +        char    key[CONTRI_KEY_MAX]  = {0, };          GF_VALIDATE_OR_GOTO ("marker", this, out);          GF_VALIDATE_OR_GOTO ("marker", dict, out);          GF_VALIDATE_OR_GOTO ("marker", loc, out); -        if (loc->parent) { -                GET_CONTRI_KEY (contri_key, loc->parent->gfid, ret); -                if (ret < 0) { -                        ret = -1; -                        goto out; -                } +        if (gfid && !uuid_is_null(gfid)) { +                GET_CONTRI_KEY (key, gfid, ret); +        } else if (loc->parent) { +                GET_CONTRI_KEY (key, loc->parent->gfid, ret);          } else {                  /* nameless lookup, fetch contributions to all parents */ -                GET_CONTRI_KEY (contri_key, NULL, ret); +                GET_CONTRI_KEY (key, NULL, ret);          } -        ret = dict_set_int64 (dict, contri_key, 0); -        if (ret < 0) { -                gf_log (this->name, GF_LOG_WARNING, -                        "unable to set dict value on %s.", -                        loc->path); +        if (ret < 0)                  goto out; -        } -        ret = 0; +        ret = dict_set_int64 (dict, key, 0); +        if (ret < 0) +                goto out; + +        if (contri_key) +                strncpy (contri_key, key, CONTRI_KEY_MAX); +  out: +        if (ret < 0) +                gf_log_callingfn (this->name, GF_LOG_ERROR, "dict set failed"); +          return ret;  } diff --git a/xlators/features/marker/src/marker-quota-helper.h b/xlators/features/marker/src/marker-quota-helper.h index b200413b0ad..161413debfa 100644 --- a/xlators/features/marker/src/marker-quota-helper.h +++ b/xlators/features/marker/src/marker-quota-helper.h @@ -44,7 +44,7 @@ inode_contribution_t *  mq_add_new_contribution_node (xlator_t *, quota_inode_ctx_t *, loc_t *);  int32_t -mq_dict_set_contribution (xlator_t *, dict_t *, loc_t *); +mq_dict_set_contribution (xlator_t *, dict_t *, loc_t *, uuid_t, char *);  quota_inode_ctx_t *  mq_inode_ctx_new (inode_t *, xlator_t *); diff --git a/xlators/features/marker/src/marker-quota.c b/xlators/features/marker/src/marker-quota.c index b8d7a774377..835108cc403 100644 --- a/xlators/features/marker/src/marker-quota.c +++ b/xlators/features/marker/src/marker-quota.c @@ -20,6 +20,7 @@  #include "byte-order.h"  #include "marker-quota.h"  #include "marker-quota-helper.h" +#include "syncop.h"  int  mq_loc_copy (loc_t *dst, loc_t *src) @@ -478,11 +479,11 @@ mq_get_child_contribution (call_frame_t *frame,                             dict_t *dict,                             struct iatt *postparent)  { -        int32_t        ret                = -1; -        int32_t        val                = 0; -        char           contri_key [512]   = {0, }; -        int64_t       *contri             = NULL; -        quota_local_t *local              = NULL; +        int32_t        ret                           = -1; +        int32_t        val                           = 0; +        char           contri_key[CONTRI_KEY_MAX]    = {0, }; +        int64_t       *contri                        = NULL; +        quota_local_t *local                         = NULL;          local = frame->local; @@ -543,16 +544,16 @@ mq_readdir_cbk (call_frame_t *frame,                  int32_t op_errno,                  gf_dirent_t *entries, dict_t *xdata)  { -        char           contri_key [512]   = {0, }; -        int32_t        ret                = 0; -        int32_t        val                = 0; -        off_t          offset             = 0; -        int32_t        count              = 0; -        dict_t        *dict               = NULL; -        quota_local_t *local              = NULL; -        gf_dirent_t   *entry              = NULL; -        call_frame_t  *newframe           = NULL; -        loc_t          loc                = {0, }; +        char           contri_key[CONTRI_KEY_MAX]    = {0, }; +        int32_t        ret                           = 0; +        int32_t        val                           = 0; +        off_t          offset                        = 0; +        int32_t        count                         = 0; +        dict_t        *dict                          = NULL; +        quota_local_t *local                         = NULL; +        gf_dirent_t   *entry                         = NULL; +        call_frame_t  *newframe                      = NULL; +        loc_t          loc                           = {0, };          local = mq_local_ref (frame->local); @@ -824,7 +825,10 @@ mq_get_dirty_xattr (call_frame_t *frame, void *cookie, xlator_t *this,          if (uuid_is_null (local->loc.gfid))                  uuid_copy (local->loc.gfid, local->loc.inode->gfid); -        GF_UUID_ASSERT (local->loc.gfid); +        if (uuid_is_null (local->loc.gfid)) { +                ret = -1; +                goto err; +        }          STACK_WIND (frame,                      mq_check_if_still_dirty, @@ -850,14 +854,12 @@ err:   * 0 other wise   */  int32_t -mq_update_dirty_inode (xlator_t *this, -                       loc_t *loc, -                       quota_inode_ctx_t *ctx, +mq_update_dirty_inode (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,                         inode_contribution_t *contribution)  {          int32_t          ret        = -1;          quota_local_t   *local      = NULL; -        gf_boolean_t    status     = _gf_false; +        gf_boolean_t     status     = _gf_false;          struct gf_flock  lock       = {0, };          call_frame_t    *frame      = NULL; @@ -1017,14 +1019,14 @@ err:  int32_t  mq_create_xattr (xlator_t *this, call_frame_t *frame)  { -        int32_t               ret       = 0; -        int64_t              *value     = NULL; -        int64_t              *size      = NULL; -        dict_t               *dict      = NULL; -        char                  key[512]  = {0, }; -        quota_local_t        *local     = NULL; -        quota_inode_ctx_t    *ctx       = NULL; -        inode_contribution_t *contri    = NULL; +        int32_t               ret                  = 0; +        int64_t              *value                = NULL; +        int64_t              *size                 = NULL; +        dict_t               *dict                 = NULL; +        char                  key[CONTRI_KEY_MAX]  = {0, }; +        quota_local_t        *local                = NULL; +        quota_inode_ctx_t    *ctx                  = NULL; +        inode_contribution_t *contri               = NULL;          if (frame == NULL || this == NULL)                  return 0; @@ -1070,7 +1072,10 @@ mq_create_xattr (xlator_t *this, call_frame_t *frame)                          goto free_value;          } -        GF_UUID_ASSERT (local->loc.gfid); +        if (uuid_is_null (local->loc.gfid)) { +                ret = -1; +                goto out; +        }          STACK_WIND (frame, mq_create_dirty_xattr, FIRST_CHILD(this),                      FIRST_CHILD(this)->fops->xattrop, &local->loc, @@ -1105,11 +1110,12 @@ mq_check_n_set_inode_xattr (call_frame_t *frame, void *cookie,                              inode_t *inode, struct iatt *buf, dict_t *dict,                              struct iatt *postparent)  { -        quota_local_t        *local           = NULL; -        int64_t              *size            = NULL, *contri = NULL; -        int8_t                dirty           = 0; -        int32_t               ret             = 0; -        char                  contri_key[512] = {0, }; +        quota_local_t        *local                      = NULL; +        int64_t              *size                       = NULL; +        int64_t              *contri                     = NULL; +        int8_t                dirty                      = 0; +        int32_t               ret                        = 0; +        char                  contri_key[CONTRI_KEY_MAX] = {0, };          if (op_ret < 0) {                  goto out; @@ -1174,7 +1180,7 @@ mq_get_xattr (call_frame_t *frame, void *cookie, xlator_t *this,                  goto err;          } -        ret = mq_req_xattr (this, &local->loc, xattr_req); +        ret = mq_req_xattr (this, &local->loc, xattr_req, NULL);          if (ret < 0) {                  gf_log (this->name, GF_LOG_WARNING, "cannot request xattr");                  goto err; @@ -1183,7 +1189,10 @@ mq_get_xattr (call_frame_t *frame, void *cookie, xlator_t *this,          if (uuid_is_null (local->loc.gfid))                  uuid_copy (local->loc.gfid, local->loc.inode->gfid); -        GF_UUID_ASSERT (local->loc.gfid); +        if (uuid_is_null (local->loc.gfid)) { +                ret = -1; +                goto err; +        }          STACK_WIND (frame, mq_check_n_set_inode_xattr, FIRST_CHILD(this),                      FIRST_CHILD(this)->fops->lookup, &local->loc, xattr_req); @@ -1634,15 +1643,17 @@ mq_update_inode_contribution (call_frame_t *frame, void *cookie,                                struct iatt *buf, dict_t *dict,                                struct iatt *postparent)  { -        int32_t               ret              = -1; -        int64_t              *size             = NULL, size_int = 0, contri_int = 0; -        int64_t              *contri           = NULL; -        int64_t              *delta            = NULL; -        char                  contri_key [512] = {0, }; -        dict_t               *newdict          = NULL; -        quota_local_t        *local            = NULL; -        quota_inode_ctx_t    *ctx              = NULL; -        inode_contribution_t *contribution     = NULL; +        int32_t               ret                         = -1; +        int64_t              *size                        = NULL; +        int64_t               size_int                    = 0; +        int64_t               contri_int                  = 0; +        int64_t              *contri                      = NULL; +        int64_t              *delta                       = NULL; +        char                  contri_key[CONTRI_KEY_MAX]  = {0, }; +        dict_t               *newdict                     = NULL; +        quota_local_t        *local                       = NULL; +        quota_inode_ctx_t    *ctx                         = NULL; +        inode_contribution_t *contribution                = NULL;          local = frame->local; @@ -1760,11 +1771,11 @@ mq_fetch_child_size_and_contri (call_frame_t *frame, void *cookie,                                  xlator_t *this, int32_t op_ret,                                  int32_t op_errno, dict_t *xdata)  { -        int32_t            ret              = -1; -        char               contri_key [512] = {0, }; -        dict_t            *newdict          = NULL; -        quota_local_t     *local            = NULL; -        quota_inode_ctx_t *ctx              = NULL; +        int32_t            ret                         = -1; +        char               contri_key[CONTRI_KEY_MAX]  = {0, }; +        dict_t            *newdict                     = NULL; +        quota_local_t     *local                       = NULL; +        quota_inode_ctx_t *ctx                         = NULL;          local = frame->local; @@ -2024,19 +2035,1156 @@ err:          return -1;  } +int32_t +mq_dict_set_meta (dict_t *dict, char *key, const quota_meta_t *meta, +                  ia_type_t ia_type) +{ +        int32_t         ret      = -1; +        quota_meta_t   *value    = NULL; + +        QUOTA_ALLOC_OR_GOTO (value, quota_meta_t, ret, out); + +        value->size = hton64 (meta->size); +        value->file_count = hton64 (meta->file_count); +        value->dir_count = hton64 (meta->dir_count); + +        if (ia_type == IA_IFDIR) { +                ret = dict_set_bin (dict, key, value, sizeof (*value)); +        } else { +                /* For a file we don't need to store dir_count in the +                 * quota size xattr, so we set the len of the data in the dict +                 * as 128bits, so when the posix xattrop reads the dict, it only +                 * performs operations on size and file_count +                 */ +                ret = dict_set_bin (dict, key, value, +                                    sizeof (*value) - sizeof (int64_t)); +        } + +        if (ret < 0) { +                gf_log_callingfn ("marker", GF_LOG_ERROR, "dict set failed"); +                GF_FREE (value); +        } + +out: +        return ret; +} + +int32_t +mq_dict_get_meta (dict_t *dict, char *key, quota_meta_t *meta) +{ +        int32_t        ret      = -1; +        data_t        *data     = NULL; +        quota_meta_t  *value    = NULL; + +        if (!dict || !key || !meta) +                goto out; + +        data = dict_get (dict, key); +        if (!data || !data->data) +                goto out; + +        if (data->len > sizeof (int64_t)) { +                value = (quota_meta_t *) data->data; +                meta->size = ntoh64 (value->size); +                meta->file_count = ntoh64 (value->file_count); +                if (data->len > (sizeof (int64_t)) * 2) +                        meta->dir_count  = ntoh64 (value->dir_count); +                else +                        meta->dir_count = 0; +        } else { +                /* This can happen during software upgrade. +                 * Older version of glusterfs will not have inode count. +                 * Return failure, this will be healed as part of lookup +                 */ +                gf_log_callingfn ("marker", GF_LOG_DEBUG, "Object quota xattrs " +                                  "missing: len = %d", data->len); +                ret = -1; +                goto out; +        } + +        ret = 0; +out: + +        return ret; +} + +void +mq_compute_delta (quota_meta_t *delta, const quota_meta_t *op1, +                  const quota_meta_t *op2) +{ +        delta->size       = op1->size - op2->size; +        delta->file_count = op1->file_count - op2->file_count; +        delta->dir_count  = op1->dir_count - op2->dir_count; +} + +void +mq_add_meta (quota_meta_t *dst, const quota_meta_t *src) +{ +        dst->size       += src->size; +        dst->file_count += src->file_count; +        dst->dir_count  += src->dir_count; +} + +void +mq_sub_meta (quota_meta_t *dst, const quota_meta_t *src) +{ +        if (src == NULL) { +                dst->size       = -dst->size; +                dst->file_count = -dst->file_count; +                dst->dir_count  = -dst->dir_count; +        } else { +                dst->size       = src->size - dst->size; +                dst->file_count = src->file_count - dst->file_count; +                dst->dir_count  = src->dir_count - dst->dir_count; +        } +} + +gf_boolean_t +quota_meta_is_null (const quota_meta_t *meta) +{ +        if (meta->size == 0 && +            meta->file_count == 0 && +            meta->dir_count == 0) +                return _gf_true; + +        return _gf_false; +} + +int32_t +mq_are_xattrs_set (xlator_t *this, loc_t *loc, gf_boolean_t *result, +                   gf_boolean_t *objects) +{ +        int32_t        ret                         = -1; +        char           contri_key[CONTRI_KEY_MAX]  = {0, }; +        quota_meta_t   meta                        = {0, }; +        struct iatt    stbuf                       = {0,}; +        dict_t        *dict                        = NULL; +        dict_t        *rsp_dict                    = NULL; + +        dict = dict_new (); +        if (dict == NULL) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +                goto out; +        } + +        ret = mq_req_xattr (this, loc, dict, contri_key); +        if (ret < 0) +                goto out; + +        ret = syncop_lookup (FIRST_CHILD(this), loc, dict, &stbuf, &rsp_dict, +                             NULL); +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "lookup failed " +                        "for %s: %s", loc->path, strerror (-ret)); +                goto out; +        } + +        if (rsp_dict == NULL) { +                *result = _gf_false; +                goto out; +        } + +        *result = _gf_true; +        if (loc->inode->ia_type == IA_IFDIR) { +                ret = mq_dict_get_meta (rsp_dict, QUOTA_SIZE_KEY, &meta); +                if (ret < 0 || meta.dir_count == 0) { +                        ret = 0; +                        *result = _gf_false; +                        goto out; +                } +                *objects = _gf_true; +        } + +        if (!loc_is_root(loc) && !dict_get (rsp_dict, contri_key)) { +                *result = _gf_false; +                goto out; +        } + +out: +        if (dict) +                dict_unref (dict); + +        if (rsp_dict) +                dict_unref (rsp_dict); + +        return ret; +} + +int32_t +mq_create_xattrs (xlator_t *this, loc_t *loc, gf_boolean_t objects) +{ +        quota_meta_t           size                 = {0, }; +        quota_meta_t           contri               = {0, }; +        int32_t                ret                  = -1; +        char                   key[CONTRI_KEY_MAX]  = {0, }; +        dict_t                *dict                 = NULL; +        quota_inode_ctx_t     *ctx                  = NULL; +        inode_contribution_t  *contribution         = NULL; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        ret = mq_inode_ctx_get (loc->inode, this, &ctx); +        if (ret < 0) { +                ctx = mq_inode_ctx_new (loc->inode, this); +                if (ctx == NULL) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "mq_inode_ctx_new failed"); +                        ret = -1; +                        goto out; +                } +        } + +        dict = dict_new (); +        if (!dict) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +                ret = -1; +                goto out; +        } + +        if (loc->inode->ia_type == IA_IFDIR) { +                if (objects == _gf_false) { +                        /* Initial object count of a directory is 1 */ +                        size.dir_count = 1; +                } +                ret = mq_dict_set_meta (dict, QUOTA_SIZE_KEY, &size, IA_IFDIR); +                if (ret < 0) +                        goto out; +        } + +        if (!loc_is_root (loc)) { +                contribution = mq_add_new_contribution_node (this, ctx, loc); +                if (contribution == NULL) { +                        ret = -1; +                        goto out; +                } + +                GET_CONTRI_KEY (key, contribution->gfid, ret); +                ret = mq_dict_set_meta (dict, key, &contri, +                                        loc->inode->ia_type); +                if (ret < 0) +                        goto out; +        } + +        ret = syncop_xattrop(FIRST_CHILD(this), loc, GF_XATTROP_ADD_ARRAY64, +                             dict, NULL); + +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "xattrop failed " +                        "for %s: %s", loc->path, strerror (-ret)); +                goto out; +        } + +out: +        if (dict) +                dict_unref (dict); + +        return ret; +} + +int32_t +mq_lock (xlator_t *this, loc_t *loc, short l_type) +{ +        struct gf_flock  lock  = {0, }; +        int32_t          ret   = -1; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        gf_log (this->name, GF_LOG_DEBUG, "set lock type %d on %s", +                l_type, loc->path); + +        lock.l_len    = 0; +        lock.l_start  = 0; +        lock.l_type   = l_type; +        lock.l_whence = SEEK_SET; + +        ret = syncop_inodelk (FIRST_CHILD(this), this->name, loc, F_SETLKW, +                              &lock, NULL, NULL); +        if (ret < 0) +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "inodelk failed " +                        "for %s: %s", loc->path, strerror (-ret)); + +out: + +        return ret; +} + +int32_t +mq_get_dirty (xlator_t *this, loc_t *loc, int32_t *dirty) +{ +        int32_t        ret              = -1; +        int8_t         value            = 0; +        dict_t        *dict             = NULL; +        dict_t        *rsp_dict         = NULL; +        struct iatt    stbuf            = {0,}; + +        dict = dict_new (); +        if (dict == NULL) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +                goto out; +        } + +        ret = dict_set_int64 (dict, QUOTA_DIRTY_KEY, 0); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_WARNING, "dict set failed"); +                goto out; +        } + +        ret = syncop_lookup (FIRST_CHILD(this), loc, dict, &stbuf, &rsp_dict, +                             NULL); +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "lookup failed " +                        "for %s: %s", loc->path, strerror (-ret)); +                goto out; +        } + +        ret = dict_get_int8 (rsp_dict, QUOTA_DIRTY_KEY, &value); +        if (ret < 0) +                goto out; + +        *dirty = value; + +out: +        if (dict) +                dict_unref (dict); + +        if (rsp_dict) +                dict_unref (rsp_dict); + +        return ret; +} + +int32_t +mq_mark_dirty (xlator_t *this, loc_t *loc, int32_t dirty) +{ +        int32_t            ret      = -1; +        dict_t            *dict     = NULL; +        quota_inode_ctx_t *ctx      = NULL; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        dict = dict_new (); +        if (!dict) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +                goto out; +        } + +        ret = dict_set_int8 (dict, QUOTA_DIRTY_KEY, dirty); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "dict_set failed"); +                goto out; +        } + +        ret = syncop_setxattr (FIRST_CHILD(this), loc, dict, 0); +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "setxattr dirty = %d " +                        "failed for %s: %s", dirty, loc->path, strerror (-ret)); +                goto out; +        } + +        ret = mq_inode_ctx_get (loc->inode, this, &ctx); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "failed to get inode ctx for " +                        "%s", loc->path); +                goto out; +        } + +        LOCK (&ctx->lock); +        { +                ctx->dirty = dirty; +        } +        UNLOCK (&ctx->lock); + +out: +        if (dict) +                dict_unref (dict); + +        return ret; +} + +int32_t +_mq_get_metadata (xlator_t *this, loc_t *loc, quota_meta_t *contri, +                  quota_meta_t *size, uuid_t contri_gfid) +{ +        int32_t            ret                         = -1; +        quota_meta_t       meta                        = {0, }; +        char               contri_key[CONTRI_KEY_MAX]  = {0, }; +        dict_t            *dict                        = NULL; +        dict_t            *rsp_dict                    = NULL; +        struct iatt        stbuf                       = {0,}; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        if (size == NULL && contri == NULL) +                goto out; + +        dict = dict_new (); +        if (dict == NULL) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +                goto out; +        } + +        if (size && loc->inode->ia_type == IA_IFDIR) { +                ret = dict_set_int64 (dict, QUOTA_SIZE_KEY, 0); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, "dict_set failed."); +                        goto out; +                } +        } + +        if (contri && !loc_is_root(loc)) { +                ret = mq_dict_set_contribution (this, dict, loc, contri_gfid, +                                                contri_key); +                if (ret < 0) +                        goto out; +        } + +        ret = syncop_lookup (FIRST_CHILD(this), loc, dict, &stbuf, &rsp_dict, +                             NULL); +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "lookup failed " +                        "for %s: %s", loc->path, strerror (-ret)); +                goto out; +        } + +        if (size) { +                if (loc->inode->ia_type == IA_IFDIR) { +                        ret = mq_dict_get_meta (rsp_dict, QUOTA_SIZE_KEY, +                                                &meta); +                        if (ret < 0) { +                                gf_log (this->name, GF_LOG_ERROR, +                                        "dict_get failed."); +                                goto out; +                        } + +                        size->size = meta.size; +                        size->file_count = meta.file_count; +                        size->dir_count = meta.dir_count; +                } else { +                        size->size = stbuf.ia_blocks * 512; +                        size->file_count = 1; +                        size->dir_count = 0; +                } +        } + +        if (contri && !loc_is_root(loc)) { +                ret = mq_dict_get_meta (rsp_dict, contri_key, &meta); +                if (ret < 0) { +                        contri->size = 0; +                        contri->file_count = 0; +                        contri->dir_count = 0; +                } else { +                        contri->size = meta.size; +                        contri->file_count = meta.file_count; +                        contri->dir_count = meta.dir_count; +                } +        } + +        ret = 0; + +out: +        if (dict) +                dict_unref (dict); + +        if (rsp_dict) +                dict_unref (rsp_dict); + +        return ret; +} + +int32_t +mq_get_metadata (xlator_t *this, loc_t *loc, quota_meta_t *contri, +                 quota_meta_t *size, quota_inode_ctx_t *ctx, +                 inode_contribution_t *contribution) +{ +        int32_t         ret      = -1; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); +        GF_VALIDATE_OR_GOTO ("marker", ctx, out); +        GF_VALIDATE_OR_GOTO ("marker", contribution, out); + +        if (size == NULL && contri == NULL) { +                ret = 0; +                goto out; +        } + +        ret = _mq_get_metadata (this, loc, contri, size, contribution->gfid); +        if (ret < 0) { +                gf_log_callingfn (this->name, GF_LOG_ERROR, "Failed to get " +                                  "metadata for %s", loc->path); +                goto out; +        } + +        if (size) { +                LOCK (&ctx->lock); +                { +                        ctx->size = size->size; +                        ctx->file_count = size->file_count; +                        ctx->dir_count = size->dir_count; +                } +                UNLOCK  (&ctx->lock); +        } + +        if (contri) { +                LOCK (&contribution->lock); +                { +                        contribution->contribution = contri->size; +                        contribution->file_count = contri->file_count; +                        contribution->dir_count = contri->dir_count; +                } +                UNLOCK (&contribution->lock); +        } + +out: +        return ret; +} + +int32_t +mq_get_size (xlator_t *this, loc_t *loc, quota_meta_t *size) +{ +        int32_t         ret      = -1; + +        ret = _mq_get_metadata (this, loc, NULL, size, 0); +        if (ret < 0) { +                gf_log_callingfn (this->name, GF_LOG_ERROR, "Failed to get " +                                  "metadata for %s", loc->path); +                goto out; +        } + +out: +        return ret; +} + +int32_t +mq_get_contri (xlator_t *this, loc_t *loc, quota_meta_t *contri, +               uuid_t contri_gfid) +{ +        int32_t         ret      = -1; + +        ret = _mq_get_metadata (this, loc, contri, NULL, contri_gfid); +        if (ret < 0) { +                gf_log_callingfn (this->name, GF_LOG_ERROR, "Failed to get " +                                  "metadata for %s", loc->path); +                goto out; +        } + +out: +        return ret; +} + +int32_t +mq_get_delta (xlator_t *this, loc_t *loc, quota_meta_t *delta, +              quota_inode_ctx_t *ctx, inode_contribution_t *contribution) +{ +        int32_t         ret      = -1; +        quota_meta_t    size     = {0, }; +        quota_meta_t    contri   = {0, }; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); +        GF_VALIDATE_OR_GOTO ("marker", ctx, out); +        GF_VALIDATE_OR_GOTO ("marker", contribution, out); + +        ret = mq_get_metadata (this, loc, &contri, &size, ctx, contribution); +        if (ret < 0) +                goto out; + +        mq_compute_delta (delta, &size, &contri); + +out: +        return ret; +} + +int32_t +mq_remove_contri (xlator_t *this, loc_t *loc, inode_contribution_t *contri) +{ +        int32_t              ret                         = -1; +        char                 contri_key[CONTRI_KEY_MAX]  = {0, }; + +        GET_CONTRI_KEY (contri_key, contri->gfid, ret); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "get contri_key " +                        "failed for %s", uuid_utoa(contri->gfid)); +                goto out; +        } + +        ret = syncop_removexattr (FIRST_CHILD(this), loc, contri_key, 0); +        if (ret < 0) { +                if (-ret == ENOENT || -ret == ESTALE) { +                        /* Remove contri in done when unlink operation is +                         * performed, so return success on ENOENT/ESTSLE +                         */ +                        ret = 0; +                } else { +                        gf_log (this->name, GF_LOG_ERROR, "removexattr %s " +                                "failed for %s: %s", contri_key, loc->path, +                                strerror (-ret)); +                        goto out; +                } +        } + +        LOCK (&contri->lock); +        { +                contri->contribution = 0; +                contri->file_count = 0; +                contri->dir_count = 0; +        } +        UNLOCK (&contri->lock); + +        ret = 0; +out: + +        return ret; +} + +int32_t +mq_update_contri (xlator_t *this, loc_t *loc, inode_contribution_t *contri, +                  quota_meta_t *delta) +{ +        int32_t              ret                         = -1; +        char                 contri_key[CONTRI_KEY_MAX]  = {0, }; +        dict_t              *dict                        = NULL; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); +        GF_VALIDATE_OR_GOTO ("marker", delta, out); +        GF_VALIDATE_OR_GOTO ("marker", contri, out); + +        if (quota_meta_is_null (delta)) { +                ret = 0; +                goto out; +        } + +        dict = dict_new (); +        if (!dict) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +                ret = -1; +                goto out; +        } + +        GET_CONTRI_KEY (contri_key, contri->gfid, ret); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "get contri_key " +                        "failed for %s", uuid_utoa(contri->gfid)); +                goto out; +        } + +        ret = mq_dict_set_meta (dict, contri_key, delta, loc->inode->ia_type); +        if (ret < 0) +                goto out; + +        ret = syncop_xattrop(FIRST_CHILD(this), loc, GF_XATTROP_ADD_ARRAY64, +                             dict, NULL); +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "xattrop failed " +                        "for %s: %s", loc->path, strerror (-ret)); +                goto out; +        } + +        LOCK (&contri->lock); +        { +                contri->contribution += delta->size; +                contri->file_count += delta->file_count; +                contri->dir_count += delta->dir_count; +        } +        UNLOCK (&contri->lock); + +out: +        if (dict) +                dict_unref (dict); + +        return ret; +} + +int32_t +mq_update_size (xlator_t *this, loc_t *loc, quota_meta_t *delta) +{ +        int32_t              ret              = -1; +        quota_inode_ctx_t   *ctx              = NULL; +        dict_t              *dict             = NULL; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); +        GF_VALIDATE_OR_GOTO ("marker", delta, out); + +        if (quota_meta_is_null (delta)) { +                ret = 0; +                goto out; +        } + +        ret = mq_inode_ctx_get (loc->inode, this, &ctx); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "failed to get inode ctx for " +                        "%s", loc->path); +                goto out; +        } + +        dict = dict_new (); +        if (!dict) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +                ret = -1; +                goto out; +        } + +        ret = mq_dict_set_meta (dict, QUOTA_SIZE_KEY, delta, +                                loc->inode->ia_type); +        if (ret < 0) +                goto out; + +        ret = syncop_xattrop(FIRST_CHILD(this), loc, GF_XATTROP_ADD_ARRAY64, +                             dict, NULL); +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "xattrop failed " +                        "for %s: %s", loc->path, strerror (-ret)); +                goto out; +        } + +        LOCK (&ctx->lock); +        { +                ctx->size += delta->size; +                ctx->file_count += delta->file_count; +                ctx->dir_count += delta->dir_count; +        } +        UNLOCK (&ctx->lock); + +out: +        if (dict) +                dict_unref (dict); + +        return ret; +}  int -mq_initiate_quota_txn (xlator_t *this, loc_t *loc) +mq_synctask_cleanup (int ret, call_frame_t *frame, void *opaque)  { -        int32_t               ret          = -1; -        gf_boolean_t          status       = _gf_false; -        quota_inode_ctx_t    *ctx          = NULL; -        inode_contribution_t *contribution = NULL; +        quota_synctask_t       *args         = NULL; + +        GF_ASSERT (opaque); + +        args = (quota_synctask_t *) opaque; +        loc_wipe (&args->loc); +        if (args->dict) +                dict_unref (args->dict); + +        if (!args->is_static) +                GF_FREE (args); + +        return 0; +} + +int +mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc, +             dict_t *dict, struct iatt *buf, int64_t contri) +{ +        int32_t              ret         = -1; +        quota_synctask_t    *args        = NULL; +        quota_synctask_t     static_args = {0, }; + +        if (spawn) { +                QUOTA_ALLOC_OR_GOTO (args, quota_synctask_t, ret, out); +                args->is_static = _gf_false; +        } else { +                args = &static_args; +                args->is_static = _gf_true; +        } + +        args->this = this; +        loc_copy (&args->loc, loc); +        args->contri = contri; +        if (dict) +                args->dict = dict_ref (dict); +        if (buf) +                args->buf = *buf; + + +        if (spawn) { +                ret = synctask_new (this->ctx->env, task, mq_synctask_cleanup, +                                    NULL, args); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, "Failed to spawn " +                                "new synctask"); +                        mq_synctask_cleanup (ret, NULL, args); +                } +        } else { +                ret = task (args); +                mq_synctask_cleanup (ret, NULL, args); +        } + +out: +        return ret; +} + +int +mq_start_quota_txn_v2 (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx, +                       inode_contribution_t *contri) +{ +        int32_t            ret        = -1; +        loc_t              child_loc  = {0,}; +        loc_t              parent_loc = {0,}; +        gf_boolean_t       locked     = _gf_false; +        gf_boolean_t       dirty      = _gf_false; +        gf_boolean_t       status     = _gf_true; +        quota_meta_t       delta      = {0, }; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); +        GF_VALIDATE_OR_GOTO ("marker", ctx, out); +        GF_VALIDATE_OR_GOTO ("marker", contri, out); + +        ret = mq_loc_copy (&child_loc, loc); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "loc copy failed"); +                goto out; +        } + +        if (uuid_is_null (child_loc.gfid)) +                uuid_copy (child_loc.gfid, child_loc.inode->gfid); + +        if (uuid_is_null (child_loc.gfid)) { +                ret = -1; +                gf_log (this->name, GF_LOG_DEBUG, "UUID is null for %s", +                        child_loc.path); +                goto out; +        } + +        while (!__is_root_gfid (child_loc.gfid)) { +                /* To improve performance, abort current transaction +                 * if one is already in progress for same inode +                 */ +                ret = mq_test_and_set_ctx_updation_status (ctx, &status); +                if (ret < 0 || status == _gf_true) +                        goto out; + +                ret = mq_inode_loc_fill (NULL, child_loc.parent, &parent_loc); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, "loc fill failed"); +                        goto out; +                } + +                ret = mq_lock (this, &parent_loc, F_WRLCK); +                if (ret < 0) +                        goto out; +                locked = _gf_true; + +                mq_set_ctx_updation_status (ctx, _gf_false); +                status = _gf_true; + +                ret = mq_get_delta (this, &child_loc, &delta, ctx, contri); +                if (ret < 0) +                        goto out; + +                if (quota_meta_is_null (&delta)) +                        goto out; + +                ret = mq_mark_dirty (this, &parent_loc, 1); +                if (ret < 0) +                        goto out; +                dirty = _gf_true; + +                ret = mq_update_contri (this, &child_loc, contri, &delta); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, "contri " +                                "update failed for %s", child_loc.path); +                        goto out; +                } + +                ret = mq_update_size (this, &parent_loc, &delta); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_WARNING, "rollback " +                                "contri updation"); +                        mq_sub_meta (&delta, NULL); +                        mq_update_contri (this, &child_loc, contri, &delta); +                        goto out; +                } + +                ret = mq_mark_dirty (this, &parent_loc, 0); +                dirty = _gf_false; + +                ret = mq_lock (this, &parent_loc, F_UNLCK); +                locked = _gf_false; + +                if (__is_root_gfid (parent_loc.gfid)) +                        break; + +                /* Repeate above steps upwards till the root */ +                loc_wipe (&child_loc); +                ret = mq_loc_copy (&child_loc, &parent_loc); +                if (ret < 0) +                        goto out; +                loc_wipe (&parent_loc); + +                ret = mq_inode_ctx_get (child_loc.inode, this, &ctx); +                if (ret < 0) +                        goto out; + +                if (list_empty (&ctx->contribution_head)) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "contribution node list is empty (%s)", +                                uuid_utoa(child_loc.inode->gfid)); +                        ret = -1; +                        goto out; +                } +                contri = mq_get_contribution_node (child_loc.parent, ctx); +                GF_ASSERT (contri != NULL); +        } + +out: +        if (ret >= 0 && dirty) +                ret = mq_mark_dirty (this, &parent_loc, 0); + +        if (locked) +                ret = mq_lock (this, &parent_loc, F_UNLCK); + +        if (status == _gf_false) +                mq_set_ctx_updation_status (ctx, _gf_false); + +        loc_wipe (&child_loc); +        loc_wipe (&parent_loc); + +        return ret; +} + +int +mq_create_xattrs_task (void *opaque) +{ +        int32_t                  ret        = -1; +        gf_boolean_t             locked     = _gf_false; +        gf_boolean_t             xattrs_set = _gf_false; +        gf_boolean_t             objects    = _gf_false; +        gf_boolean_t             need_txn   = _gf_false; +        quota_synctask_t        *args       = NULL; +        xlator_t                *this       = NULL; +        loc_t                   *loc        = NULL; + +        GF_ASSERT (opaque); + +        args = (quota_synctask_t *) opaque; +        loc = &args->loc; +        this = args->this; +        THIS = this; + +        if (uuid_is_null (loc->gfid)) +                uuid_copy (loc->gfid, loc->inode->gfid); + +        if (uuid_is_null (loc->gfid)) { +                ret = -1; +                gf_log (this->name, GF_LOG_DEBUG, "UUID is null for %s", +                        loc->path); +                goto out; +        } + +        ret = mq_lock (this, loc, F_WRLCK); +        if (ret < 0) +                goto out; +        locked = _gf_true; + +        ret = mq_are_xattrs_set (this, loc, &xattrs_set, &objects); +        if (ret < 0 || xattrs_set) +                goto out; + +        ret = mq_create_xattrs (this, loc, objects); +        if (ret < 0) +                goto out; + +        need_txn = _gf_true; +out: +        if (locked) +                ret = mq_lock (this, loc, F_UNLCK); + +        if (need_txn) +                ret = mq_initiate_quota_blocking_txn (this, loc); + +        return ret; +} + +int +mq_create_xattrs_txn (xlator_t *this, loc_t *loc) +{ +        int32_t                  ret        = -1; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        ret = mq_synctask (this, mq_create_xattrs_task, _gf_true, loc, NULL, +                           NULL, 0); +out: +        return ret; +} + +int +mq_create_xattrs_blocking_txn (xlator_t *this, loc_t *loc) +{ +        int32_t                  ret        = -1; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        ret = mq_synctask (this, mq_create_xattrs_task, _gf_false, loc, NULL, +                           NULL, 0); +out: +        return ret; +} + +int32_t +mq_reduce_parent_size_task (void *opaque) +{ +        int32_t                  ret           = -1; +        quota_inode_ctx_t       *ctx           = NULL; +        inode_contribution_t    *contribution  = NULL; +        quota_meta_t             delta         = {0, }; +        loc_t                    parent_loc    = {0,}; +        gf_boolean_t             locked        = _gf_false; +        gf_boolean_t             dirty         = _gf_false; +        quota_synctask_t        *args          = NULL; +        xlator_t                *this          = NULL; +        loc_t                   *loc           = NULL; +        int64_t                  contri        = 0; + +        GF_ASSERT (opaque); + +        args = (quota_synctask_t *) opaque; +        loc = &args->loc; +        contri = args->contri; +        this = args->this; +        THIS = this; + +        ret = mq_inode_ctx_get (loc->inode, this, &ctx); +        if (ret < 0) { +                gf_log_callingfn (this->name, GF_LOG_WARNING, "ctx for" +                                  " the node %s is NULL", loc->path); +                goto out; +        } + +        contribution = mq_get_contribution_node (loc->parent, ctx); +        if (contribution == NULL) { +                ret = -1; +                gf_log_callingfn (this->name, GF_LOG_WARNING, +                                  "contribution for the node %s is NULL", +                                  loc->path); +                goto out; +        } + +        if (contri >= 0) { +                /* contri paramater is supplied only for rename operation */ +                delta.size = contri; +                delta.file_count = 1; +                delta.dir_count = 0; +        } + +        ret = mq_inode_loc_fill (NULL, loc->parent, &parent_loc); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "loc fill failed"); +                goto out; +        } + +        ret = mq_lock (this, &parent_loc, F_WRLCK); +        if (ret < 0) +                goto out; +        locked = _gf_true; + +        if (contri < 0) { +                LOCK (&contribution->lock); +                { +                        delta.size = contribution->contribution; +                        delta.file_count = contribution->file_count; +                        delta.dir_count = contribution->dir_count; +                } +                UNLOCK (&contribution->lock); +        } + +        /* TODO: Handle handlinks with better approach +           Iterating dentry_list without a lock is not a good idea +        if (loc->inode->ia_type != IA_IFDIR) { +                list_for_each_entry (dentry, &inode->dentry_list, inode_list) { +                        if (loc->parent == dentry->parent) { +                                * If the file has another link within the same +                                * directory, we should not be reducing the size +                                * of parent +                                * +                                delta = 0; +                                idelta = 0; +                                break; +                        } +                } +        } +        */ + +        if (quota_meta_is_null (&delta)) +                goto out; + +        ret = mq_mark_dirty (this, &parent_loc, 1); +        if (ret < 0) +                goto out; +        dirty = _gf_true; + +        ret = mq_remove_contri (this, loc, contribution); +        if (ret < 0) +                goto out; + +        mq_sub_meta (&delta, NULL); +        ret = mq_update_size (this, &parent_loc, &delta); +        if (ret < 0) +                goto out; + +out: +        if (dirty && ret >= 0) +                ret = mq_mark_dirty (this, &parent_loc, 0); + +        if (locked) +                ret = mq_lock (this, &parent_loc, F_UNLCK); + +        if (ret >= 0) +                ret = mq_initiate_quota_blocking_txn (this, &parent_loc); + +        loc_wipe (&parent_loc); + +        return ret; +} + +int32_t +mq_reduce_parent_size_txn (xlator_t *this, loc_t *loc, int64_t contri) +{ +        int32_t                  ret           = -1;          GF_VALIDATE_OR_GOTO ("marker", this, out);          GF_VALIDATE_OR_GOTO ("marker", loc, out);          GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); +        ret = mq_synctask (this, mq_reduce_parent_size_task, _gf_true, loc, +                           NULL, NULL, contri); +out: +        return ret; +} + +int +mq_initiate_quota_task (void *opaque) +{ +        int32_t                 ret          = -1; +        quota_inode_ctx_t      *ctx          = NULL; +        inode_contribution_t   *contribution = NULL; +        quota_synctask_t       *args         = NULL; +        xlator_t               *this         = NULL; +        loc_t                  *loc          = NULL; + +        GF_ASSERT (opaque); + +        args = (quota_synctask_t *) opaque; +        loc = &args->loc; +        this = args->this; +        THIS = this; +          ret = mq_inode_ctx_get (loc->inode, this, &ctx);          if (ret == -1) {                  gf_log (this->name, GF_LOG_WARNING, @@ -2057,69 +3205,259 @@ mq_initiate_quota_txn (xlator_t *this, loc_t *loc)          */          contribution = mq_get_contribution_node (loc->parent, ctx);          if (!contribution) { -                if ((loc->path && strcmp (loc->path, "/")) -                    || (!uuid_is_null (loc->gfid) -                        && !__is_root_gfid (loc->gfid)) -                    || (loc->inode && !uuid_is_null (loc->inode->gfid) -                        && !__is_root_gfid (loc->inode->gfid))) +                if (!loc_is_root(loc))                          gf_log_callingfn (this->name, GF_LOG_TRACE,                                            "contribution node for the "                                            "path (%s) with parent (%s) "                                            "not found", loc->path, -                                          loc->parent? -                                          uuid_utoa (loc->parent->gfid): +                                          loc->parent ? +                                          uuid_utoa (loc->parent->gfid) :                                            NULL);                  contribution = mq_add_new_contribution_node (this, ctx, loc);                  if (!contribution) { -                        if(loc->path && strcmp (loc->path, "/")) +                        if (!loc_is_root(loc))                                  gf_log_callingfn (this->name, GF_LOG_WARNING,                                                    "could not allocate "                                                    " contribution node for (%s) "                                                    "parent: (%s)", loc->path, -                                                  loc->parent? -                                                  uuid_utoa (loc->parent->gfid): +                                                  loc->parent ? +                                                 uuid_utoa (loc->parent->gfid) :                                                    NULL);                          goto out;                  }          } -        /* To improve performance, do not start another transaction -         * if one is already in progress for same inode -         */ -        status = _gf_true; +        mq_start_quota_txn_v2 (this, loc, ctx, contribution); + +        ret = 0; +out: +        return ret; +} + +int +mq_initiate_quota_txn (xlator_t *this, loc_t *loc) +{ +        int32_t                 ret          = -1; + +        GF_VALIDATE_OR_GOTO ("marker", this, out); +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        ret = mq_synctask (this, mq_initiate_quota_task, _gf_true, loc, NULL, +                           NULL, 0); +out: +        return ret; +} + +int +mq_initiate_quota_blocking_txn (xlator_t *this, loc_t *loc) +{ +        int32_t                 ret          = -1; + +        GF_VALIDATE_OR_GOTO ("marker", this, out); +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        ret = mq_synctask (this, mq_initiate_quota_task, _gf_false, loc, NULL, +                           NULL, 0); +out: +        return ret; +} + +/* return 1 when dirty updation is performed + * return 0 other wise + */ +int32_t +mq_update_dirty_inode_v2 (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx, +                          inode_contribution_t *contribution) +{ +        int32_t          ret            = -1; +        fd_t            *fd             = NULL; +        off_t            offset         = 0; +        loc_t            child_loc      = {0, }; +        gf_dirent_t      entries; +        gf_dirent_t     *entry          = NULL; +        gf_boolean_t     status         = _gf_true; +        gf_boolean_t     locked         = _gf_false; +        gf_boolean_t     free_entries   = _gf_false; +        gf_boolean_t     updated        = _gf_false; +        int32_t          dirty          = 0; +        quota_meta_t     contri         = {0, }; +        quota_meta_t     size           = {0, }; +        quota_meta_t     contri_sum     = {0, }; +        quota_meta_t     delta          = {0, }; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        ret = mq_get_ctx_updation_status (ctx, &status); +        if (ret == -1 || status == _gf_true) { +                ret = 0; +                goto out; +        } + +        if (uuid_is_null (loc->gfid)) +                uuid_copy (loc->gfid, loc->inode->gfid); -        ret = mq_test_and_set_ctx_updation_status (ctx, &status); +        if (uuid_is_null (loc->gfid)) { +                ret = -1; +                gf_log (this->name, GF_LOG_DEBUG, "UUID is null for %s", +                        loc->path); +                goto out; +        } + +        ret = mq_lock (this, loc, F_WRLCK);          if (ret < 0)                  goto out; +        locked = _gf_true; -        if (status == _gf_false) { -                mq_start_quota_txn (this, loc, ctx, contribution); +        ret = mq_get_dirty (this, loc, &dirty); +        if (ret < 0 || dirty == 0) { +                ret = 0; +                goto out;          } -        ret = 0; +        fd = fd_create (loc->inode, 0); +        if (!fd) { +                gf_log (this->name, GF_LOG_ERROR, "Failed to create fd"); +                ret = -1; +                goto out; +        } + +        ret = syncop_opendir (this, loc, fd); +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "opendir failed " +                        "for %s: %s", loc->path, strerror (-ret)); +                goto out; +        } + +        INIT_LIST_HEAD (&entries.list); +        while ((ret = syncop_readdirp (this, fd, 131072, offset, NULL, +                                       &entries)) != 0) { +                if (ret < 0) { +                        gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                                ? GF_LOG_DEBUG:GF_LOG_ERROR, "readdirp failed " +                                "for %s: %s", loc->path, strerror (-ret)); +                        goto out; +                } + +                if (list_empty (&entries.list)) +                        break; + +                free_entries = _gf_true; +                list_for_each_entry (entry, &entries.list, list) { +                        offset = entry->d_off; + +                        if (!strcmp (entry->d_name, ".") || +                            !strcmp (entry->d_name, "..")) +                                continue; + +                        ret = loc_build_child (&child_loc, loc, entry->d_name); +                        if (ret < 0) { +                                gf_log (this->name, GF_LOG_WARNING, +                                        "Couldn't build loc for %s/%s " +                                        "returning from updation of dirty " +                                        "inode", loc->path, entry->d_name); +                                goto out; +                        } + +                        ret = mq_get_contri (this, &child_loc, &contri, +                                             loc->gfid); +                        if (ret < 0) +                                goto out; + +                        mq_add_meta (&contri_sum, &contri); +                        loc_wipe (&child_loc); +                } + +                gf_dirent_free (&entries); +                free_entries = _gf_false; +        } +        /* Inculde for self */ +        contri_sum.dir_count++; + +        ret = mq_get_size (this, loc, &size); +        if (ret < 0) +                goto out; + +        mq_compute_delta (&delta, &contri_sum, &size); + +        if (quota_meta_is_null (&delta)) +                goto out; + +        gf_log (this->name, GF_LOG_INFO, "calculated size = %"PRId64 +                ", original size = %"PRIu64 ", diff = %"PRIu64 +                ", path = %s ", contri_sum.size, size.size, delta.size, +                loc->path); + +        gf_log (this->name, GF_LOG_INFO, "calculated f_count = %"PRId64 +                ", original f_count = %"PRIu64 ", diff = %"PRIu64 +                ", path = %s ", contri_sum.file_count, size.file_count, +                delta.file_count, loc->path); + +        gf_log (this->name, GF_LOG_INFO, "calculated d_count = %"PRId64 +                ", original d_count = %"PRIu64 ", diff = %"PRIu64 +                ", path = %s ", contri_sum.dir_count, size.dir_count, +                delta.dir_count, loc->path); + + +        ret = mq_update_size (this, loc, &delta); +        if (ret < 0) +                goto out; + +        updated = _gf_true; +  out: -        return ret; -} +        if (free_entries) +                gf_dirent_free (&entries); +        if (fd) +                fd_unref (fd); + +        if (ret >= 0 && dirty) +                mq_mark_dirty (this, loc, 0); + +        if (locked) +                mq_lock (this, loc, F_UNLCK); +        if (status == _gf_false) +                mq_set_ctx_updation_status (ctx, _gf_false); +        loc_wipe(&child_loc); +        if (updated) +                return 1; +        else +                return 0; +}  int32_t -mq_inspect_directory_xattr (xlator_t *this, -                            loc_t *loc, -                            dict_t *dict, -                            struct iatt buf) +mq_inspect_directory_xattr_task (void *opaque)  { -        int32_t               ret                 = 0; -        int8_t                dirty               = -1; -        int64_t              *size                = NULL, size_int = 0; -        int64_t              *contri              = NULL, contri_int = 0; -        char                  contri_key [512]    = {0, }; -        gf_boolean_t          not_root            = _gf_false; -        quota_inode_ctx_t    *ctx                 = NULL; -        inode_contribution_t *contribution        = NULL; +        int32_t               ret                          = 0; +        int8_t                dirty                        = -1; +        quota_meta_t          size                         = {0, }; +        quota_meta_t          contri                       = {0, }; +        quota_meta_t          delta                        = {0, }; +        char                  contri_key[CONTRI_KEY_MAX]   = {0, }; +        quota_inode_ctx_t    *ctx                          = NULL; +        inode_contribution_t *contribution                 = NULL; +        quota_synctask_t     *args                         = NULL; +        xlator_t             *this                         = NULL; +        loc_t                *loc                          = NULL; +        dict_t               *dict                         = NULL; +        struct iatt           buf                          = {0,}; + +        GF_ASSERT (opaque); + +        args = (quota_synctask_t *) opaque; +        loc = &args->loc; +        dict = args->dict; +        buf = args->buf; +        this = args->this; +        THIS = this;          ret = mq_inode_ctx_get (loc->inode, this, &ctx);          if (ret < 0) { @@ -2132,7 +3470,7 @@ mq_inspect_directory_xattr (xlator_t *this,                  }          } -        if (!loc->path || (loc->path && strcmp (loc->path, "/") != 0)) { +        if (!loc_is_root(loc)) {                  contribution = mq_add_new_contribution_node (this, ctx, loc);                  if (contribution == NULL) {                          if (!uuid_is_null (loc->inode->gfid)) @@ -2144,76 +3482,96 @@ mq_inspect_directory_xattr (xlator_t *this,                  }          } -        ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size); +        ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty);          if (ret < 0)                  goto out; -        ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty); +        ret = mq_dict_get_meta (dict, QUOTA_SIZE_KEY, &size);          if (ret < 0)                  goto out; -        if ((loc->path && strcmp (loc->path, "/") != 0) -            || (!uuid_is_null (loc->gfid) && !__is_root_gfid (loc->gfid)) -            || (loc->inode && !uuid_is_null (loc->inode->gfid) && -                !__is_root_gfid (loc->inode->gfid))) { -                not_root = _gf_true; - +        if (!loc_is_root(loc)) {                  GET_CONTRI_KEY (contri_key, contribution->gfid, ret);                  if (ret < 0) -                        goto out; +                        goto err; -                ret = dict_get_bin (dict, contri_key, (void **) &contri); +                ret = mq_dict_get_meta (dict, contri_key, &contri);                  if (ret < 0)                          goto out;                  LOCK (&contribution->lock);                  { -                        contribution->contribution = ntoh64 (*contri); -                        contri_int = contribution->contribution; +                        contribution->contribution = contri.size; +                        contribution->file_count = contri.file_count; +                        contribution->dir_count = contri.dir_count;                  }                  UNLOCK (&contribution->lock);          }          LOCK (&ctx->lock);          { -                ctx->size = ntoh64 (*size); +                ctx->size = size.size; +                ctx->file_count = size.file_count; +                ctx->dir_count = size.dir_count;                  ctx->dirty = dirty; -                size_int = ctx->size;          }          UNLOCK (&ctx->lock); -        gf_log (this->name, GF_LOG_DEBUG, "size=%"PRId64 -                " contri=%"PRId64, size_int, contri_int); +        mq_compute_delta (&delta, &size, &contri); -        if (dirty) { -                ret = mq_update_dirty_inode (this, loc, ctx, contribution); -        } +        if (dirty) +                ret = mq_update_dirty_inode_v2 (this, loc, ctx, contribution); -        if ((!dirty || ret == 0) && (not_root == _gf_true) && -            (size_int != contri_int)) { -                mq_initiate_quota_txn (this, loc); -        } +        if ((!dirty || ret == 1) && +            !loc_is_root(loc) && +            !quota_meta_is_null (&delta)) +                mq_initiate_quota_blocking_txn (this, loc);          ret = 0;  out: -        if (ret) -                mq_set_inode_xattr (this, loc); +        if (ret < 0) +                ret = mq_create_xattrs_blocking_txn (this, loc); +  err:          return ret;  }  int32_t -mq_inspect_file_xattr (xlator_t *this, -                       loc_t *loc, -                       dict_t *dict, -                       struct iatt buf) +mq_inspect_directory_xattr_txn (xlator_t *this, loc_t *loc, dict_t *dict, +                                struct iatt buf)  { -        int32_t               ret              = -1; -        uint64_t              contri_int       = 0, size = 0; -        int64_t              *contri_ptr       = NULL; -        char                  contri_key [512] = {0, }; -        quota_inode_ctx_t    *ctx              = NULL; -        inode_contribution_t *contribution     = NULL; +        int32_t   ret = -1; + +        ret = mq_synctask (this, mq_inspect_directory_xattr_task, _gf_true, +                           loc, dict, &buf, 0); + +        return ret; +} + +int32_t +mq_inspect_file_xattr_task (void *opaque) +{ +        int32_t               ret                          = -1; +        quota_meta_t          size                         = {0, }; +        quota_meta_t          contri                       = {0, }; +        quota_meta_t          delta                        = {0, }; +        char                  contri_key[CONTRI_KEY_MAX]   = {0, }; +        quota_inode_ctx_t    *ctx                          = NULL; +        inode_contribution_t *contribution                 = NULL; +        quota_synctask_t     *args                         = NULL; +        xlator_t             *this                         = NULL; +        loc_t                *loc                          = NULL; +        dict_t               *dict                         = NULL; +        struct iatt           buf                          = {0,}; + +        GF_ASSERT (opaque); + +        args = (quota_synctask_t *) opaque; +        loc = &args->loc; +        dict = args->dict; +        buf = args->buf; +        this = args->this; +        THIS = this;          ret = mq_inode_ctx_get (loc->inode, this, &ctx);          if (ret < 0) { @@ -2230,103 +3588,111 @@ mq_inspect_file_xattr (xlator_t *this,          if (contribution == NULL) {                  gf_log_callingfn (this->name, GF_LOG_DEBUG, "cannot allocate "                                    "contribution node (path:%s)", loc->path); +                ret = -1;                  goto out;          }          LOCK (&ctx->lock);          {                  ctx->size = 512 * buf.ia_blocks; -                size = ctx->size; +                ctx->file_count = 1; +                ctx->dir_count = 0; + +                size.size = ctx->size; +                size.file_count = ctx->file_count; +                size.dir_count = ctx->dir_count;          }          UNLOCK (&ctx->lock);          list_for_each_entry (contribution, &ctx->contribution_head,                               contri_list) { +                  GET_CONTRI_KEY (contri_key, contribution->gfid, ret);                  if (ret < 0)                          continue; -                ret = dict_get_bin (dict, contri_key, (void **) &contri_int); -                if (ret == 0) { -                        contri_ptr = (int64_t *)(unsigned long)contri_int; - +                ret = mq_dict_get_meta (dict, contri_key, &contri); +                if (ret < 0) { +                        ret = mq_create_xattrs_blocking_txn (this, loc); +                } else {                          LOCK (&contribution->lock);                          { -                                contribution->contribution = ntoh64 (*contri_ptr); -                                contri_int = contribution->contribution; +                                contribution->contribution = contri.size; +                                contribution->file_count = contri.file_count; +                                contribution->dir_count = contri.dir_count;                          }                          UNLOCK (&contribution->lock); -                        gf_log (this->name, GF_LOG_DEBUG, -                                "size=%"PRId64 " contri=%"PRId64, size, contri_int); - -                        if (size != contri_int) { -                                mq_initiate_quota_txn (this, loc); +                        mq_compute_delta (&delta, &size, &contri); +                        if (!quota_meta_is_null (&delta)) { +                                mq_initiate_quota_blocking_txn (this, loc); +                                /* TODO: revist this code when fixing hardlinks +                                 */ +                                break;                          } -                } else { -                        if (size) -                                mq_initiate_quota_txn (this, loc); -                        else -                                mq_set_inode_xattr (this, loc);                  } + +                /* TODO: loc->parent might need to be assigned to corresponding +                 * contribution inode. We need to handle hard links here +                */          }  out: +          return ret;  }  int32_t -mq_xattr_state (xlator_t *this, -                loc_t *loc, -                dict_t *dict, -                struct iatt buf) +mq_inspect_file_xattr_txn (xlator_t *this, loc_t *loc, dict_t *dict, +                           struct iatt buf) +{ +        int32_t   ret = -1; + +        ret = mq_synctask (this, mq_inspect_file_xattr_task, _gf_true, +                           loc, dict, &buf, 0); + +        return ret; +} + +int32_t +mq_xattr_state (xlator_t *this, loc_t *loc, dict_t *dict, struct iatt buf)  {          if (((buf.ia_type == IA_IFREG) && !dht_is_linkfile (&buf, dict))              || (buf.ia_type == IA_IFLNK))  { -                mq_inspect_file_xattr (this, loc, dict, buf); +                mq_inspect_file_xattr_txn (this, loc, dict, buf);          } else if (buf.ia_type == IA_IFDIR) -                mq_inspect_directory_xattr (this, loc, dict, buf); +                mq_inspect_directory_xattr_txn (this, loc, dict, buf);          return 0;  }  int32_t -mq_req_xattr (xlator_t *this, -              loc_t *loc, -              dict_t *dict) +mq_req_xattr (xlator_t *this, loc_t *loc, dict_t *dict, +              char *contri_key)  {          int32_t               ret       = -1;          GF_VALIDATE_OR_GOTO ("marker", this, out); +        GF_VALIDATE_OR_GOTO ("marker", loc, out);          GF_VALIDATE_OR_GOTO ("marker", dict, out); -        if (!loc) -                goto set_size; - -        //if not "/" then request contribution -        if (loc->path && strcmp (loc->path, "/") == 0) -                goto set_size; - -        ret = mq_dict_set_contribution (this, dict, loc); -        if (ret == -1) -                goto out; +        if (!loc_is_root(loc)) { +                ret = mq_dict_set_contribution (this, dict, loc, NULL, +                                                contri_key); +                if (ret < 0) +                        goto out; +        } -set_size:          ret = dict_set_uint64 (dict, QUOTA_SIZE_KEY, 0); -        if (ret < 0) { -                ret = -1; +        if (ret < 0)                  goto out; -        }          ret = dict_set_int8 (dict, QUOTA_DIRTY_KEY, 0); -        if (ret < 0) { -                ret = -1; -                goto out; -        } - -        ret = 0;  out: +        if (ret < 0) +                gf_log_callingfn (this->name, GF_LOG_ERROR, "dict set failed"); +          return ret;  } @@ -2344,15 +3710,15 @@ int32_t  _mq_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this,                         int32_t op_ret, int32_t op_errno, dict_t *xdata)  { -        int32_t        ret                = 0; -        char           contri_key [512]   = {0, }; -        quota_local_t *local              = NULL; -        inode_t       *inode              = NULL; -        dentry_t      *tmp                = NULL; -        gf_boolean_t  last_dentry         = _gf_true; -        loc_t         loc                 = {0, }; -        dentry_t      *other_dentry       = NULL; -        gf_boolean_t  remove              = _gf_false; +        int32_t        ret                           = 0; +        char           contri_key[CONTRI_KEY_MAX]    = {0, }; +        quota_local_t *local                         = NULL; +        inode_t       *inode                         = NULL; +        dentry_t      *tmp                           = NULL; +        gf_boolean_t  last_dentry                    = _gf_true; +        loc_t         loc                            = {0, }; +        dentry_t      *other_dentry                  = NULL; +        gf_boolean_t  remove                         = _gf_false;          local = (quota_local_t *) frame->local; @@ -2531,6 +3897,7 @@ mq_reduce_parent_size_xattr (call_frame_t *frame, void *cookie, xlator_t *this,          dict = dict_new ();          if (dict == NULL) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed");                  ret = -1;                  goto err;          } diff --git a/xlators/features/marker/src/marker-quota.h b/xlators/features/marker/src/marker-quota.h index 42def9d22dc..fa132a815b7 100644 --- a/xlators/features/marker/src/marker-quota.h +++ b/xlators/features/marker/src/marker-quota.h @@ -21,7 +21,7 @@  #define QUOTA_XATTR_PREFIX "trusted.glusterfs"  #define QUOTA_DIRTY_KEY "trusted.glusterfs.quota.dirty" -#define CONTRIBUTION "contri" +#define CONTRIBUTION  "contri"  #define CONTRI_KEY_MAX 512  #define READDIR_BUF 4096 @@ -59,21 +59,21 @@                  ret = 0;                                \          } while (0); -#define GET_CONTRI_KEY(var, _gfid, _ret)              \ -        do {                                          \ -                if (_gfid != NULL) {                  \ -                        char _gfid_unparsed[40];      \ -                        uuid_unparse (_gfid, _gfid_unparsed);           \ -                        _ret = snprintf (var, CONTRI_KEY_MAX,           \ -                                         QUOTA_XATTR_PREFIX             \ +#define GET_CONTRI_KEY(var, _gfid, _ret)                                  \ +        do {                                                              \ +                if (_gfid != NULL) {                                      \ +                        char _gfid_unparsed[40];                          \ +                        uuid_unparse (_gfid, _gfid_unparsed);             \ +                        _ret = snprintf (var, CONTRI_KEY_MAX,             \ +                                         QUOTA_XATTR_PREFIX               \                                           ".%s.%s." CONTRIBUTION, "quota", \ -                                         _gfid_unparsed);               \ -                } else {                                                \ -                        _ret = snprintf (var, CONTRI_KEY_MAX,           \ -                                         QUOTA_XATTR_PREFIX             \ -                                         ".%s.." CONTRIBUTION, "quota"); \ -                }                                                       \ -        } while (0); +                                         _gfid_unparsed);                 \ +                } else {                                                  \ +                        _ret = snprintf (var, CONTRI_KEY_MAX,             \ +                                         QUOTA_XATTR_PREFIX               \ +                                         ".%s.." CONTRIBUTION, "quota");  \ +                }                                                         \ +        } while (0)  #define QUOTA_SAFE_INCREMENT(lock, var)         \          do {                                    \ @@ -84,6 +84,8 @@  struct quota_inode_ctx {          int64_t                size; +        int64_t                file_count; +        int64_t                dir_count;          int8_t                 dirty;          gf_boolean_t           updation_status;          gf_lock_t              lock; @@ -91,9 +93,28 @@ struct quota_inode_ctx {  };  typedef struct quota_inode_ctx quota_inode_ctx_t; +struct quota_meta { +        int64_t    size; +        int64_t    file_count; +        int64_t    dir_count; +}; +typedef struct quota_meta quota_meta_t; + +struct quota_synctask { +        xlator_t      *this; +        loc_t          loc; +        dict_t        *dict; +        struct iatt    buf; +        int64_t        contri; +        gf_boolean_t   is_static; +}; +typedef struct quota_synctask quota_synctask_t; +  struct inode_contribution {          struct list_head contri_list;          int64_t          contribution; +        int64_t          file_count; +        int64_t          dir_count;          uuid_t           gfid;    gf_lock_t lock;  }; @@ -103,7 +124,7 @@ int32_t  mq_get_lock_on_parent (call_frame_t *, xlator_t *);  int32_t -mq_req_xattr (xlator_t *, loc_t *, dict_t *); +mq_req_xattr (xlator_t *, loc_t *, dict_t *, char *);  int32_t  init_quota_priv (xlator_t *); @@ -117,6 +138,12 @@ mq_set_inode_xattr (xlator_t *, loc_t *);  int  mq_initiate_quota_txn (xlator_t *, loc_t *); +int +mq_initiate_quota_blocking_txn (xlator_t *, loc_t *); + +int +mq_create_xattrs_txn (xlator_t *this, loc_t *loc); +  int32_t  mq_dirty_inode_readdir (call_frame_t *, void *, xlator_t *,                          int32_t, int32_t, fd_t *, dict_t *); @@ -125,6 +152,9 @@ int32_t  mq_reduce_parent_size (xlator_t *, loc_t *, int64_t);  int32_t +mq_reduce_parent_size_txn (xlator_t *, loc_t *, int64_t); + +int32_t  mq_rename_update_newpath (xlator_t *, loc_t *);  int32_t diff --git a/xlators/features/marker/src/marker.c b/xlators/features/marker/src/marker.c index ad3aabda9ba..af7ec1f907f 100644 --- a/xlators/features/marker/src/marker.c +++ b/xlators/features/marker/src/marker.c @@ -607,7 +607,7 @@ marker_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          priv = this->private;          if (priv->feature_enabled & GF_QUOTA) -                mq_set_inode_xattr (this, &local->loc); +                mq_create_xattrs_txn (this, &local->loc);          if (priv->feature_enabled & GF_XTIME)                  marker_xtime_update_marks (this, local); @@ -681,7 +681,7 @@ marker_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          priv = this->private;          if (priv->feature_enabled & GF_QUOTA) -                mq_set_inode_xattr (this, &local->loc); +                mq_create_xattrs_txn (this, &local->loc);          if (priv->feature_enabled & GF_XTIME)                  marker_xtime_update_marks (this, local); @@ -827,7 +827,7 @@ marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          priv = this->private;          if (priv->feature_enabled & GF_QUOTA) -                mq_reduce_parent_size (this, &local->loc, -1); +                mq_reduce_parent_size_txn (this, &local->loc, -1);          if (priv->feature_enabled & GF_XTIME)                  marker_xtime_update_marks (this, local); @@ -896,7 +896,7 @@ marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          if (priv->feature_enabled & GF_QUOTA) {                  if (!local->skip_txn) -                        mq_reduce_parent_size (this, &local->loc, -1); +                        mq_reduce_parent_size_txn (this, &local->loc, -1);          }          if (priv->feature_enabled & GF_XTIME) @@ -977,7 +977,7 @@ marker_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          if (priv->feature_enabled & GF_QUOTA) {                  if (!local->skip_txn) -                        mq_set_inode_xattr (this, &local->loc); +                        mq_create_xattrs_txn (this, &local->loc);          } @@ -1065,10 +1065,11 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,                          frame->root->unique);          } -        mq_reduce_parent_size (this, &oplocal->loc, oplocal->contribution); +        mq_reduce_parent_size_txn (this, &oplocal->loc, oplocal->contribution);          if (local->loc.inode != NULL) { -                mq_reduce_parent_size (this, &local->loc, local->contribution); +                mq_reduce_parent_size_txn (this, &local->loc, +                                           local->contribution);          }          newloc.inode = inode_ref (oplocal->loc.inode); @@ -1078,7 +1079,7 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,                  newloc.name++;          newloc.parent = inode_ref (local->loc.parent); -        mq_set_inode_xattr (this, &newloc); +        mq_create_xattrs_txn (this, &newloc);          loc_wipe (&newloc); @@ -1181,13 +1182,13 @@ marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                     struct iatt *prenewparent, struct iatt *postnewparent,                     dict_t *xdata)  { -        marker_conf_t  *priv                 = NULL; -        marker_local_t *local                = NULL; -        marker_local_t *oplocal              = NULL; -        call_stub_t    *stub                 = NULL; -        int32_t         ret                  = 0; -        char            contri_key [512]     = {0, }; -        loc_t           newloc               = {0, }; +        marker_conf_t  *priv                            = NULL; +        marker_local_t *local                           = NULL; +        marker_local_t *oplocal                         = NULL; +        call_stub_t    *stub                            = NULL; +        int32_t         ret                             = 0; +        char            contri_key[CONTRI_KEY_MAX]      = {0, }; +        loc_t           newloc                          = {0, };          local = (marker_local_t *) frame->local; @@ -1284,10 +1285,11 @@ int32_t  marker_do_rename (call_frame_t *frame, void *cookie, xlator_t *this,                    int32_t op_ret, int32_t op_errno, dict_t *dict, dict_t *xdata)  { -        marker_local_t       *local           = NULL, *oplocal = NULL; -        char                  contri_key[512] = {0, }; -        int32_t               ret             = 0; -        int64_t              *contribution    = 0; +        marker_local_t       *local                      = NULL; +        marker_local_t       *oplocal                    = NULL; +        char                  contri_key[CONTRI_KEY_MAX] = {0, }; +        int32_t               ret                        = 0; +        int64_t              *contribution               = 0;          local = frame->local;          oplocal = local->oplocal; @@ -1336,10 +1338,11 @@ marker_get_newpath_contribution (call_frame_t *frame, void *cookie,                                   xlator_t *this, int32_t op_ret,                                   int32_t op_errno, dict_t *dict, dict_t *xdata)  { -        marker_local_t *local           = NULL, *oplocal = NULL; -        char            contri_key[512] = {0, }; -        int32_t         ret             = 0; -        int64_t        *contribution    = 0; +        marker_local_t *local                      = NULL; +        marker_local_t *oplocal                    = NULL; +        char            contri_key[CONTRI_KEY_MAX] = {0, }; +        int32_t         ret                        = 0; +        int64_t        *contribution               = 0;          local = frame->local;          oplocal = local->oplocal; @@ -1403,9 +1406,10 @@ marker_get_oldpath_contribution (call_frame_t *frame, void *cookie,                                   xlator_t *this, int32_t op_ret,                                   int32_t op_errno, dict_t *xdata)  { -        marker_local_t *local           = NULL, *oplocal = NULL; -        char            contri_key[512] = {0, }; -        int32_t         ret             = 0; +        marker_local_t *local                      = NULL; +        marker_local_t *oplocal                    = NULL; +        char            contri_key[CONTRI_KEY_MAX] = {0, }; +        int32_t         ret                        = 0;          local = frame->local;          oplocal = local->oplocal; @@ -1764,8 +1768,9 @@ marker_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          priv = this->private; -        if (priv->feature_enabled & GF_QUOTA) -                mq_set_inode_xattr (this, &local->loc); +        if (priv->feature_enabled & GF_QUOTA) { +                mq_create_xattrs_txn (this, &local->loc); +        }          if (priv->feature_enabled & GF_XTIME)                  marker_xtime_update_marks (this, local); @@ -1838,7 +1843,7 @@ marker_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          priv = this->private;          if ((priv->feature_enabled & GF_QUOTA) && (S_ISREG (local->mode))) { -                mq_set_inode_xattr (this, &local->loc); +                mq_create_xattrs_txn (this, &local->loc);          }          if (priv->feature_enabled & GF_XTIME) @@ -2706,7 +2711,7 @@ marker_lookup (call_frame_t *frame, xlator_t *this,                  goto err;          if ((priv->feature_enabled & GF_QUOTA) && xattr_req) -                mq_req_xattr (this, loc, xattr_req); +                mq_req_xattr (this, loc, xattr_req, NULL);  wind:          STACK_WIND (frame, marker_lookup_cbk, FIRST_CHILD(this),                      FIRST_CHILD(this)->fops->lookup, loc, xattr_req); @@ -2854,7 +2859,7 @@ marker_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,                          loc.parent = local->loc.inode = inode_ref (fd->inode); -                        mq_req_xattr (this, &loc, dict); +                        mq_req_xattr (this, &loc, dict, NULL);                  }                  STACK_WIND (frame, marker_readdirp_cbk,  | 
