diff options
Diffstat (limited to 'xlators/features/marker/src')
| -rw-r--r-- | xlators/features/marker/src/Makefile.am | 8 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-common.c | 41 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-common.h | 27 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-mem-types.h | 24 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota-helper.c | 151 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota-helper.h | 64 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota.c | 1718 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota.h | 122 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker.c | 1408 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker.h | 83 |
10 files changed, 2495 insertions, 1151 deletions
diff --git a/xlators/features/marker/src/Makefile.am b/xlators/features/marker/src/Makefile.am index 501586a76..a7c676472 100644 --- a/xlators/features/marker/src/Makefile.am +++ b/xlators/features/marker/src/Makefile.am @@ -1,15 +1,17 @@ xlator_LTLIBRARIES = marker.la xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features -marker_la_LDFLAGS = -module -avoidversion +marker_la_LDFLAGS = -module -avoid-version marker_la_SOURCES = marker.c marker-quota.c marker-quota-helper.c marker-common.c marker_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la noinst_HEADERS = marker-mem-types.h marker.h marker-quota.h marker-quota-helper.h marker-common.h $(top_builddir)/xlators/lib/src/libxlator.h -AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -fno-strict-aliasing -D$(GF_HOST_OS) \ - -I$(top_srcdir)/libglusterfs/src -I$(top_srcdir)/xlators/lib/src $(GF_CFLAGS) -shared -nostartfiles +AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \ + -I$(top_srcdir)/xlators/lib/src + +AM_CFLAGS = -Wall -fno-strict-aliasing $(GF_CFLAGS) CLEANFILES = diff --git a/xlators/features/marker/src/marker-common.c b/xlators/features/marker/src/marker-common.c index 9d27167fc..84a718add 100644 --- a/xlators/features/marker/src/marker-common.c +++ b/xlators/features/marker/src/marker-common.c @@ -1,21 +1,12 @@ -/*Copyright (c) 2008-2010 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. +/* + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ - #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" @@ -42,7 +33,7 @@ int32_t marker_force_inode_ctx_get (inode_t *inode, xlator_t *this, marker_inode_ctx_t **ctx) { - int32_t ret = -1; + int32_t ret = -1; uint64_t ctx_int = 0; LOCK (&inode->lock); @@ -69,18 +60,10 @@ unlock: UNLOCK (&inode->lock); return ret; } -void +int marker_filter_quota_xattr (dict_t *dict, char *key, - data_t *value, void *data) + data_t *value, void *data) { - int ret = -1; - - GF_VALIDATE_OR_GOTO ("marker", dict, out); - GF_VALIDATE_OR_GOTO ("marker", key, out); - - ret = fnmatch ("trusted.glusterfs.quota*", key, 0); - if (ret == 0) - dict_del (dict, key); -out: - return; + dict_del (dict, key); + return 0; } diff --git a/xlators/features/marker/src/marker-common.h b/xlators/features/marker/src/marker-common.h index 2533571c1..23dd846cb 100644 --- a/xlators/features/marker/src/marker-common.h +++ b/xlators/features/marker/src/marker-common.h @@ -1,21 +1,12 @@ -/*Copyright (c) 2008-2010 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. +/* + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ - #ifndef _MARKER_COMMON_H #define _MARKER_COMMON_H @@ -31,6 +22,6 @@ int32_t marker_force_inode_ctx_get (inode_t *, xlator_t *, marker_inode_ctx_t **); -void +int marker_filter_quota_xattr (dict_t *, char *, data_t *, void *); #endif diff --git a/xlators/features/marker/src/marker-mem-types.h b/xlators/features/marker/src/marker-mem-types.h index 847bfa67c..1f74d5048 100644 --- a/xlators/features/marker/src/marker-mem-types.h +++ b/xlators/features/marker/src/marker-mem-types.h @@ -1,36 +1,24 @@ /* - Copyright (c) 2008-2010 Gluster, Inc. <http://www.gluster.com> + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ - #ifndef __MARKER_MEM_TYPES_H__ #define __MARKER_MEM_TYPES_H__ #include "mem-types.h" enum gf_marker_mem_types_ { - gf_marker_mt_marker_local_t = gf_common_mt_end + 1, - gf_marker_mt_marker_conf_t, + gf_marker_mt_marker_conf_t = gf_common_mt_end + 1, gf_marker_mt_loc_t, gf_marker_mt_volume_mark, gf_marker_mt_int64_t, gf_marker_mt_quota_inode_ctx_t, gf_marker_mt_marker_inode_ctx_t, - gf_marker_mt_quota_local_t, gf_marker_mt_inode_contribution_t, gf_marker_mt_end }; diff --git a/xlators/features/marker/src/marker-quota-helper.c b/xlators/features/marker/src/marker-quota-helper.c index fba2cdd3f..af5fed132 100644 --- a/xlators/features/marker/src/marker-quota-helper.c +++ b/xlators/features/marker/src/marker-quota-helper.c @@ -1,21 +1,12 @@ -/*Copyright (c) 2008-2010 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. +/* + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ - #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" @@ -28,16 +19,19 @@ #include "marker-mem-types.h" int -quota_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path) +mq_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path) { int ret = -1; - if (!loc) - return ret; + GF_VALIDATE_OR_GOTO ("marker", loc, out); + GF_VALIDATE_OR_GOTO ("marker", inode, out); + GF_VALIDATE_OR_GOTO ("marker", path, out); + /* Not checking for parent because while filling + * loc of root, parent will be NULL + */ if (inode) { loc->inode = inode_ref (inode); - loc->ino = inode->ino; } if (parent) @@ -59,13 +53,13 @@ quota_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path) loc_wipe: if (ret < 0) loc_wipe (loc); - +out: return ret; } int32_t -quota_inode_loc_fill (const char *parent_gfid, inode_t *inode, loc_t *loc) +mq_inode_loc_fill (const char *parent_gfid, inode_t *inode, loc_t *loc) { char *resolvedpath = NULL; inode_t *parent = NULL; @@ -74,7 +68,7 @@ quota_inode_loc_fill (const char *parent_gfid, inode_t *inode, loc_t *loc) if ((!inode) || (!loc)) return ret; - if ((inode) && (inode->ino == 1)) { + if ((inode) && __is_root_gfid (inode->gfid)) { loc->parent = NULL; goto ignore_parent; } @@ -93,7 +87,7 @@ ignore_parent: if (ret < 0) goto err; - ret = quota_loc_fill (loc, inode, parent, resolvedpath); + ret = mq_loc_fill (loc, inode, parent, resolvedpath); if (ret < 0) goto err; @@ -108,7 +102,7 @@ err: quota_inode_ctx_t * -quota_alloc_inode_ctx () +mq_alloc_inode_ctx () { int32_t ret = -1; quota_inode_ctx_t *ctx = NULL; @@ -119,6 +113,7 @@ quota_alloc_inode_ctx () ctx->size = 0; ctx->dirty = 0; + ctx->updation_status = _gf_false; LOCK_INIT (&ctx->lock); INIT_LIST_HEAD (&ctx->contribution_head); out: @@ -126,13 +121,13 @@ out: } inode_contribution_t * -get_contribution_node (inode_t *inode, quota_inode_ctx_t *ctx) +mq_get_contribution_node (inode_t *inode, quota_inode_ctx_t *ctx) { inode_contribution_t *contri = NULL; inode_contribution_t *temp = NULL; - GF_VALIDATE_OR_GOTO ("marker", inode, out); - GF_VALIDATE_OR_GOTO ("marker", ctx, out); + if (!inode || !ctx) + goto out; list_for_each_entry (temp, &ctx->contribution_head, contri_list) { if (uuid_compare (temp->gfid, inode->gfid) == 0) { @@ -146,8 +141,8 @@ out: int32_t -delete_contribution_node (dict_t *dict, char *key, - inode_contribution_t *contribution) +mq_delete_contribution_node (dict_t *dict, char *key, + inode_contribution_t *contribution) { if (dict_get (dict, key) != NULL) goto out; @@ -159,13 +154,25 @@ out: inode_contribution_t * -__add_new_contribution_node (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc) +__mq_add_new_contribution_node (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc) { int32_t ret = 0; inode_contribution_t *contribution = NULL; + if (!loc->parent) { + if (!uuid_is_null (loc->pargfid)) + loc->parent = inode_find (loc->inode->table, + loc->pargfid); + if (!loc->parent) + loc->parent = inode_parent (loc->inode, loc->pargfid, + loc->name); + if (!loc->parent) + goto out; + } + list_for_each_entry (contribution, &ctx->contribution_head, contri_list) { - if (uuid_compare (contribution->gfid, loc->parent->gfid) == 0) { + if (loc->parent && + uuid_compare (contribution->gfid, loc->parent->gfid) == 0) { goto out; } } @@ -178,6 +185,9 @@ __add_new_contribution_node (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc) uuid_copy (contribution->gfid, loc->parent->gfid); + LOCK_INIT (&contribution->lock); + INIT_LIST_HEAD (&contribution->contri_list); + list_add_tail (&contribution->contri_list, &ctx->contribution_head); out: @@ -186,7 +196,7 @@ out: inode_contribution_t * -add_new_contribution_node (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc) +mq_add_new_contribution_node (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc) { inode_contribution_t *contribution = NULL; @@ -198,7 +208,7 @@ add_new_contribution_node (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc) LOCK (&ctx->lock); { - contribution = __add_new_contribution_node (this, ctx, loc); + contribution = __mq_add_new_contribution_node (this, ctx, loc); } UNLOCK (&ctx->lock); @@ -207,8 +217,8 @@ add_new_contribution_node (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc) int32_t -dict_set_contribution (xlator_t *this, dict_t *dict, - loc_t *loc) +mq_dict_set_contribution (xlator_t *this, dict_t *dict, + loc_t *loc) { int32_t ret = -1; char contri_key [512] = {0, }; @@ -216,6 +226,7 @@ dict_set_contribution (xlator_t *this, dict_t *dict, GF_VALIDATE_OR_GOTO ("marker", this, out); GF_VALIDATE_OR_GOTO ("marker", dict, out); GF_VALIDATE_OR_GOTO ("marker", loc, out); + GF_VALIDATE_OR_GOTO ("marker", loc->parent, out); GET_CONTRI_KEY (contri_key, loc->parent->gfid, ret); if (ret < 0) { @@ -238,8 +249,8 @@ out: int32_t -quota_inode_ctx_get (inode_t *inode, xlator_t *this, - quota_inode_ctx_t **ctx) +mq_inode_ctx_get (inode_t *inode, xlator_t *this, + quota_inode_ctx_t **ctx) { int32_t ret = -1; uint64_t ctx_int = 0; @@ -272,7 +283,7 @@ out: quota_inode_ctx_t * -__quota_inode_ctx_new (inode_t *inode, xlator_t *this) +__mq_inode_ctx_new (inode_t *inode, xlator_t *this) { int32_t ret = -1; quota_inode_ctx_t *quota_ctx = NULL; @@ -288,7 +299,7 @@ __quota_inode_ctx_new (inode_t *inode, xlator_t *this) LOCK (&inode->lock); { if (mark_ctx->quota_ctx == NULL) { - quota_ctx = quota_alloc_inode_ctx (); + quota_ctx = mq_alloc_inode_ctx (); if (quota_ctx == NULL) { ret = -1; goto unlock; @@ -308,29 +319,23 @@ out: quota_inode_ctx_t * -quota_inode_ctx_new (inode_t * inode, xlator_t *this) +mq_inode_ctx_new (inode_t * inode, xlator_t *this) { - return __quota_inode_ctx_new (inode, this); + return __mq_inode_ctx_new (inode, this); } quota_local_t * -quota_local_new () +mq_local_new () { - int32_t ret = -1; quota_local_t *local = NULL; - QUOTA_ALLOC (local, quota_local_t, ret); - if (ret < 0) + local = mem_get0 (THIS->local_pool); + if (!local) goto out; local->ref = 1; - local->delta = 0; - local->err = 0; LOCK_INIT (&local->lock); - memset (&local->loc, 0, sizeof (loc_t)); - memset (&local->parent_loc, 0, sizeof (loc_t)); - local->ctx = NULL; local->contri = NULL; @@ -339,7 +344,7 @@ out: } quota_local_t * -quota_local_ref (quota_local_t *local) +mq_local_ref (quota_local_t *local) { LOCK (&local->lock); { @@ -352,14 +357,15 @@ quota_local_ref (quota_local_t *local) int32_t -quota_local_unref (xlator_t *this, quota_local_t *local) +mq_local_unref (xlator_t *this, quota_local_t *local) { + int32_t ref = 0; if (local == NULL) goto out; - QUOTA_SAFE_DECREMENT (&local->lock, local->ref); + QUOTA_SAFE_DECREMENT (&local->lock, local->ref, ref); - if (local->ref > 0) + if (ref != 0) goto out; if (local->fd != NULL) @@ -370,6 +376,39 @@ quota_local_unref (xlator_t *this, quota_local_t *local) loc_wipe (&local->parent_loc); LOCK_DESTROY (&local->lock); + + mem_put (local); out: return 0; } + + +inode_contribution_t * +mq_get_contribution_from_loc (xlator_t *this, loc_t *loc) +{ + int32_t ret = 0; + quota_inode_ctx_t *ctx = NULL; + inode_contribution_t *contribution = NULL; + + ret = mq_inode_ctx_get (loc->inode, this, &ctx); + if (ret < 0) { + gf_log_callingfn (this->name, GF_LOG_WARNING, + "cannot get marker-quota context from inode " + "(gfid:%s, path:%s)", + uuid_utoa (loc->inode->gfid), loc->path); + goto err; + } + + contribution = mq_get_contribution_node (loc->parent, ctx); + if (contribution == NULL) { + gf_log_callingfn (this->name, GF_LOG_WARNING, + "inode (gfid:%s, path:%s) has " + "no contribution towards parent (gfid:%s)", + uuid_utoa (loc->inode->gfid), + loc->path, uuid_utoa (loc->parent->gfid)); + goto err; + } + +err: + return contribution; +} diff --git a/xlators/features/marker/src/marker-quota-helper.h b/xlators/features/marker/src/marker-quota-helper.h index 9a24c8c3d..6cdd14881 100644 --- a/xlators/features/marker/src/marker-quota-helper.h +++ b/xlators/features/marker/src/marker-quota-helper.h @@ -1,20 +1,13 @@ -/*Copyright (c) 2008-2010 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. +/* + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ + #ifndef _MARKER_QUOTA_HELPER_H #define _MARKER_QUOTA_HELPER @@ -22,7 +15,8 @@ #define _CONFIG_H #include "config.h" #endif -#include "marker-quota.h" + +#include "marker.h" #define QUOTA_FREE_CONTRIBUTION_NODE(_contribution) \ do { \ @@ -37,40 +31,46 @@ UNLOCK (lock); \ } while (0) -#define QUOTA_SAFE_DECREMENT(lock, var) \ - do { \ - LOCK (lock); \ - var --; \ - UNLOCK (lock); \ +#define QUOTA_SAFE_DECREMENT(lock, var, value) \ + do { \ + LOCK (lock); \ + { \ + value = --var; \ + } \ + UNLOCK (lock); \ } while (0) inode_contribution_t * -add_new_contribution_node (xlator_t *, quota_inode_ctx_t *, loc_t *); +mq_add_new_contribution_node (xlator_t *, quota_inode_ctx_t *, loc_t *); int32_t -dict_set_contribution (xlator_t *, dict_t *, loc_t *); +mq_dict_set_contribution (xlator_t *, dict_t *, loc_t *); quota_inode_ctx_t * -quota_inode_ctx_new (inode_t *, xlator_t *); +mq_inode_ctx_new (inode_t *, xlator_t *); int32_t -quota_inode_ctx_get (inode_t *, xlator_t *, quota_inode_ctx_t **); +mq_inode_ctx_get (inode_t *, xlator_t *, quota_inode_ctx_t **); int32_t -delete_contribution_node (dict_t *, char *, inode_contribution_t *); +mq_delete_contribution_node (dict_t *, char *, inode_contribution_t *); int32_t -quota_inode_loc_fill (const char *, inode_t *, loc_t *); +mq_inode_loc_fill (const char *, inode_t *, loc_t *); quota_local_t * -quota_local_new (); +mq_local_new (); quota_local_t * -quota_local_ref (quota_local_t *); +mq_local_ref (quota_local_t *); int32_t -quota_local_unref (xlator_t *, quota_local_t *); +mq_local_unref (xlator_t *, quota_local_t *); + +inode_contribution_t * +mq_get_contribution_node (inode_t *, quota_inode_ctx_t *); inode_contribution_t * -get_contribution_node (inode_t *, quota_inode_ctx_t *); +mq_get_contribution_from_loc (xlator_t *this, loc_t *loc); + #endif diff --git a/xlators/features/marker/src/marker-quota.c b/xlators/features/marker/src/marker-quota.c index 68a8a26c6..6f9af6e13 100644 --- a/xlators/features/marker/src/marker-quota.c +++ b/xlators/features/marker/src/marker-quota.c @@ -1,21 +1,12 @@ -/*Copyright (c) 2008-2010 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. +/* + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ - #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" @@ -30,11 +21,115 @@ #include "marker-quota.h" #include "marker-quota-helper.h" +int +mq_loc_copy (loc_t *dst, loc_t *src) +{ + int ret = -1; + + GF_VALIDATE_OR_GOTO ("marker", dst, out); + GF_VALIDATE_OR_GOTO ("marker", src, out); + + if (src->inode == NULL || + src->path == NULL) { + gf_log ("marker", GF_LOG_WARNING, + "src loc is not valid"); + goto out; + } + + ret = loc_copy (dst, src); +out: + return ret; +} + +int32_t +mq_get_local_err (quota_local_t *local, + int32_t *val) +{ + int32_t ret = -1; + + GF_VALIDATE_OR_GOTO ("marker", local, out); + GF_VALIDATE_OR_GOTO ("marker", val, out); + + LOCK (&local->lock); + { + *val = local->err; + } + UNLOCK (&local->lock); + + ret = 0; +out: + return ret; +} + +int32_t +mq_get_ctx_updation_status (quota_inode_ctx_t *ctx, + gf_boolean_t *status) +{ + int32_t ret = -1; + + GF_VALIDATE_OR_GOTO ("marker", ctx, out); + GF_VALIDATE_OR_GOTO ("marker", status, out); + + LOCK (&ctx->lock); + { + *status = ctx->updation_status; + } + UNLOCK (&ctx->lock); + + ret = 0; +out: + return ret; +} + + +int32_t +mq_set_ctx_updation_status (quota_inode_ctx_t *ctx, + gf_boolean_t status) +{ + int32_t ret = -1; + + if (ctx == NULL) + goto out; + + LOCK (&ctx->lock); + { + ctx->updation_status = status; + } + UNLOCK (&ctx->lock); + + ret = 0; +out: + return ret; +} + +int32_t +mq_test_and_set_ctx_updation_status (quota_inode_ctx_t *ctx, + gf_boolean_t *status) +{ + int32_t ret = -1; + gf_boolean_t temp = _gf_false; + + GF_VALIDATE_OR_GOTO ("marker", ctx, out); + GF_VALIDATE_OR_GOTO ("marker", status, out); + + LOCK (&ctx->lock); + { + temp = *status; + *status = ctx->updation_status; + ctx->updation_status = temp; + } + UNLOCK (&ctx->lock); + + ret = 0; +out: + return ret; +} + void mq_assign_lk_owner (xlator_t *this, call_frame_t *frame) { marker_conf_t *conf = NULL; - uint64_t lk_owner = 0; + uint64_t lk_owner = 0; conf = this->private; @@ -48,15 +143,15 @@ mq_assign_lk_owner (xlator_t *this, call_frame_t *frame) } UNLOCK (&conf->lock); - frame->root->lk_owner = lk_owner; + set_lk_owner_from_uint64 (&frame->root->lk_owner, lk_owner); return; } int32_t -loc_fill_from_name (xlator_t *this, loc_t *newloc, loc_t *oldloc, - uint64_t ino, char *name) +mq_loc_fill_from_name (xlator_t *this, loc_t *newloc, loc_t *oldloc, + uint64_t ino, char *name) { int32_t ret = -1; int32_t len = 0; @@ -67,8 +162,6 @@ loc_fill_from_name (xlator_t *this, loc_t *newloc, loc_t *oldloc, GF_VALIDATE_OR_GOTO ("marker", oldloc, out); GF_VALIDATE_OR_GOTO ("marker", name, out); - newloc->ino = ino; - newloc->inode = inode_new (oldloc->inode->table); if (!newloc->inode) { @@ -77,6 +170,7 @@ loc_fill_from_name (xlator_t *this, loc_t *newloc, loc_t *oldloc, } newloc->parent = inode_ref (oldloc->inode); + uuid_copy (newloc->pargfid, oldloc->inode->gfid); len = strlen (oldloc->path); @@ -104,36 +198,29 @@ out: } int32_t -dirty_inode_updation_done (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) +mq_dirty_inode_updation_done (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) { - quota_local_t *local = NULL; - - local = frame->local; - - if (!local->err) - QUOTA_SAFE_DECREMENT (&local->lock, local->ref); - else - frame->local = NULL; - QUOTA_STACK_DESTROY (frame, this); return 0; } int32_t -release_lock_on_dirty_inode (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) +mq_release_lock_on_dirty_inode (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) { - struct gf_flock lock; + struct gf_flock lock = {0, }; quota_local_t *local = NULL; + loc_t loc = {0, }; + int ret = -1; local = frame->local; if (op_ret == -1) { local->err = -1; - dirty_inode_updation_done (frame, NULL, this, 0, 0); + mq_dirty_inode_updation_done (frame, NULL, this, 0, 0, NULL); return 0; } @@ -147,32 +234,50 @@ release_lock_on_dirty_inode (call_frame_t *frame, void *cookie, xlator_t *this, lock.l_len = 0; lock.l_pid = 0; + ret = loc_copy (&loc, &local->loc); + if (ret == -1) { + local->err = -1; + frame->local = NULL; + mq_dirty_inode_updation_done (frame, NULL, this, 0, 0, NULL); + return 0; + } + + if (local->loc.inode == NULL) { + gf_log (this->name, GF_LOG_WARNING, + "Inode is NULL, so can't stackwind."); + goto out; + } + STACK_WIND (frame, - dirty_inode_updation_done, + mq_dirty_inode_updation_done, FIRST_CHILD(this), FIRST_CHILD(this)->fops->inodelk, - this->name, &local->loc, F_SETLKW, &lock); + this->name, &loc, F_SETLKW, &lock, NULL); + + loc_wipe (&loc); + + return 0; +out: + mq_dirty_inode_updation_done (frame, NULL, this, -1, 0, NULL); return 0; } int32_t -mark_inode_undirty (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, dict_t *dict) +mq_mark_inode_undirty (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *dict, + dict_t *xdata) { - int32_t ret = -1; - int64_t *size = NULL; - dict_t *newdict = NULL; - quota_local_t *local = NULL; - marker_conf_t *priv = NULL; + int32_t ret = -1; + int64_t *size = NULL; + dict_t *newdict = NULL; + quota_local_t *local = NULL; local = (quota_local_t *) frame->local; if (op_ret == -1) goto err; - priv = (marker_conf_t *) this->private; - if (!dict) goto wind; @@ -180,7 +285,11 @@ mark_inode_undirty (call_frame_t *frame, void *cookie, xlator_t *this, if (ret) goto wind; - local->ctx->size = ntoh64 (*size); + LOCK (&local->ctx->lock); + { + local->ctx->size = ntoh64 (*size); + } + UNLOCK (&local->ctx->lock); wind: newdict = dict_new (); @@ -191,17 +300,21 @@ wind: if (ret) goto err; - STACK_WIND (frame, release_lock_on_dirty_inode, + if (uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, local->loc.inode->gfid); + + GF_UUID_ASSERT (local->loc.gfid); + STACK_WIND (frame, mq_release_lock_on_dirty_inode, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, - &local->loc, newdict, 0); + &local->loc, newdict, 0, NULL); ret = 0; err: if (op_ret == -1 || ret == -1) { local->err = -1; - release_lock_on_dirty_inode (frame, NULL, this, 0, 0); + mq_release_lock_on_dirty_inode (frame, NULL, this, 0, 0, NULL); } if (newdict) @@ -211,24 +324,21 @@ err: } int32_t -update_size_xattr (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, inode_t *inode, - struct iatt *buf, dict_t *dict, struct iatt *postparent) +mq_update_size_xattr (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, inode_t *inode, + struct iatt *buf, dict_t *dict, struct iatt *postparent) { int32_t ret = -1; dict_t *new_dict = NULL; int64_t *size = NULL; int64_t *delta = NULL; quota_local_t *local = NULL; - marker_conf_t *priv = NULL; local = frame->local; if (op_ret == -1) goto err; - priv = this->private; - if (dict == NULL) { gf_log (this->name, GF_LOG_WARNING, "Dict is null while updating the size xattr %s", @@ -260,9 +370,14 @@ update_size_xattr (call_frame_t *frame, void *cookie, xlator_t *this, if (ret) goto err; - STACK_WIND (frame, mark_inode_undirty, FIRST_CHILD(this), + if (uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, buf->ia_gfid); + + GF_UUID_ASSERT (local->loc.gfid); + + STACK_WIND (frame, mq_mark_inode_undirty, FIRST_CHILD(this), FIRST_CHILD(this)->fops->xattrop, &local->loc, - GF_XATTROP_ADD_ARRAY64, new_dict); + GF_XATTROP_ADD_ARRAY64, new_dict, NULL); ret = 0; @@ -270,7 +385,7 @@ err: if (op_ret == -1 || ret == -1) { local->err = -1; - release_lock_on_dirty_inode (frame, NULL, this, 0, 0); + mq_release_lock_on_dirty_inode (frame, NULL, this, 0, 0, NULL); } if (new_dict) @@ -280,16 +395,36 @@ err: } int32_t -get_dirty_inode_size (call_frame_t *frame, xlator_t *this) +mq_test_and_set_local_err(quota_local_t *local, + int32_t *val) { - int32_t ret = -1; - dict_t *dict = NULL; - quota_local_t *local = NULL; - marker_conf_t *priv = NULL; + int tmp = 0; + int32_t ret = -1; - local = (quota_local_t *) frame->local; + GF_VALIDATE_OR_GOTO ("marker", local, out); + GF_VALIDATE_OR_GOTO ("marker", val, out); + + LOCK (&local->lock); + { + tmp = local->err; + local->err = *val; + *val = tmp; + } + UNLOCK (&local->lock); + + ret = 0; +out: + return ret; +} + +int32_t +mq_get_dirty_inode_size (call_frame_t *frame, xlator_t *this) +{ + int32_t ret = -1; + dict_t *dict = NULL; + quota_local_t *local = NULL; - priv = (marker_conf_t *) this->private; + local = (quota_local_t *) frame->local; dict = dict_new (); if (!dict) { @@ -301,7 +436,12 @@ get_dirty_inode_size (call_frame_t *frame, xlator_t *this) if (ret) goto err; - STACK_WIND (frame, update_size_xattr, FIRST_CHILD(this), + if (uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, local->loc.inode->gfid); + + GF_UUID_ASSERT (local->loc.gfid); + + STACK_WIND (frame, mq_update_size_xattr, FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup, &local->loc, dict); ret =0; @@ -309,28 +449,31 @@ err: if (ret) { local->err = -1; - release_lock_on_dirty_inode (frame, NULL, this, 0, 0); + mq_release_lock_on_dirty_inode (frame, NULL, this, 0, 0, NULL); } + if (dict) + dict_unref (dict); + return 0; } int32_t -get_child_contribution (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - inode_t *inode, - struct iatt *buf, - dict_t *dict, - struct iatt *postparent) +mq_get_child_contribution (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + inode_t *inode, + struct iatt *buf, + dict_t *dict, + struct iatt *postparent) { - int32_t ret = -1; - int32_t val = 0; - char contri_key [512] = {0, }; - int64_t *contri = NULL; - quota_local_t *local = NULL; + int32_t ret = -1; + int32_t val = 0; + char contri_key [512] = {0, }; + int64_t *contri = NULL; + quota_local_t *local = NULL; local = frame->local; @@ -339,17 +482,20 @@ get_child_contribution (call_frame_t *frame, QUOTA_STACK_DESTROY (frame, this); if (op_ret == -1) { - gf_log (this->name, GF_LOG_ERROR, "%s", strerror (op_errno)); - - local->err = -2; - - release_lock_on_dirty_inode (local->frame, NULL, this, 0, 0); + gf_log (this->name, GF_LOG_ERROR, "%s", + strerror (op_errno)); + val = -2; + if (!mq_test_and_set_local_err (local, &val) && + val != -2) + mq_release_lock_on_dirty_inode (local->frame, NULL, + this, 0, 0, NULL); - goto out; + goto exit; } - if (local->err) - goto out; + ret = mq_get_local_err (local, &val); + if (!ret && val == -2) + goto exit; GET_CONTRI_KEY (contri_key, local->loc.inode->gfid, ret); if (ret < 0) @@ -368,76 +514,83 @@ out: } UNLOCK (&local->lock); - if (val== 0) { - if (local->err) { - QUOTA_SAFE_DECREMENT (&local->lock, local->ref); - - quota_local_unref (this, local); - } else - quota_dirty_inode_readdir (local->frame, NULL, this, - 0, 0, NULL); + if (val == 0) { + mq_dirty_inode_readdir (local->frame, NULL, this, + 0, 0, NULL, NULL); } + mq_local_unref (this, local); return 0; +exit: + mq_local_unref (this, local); + return 0; } int32_t -quota_readdir_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - gf_dirent_t *entries) +mq_readdir_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + gf_dirent_t *entries, dict_t *xdata) { - char contri_key [512] = {0, }; - loc_t loc; - int32_t ret = 0; - off_t offset = 0; - int32_t count = 0; - dict_t *dict = NULL; - quota_local_t *local = NULL; - gf_dirent_t *entry = NULL; - call_frame_t *newframe = NULL; - - local = frame->local; + char contri_key [512] = {0, }; + int32_t ret = 0; + int32_t val = 0; + off_t offset = 0; + int32_t count = 0; + dict_t *dict = NULL; + quota_local_t *local = NULL; + gf_dirent_t *entry = NULL; + call_frame_t *newframe = NULL; + loc_t loc = {0, }; + + local = mq_local_ref (frame->local); if (op_ret == -1) { gf_log (this->name, GF_LOG_DEBUG, "readdir failed %s", strerror (op_errno)); local->err = -1; - release_lock_on_dirty_inode (frame, NULL, this, 0, 0); + mq_release_lock_on_dirty_inode (frame, NULL, this, 0, 0, NULL); - return 0; + goto end; } else if (op_ret == 0) { - get_dirty_inode_size (frame, this); + mq_get_dirty_inode_size (frame, this); - return 0; + goto end; } local->dentry_child_count = 0; list_for_each_entry (entry, (&entries->list), list) { - gf_log (this->name, GF_LOG_DEBUG, "entry = %s", entry->d_name); + gf_log (this->name, GF_LOG_DEBUG, "entry = %s", entry->d_name); + + if ((!strcmp (entry->d_name, ".")) || (!strcmp (entry->d_name, + ".."))) { + gf_log (this->name, GF_LOG_DEBUG, "entry = %s", + entry->d_name); + continue; + } + + offset = entry->d_off; + count++; + } + + if (count == 0) { + mq_get_dirty_inode_size (frame, this); + goto end; - if ((!strcmp (entry->d_name, ".")) || (!strcmp (entry->d_name, - ".."))) { - gf_log (this->name, GF_LOG_DEBUG, "entry = %s", - entry->d_name); - continue; - } - count++; } local->frame = frame; - if (count > 0) { - LOCK (&local->lock); - { - local->dentry_child_count = count; - } - UNLOCK (&local->lock); + LOCK (&local->lock); + { + local->dentry_child_count = count; + local->d_off = offset; } + UNLOCK (&local->lock); list_for_each_entry (entry, (&entries->list), list) { @@ -447,22 +600,32 @@ quota_readdir_cbk (call_frame_t *frame, ".."))) { gf_log (this->name, GF_LOG_DEBUG, "entry = %s", entry->d_name); - offset = entry->d_off; continue; } - ret = loc_fill_from_name (this, &loc, &local->loc, - entry->d_ino, entry->d_name); + ret = mq_loc_fill_from_name (this, &loc, &local->loc, + entry->d_ino, entry->d_name); if (ret < 0) goto out; - newframe = copy_frame (frame); - if (!newframe) { - ret = -1; - goto out; + ret = 0; + + LOCK (&local->lock); + { + if (local->err != -2) { + newframe = copy_frame (frame); + if (!newframe) { + ret = -1; + } + } else + ret = -1; } + UNLOCK (&local->lock); - newframe->local = local; + if (ret == -1) + goto out; + + newframe->local = mq_local_ref (local); dict = dict_new (); if (!dict) { @@ -479,7 +642,7 @@ quota_readdir_cbk (call_frame_t *frame, goto out; STACK_WIND (newframe, - get_child_contribution, + mq_get_child_contribution, FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup, &loc, dict); @@ -488,6 +651,8 @@ quota_readdir_cbk (call_frame_t *frame, loc_wipe (&loc); + newframe = NULL; + out: if (dict) { dict_unref (dict); @@ -495,46 +660,35 @@ quota_readdir_cbk (call_frame_t *frame, } if (ret) { - LOCK (&local->lock); - { - if (local->dentry_child_count == 0) - local->err = -1; - else - local->err = -2; - } - UNLOCK (&local->lock); + val = -2; + mq_test_and_set_local_err (local, &val); if (newframe) { newframe->local = NULL; - + mq_local_unref(this, local); QUOTA_STACK_DESTROY (newframe, this); } break; } } - gf_log (this->name, GF_LOG_DEBUG, "offset before =%"PRIu64, - local->d_off); - local->d_off +=offset; - gf_log (this->name, GF_LOG_DEBUG, "offset after = %"PRIu64, - local->d_off); - if (ret) - release_lock_on_dirty_inode (frame, NULL, this, 0, 0); - - else if (count == 0 ) - get_dirty_inode_size (frame, this); + if (ret && val != -2) { + mq_release_lock_on_dirty_inode (frame, NULL, this, 0, 0, NULL); + } +end: + mq_local_unref (this, local); return 0; } int32_t -quota_dirty_inode_readdir (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - fd_t *fd) +mq_dirty_inode_readdir (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + fd_t *fd, dict_t *xdata) { quota_local_t *local = NULL; @@ -542,7 +696,7 @@ quota_dirty_inode_readdir (call_frame_t *frame, if (op_ret == -1) { local->err = -1; - release_lock_on_dirty_inode (frame, NULL, this, 0, 0); + mq_release_lock_on_dirty_inode (frame, NULL, this, 0, 0, NULL); return 0; } @@ -550,30 +704,29 @@ quota_dirty_inode_readdir (call_frame_t *frame, local->fd = fd_ref (fd); STACK_WIND (frame, - quota_readdir_cbk, + mq_readdir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdir, - local->fd, READDIR_BUF, local->d_off); + local->fd, READDIR_BUF, local->d_off, xdata); return 0; } int32_t -check_if_still_dirty (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - inode_t *inode, - struct iatt *buf, - dict_t *dict, - struct iatt *postparent) +mq_check_if_still_dirty (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + inode_t *inode, + struct iatt *buf, + dict_t *dict, + struct iatt *postparent) { int8_t dirty = -1; int32_t ret = -1; fd_t *fd = NULL; quota_local_t *local = NULL; - marker_conf_t *priv = NULL; local = frame->local; @@ -583,11 +736,9 @@ check_if_still_dirty (call_frame_t *frame, goto err; } - priv = this->private; - if (!dict) { - ret = -1; - goto err; + ret = -1; + goto err; } ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty); @@ -596,7 +747,7 @@ check_if_still_dirty (call_frame_t *frame, //the inode is not dirty anymore if (dirty == 0) { - release_lock_on_dirty_inode (frame, NULL, this, 0, 0); + mq_release_lock_on_dirty_inode (frame, NULL, this, 0, 0, NULL); return 0; } @@ -605,39 +756,44 @@ check_if_still_dirty (call_frame_t *frame, local->d_off = 0; + if (uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, buf->ia_gfid); + + GF_UUID_ASSERT (local->loc.gfid); STACK_WIND(frame, - quota_dirty_inode_readdir, + mq_dirty_inode_readdir, FIRST_CHILD(this), FIRST_CHILD(this)->fops->opendir, - &local->loc, fd); + &local->loc, fd, NULL); ret = 0; err: if (op_ret == -1 || ret == -1) { local->err = -1; - release_lock_on_dirty_inode (frame, NULL, this, 0, 0); + mq_release_lock_on_dirty_inode (frame, NULL, this, 0, 0, NULL); + } + + if (fd != NULL) { + fd_unref (fd); } return 0; } int32_t -get_dirty_xattr (call_frame_t *frame, void *cookie, - xlator_t *this, int32_t op_ret, int32_t op_errno) +mq_get_dirty_xattr (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) { - int32_t ret = -1; - dict_t *xattr_req = NULL; - quota_local_t *local = NULL; - marker_conf_t *priv = NULL; + int32_t ret = -1; + dict_t *xattr_req = NULL; + quota_local_t *local = NULL; if (op_ret == -1) { - dirty_inode_updation_done (frame, NULL, this, 0, 0); + mq_dirty_inode_updation_done (frame, NULL, this, 0, 0, NULL); return 0; } - priv = (marker_conf_t *) this->private; - local = frame->local; xattr_req = dict_new (); @@ -650,8 +806,13 @@ get_dirty_xattr (call_frame_t *frame, void *cookie, if (ret) goto err; + if (uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, local->loc.inode->gfid); + + GF_UUID_ASSERT (local->loc.gfid); + STACK_WIND (frame, - check_if_still_dirty, + mq_check_if_still_dirty, FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup, &local->loc, @@ -661,7 +822,7 @@ get_dirty_xattr (call_frame_t *frame, void *cookie, err: if (ret) { local->err = -1; - release_lock_on_dirty_inode(frame, NULL, this, 0, 0); + mq_release_lock_on_dirty_inode(frame, NULL, this, 0, 0, NULL); } if (xattr_req) @@ -670,17 +831,27 @@ err: return 0; } +/* return 1 when dirty updation started + * 0 other wise + */ int32_t -update_dirty_inode (xlator_t *this, - loc_t *loc, - quota_inode_ctx_t *ctx, - inode_contribution_t *contribution) +mq_update_dirty_inode (xlator_t *this, + loc_t *loc, + quota_inode_ctx_t *ctx, + inode_contribution_t *contribution) { int32_t ret = -1; quota_local_t *local = NULL; + gf_boolean_t status = _gf_false; struct gf_flock lock = {0, }; call_frame_t *frame = NULL; + ret = mq_get_ctx_updation_status (ctx, &status); + if (ret == -1 || status == _gf_true) { + ret = 0; + goto out; + } + frame = create_frame (this, this->ctx->pool); if (frame == NULL) { ret = -1; @@ -689,13 +860,12 @@ update_dirty_inode (xlator_t *this, mq_assign_lk_owner (this, frame); - local = quota_local_new (); + local = mq_local_new (); if (local == NULL) goto fr_destroy; frame->local = local; - - ret = loc_copy (&local->loc, loc); + ret = mq_loc_copy (&local->loc, loc); if (ret < 0) goto fr_destroy; @@ -708,15 +878,22 @@ update_dirty_inode (xlator_t *this, lock.l_start = 0; lock.l_len = 0; + if (local->loc.inode == NULL) { + ret = -1; + gf_log (this->name, GF_LOG_WARNING, + "Inode is NULL, so can't stackwind."); + goto fr_destroy; + } + STACK_WIND (frame, - get_dirty_xattr, + mq_get_dirty_xattr, FIRST_CHILD(this), FIRST_CHILD(this)->fops->inodelk, - this->name, &local->loc, F_SETLKW, &lock); - return 0; + this->name, &local->loc, F_SETLKW, &lock, NULL); + return 1; fr_destroy: - QUOTA_STACK_DESTROY (frame, this); + QUOTA_STACK_DESTROY (frame, this); out: return 0; @@ -724,8 +901,8 @@ out: int32_t -quota_inode_creation_done (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) +mq_inode_creation_done (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) { quota_local_t *local = NULL; @@ -734,50 +911,88 @@ quota_inode_creation_done (call_frame_t *frame, void *cookie, xlator_t *this, local = frame->local; + if (local != NULL) { + mq_initiate_quota_txn (this, &local->loc); + } + QUOTA_STACK_DESTROY (frame, this); return 0; } + +int32_t +mq_xattr_creation_release_lock (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno, dict_t *xdata) +{ + struct gf_flock lock = {0, }; + quota_local_t *local = NULL; + + local = frame->local; + + lock.l_type = F_UNLCK; + lock.l_whence = SEEK_SET; + lock.l_start = 0; + lock.l_len = 0; + lock.l_pid = 0; + + STACK_WIND (frame, + mq_inode_creation_done, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, &local->loc, + F_SETLKW, &lock, NULL); + + return 0; +} + + int32_t -create_dirty_xattr (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, dict_t *dict) +mq_create_dirty_xattr (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *dict, + dict_t *xdata) { int32_t ret = -1; dict_t *newdict = NULL; quota_local_t *local = NULL; - marker_conf_t *priv = NULL; - if (op_ret == -1 && op_errno == ENOTCONN) + if (op_ret < 0) { goto err; + } local = frame->local; - priv = (marker_conf_t *) this->private; - if (local->loc.inode->ia_type == IA_IFDIR) { newdict = dict_new (); - if (!newdict) + if (!newdict) { goto err; + } ret = dict_set_int8 (newdict, QUOTA_DIRTY_KEY, 0); - if (ret == -1) + if (ret == -1) { goto err; + } + + uuid_copy (local->loc.gfid, local->loc.inode->gfid); + GF_UUID_ASSERT (local->loc.gfid); - STACK_WIND (frame, quota_inode_creation_done, + STACK_WIND (frame, mq_xattr_creation_release_lock, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, - &local->loc, newdict, 0); - } else - quota_inode_creation_done (frame, NULL, this, 0, 0); + &local->loc, newdict, 0, NULL); + } else { + mq_xattr_creation_release_lock (frame, NULL, this, 0, 0, NULL); + } ret = 0; err: - if (ret == -1) - quota_inode_creation_done (frame, NULL, this, -1, 0); + if (ret < 0) { + mq_xattr_creation_release_lock (frame, NULL, this, 0, 0, NULL); + } - if (newdict) + if (newdict != NULL) dict_unref (newdict); return 0; @@ -785,30 +1000,28 @@ err: int32_t -quota_set_inode_xattr (xlator_t *this, loc_t *loc) +mq_create_xattr (xlator_t *this, call_frame_t *frame) { int32_t ret = 0; int64_t *value = NULL; int64_t *size = NULL; dict_t *dict = NULL; char key[512] = {0, }; - call_frame_t *frame = NULL; quota_local_t *local = NULL; - marker_conf_t *priv = NULL; quota_inode_ctx_t *ctx = NULL; inode_contribution_t *contri = NULL; - if (loc == NULL || this == NULL) + if (frame == NULL || this == NULL) return 0; - priv = (marker_conf_t *) this->private; + local = frame->local; - ret = quota_inode_ctx_get (loc->inode, this, &ctx); + ret = mq_inode_ctx_get (local->loc.inode, this, &ctx); if (ret < 0) { - ctx = quota_inode_ctx_new (loc->inode, this); + ctx = mq_inode_ctx_new (local->loc.inode, this); if (ctx == NULL) { gf_log (this->name, GF_LOG_WARNING, - "quota_inode_ctx_new failed"); + "mq_inode_ctx_new failed"); ret = -1; goto out; } @@ -818,106 +1031,267 @@ quota_set_inode_xattr (xlator_t *this, loc_t *loc) if (!dict) goto out; - if (loc->inode->ia_type == IA_IFDIR) { + if (local->loc.inode->ia_type == IA_IFDIR) { QUOTA_ALLOC_OR_GOTO (size, int64_t, ret, err); ret = dict_set_bin (dict, QUOTA_SIZE_KEY, size, 8); if (ret < 0) goto free_size; } - //if '/' then dont set contribution xattr - if (strcmp (loc->path, "/") == 0) - goto wind; + if (strcmp (local->loc.path, "/") != 0) { + contri = mq_add_new_contribution_node (this, ctx, &local->loc); + if (contri == NULL) + goto err; - contri = add_new_contribution_node (this, ctx, loc); - if (contri == NULL) - goto err; + QUOTA_ALLOC_OR_GOTO (value, int64_t, ret, err); + GET_CONTRI_KEY (key, local->loc.parent->gfid, ret); + + ret = dict_set_bin (dict, key, value, 8); + if (ret < 0) + goto free_value; + } + + GF_UUID_ASSERT (local->loc.gfid); + + STACK_WIND (frame, mq_create_dirty_xattr, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->xattrop, &local->loc, + GF_XATTROP_ADD_ARRAY64, dict, NULL); + ret = 0; + +free_size: + if (ret < 0) { + GF_FREE (size); + } + +free_value: + if (ret < 0) { + GF_FREE (value); + } + +err: + dict_unref (dict); + +out: + if (ret < 0) { + mq_xattr_creation_release_lock (frame, NULL, this, 0, 0, NULL); + } - QUOTA_ALLOC_OR_GOTO (value, int64_t, ret, err); - GET_CONTRI_KEY (key, loc->parent->gfid, ret); + return 0; +} + + +int32_t +mq_check_n_set_inode_xattr (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, int32_t op_errno, + inode_t *inode, struct iatt *buf, dict_t *dict, + struct iatt *postparent) +{ + quota_local_t *local = NULL; + int64_t *size = NULL, *contri = NULL; + int8_t dirty = 0; + int32_t ret = 0; + char contri_key[512] = {0, }; + + if (op_ret < 0) { + goto out; + } - ret = dict_set_bin (dict, key, value, 8); + local = frame->local; + + ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size); if (ret < 0) - goto free_value; + goto create_xattr; + + ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty); + if (ret < 0) + goto create_xattr; + + //check contribution xattr if not root + if (strcmp (local->loc.path, "/") != 0) { + GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret); + if (ret < 0) + goto out; + + ret = dict_get_bin (dict, contri_key, (void **) &contri); + if (ret < 0) + goto create_xattr; + } + +out: + mq_xattr_creation_release_lock (frame, NULL, this, 0, 0, NULL); + return 0; + +create_xattr: + if (uuid_is_null (local->loc.gfid)) { + uuid_copy (local->loc.gfid, buf->ia_gfid); + } + + mq_create_xattr (this, frame); + return 0; +} + + +int32_t +mq_get_xattr (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + dict_t *xattr_req = NULL; + quota_local_t *local = NULL; + int32_t ret = 0; + + if (op_ret < 0) { + goto lock_err; + } + + local = frame->local; + + xattr_req = dict_new (); + if (xattr_req == NULL) { + goto err; + } + + ret = mq_req_xattr (this, &local->loc, xattr_req); + if (ret < 0) { + gf_log (this->name, GF_LOG_WARNING, "cannot request xattr"); + goto err; + } + + if (uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, local->loc.inode->gfid); + + GF_UUID_ASSERT (local->loc.gfid); + + STACK_WIND (frame, mq_check_n_set_inode_xattr, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->lookup, &local->loc, xattr_req); + + dict_unref (xattr_req); + + return 0; + +err: + mq_xattr_creation_release_lock (frame, NULL, this, 0, 0, NULL); + + if (xattr_req) + dict_unref (xattr_req); + return 0; + +lock_err: + mq_inode_creation_done (frame, NULL, this, 0, 0, NULL); + return 0; +} + + +int32_t +mq_set_inode_xattr (xlator_t *this, loc_t *loc) +{ + struct gf_flock lock = {0, }; + quota_local_t *local = NULL; + int32_t ret = 0; + call_frame_t *frame = NULL; -wind: frame = create_frame (this, this->ctx->pool); if (!frame) { ret = -1; goto err; } - local = quota_local_new (); - if (local == NULL) - goto free_size; - - local->ctx = ctx; + local = mq_local_new (); + if (local == NULL) { + goto err; + } - local->contri = contri; + frame->local = local; ret = loc_copy (&local->loc, loc); - if (ret < 0) - quota_local_unref (this, local); + if (ret < 0) { + goto err; + } frame->local = local; - STACK_WIND (frame, create_dirty_xattr, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->xattrop, &local->loc, - GF_XATTROP_ADD_ARRAY64, dict); - ret = 0; + lock.l_len = 0; + lock.l_start = 0; + lock.l_type = F_WRLCK; + lock.l_whence = SEEK_SET; -free_size: - if (ret < 0) - GF_FREE (size); + STACK_WIND (frame, + mq_get_xattr, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, &local->loc, F_SETLKW, &lock, NULL); -free_value: - if (ret < 0) - GF_FREE (value); + return 0; err: - dict_unref (dict); - -out: - if (ret < 0) - quota_inode_creation_done (NULL, NULL, this, -1, 0); + QUOTA_STACK_DESTROY (frame, this); return 0; } int32_t -get_parent_inode_local (xlator_t *this, quota_local_t *local) +mq_get_parent_inode_local (xlator_t *this, quota_local_t *local) { - int32_t ret; + int32_t ret = -1; quota_inode_ctx_t *ctx = NULL; + GF_VALIDATE_OR_GOTO ("marker", this, out); + GF_VALIDATE_OR_GOTO ("marker", local, out); + + local->contri = NULL; + loc_wipe (&local->loc); - loc_copy (&local->loc, &local->parent_loc); + ret = mq_loc_copy (&local->loc, &local->parent_loc); + if (ret < 0) { + gf_log_callingfn (this->name, GF_LOG_WARNING, + "loc copy failed"); + goto out; + } loc_wipe (&local->parent_loc); - quota_inode_loc_fill (NULL, local->loc.parent, &local->parent_loc); + ret = mq_inode_loc_fill (NULL, local->loc.parent, + &local->parent_loc); + if (ret < 0) { + gf_log_callingfn (this->name, GF_LOG_WARNING, + "failed to build parent loc of %s", + local->loc.path); + goto out; + } - ret = quota_inode_ctx_get (local->loc.inode, this, &ctx); - if (ret < 0) - return -1; + ret = mq_inode_ctx_get (local->loc.inode, this, &ctx); + if (ret < 0) { + gf_log_callingfn (this->name, GF_LOG_WARNING, + "inode ctx get failed"); + goto out; + } local->ctx = ctx; + if (list_empty (&ctx->contribution_head)) { + gf_log_callingfn (this->name, GF_LOG_WARNING, + "contribution node list is empty which " + "is an error"); + ret = -1; + goto out; + } + local->contri = (inode_contribution_t *) ctx->contribution_head.next; - return 0; + ret = 0; +out: + return ret; } int32_t -xattr_updation_done (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - dict_t *dict) +mq_xattr_updation_done (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + dict_t *dict, dict_t *xdata) { QUOTA_STACK_DESTROY (frame, this); return 0; @@ -925,19 +1299,22 @@ xattr_updation_done (call_frame_t *frame, int32_t -quota_inodelk_cbk (call_frame_t *frame, void *cookie, - xlator_t *this, int32_t op_ret, int32_t op_errno) +mq_inodelk_cbk (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { - int32_t ret = 0; - quota_local_t *local = NULL; + int32_t ret = 0; + gf_boolean_t status = _gf_false; + quota_local_t *local = NULL; local = frame->local; if (op_ret == -1 || local->err) { - gf_log (this->name, ((op_errno == ENOENT) ? GF_LOG_DEBUG : - GF_LOG_INFO), - "lock setting failed (%s)", strerror (op_errno)); - xattr_updation_done (frame, NULL, this, 0, 0, NULL); + if (op_ret == -1) { + gf_log (this->name, GF_LOG_DEBUG, + "unlocking failed on path (%s)(%s)", + local->parent_loc.path, strerror (op_errno)); + } + mq_xattr_updation_done (frame, NULL, this, 0, 0, NULL, NULL); return 0; } @@ -945,16 +1322,25 @@ quota_inodelk_cbk (call_frame_t *frame, void *cookie, gf_log (this->name, GF_LOG_DEBUG, "inodelk released on %s", local->parent_loc.path); - if (strcmp (local->parent_loc.path, "/") == 0) { - xattr_updation_done (frame, NULL, this, 0, 0, NULL); + if ((strcmp (local->parent_loc.path, "/") == 0) + || (local->delta == 0)) { + mq_xattr_updation_done (frame, NULL, this, 0, 0, NULL, NULL); } else { - ret = get_parent_inode_local (this, local); + ret = mq_get_parent_inode_local (this, local); if (ret < 0) { - xattr_updation_done (frame, NULL, this, 0, 0, NULL); + mq_xattr_updation_done (frame, NULL, this, 0, 0, NULL, + NULL); goto out; } - - get_lock_on_parent (frame, this); + status = _gf_true; + + ret = mq_test_and_set_ctx_updation_status (local->ctx, &status); + if (ret == 0 && status == _gf_false) { + mq_get_lock_on_parent (frame, this); + } else { + mq_xattr_updation_done (frame, NULL, this, 0, 0, NULL, + NULL); + } } out: return 0; @@ -963,18 +1349,27 @@ out: //now release lock on the parent inode int32_t -quota_release_parent_lock (call_frame_t *frame, void *cookie, - xlator_t *this, int32_t op_ret, - int32_t op_errno) +mq_release_parent_lock (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno, dict_t *xdata) { int32_t ret = 0; - struct gf_flock lock; quota_local_t *local = NULL; quota_inode_ctx_t *ctx = NULL; + struct gf_flock lock = {0, }; local = frame->local; - ret = quota_inode_ctx_get (local->parent_loc.inode, this, &ctx); + if (local->err != 0) { + gf_log_callingfn (this->name, + (local->err == ENOENT) ? GF_LOG_DEBUG + : GF_LOG_WARNING, + "An operation during quota updation " + "of path (%s) failed (%s)", local->loc.path, + strerror (local->err)); + } + + ret = mq_inode_ctx_get (local->parent_loc.inode, this, &ctx); if (ret < 0) goto wind; @@ -984,6 +1379,12 @@ quota_release_parent_lock (call_frame_t *frame, void *cookie, } UNLOCK (&ctx->lock); + if (local->parent_loc.inode == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "Invalid parent inode."); + goto err; + } + wind: lock.l_type = F_UNLCK; lock.l_whence = SEEK_SET; @@ -992,30 +1393,33 @@ wind: lock.l_pid = 0; STACK_WIND (frame, - quota_inodelk_cbk, + mq_inodelk_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->inodelk, this->name, &local->parent_loc, - F_SETLKW, &lock); + F_SETLKW, &lock, NULL); return 0; +err: + mq_xattr_updation_done (frame, NULL, this, + 0, 0 , NULL, NULL); + return 0; } int32_t -quota_mark_undirty (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - dict_t *dict) +mq_mark_undirty (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + dict_t *dict, dict_t *xdata) { int32_t ret = -1; int64_t *size = NULL; dict_t *newdict = NULL; quota_local_t *local = NULL; quota_inode_ctx_t *ctx = NULL; - marker_conf_t *priv = NULL; local = frame->local; @@ -1027,17 +1431,19 @@ quota_mark_undirty (call_frame_t *frame, goto err; } - priv = this->private; - - //update the size of the parent inode + //update the size of the parent inode if (dict != NULL) { - ret = quota_inode_ctx_get (local->parent_loc.inode, this, &ctx); - if (ret < 0) + ret = mq_inode_ctx_get (local->parent_loc.inode, this, &ctx); + if (ret < 0) { + op_errno = EINVAL; goto err; + } ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size); - if (ret < 0) + if (ret < 0) { + op_errno = EINVAL; goto err; + } LOCK (&ctx->lock); { @@ -1050,26 +1456,32 @@ quota_mark_undirty (call_frame_t *frame, } newdict = dict_new (); - - if (!newdict) + if (!newdict) { + op_errno = ENOMEM; goto err; + } ret = dict_set_int8 (newdict, QUOTA_DIRTY_KEY, 0); - if (ret == -1) + if (ret == -1) { + op_errno = -ret; goto err; + } + + uuid_copy (local->parent_loc.gfid, local->parent_loc.inode->gfid); + GF_UUID_ASSERT (local->parent_loc.gfid); - STACK_WIND (frame, quota_release_parent_lock, + STACK_WIND (frame, mq_release_parent_lock, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, - &local->parent_loc, newdict, 0); + &local->parent_loc, newdict, 0, NULL); ret = 0; err: if (op_ret == -1 || ret == -1) { - local->err = 1; + local->err = op_errno; - quota_release_parent_lock (frame, NULL, this, 0, 0); + mq_release_parent_lock (frame, NULL, this, 0, 0, NULL); } if (newdict) @@ -1080,17 +1492,16 @@ err: int32_t -quota_update_parent_size (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - dict_t *dict) +mq_update_parent_size (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + dict_t *dict, dict_t *xdata) { int64_t *size = NULL; int32_t ret = -1; dict_t *newdict = NULL; - marker_conf_t *priv = NULL; quota_local_t *local = NULL; quota_inode_ctx_t *ctx = NULL; @@ -1098,29 +1509,36 @@ quota_update_parent_size (call_frame_t *frame, if (op_ret == -1) { gf_log (this->name, ((op_errno == ENOENT) ? GF_LOG_DEBUG : - GF_LOG_ERROR), + GF_LOG_WARNING), "xattrop call failed: %s", strerror (op_errno)); goto err; } - local->contri->contribution += local->delta; + LOCK (&local->contri->lock); + { + local->contri->contribution += local->delta; + } + UNLOCK (&local->contri->lock); gf_log (this->name, GF_LOG_DEBUG, "%s %"PRId64 "%"PRId64, local->loc.path, local->ctx->size, local->contri->contribution); - priv = this->private; - - if (dict == NULL) + if (dict == NULL) { + op_errno = EINVAL; goto err; + } - ret = quota_inode_ctx_get (local->parent_loc.inode, this, &ctx); - if (ret < 0) + ret = mq_inode_ctx_get (local->parent_loc.inode, this, &ctx); + if (ret < 0) { + op_errno = EINVAL; goto err; + } newdict = dict_new (); if (!newdict) { + op_errno = ENOMEM; ret = -1; goto err; } @@ -1130,21 +1548,28 @@ quota_update_parent_size (call_frame_t *frame, *size = hton64 (local->delta); ret = dict_set_bin (newdict, QUOTA_SIZE_KEY, size, 8); - if (ret < 0) + if (ret < 0) { + op_errno = -ret; goto err; + } + + if (uuid_is_null (local->parent_loc.gfid)) + uuid_copy (local->parent_loc.gfid, + local->parent_loc.inode->gfid); + GF_UUID_ASSERT (local->parent_loc.gfid); STACK_WIND (frame, - quota_mark_undirty, + mq_mark_undirty, FIRST_CHILD(this), FIRST_CHILD(this)->fops->xattrop, &local->parent_loc, GF_XATTROP_ADD_ARRAY64, - newdict); + newdict, NULL); ret = 0; err: if (op_ret == -1 || ret < 0) { - local->err = 1; - quota_release_parent_lock (frame, NULL, this, 0, 0); + local->err = op_errno; + mq_release_parent_lock (frame, NULL, this, 0, 0, NULL); } if (newdict) @@ -1154,103 +1579,125 @@ err: } int32_t -quota_update_inode_contribution (call_frame_t *frame, void *cookie, - xlator_t *this, int32_t op_ret, - int32_t op_errno, inode_t *inode, - struct iatt *buf, dict_t *dict, - struct iatt *postparent) +mq_update_inode_contribution (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno, inode_t *inode, + struct iatt *buf, dict_t *dict, + struct iatt *postparent) { - int32_t ret = -1; - int64_t *size = NULL; - int64_t *contri = NULL; - int64_t *delta = NULL; + int32_t ret = -1; + int64_t *size = NULL, size_int = 0, contri_int = 0; + int64_t *contri = NULL; + int64_t *delta = NULL; char contri_key [512] = {0, }; - dict_t *newdict = NULL; - quota_local_t *local = NULL; - quota_inode_ctx_t *ctx = NULL; - marker_conf_t *priv = NULL; - inode_contribution_t *contribution = NULL; + dict_t *newdict = NULL; + quota_local_t *local = NULL; + quota_inode_ctx_t *ctx = NULL; + inode_contribution_t *contribution = NULL; local = frame->local; if (op_ret == -1) { gf_log (this->name, ((op_errno == ENOENT) ? GF_LOG_DEBUG : GF_LOG_WARNING), - "failed to get size and contribution with %s error", - strerror (op_errno)); + "failed to get size and contribution of path (%s)(%s)", + local->loc.path, strerror (op_errno)); goto err; } - priv = this->private; - ctx = local->ctx; contribution = local->contri; //prepare to update size & contribution of the inode GET_CONTRI_KEY (contri_key, contribution->gfid, ret); - if (ret == -1) + if (ret == -1) { + op_errno = ENOMEM; goto err; + } LOCK (&ctx->lock); { if (local->loc.inode->ia_type == IA_IFDIR ) { ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size); - if (ret < 0) + if (ret < 0) { + op_errno = EINVAL; goto unlock; + } ctx->size = ntoh64 (*size); } else ctx->size = buf->ia_blocks * 512; - ret = dict_get_bin (dict, contri_key, (void **) &contri); + size_int = ctx->size; + } +unlock: + UNLOCK (&ctx->lock); + + if (ret < 0) { + goto err; + } + + ret = dict_get_bin (dict, contri_key, (void **) &contri); + + LOCK (&contribution->lock); + { if (ret < 0) contribution->contribution = 0; else contribution->contribution = ntoh64 (*contri); - ret = 0; + contri_int = contribution->contribution; } -unlock: - UNLOCK (&ctx->lock); - - if (ret < 0) - goto err; + UNLOCK (&contribution->lock); gf_log (this->name, GF_LOG_DEBUG, "%s %"PRId64 "%"PRId64, - local->loc.path, ctx->size, contribution->contribution); + local->loc.path, size_int, contri_int); + + local->delta = size_int - contri_int; + + if (local->delta == 0) { + mq_mark_undirty (frame, NULL, this, 0, 0, NULL, NULL); + return 0; + } + newdict = dict_new (); if (newdict == NULL) { + op_errno = ENOMEM; ret = -1; goto err; } - local->delta = ctx->size - contribution->contribution; - QUOTA_ALLOC_OR_GOTO (delta, int64_t, ret, err); *delta = hton64 (local->delta); ret = dict_set_bin (newdict, contri_key, delta, 8); if (ret < 0) { + op_errno = -ret; ret = -1; goto err; } + if (uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, buf->ia_gfid); + + GF_UUID_ASSERT (local->loc.gfid); + STACK_WIND (frame, - quota_update_parent_size, + mq_update_parent_size, FIRST_CHILD(this), FIRST_CHILD(this)->fops->xattrop, &local->loc, GF_XATTROP_ADD_ARRAY64, - newdict); + newdict, NULL); ret = 0; err: if (op_ret == -1 || ret < 0) { - local->err = 1; + local->err = op_errno; - quota_release_parent_lock (frame, NULL, this, 0, 0); + mq_release_parent_lock (frame, NULL, this, 0, 0, NULL); } if (newdict) @@ -1260,22 +1707,23 @@ err: } int32_t -quota_fetch_child_size_and_contri (call_frame_t *frame, void *cookie, - xlator_t *this, int32_t op_ret, - int32_t op_errno) +mq_fetch_child_size_and_contri (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno, dict_t *xdata) { - int32_t ret = -1; + int32_t ret = -1; char contri_key [512] = {0, }; - dict_t *newdict = NULL; - quota_local_t *local = NULL; - marker_conf_t *priv = NULL; - quota_inode_ctx_t *ctx = NULL; + dict_t *newdict = NULL; + quota_local_t *local = NULL; + quota_inode_ctx_t *ctx = NULL; local = frame->local; if (op_ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "%s couldnt mark dirty", local->parent_loc.path); + gf_log (this->name, (op_errno == ENOENT) ? GF_LOG_DEBUG + : GF_LOG_WARNING, + "couldnt mark inode corresponding to path (%s) dirty " + "(%s)", local->parent_loc.path, strerror (op_errno)); goto err; } @@ -1284,12 +1732,12 @@ quota_fetch_child_size_and_contri (call_frame_t *frame, void *cookie, gf_log (this->name, GF_LOG_DEBUG, "%s marked dirty", local->parent_loc.path); - priv = this->private; - //update parent ctx - ret = quota_inode_ctx_get (local->parent_loc.inode, this, &ctx); - if (ret == -1) + ret = mq_inode_ctx_get (local->parent_loc.inode, this, &ctx); + if (ret == -1) { + op_errno = EINVAL; goto err; + } LOCK (&ctx->lock); { @@ -1298,29 +1746,52 @@ quota_fetch_child_size_and_contri (call_frame_t *frame, void *cookie, UNLOCK (&ctx->lock); newdict = dict_new (); - if (newdict == NULL) + if (newdict == NULL) { + op_errno = ENOMEM; goto err; + } if (local->loc.inode->ia_type == IA_IFDIR) { ret = dict_set_int64 (newdict, QUOTA_SIZE_KEY, 0); + if (ret < 0) { + gf_log (this->name, GF_LOG_WARNING, + "dict_set failed."); + goto err; + } } GET_CONTRI_KEY (contri_key, local->contri->gfid, ret); - if (ret < 0) + if (ret < 0) { + op_errno = ENOMEM; goto err; + } ret = dict_set_int64 (newdict, contri_key, 0); + if (ret < 0) { + gf_log (this->name, GF_LOG_WARNING, + "dict_set failed."); + goto err; + } - STACK_WIND (frame, quota_update_inode_contribution, FIRST_CHILD(this), + mq_set_ctx_updation_status (local->ctx, _gf_false); + + if (uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, local->loc.inode->gfid); + + GF_UUID_ASSERT (local->loc.gfid); + + STACK_WIND (frame, mq_update_inode_contribution, FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup, &local->loc, newdict); ret = 0; err: - if (op_ret == -1 || ret == -1) { - local->err = 1; + if ((op_ret == -1) || (ret < 0)) { + local->err = op_errno; + + mq_set_ctx_updation_status (local->ctx, _gf_false); - quota_release_parent_lock (frame, NULL, this, 0, 0); + mq_release_parent_lock (frame, NULL, this, 0, 0, NULL); } if (newdict) @@ -1330,24 +1801,25 @@ err: } int32_t -quota_markdirty (call_frame_t *frame, void *cookie, - xlator_t *this, int32_t op_ret, int32_t op_errno) +mq_markdirty (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { int32_t ret = -1; dict_t *dict = NULL; quota_local_t *local = NULL; - marker_conf_t *priv = NULL; local = frame->local; if (op_ret == -1){ - gf_log (this->name, GF_LOG_ERROR, - "lock setting failed on %s (%s)", + gf_log (this->name, (op_errno == ENOENT) ? GF_LOG_DEBUG + : GF_LOG_WARNING, "acquiring locks failed on %s (%s)", local->parent_loc.path, strerror (op_errno)); - local->err = 1; + local->err = op_errno; - quota_inodelk_cbk (frame, NULL, this, 0, 0); + mq_set_ctx_updation_status (local->ctx, _gf_false); + + mq_inodelk_cbk (frame, NULL, this, 0, 0, NULL); return 0; } @@ -1355,8 +1827,6 @@ quota_markdirty (call_frame_t *frame, void *cookie, gf_log (this->name, GF_LOG_TRACE, "inodelk succeeded on %s", local->parent_loc.path); - priv = this->private; - dict = dict_new (); if (!dict) { ret = -1; @@ -1367,17 +1837,23 @@ quota_markdirty (call_frame_t *frame, void *cookie, if (ret == -1) goto err; - STACK_WIND (frame, quota_fetch_child_size_and_contri, + uuid_copy (local->parent_loc.gfid, + local->parent_loc.inode->gfid); + GF_UUID_ASSERT (local->parent_loc.gfid); + + STACK_WIND (frame, mq_fetch_child_size_and_contri, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, - &local->parent_loc, dict, 0); + &local->parent_loc, dict, 0, NULL); ret = 0; err: if (ret == -1) { local->err = 1; - quota_release_parent_lock (frame, NULL, this, 0, 0); + mq_set_ctx_updation_status (local->ctx, _gf_false); + + mq_release_parent_lock (frame, NULL, this, 0, 0, NULL); } if (dict) @@ -1388,9 +1864,9 @@ err: int32_t -get_lock_on_parent (call_frame_t *frame, xlator_t *this) +mq_get_lock_on_parent (call_frame_t *frame, xlator_t *this) { - struct gf_flock lock; + struct gf_flock lock = {0, }; quota_local_t *local = NULL; GF_VALIDATE_OR_GOTO ("marker", frame, fr_destroy); @@ -1399,30 +1875,37 @@ get_lock_on_parent (call_frame_t *frame, xlator_t *this) gf_log (this->name, GF_LOG_DEBUG, "taking lock on %s", local->parent_loc.path); + if (local->parent_loc.inode == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "parent inode is not valid, aborting " + "transaction."); + goto fr_destroy; + } + lock.l_len = 0; lock.l_start = 0; lock.l_type = F_WRLCK; lock.l_whence = SEEK_SET; STACK_WIND (frame, - quota_markdirty, + mq_markdirty, FIRST_CHILD(this), FIRST_CHILD(this)->fops->inodelk, - this->name, &local->parent_loc, F_SETLKW, &lock); + this->name, &local->parent_loc, F_SETLKW, &lock, NULL); return 0; fr_destroy: QUOTA_STACK_DESTROY (frame, this); - return 0; + return -1; } int -start_quota_txn (xlator_t *this, loc_t *loc, - quota_inode_ctx_t *ctx, - inode_contribution_t *contri) +mq_start_quota_txn (xlator_t *this, loc_t *loc, + quota_inode_ctx_t *ctx, + inode_contribution_t *contri) { int32_t ret = -1; call_frame_t *frame = NULL; @@ -1434,46 +1917,52 @@ start_quota_txn (xlator_t *this, loc_t *loc, mq_assign_lk_owner (this, frame); - local = quota_local_new (); + local = mq_local_new (); if (local == NULL) goto fr_destroy; frame->local = local; - ret = loc_copy (&local->loc, loc); + ret = mq_loc_copy (&local->loc, loc); if (ret < 0) goto fr_destroy; - ret = quota_inode_loc_fill (NULL, local->loc.parent, - &local->parent_loc); + ret = mq_inode_loc_fill (NULL, local->loc.parent, + &local->parent_loc); if (ret < 0) goto fr_destroy; local->ctx = ctx; local->contri = contri; - get_lock_on_parent (frame, this); + ret = mq_get_lock_on_parent (frame, this); + if (ret == -1) + goto err; return 0; fr_destroy: QUOTA_STACK_DESTROY (frame, this); - err: + mq_set_ctx_updation_status (ctx, _gf_false); + return -1; } int -initiate_quota_txn (xlator_t *this, loc_t *loc) +mq_initiate_quota_txn (xlator_t *this, loc_t *loc) { - int32_t ret = -1; - quota_inode_ctx_t *ctx = NULL; + int32_t ret = -1; + gf_boolean_t status = _gf_false; + quota_inode_ctx_t *ctx = NULL; inode_contribution_t *contribution = NULL; - VALIDATE_OR_GOTO (loc, out); + GF_VALIDATE_OR_GOTO ("marker", this, out); + GF_VALIDATE_OR_GOTO ("marker", loc, out); + GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); - ret = quota_inode_ctx_get (loc->inode, this, &ctx); + ret = mq_inode_ctx_get (loc->inode, this, &ctx); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, "inode ctx get failed, aborting quota txn"); @@ -1481,55 +1970,75 @@ initiate_quota_txn (xlator_t *this, loc_t *loc) goto out; } - contribution = get_contribution_node (loc->parent, ctx); + contribution = mq_get_contribution_node (loc->parent, ctx); if (contribution == NULL) goto out; - start_quota_txn (this, loc, ctx, contribution); + /* To improve performance, donot start another transaction + * if one is already in progress for same inode + */ + status = _gf_true; + + ret = mq_test_and_set_ctx_updation_status (ctx, &status); + if (ret < 0) + goto out; + + if (status == _gf_false) { + mq_start_quota_txn (this, loc, ctx, contribution); + } + + ret = 0; out: - return 0; + return ret; } -int32_t -validate_inode_size_contribution (xlator_t *this, - loc_t *loc, - quota_inode_ctx_t *ctx, - inode_contribution_t *contribution) -{ - if (ctx->size != contribution->contribution) - initiate_quota_txn (this, loc); +/* int32_t */ +/* validate_inode_size_contribution (xlator_t *this, loc_t *loc, int64_t size, */ +/* int64_t contribution) */ +/* { */ +/* if (size != contribution) { */ +/* mq_initiate_quota_txn (this, loc); */ +/* } */ - return 0; -} +/* return 0; */ +/* } */ int32_t -inspect_directory_xattr (xlator_t *this, - loc_t *loc, - dict_t *dict, - struct iatt buf) +mq_inspect_directory_xattr (xlator_t *this, + loc_t *loc, + dict_t *dict, + struct iatt buf) { - int32_t ret = 0; - int8_t dirty = -1; - int64_t *size = NULL; - int64_t *contri = NULL; - char contri_key [512] = {0, }; - marker_conf_t *priv = NULL; - gf_boolean_t not_root = _gf_false; - quota_inode_ctx_t *ctx = NULL; - inode_contribution_t *contribution = NULL; - - priv = this->private; - - ret = quota_inode_ctx_get (loc->inode, this, &ctx); + int32_t ret = 0; + int8_t dirty = -1; + int64_t *size = NULL, size_int = 0; + int64_t *contri = NULL, contri_int = 0; + char contri_key [512] = {0, }; + gf_boolean_t not_root = _gf_false; + quota_inode_ctx_t *ctx = NULL; + inode_contribution_t *contribution = NULL; + + ret = mq_inode_ctx_get (loc->inode, this, &ctx); if (ret < 0) { - ctx = quota_inode_ctx_new (loc->inode, this); + ctx = mq_inode_ctx_new (loc->inode, this); if (ctx == NULL) { gf_log (this->name, GF_LOG_WARNING, - "quota_inode_ctx_new failed"); + "mq_inode_ctx_new failed"); ret = -1; - goto out; + goto err; + } + } + + if (strcmp (loc->path, "/") != 0) { + contribution = mq_add_new_contribution_node (this, ctx, loc); + if (contribution == NULL) { + if (!uuid_is_null (loc->inode->gfid)) + gf_log (this->name, GF_LOG_WARNING, + "cannot add a new contribution node"); + ret = -1; + goto err; } } @@ -1544,13 +2053,6 @@ inspect_directory_xattr (xlator_t *this, if (strcmp (loc->path, "/") != 0) { not_root = _gf_true; - contribution = add_new_contribution_node (this, ctx, loc); - if (contribution == NULL) { - gf_log (this->name, GF_LOG_DEBUG, - "cannot add a new contributio node"); - goto out; - } - GET_CONTRI_KEY (contri_key, contribution->gfid, ret); if (ret < 0) goto out; @@ -1559,69 +2061,79 @@ inspect_directory_xattr (xlator_t *this, if (ret < 0) goto out; - contribution->contribution = ntoh64 (*contri); + LOCK (&contribution->lock); + { + contribution->contribution = ntoh64 (*contri); + contri_int = contribution->contribution; + } + UNLOCK (&contribution->lock); } - ctx->size = ntoh64 (*size); + LOCK (&ctx->lock); + { + ctx->size = ntoh64 (*size); + ctx->dirty = dirty; + size_int = ctx->size; + } + UNLOCK (&ctx->lock); gf_log (this->name, GF_LOG_DEBUG, "size=%"PRId64 - " contri=%"PRId64, ctx->size, - contribution?contribution->contribution:0); + " contri=%"PRId64, size_int, contri_int); - ctx->dirty = dirty; - if (ctx->dirty == 1) { - update_dirty_inode (this, loc, ctx, contribution); - } else if (not_root == _gf_true && - ctx->size != contribution->contribution) { - initiate_quota_txn (this, loc); + if (dirty) { + ret = mq_update_dirty_inode (this, loc, ctx, contribution); + } + + if ((!dirty || ret == 0) && (not_root == _gf_true) && + (size_int != contri_int)) { + mq_initiate_quota_txn (this, loc); } ret = 0; out: if (ret) - quota_set_inode_xattr (this, loc); - - return 0; + mq_set_inode_xattr (this, loc); +err: + return ret; } int32_t -inspect_file_xattr (xlator_t *this, - loc_t *loc, - dict_t *dict, - struct iatt buf) +mq_inspect_file_xattr (xlator_t *this, + loc_t *loc, + dict_t *dict, + struct iatt buf) { - int32_t ret = -1; - uint64_t contri_int = 0; - int64_t *contri_ptr = NULL; + int32_t ret = -1; + uint64_t contri_int = 0, size = 0; + int64_t *contri_ptr = NULL; char contri_key [512] = {0, }; - marker_conf_t *priv = NULL; - quota_inode_ctx_t *ctx = NULL; - inode_contribution_t *contribution = NULL; + quota_inode_ctx_t *ctx = NULL; + inode_contribution_t *contribution = NULL; - priv = this->private; - - ret = quota_inode_ctx_get (loc->inode, this, &ctx); + ret = mq_inode_ctx_get (loc->inode, this, &ctx); if (ret < 0) { - ctx = quota_inode_ctx_new (loc->inode, this); + ctx = mq_inode_ctx_new (loc->inode, this); if (ctx == NULL) { gf_log (this->name, GF_LOG_WARNING, - "quota_inode_ctx_new failed"); + "mq_inode_ctx_new failed"); ret = -1; goto out; } } - contribution = add_new_contribution_node (this, ctx, loc); + contribution = mq_add_new_contribution_node (this, ctx, loc); if (contribution == NULL) goto out; LOCK (&ctx->lock); { ctx->size = 512 * buf.ia_blocks; + size = ctx->size; } UNLOCK (&ctx->lock); - list_for_each_entry (contribution, &ctx->contribution_head, contri_list) { + list_for_each_entry (contribution, &ctx->contribution_head, + contri_list) { GET_CONTRI_KEY (contri_key, contribution->gfid, ret); if (ret < 0) continue; @@ -1630,16 +2142,21 @@ inspect_file_xattr (xlator_t *this, if (ret == 0) { contri_ptr = (int64_t *)(unsigned long)contri_int; - contribution->contribution = ntoh64 (*contri_ptr); + LOCK (&contribution->lock); + { + contribution->contribution = ntoh64 (*contri_ptr); + contri_int = contribution->contribution; + } + UNLOCK (&contribution->lock); gf_log (this->name, GF_LOG_DEBUG, - "size=%"PRId64 " contri=%"PRId64, ctx->size, - contribution->contribution); + "size=%"PRId64 " contri=%"PRId64, size, contri_int); - ret = validate_inode_size_contribution - (this, loc, ctx, contribution); + if (size != contri_int) { + mq_initiate_quota_txn (this, loc); + } } else - initiate_quota_txn (this, loc); + mq_initiate_quota_txn (this, loc); } out: @@ -1647,40 +2164,38 @@ out: } int32_t -quota_xattr_state (xlator_t *this, - loc_t *loc, - dict_t *dict, - struct iatt buf) +mq_xattr_state (xlator_t *this, + loc_t *loc, + dict_t *dict, + struct iatt buf) { if (buf.ia_type == IA_IFREG || buf.ia_type == IA_IFLNK) { - k ++; - inspect_file_xattr (this, loc, dict, buf); + mq_inspect_file_xattr (this, loc, dict, buf); } else if (buf.ia_type == IA_IFDIR) - inspect_directory_xattr (this, loc, dict, buf); + mq_inspect_directory_xattr (this, loc, dict, buf); return 0; } int32_t -quota_req_xattr (xlator_t *this, - loc_t *loc, - dict_t *dict) +mq_req_xattr (xlator_t *this, + loc_t *loc, + dict_t *dict) { int32_t ret = -1; - marker_conf_t *priv = NULL; GF_VALIDATE_OR_GOTO ("marker", this, out); - GF_VALIDATE_OR_GOTO ("marker", loc, out); GF_VALIDATE_OR_GOTO ("marker", dict, out); - priv = this->private; + if (!loc) + goto set_size; //if not "/" then request contribution if (strcmp (loc->path, "/") == 0) goto set_size; - ret = dict_set_contribution (this, dict, loc); + ret = mq_dict_set_contribution (this, dict, loc); if (ret == -1) goto out; @@ -1705,8 +2220,8 @@ out: int32_t -quota_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) +mq_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) { QUOTA_STACK_DESTROY (frame, this); @@ -1714,58 +2229,84 @@ quota_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, } int32_t -quota_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) +_mq_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) { - int32_t ret = 0; - char contri_key [512] = {0, }; - quota_local_t *local = NULL; + int32_t ret = 0; + char contri_key [512] = {0, }; + quota_local_t *local = NULL; local = (quota_local_t *) frame->local; if (op_ret == -1 || local->err == -1) { - quota_removexattr_cbk (frame, NULL, this, -1, 0); + mq_removexattr_cbk (frame, NULL, this, -1, 0, NULL); return 0; } + frame->local = NULL; + if (local->hl_count > 1) { GET_CONTRI_KEY (contri_key, local->contri->gfid, ret); - STACK_WIND (frame, quota_removexattr_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->removexattr, - &local->loc, contri_key); + STACK_WIND (frame, mq_removexattr_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->removexattr, + &local->loc, contri_key, NULL); ret = 0; + } else { + mq_removexattr_cbk (frame, NULL, this, 0, 0, NULL); } if (strcmp (local->parent_loc.path, "/") != 0) { - get_parent_inode_local (this, local); + ret = mq_get_parent_inode_local (this, local); + if (ret < 0) + goto out; - start_quota_txn (this, &local->loc, local->ctx, local->contri); + mq_start_quota_txn (this, &local->loc, local->ctx, local->contri); } - - quota_local_unref (this, local); +out: + mq_local_unref (this, local); return 0; } int32_t mq_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, dict_t *dict) + int32_t op_ret, int32_t op_errno, dict_t *dict, + dict_t *xdata) { - int32_t ret; - struct gf_flock lock; - quota_inode_ctx_t *ctx; - quota_local_t *local = NULL; + int32_t ret = -1; + struct gf_flock lock = {0, }; + quota_inode_ctx_t *ctx = NULL; + quota_local_t *local = NULL; + int64_t contribution = 0; local = frame->local; if (op_ret == -1) local->err = -1; - ret = quota_inode_ctx_get (local->parent_loc.inode, this, &ctx); - if (ret == 0) - ctx->size -= local->contri->contribution; + ret = mq_inode_ctx_get (local->parent_loc.inode, this, &ctx); + + LOCK (&local->contri->lock); + { + contribution = local->contri->contribution; + } + UNLOCK (&local->contri->lock); + + if (contribution == local->size) { + if (ret == 0) { + LOCK (&ctx->lock); + { + ctx->size -= contribution; + } + UNLOCK (&ctx->lock); - local->contri->contribution = 0; + LOCK (&local->contri->lock); + { + local->contri->contribution = 0; + } + UNLOCK (&local->contri->lock); + } + } lock.l_type = F_UNLCK; lock.l_whence = SEEK_SET; @@ -1774,24 +2315,22 @@ mq_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this, lock.l_pid = 0; STACK_WIND (frame, - quota_inode_remove_done, + _mq_inode_remove_done, FIRST_CHILD(this), FIRST_CHILD(this)->fops->inodelk, this->name, &local->parent_loc, - F_SETLKW, &lock); + F_SETLKW, &lock, NULL); return 0; } -static int32_t -mq_reduce_parent_size_xattr (call_frame_t *frame, void *cookie, - xlator_t *this, int32_t op_ret, int32_t op_errno) +int32_t +mq_reduce_parent_size_xattr (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) { int32_t ret = -1; int64_t *size = NULL; dict_t *dict = NULL; - marker_conf_t *priv = NULL; quota_local_t *local = NULL; - inode_contribution_t *contribution = NULL; local = frame->local; if (op_ret == -1) { @@ -1803,10 +2342,6 @@ mq_reduce_parent_size_xattr (call_frame_t *frame, void *cookie, VALIDATE_OR_GOTO (local->contri, err); - priv = this->private; - - contribution = local->contri; - dict = dict_new (); if (dict == NULL) { ret = -1; @@ -1815,35 +2350,36 @@ mq_reduce_parent_size_xattr (call_frame_t *frame, void *cookie, QUOTA_ALLOC_OR_GOTO (size, int64_t, ret, err); - *size = hton64 (-contribution->contribution); - + *size = hton64 (-local->size); ret = dict_set_bin (dict, QUOTA_SIZE_KEY, size, 8); if (ret < 0) goto err; + uuid_copy (local->parent_loc.gfid, + local->parent_loc.inode->gfid); + GF_UUID_ASSERT (local->parent_loc.gfid); STACK_WIND (frame, mq_inode_remove_done, FIRST_CHILD(this), FIRST_CHILD(this)->fops->xattrop, &local->parent_loc, - GF_XATTROP_ADD_ARRAY64, dict); + GF_XATTROP_ADD_ARRAY64, dict, NULL); dict_unref (dict); return 0; err: local->err = 1; - mq_inode_remove_done (frame, NULL, this, -1, 0, NULL); + mq_inode_remove_done (frame, NULL, this, -1, 0, NULL, NULL); if (dict) dict_unref (dict); return 0; } int32_t -reduce_parent_size (xlator_t *this, loc_t *loc) +mq_reduce_parent_size (xlator_t *this, loc_t *loc, int64_t contri) { int32_t ret = -1; struct gf_flock lock = {0,}; call_frame_t *frame = NULL; - marker_conf_t *priv = NULL; quota_local_t *local = NULL; quota_inode_ctx_t *ctx = NULL; inode_contribution_t *contribution = NULL; @@ -1851,30 +2387,43 @@ reduce_parent_size (xlator_t *this, loc_t *loc) GF_VALIDATE_OR_GOTO ("marker", this, out); GF_VALIDATE_OR_GOTO ("marker", loc, out); - priv = this->private; - - ret = quota_inode_ctx_get (loc->inode, this, &ctx); + ret = mq_inode_ctx_get (loc->inode, this, &ctx); if (ret < 0) goto out; - contribution = get_contribution_node (loc->parent, ctx); + contribution = mq_get_contribution_node (loc->parent, ctx); if (contribution == NULL) goto out; - local = quota_local_new (); + local = mq_local_new (); if (local == NULL) { ret = -1; goto out; } - ret = loc_copy (&local->loc, loc); + if (contri >= 0) { + local->size = contri; + } else { + LOCK (&contribution->lock); + { + local->size = contribution->contribution; + } + UNLOCK (&contribution->lock); + } + + if (local->size == 0) { + ret = 0; + goto out; + } + + ret = mq_loc_copy (&local->loc, loc); if (ret < 0) goto out; local->ctx = ctx; local->contri = contribution; - ret = quota_inode_loc_fill (NULL, loc->parent, &local->parent_loc); + ret = mq_inode_loc_fill (NULL, loc->parent, &local->parent_loc); if (ret < 0) goto out; @@ -1893,18 +2442,25 @@ reduce_parent_size (xlator_t *this, loc_t *loc) lock.l_type = F_WRLCK; lock.l_whence = SEEK_SET; + if (local->parent_loc.inode == NULL) { + ret = -1; + gf_log (this->name, GF_LOG_DEBUG, + "Inode is NULL, so can't stackwind."); + goto out; + } + STACK_WIND (frame, mq_reduce_parent_size_xattr, FIRST_CHILD(this), FIRST_CHILD(this)->fops->inodelk, - this->name, &local->parent_loc, F_SETLKW, &lock); + this->name, &local->parent_loc, F_SETLKW, &lock, NULL); + local = NULL; ret = 0; out: - if (ret < 0) { - quota_local_unref (this, local); - GF_FREE (local); - } + if (local != NULL) + mq_local_unref (this, local); + return ret; } @@ -1912,14 +2468,12 @@ out: int32_t init_quota_priv (xlator_t *this) { - strcpy (volname, "quota"); - return 0; } int32_t -quota_rename_update_newpath (xlator_t *this, loc_t *loc) +mq_rename_update_newpath (xlator_t *this, loc_t *loc) { int32_t ret = -1; quota_inode_ctx_t *ctx = NULL; @@ -1929,23 +2483,23 @@ quota_rename_update_newpath (xlator_t *this, loc_t *loc) GF_VALIDATE_OR_GOTO ("marker", loc, out); GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); - ret = quota_inode_ctx_get (loc->inode, this, &ctx); + ret = mq_inode_ctx_get (loc->inode, this, &ctx); if (ret < 0) goto out; - contribution = add_new_contribution_node (this, ctx, loc); + contribution = mq_add_new_contribution_node (this, ctx, loc); if (contribution == NULL) { ret = -1; goto out; } - initiate_quota_txn (this, loc); + mq_initiate_quota_txn (this, loc); out: return ret; } int32_t -quota_forget (xlator_t *this, quota_inode_ctx_t *ctx) +mq_forget (xlator_t *this, quota_inode_ctx_t *ctx) { inode_contribution_t *contri = NULL; inode_contribution_t *next = NULL; diff --git a/xlators/features/marker/src/marker-quota.h b/xlators/features/marker/src/marker-quota.h index ea54bb631..385760ac4 100644 --- a/xlators/features/marker/src/marker-quota.h +++ b/xlators/features/marker/src/marker-quota.h @@ -1,21 +1,12 @@ -/*Copyright (c) 2008-2010 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. +/* + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ - #ifndef _MARKER_QUOTA_H #define _MARKER_QUOTA_H @@ -24,7 +15,6 @@ #include "config.h" #endif -#include "marker.h" #include "xlator.h" #include "marker-mem-types.h" @@ -32,11 +22,9 @@ #define QUOTA_DIRTY_KEY "trusted.glusterfs.quota.dirty" #define CONTRIBUTION "contri" -#define VOL_NAME volname #define CONTRI_KEY_MAX 512 #define READDIR_BUF 4096 -char volname [40]; #define QUOTA_STACK_DESTROY(_frame, _this) \ do { \ @@ -44,8 +32,7 @@ char volname [40]; _local = _frame->local; \ _frame->local = NULL; \ STACK_DESTROY (_frame->root); \ - quota_local_unref (_this, _local); \ - GF_FREE (_local); \ + mq_local_unref (_this, _local); \ } while (0) @@ -53,7 +40,7 @@ char volname [40]; do { \ ret = 0; \ var = GF_CALLOC (sizeof (type), 1, \ - gf_marker_mt_##type); \ + gf_marker_mt_##type); \ if (!var) { \ gf_log ("", GF_LOG_ERROR, \ "out of memory"); \ @@ -64,7 +51,7 @@ char volname [40]; #define QUOTA_ALLOC_OR_GOTO(var, type, ret, label) \ do { \ var = GF_CALLOC (sizeof (type), 1, \ - gf_marker_mt_##type); \ + gf_marker_mt_##type); \ if (!var) { \ gf_log ("", GF_LOG_ERROR, \ "out of memory"); \ @@ -74,97 +61,70 @@ char volname [40]; ret = 0; \ } while (0); -#define GET_CONTRI_KEY(var, _gfid, _ret) \ - do { \ - char _gfid_unparsed[40]; \ - uuid_unparse (_gfid, _gfid_unparsed); \ +#define GET_CONTRI_KEY(var, _gfid, _ret) \ + do { \ + char _gfid_unparsed[40]; \ + uuid_unparse (_gfid, _gfid_unparsed); \ _ret = snprintf (var, CONTRI_KEY_MAX, QUOTA_XATTR_PREFIX \ - ".%s.%s." CONTRIBUTION, VOL_NAME, \ - _gfid_unparsed); \ + ".%s.%s." CONTRIBUTION, "quota", \ + _gfid_unparsed); \ } while (0); -#define QUOTA_SAFE_INCREMENT(lock, var) \ - do { \ - LOCK (lock); \ - var ++; \ - UNLOCK (lock); \ - } while (0) - -#define QUOTA_SAFE_DECREMENT(lock, var) \ - do { \ - LOCK (lock); \ - var --; \ - UNLOCK (lock); \ +#define QUOTA_SAFE_INCREMENT(lock, var) \ + do { \ + LOCK (lock); \ + var ++; \ + UNLOCK (lock); \ } while (0) - struct quota_inode_ctx { - int64_t size; - int8_t dirty; - gf_lock_t lock; - struct list_head contribution_head; + int64_t size; + int8_t dirty; + gf_boolean_t updation_status; + gf_lock_t lock; + struct list_head contribution_head; }; typedef struct quota_inode_ctx quota_inode_ctx_t; struct inode_contribution { struct list_head contri_list; - int64_t contribution; - uuid_t gfid; + int64_t contribution; + uuid_t gfid; + gf_lock_t lock; }; typedef struct inode_contribution inode_contribution_t; -struct quota_local { - int64_t delta; - int64_t d_off; - int32_t err; - int32_t ref; - int64_t sum; - int32_t hl_count; - int32_t dentry_child_count; - - fd_t *fd; - call_frame_t *frame; - gf_lock_t lock; - - loc_t loc; - loc_t parent_loc; - - quota_inode_ctx_t *ctx; - inode_contribution_t *contri; -}; -typedef struct quota_local quota_local_t; - int32_t -get_lock_on_parent (call_frame_t *, xlator_t *); +mq_get_lock_on_parent (call_frame_t *, xlator_t *); int32_t -quota_req_xattr (xlator_t *, loc_t *, dict_t *); +mq_req_xattr (xlator_t *, loc_t *, dict_t *); int32_t init_quota_priv (xlator_t *); int32_t -quota_xattr_state (xlator_t *, loc_t *, dict_t *, struct iatt); +mq_xattr_state (xlator_t *, loc_t *, dict_t *, struct iatt); int32_t -quota_set_inode_xattr (xlator_t *, loc_t *); +mq_set_inode_xattr (xlator_t *, loc_t *); int -initiate_quota_txn (xlator_t *, loc_t *); +mq_initiate_quota_txn (xlator_t *, loc_t *); int32_t -quota_dirty_inode_readdir (call_frame_t *, void *, xlator_t *, - int32_t, int32_t, fd_t *); +mq_dirty_inode_readdir (call_frame_t *, void *, xlator_t *, + int32_t, int32_t, fd_t *, dict_t *); int32_t -reduce_parent_size (xlator_t *, loc_t *); +mq_reduce_parent_size (xlator_t *, loc_t *, int64_t); int32_t -quota_rename_update_newpath (xlator_t *, loc_t *); +mq_rename_update_newpath (xlator_t *, loc_t *); int32_t -inspect_file_xattr (xlator_t *this, loc_t *loc, dict_t *dict, struct iatt buf); +mq_inspect_file_xattr (xlator_t *this, loc_t *loc, dict_t *dict, struct iatt buf); int32_t -quota_forget (xlator_t *, quota_inode_ctx_t *); +mq_forget (xlator_t *, quota_inode_ctx_t *); #endif diff --git a/xlators/features/marker/src/marker.c b/xlators/features/marker/src/marker.c index d7b792bc4..6a2c85691 100644 --- a/xlators/features/marker/src/marker.c +++ b/xlators/features/marker/src/marker.c @@ -1,21 +1,12 @@ -/*Copyright (c) 2008-2010 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. +/* + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ - #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" @@ -27,7 +18,11 @@ #include "marker.h" #include "marker-mem-types.h" #include "marker-quota.h" +#include "marker-quota-helper.h" #include "marker-common.h" +#include "byte-order.h" + +#define _GF_UID_GID_CHANGED 1 void fini (xlator_t *this); @@ -61,23 +56,25 @@ marker_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path) if (inode) { loc->inode = inode_ref (inode); - loc->ino = inode->ino; + if (uuid_is_null (loc->gfid)) { + uuid_copy (loc->gfid, loc->inode->gfid); + } } if (parent) loc->parent = inode_ref (parent); - loc->path = gf_strdup (path); - if (!loc->path) { - gf_log ("loc fill", GF_LOG_ERROR, "strdup failed"); - goto loc_wipe; - } + if (path) { + loc->path = gf_strdup (path); + if (!loc->path) { + gf_log ("loc fill", GF_LOG_ERROR, "strdup failed"); + goto loc_wipe; + } - loc->name = strrchr (loc->path, '/'); - if (loc->name) - loc->name++; - else - goto loc_wipe; + loc->name = strrchr (loc->path, '/'); + if (loc->name) + loc->name++; + } ret = 0; loc_wipe: @@ -91,23 +88,14 @@ int marker_inode_loc_fill (inode_t *inode, loc_t *loc) { char *resolvedpath = NULL; - inode_t *parent = NULL; int ret = -1; + inode_t *parent = NULL; if ((!inode) || (!loc)) return ret; - if ((inode) && (inode->ino == 1)) { - loc->parent = NULL; - goto ignore_parent; - } + parent = inode_parent (inode, NULL, NULL); - parent = inode_parent (inode, 0, NULL); - if (!parent) { - goto err; - } - -ignore_parent: ret = inode_path (inode, NULL, &resolvedpath); if (ret < 0) goto err; @@ -117,11 +105,10 @@ ignore_parent: goto err; err: - if (parent) - inode_unref (parent); + if (parent) + inode_unref (parent); - if (resolvedpath) - GF_FREE (resolvedpath); + GF_FREE (resolvedpath); return ret; } @@ -131,8 +118,17 @@ marker_trav_parent (marker_local_t *local) { int32_t ret = 0; loc_t loc = {0, }; + inode_t *parent = NULL; + int8_t need_unref = 0; + + if (!local->loc.parent) { + parent = inode_parent (local->loc.inode, NULL, NULL); + if (parent) + need_unref = 1; + } else + parent = local->loc.parent; - ret = marker_inode_loc_fill (local->loc.parent, &loc); + ret = marker_inode_loc_fill (parent, &loc); if (ret < 0) { ret = -1; @@ -143,16 +139,28 @@ marker_trav_parent (marker_local_t *local) local->loc = loc; out: + if (need_unref) + inode_unref (parent); + return ret; } int32_t -marker_error_handler (xlator_t *this) +marker_error_handler (xlator_t *this, marker_local_t *local, int32_t op_errno) { - marker_conf_t *priv = NULL; - - priv = (marker_conf_t *) this->private; - + marker_conf_t *priv = NULL; + const char *path = NULL; + + priv = (marker_conf_t *) this->private; + path = local + ? (local->loc.path + ? local->loc.path : uuid_utoa(local->loc.gfid)) + : "<nul>"; + + gf_log (this->name, GF_LOG_CRITICAL, + "Indexing gone corrupt at %s (reason: %s)." + " Geo-replication slave content needs to be revalidated", + path, strerror (op_errno)); unlink (priv->timestamp_file); return 0; @@ -176,24 +184,28 @@ marker_local_unref (marker_local_t *local) goto out; loc_wipe (&local->loc); + loc_wipe (&local->parent_loc); + if (local->xdata) + dict_unref (local->xdata); if (local->oplocal) { - loc_wipe (&local->oplocal->loc); - GF_FREE (local->oplocal); + marker_local_unref (local->oplocal); + local->oplocal = NULL; } - GF_FREE (local); + mem_put (local); out: return 0; } int32_t -stat_stampfile (xlator_t *this, marker_conf_t *priv, struct volume_mark **status) +stat_stampfile (xlator_t *this, marker_conf_t *priv, + struct volume_mark **status) { - struct stat buf; - struct volume_mark *vol_mark; + struct stat buf = {0, }; + struct volume_mark *vol_mark = NULL; vol_mark = GF_CALLOC (sizeof (struct volume_mark), 1, - gf_marker_mt_volume_mark); + gf_marker_mt_volume_mark); vol_mark->major = 1; vol_mark->minor = 0; @@ -215,13 +227,14 @@ stat_stampfile (xlator_t *this, marker_conf_t *priv, struct volume_mark **status int32_t marker_getxattr_stampfile_cbk (call_frame_t *frame, xlator_t *this, - const char *name, struct volume_mark *vol_mark) + const char *name, struct volume_mark *vol_mark, + dict_t *xdata) { - int32_t ret; + int32_t ret = -1; dict_t *dict = NULL; if (vol_mark == NULL){ - STACK_UNWIND_STRICT (getxattr, frame, -1, ENOMEM, NULL); + STACK_UNWIND_STRICT (getxattr, frame, -1, ENOMEM, NULL, NULL); goto out; } @@ -229,9 +242,12 @@ marker_getxattr_stampfile_cbk (call_frame_t *frame, xlator_t *this, dict = dict_new (); ret = dict_set_bin (dict, (char *)name, vol_mark, - sizeof (struct volume_mark)); + sizeof (struct volume_mark)); + if (ret) + gf_log (this->name, GF_LOG_WARNING, "failed to set key %s", + name); - STACK_UNWIND_STRICT (getxattr, frame, 0, 0, dict); + STACK_UNWIND_STRICT (getxattr, frame, 0, 0, dict, xdata); dict_unref (dict); out: @@ -247,7 +263,7 @@ call_from_special_client (call_frame_t *frame, xlator_t *this, const char *name) priv = (marker_conf_t *)this->private; - if (frame->root->pid != -1 || name == NULL || + if (frame->root->pid != GF_CLIENT_PID_GSYNCD || name == NULL || strcmp (name, MARKER_XATTR_PREFIX "." VOLUME_MARK) != 0) { ret = _gf_false; goto out; @@ -255,28 +271,31 @@ call_from_special_client (call_frame_t *frame, xlator_t *this, const char *name) stat_stampfile (this, priv, &vol_mark); - marker_getxattr_stampfile_cbk (frame, this, name, vol_mark); + marker_getxattr_stampfile_cbk (frame, this, name, vol_mark, NULL); out: return ret; } int32_t marker_getxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, dict_t *dict) + int32_t op_ret, int32_t op_errno, dict_t *dict, + dict_t *xdata) { - if (cookie) { - gf_log (this->name, GF_LOG_DEBUG, - "Filtering the quota extended attributes"); + if (cookie) { + gf_log (this->name, GF_LOG_DEBUG, + "Filtering the quota extended attributes"); - dict_foreach (dict, marker_filter_quota_xattr, NULL); - } - STACK_UNWIND_STRICT (getxattr, frame, op_ret, op_errno, dict); + dict_foreach_fnmatch (dict, "trusted.glusterfs.quota*", + marker_filter_quota_xattr, NULL); + } + + STACK_UNWIND_STRICT (getxattr, frame, op_ret, op_errno, dict, xdata); return 0; } int32_t marker_getxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, - const char *name) + const char *name, dict_t *xdata) { gf_boolean_t ret = _gf_false; marker_conf_t *priv = NULL; @@ -292,19 +311,19 @@ marker_getxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, ret = call_from_special_client (frame, this, name); wind: if (ret == _gf_false) { - if (name == NULL) { + if (name == NULL) { /* Signifies that marker translator * has to filter the quota's xattr's, * this is to prevent afr from performing * self healing on marker-quota xattrs' */ - cookie = 1; + cookie = 1; } STACK_WIND_COOKIE (frame, marker_getxattr_cbk, (void *)cookie, FIRST_CHILD(this), FIRST_CHILD(this)->fops->getxattr, loc, - name); - } + name, xdata); + } return 0; } @@ -328,7 +347,7 @@ marker_setxattr_done (call_frame_t *frame) int marker_specific_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) + int32_t op_ret, int32_t op_errno, dict_t *xdata) { int32_t ret = 0; int32_t done = 0; @@ -337,20 +356,26 @@ marker_specific_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, local = (marker_local_t*) frame->local; if (op_ret == -1 && op_errno == ENOSPC) { - marker_error_handler (this); + marker_error_handler (this, local, op_errno); done = 1; goto out; } - if (strcmp (local->loc.path, "/") == 0) { - done = 1; - goto out; + if (local) { + if (local->loc.path && strcmp (local->loc.path, "/") == 0) { + done = 1; + goto out; + } + if (__is_root_gfid (local->loc.gfid)) { + done = 1; + goto out; + } } ret = marker_trav_parent (local); if (ret == -1) { - gf_log (this->name, GF_LOG_DEBUG, "Error occured " + gf_log (this->name, GF_LOG_DEBUG, "Error occurred " "while traversing to the parent, stopping marker"); done = 1; @@ -371,7 +396,7 @@ out: int32_t marker_start_setxattr (call_frame_t *frame, xlator_t *this) { - int32_t ret = 0; + int32_t ret = -1; dict_t *dict = NULL; marker_local_t *local = NULL; marker_conf_t *priv = NULL; @@ -380,25 +405,43 @@ marker_start_setxattr (call_frame_t *frame, xlator_t *this) local = (marker_local_t*) frame->local; + if (!local) + goto out; + dict = dict_new (); + if (!dict) + goto out; + + if (local->loc.inode && uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, local->loc.inode->gfid); + + GF_UUID_ASSERT (local->loc.gfid); + ret = dict_set_static_bin (dict, priv->marker_xattr, (void *)local->timebuf, 8); - - gf_log (this->name, GF_LOG_DEBUG, "path = %s", local->loc.path); + if (ret) { + gf_log (this->name, GF_LOG_WARNING, + "failed to set marker xattr (%s)", local->loc.path); + goto out; + } STACK_WIND (frame, marker_specific_setxattr_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->setxattr, &local->loc, dict, 0); + FIRST_CHILD(this)->fops->setxattr, &local->loc, dict, 0, + NULL); - dict_unref (dict); + ret = 0; +out: + if (dict) + dict_unref (dict); - return 0; + return ret; } void marker_gettimeofday (marker_local_t *local) { - struct timeval tv; + struct timeval tv = {0, }; gettimeofday (&tv, NULL); @@ -425,21 +468,33 @@ marker_create_frame (xlator_t *this, marker_local_t *local) int32_t marker_xtime_update_marks (xlator_t *this, marker_local_t *local) { + marker_conf_t *priv = NULL; + + GF_VALIDATE_OR_GOTO ("marker", this, out); + GF_VALIDATE_OR_GOTO (this->name, local, out); + + priv = this->private; + + if ((local->pid == GF_CLIENT_PID_GSYNCD + && !(priv->feature_enabled & GF_XTIME_GSYNC_FORCE)) + || (local->pid == GF_CLIENT_PID_DEFRAG)) + goto out; + marker_gettimeofday (local); marker_local_ref (local); marker_create_frame (this, local); - +out: return 0; } int32_t marker_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, inode_t *inode, - struct iatt *buf, struct iatt *preparent, - struct iatt *postparent) + int32_t op_ret, int32_t op_errno, inode_t *inode, + struct iatt *buf, struct iatt *preparent, + struct iatt *postparent, dict_t *xdata) { marker_conf_t *priv = NULL; marker_local_t *local = NULL; @@ -454,15 +509,18 @@ marker_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; STACK_UNWIND_STRICT (mkdir, frame, op_ret, op_errno, inode, - buf, preparent, postparent); + buf, preparent, postparent, xdata); if (op_ret == -1 || local == NULL) goto out; + if (uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, buf->ia_gfid); + priv = this->private; if (priv->feature_enabled & GF_QUOTA) - quota_set_inode_xattr (this, &local->loc); + mq_set_inode_xattr (this, &local->loc); if (priv->feature_enabled & GF_XTIME) marker_xtime_update_marks (this, local); @@ -475,7 +533,7 @@ out: int marker_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, - dict_t *params) + mode_t umask, dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -486,7 +544,7 @@ marker_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, if (priv->feature_enabled == 0) goto wind; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -496,21 +554,21 @@ marker_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, goto err; wind: STACK_WIND (frame, marker_mkdir_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->mkdir, loc, mode, params); + FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata); return 0; err: STACK_UNWIND_STRICT (mkdir, frame, -1, ENOMEM, NULL, - NULL, NULL, NULL); + NULL, NULL, NULL, NULL); return 0; } int32_t marker_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode, - struct iatt *buf, struct iatt *preparent, - struct iatt *postparent) + int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode, + struct iatt *buf, struct iatt *preparent, + struct iatt *postparent, dict_t *xdata) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; @@ -525,15 +583,18 @@ marker_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, fd, inode, buf, - preparent, postparent); + preparent, postparent, xdata); if (op_ret == -1 || local == NULL) goto out; + if (uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, buf->ia_gfid); + priv = this->private; if (priv->feature_enabled & GF_QUOTA) - inspect_file_xattr (this, &local->loc, NULL, *buf); + mq_set_inode_xattr (this, &local->loc); if (priv->feature_enabled & GF_XTIME) marker_xtime_update_marks (this, local); @@ -546,7 +607,7 @@ out: int32_t marker_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, - mode_t mode, fd_t *fd, dict_t *params) + mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -557,7 +618,7 @@ marker_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, if (priv->feature_enabled == 0) goto wind; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -567,11 +628,12 @@ marker_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, goto err; wind: STACK_WIND (frame, marker_create_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->create, loc, flags, mode, fd, - params); + FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, + fd, xdata); return 0; err: - STACK_UNWIND_STRICT (create, frame, -1, ENOMEM, NULL, NULL, NULL, NULL, NULL); + STACK_UNWIND_STRICT (create, frame, -1, ENOMEM, NULL, NULL, NULL, NULL, + NULL, NULL); return 0; } @@ -580,7 +642,7 @@ err: int32_t marker_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, - struct iatt *postbuf) + struct iatt *postbuf, dict_t *xdata) { marker_conf_t *priv = NULL; marker_local_t *local = NULL; @@ -594,7 +656,8 @@ marker_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; - STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, prebuf, postbuf); + STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, prebuf, postbuf, + xdata); if (op_ret == -1 || local == NULL) goto out; @@ -602,7 +665,7 @@ marker_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, priv = this->private; if (priv->feature_enabled & GF_QUOTA) - initiate_quota_txn (this, &local->loc); + mq_initiate_quota_txn (this, &local->loc); if (priv->feature_enabled & GF_XTIME) marker_xtime_update_marks (this, local); @@ -615,12 +678,12 @@ out: int32_t marker_writev (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - struct iovec *vector, - int32_t count, - off_t offset, - struct iobref *iobref) + xlator_t *this, + fd_t *fd, + struct iovec *vector, + int32_t count, + off_t offset, uint32_t flags, + struct iobref *iobref, dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -631,7 +694,7 @@ marker_writev (call_frame_t *frame, if (priv->feature_enabled == 0) goto wind; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -642,10 +705,10 @@ marker_writev (call_frame_t *frame, wind: STACK_WIND (frame, marker_writev_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, fd, vector, count, offset, - iobref); + flags, iobref, xdata); return 0; err: - STACK_UNWIND_STRICT (writev, frame, -1, ENOMEM, NULL, NULL); + STACK_UNWIND_STRICT (writev, frame, -1, ENOMEM, NULL, NULL, NULL); return 0; } @@ -653,8 +716,8 @@ err: int32_t marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *preparent, - struct iatt *postparent) + int32_t op_ret, int32_t op_errno, struct iatt *preparent, + struct iatt *postparent, dict_t *xdata) { marker_conf_t *priv = NULL; marker_local_t *local = NULL; @@ -669,7 +732,7 @@ marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; STACK_UNWIND_STRICT (rmdir, frame, op_ret, op_errno, preparent, - postparent); + postparent, xdata); if (op_ret == -1 || local == NULL) goto out; @@ -677,7 +740,7 @@ marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, priv = this->private; if (priv->feature_enabled & GF_QUOTA) - reduce_parent_size (this, &local->loc); + mq_reduce_parent_size (this, &local->loc, -1); if (priv->feature_enabled & GF_XTIME) marker_xtime_update_marks (this, local); @@ -688,7 +751,8 @@ out: } int32_t -marker_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc, int flags) +marker_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc, int flags, + dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -699,7 +763,7 @@ marker_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc, int flags) if (priv->feature_enabled == 0) goto wind; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -709,10 +773,10 @@ marker_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc, int flags) goto err; wind: STACK_WIND (frame, marker_rmdir_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->rmdir, loc, flags); + FIRST_CHILD(this)->fops->rmdir, loc, flags, xdata); return 0; err: - STACK_UNWIND_STRICT (rmdir, frame, -1, ENOMEM, NULL, NULL); + STACK_UNWIND_STRICT (rmdir, frame, -1, ENOMEM, NULL, NULL, NULL); return 0; } @@ -720,8 +784,8 @@ err: int32_t marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *preparent, - struct iatt *postparent) + int32_t op_ret, int32_t op_errno, struct iatt *preparent, + struct iatt *postparent, dict_t *xdata) { marker_conf_t *priv = NULL; marker_local_t *local = NULL; @@ -736,7 +800,7 @@ marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; STACK_UNWIND_STRICT (unlink, frame, op_ret, op_errno, preparent, - postparent); + postparent, xdata); if (op_ret == -1 || local == NULL) goto out; @@ -744,7 +808,7 @@ marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, priv = this->private; if ((priv->feature_enabled & GF_QUOTA) && (local->ia_nlink == 1)) - reduce_parent_size (this, &local->loc); + mq_reduce_parent_size (this, &local->loc, -1); if (priv->feature_enabled & GF_XTIME) marker_xtime_update_marks (this, local); @@ -757,33 +821,38 @@ out: int32_t marker_unlink_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *buf) + int32_t op_ret, int32_t op_errno, struct iatt *buf, + dict_t *xdata) { marker_local_t *local = NULL; + local = frame->local; if (op_ret < 0) { goto err; } - local = frame->local; if (local == NULL) { + op_errno = EINVAL; goto err; } local->ia_nlink = buf->ia_nlink; STACK_WIND (frame, marker_unlink_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->unlink, &local->loc); + FIRST_CHILD(this)->fops->unlink, &local->loc, local->xflag, + local->xdata); return 0; err: - STACK_UNWIND_STRICT (unlink, frame, -1, ENOMEM, NULL, NULL); - + frame->local = NULL; + STACK_UNWIND_STRICT (unlink, frame, -1, op_errno, NULL, NULL, NULL); + marker_local_unref (local); return 0; } int32_t -marker_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc) +marker_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag, + dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -794,8 +863,10 @@ marker_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc) if (priv->feature_enabled == 0) goto unlink_wind; - ALLOCATE_OR_GOTO (local, marker_local_t, err); - + local = mem_get0 (this->local_pool); + local->xflag = xflag; + if (xdata) + local->xdata = dict_ref (xdata); MARKER_INIT_LOCAL (frame, local); ret = loc_copy (&local->loc, loc); @@ -803,32 +874,36 @@ marker_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc) if (ret == -1) goto err; + if (uuid_is_null (loc->gfid) && loc->inode) + uuid_copy (loc->gfid, loc->inode->gfid); + STACK_WIND (frame, marker_unlink_stat_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->stat, loc); + FIRST_CHILD(this)->fops->stat, loc, xdata); return 0; unlink_wind: STACK_WIND (frame, marker_unlink_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->unlink, loc); + FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata); return 0; err: - STACK_UNWIND_STRICT (unlink, frame, -1, ENOMEM, NULL, NULL); - + frame->local = NULL; + STACK_UNWIND_STRICT (unlink, frame, -1, ENOMEM, NULL, NULL, NULL); + marker_local_unref (local); return 0; } int32_t marker_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, inode_t *inode, - struct iatt *buf, struct iatt *preparent, - struct iatt *postparent) + int32_t op_ret, int32_t op_errno, inode_t *inode, + struct iatt *buf, struct iatt *preparent, + struct iatt *postparent, dict_t *xdata) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; if (op_ret == -1) { - gf_log (this->name, GF_LOG_TRACE, "%s occured while " + gf_log (this->name, GF_LOG_TRACE, "%s occurred while " "linking a file ", strerror (op_errno)); } @@ -837,7 +912,7 @@ marker_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; STACK_UNWIND_STRICT (link, frame, op_ret, op_errno, inode, buf, - preparent, postparent); + preparent, postparent, xdata); if (op_ret == -1 || local == NULL) goto out; @@ -845,7 +920,7 @@ marker_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this, priv = this->private; if (priv->feature_enabled & GF_QUOTA) - initiate_quota_txn (this, &local->loc); + mq_initiate_quota_txn (this, &local->loc); if (priv->feature_enabled & GF_XTIME) marker_xtime_update_marks (this, local); @@ -856,7 +931,8 @@ out: } int32_t -marker_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc) +marker_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, + dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -867,7 +943,7 @@ marker_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc) if (priv->feature_enabled == 0) goto wind; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -877,161 +953,579 @@ marker_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc) goto err; wind: STACK_WIND (frame, marker_link_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->link, oldloc, newloc); + FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata); return 0; err: - STACK_UNWIND_STRICT (link, frame, -1, ENOMEM, NULL, NULL, NULL, NULL); + STACK_UNWIND_STRICT (link, frame, -1, ENOMEM, NULL, NULL, NULL, NULL, + NULL); return 0; } int32_t -marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *buf, - struct iatt *preoldparent, struct iatt *postoldparent, - struct iatt *prenewparent, struct iatt *postnewparent) +marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) { - marker_conf_t *priv = NULL; - marker_local_t *local = NULL; - marker_local_t *oplocal = NULL; - loc_t newloc = {0, }; + marker_local_t *local = NULL, *oplocal = NULL; + loc_t newloc = {0, }; + marker_conf_t *priv = NULL; - if (op_ret == -1) { - gf_log (this->name, GF_LOG_TRACE, "%s occured while " - "renaming a file ", strerror (op_errno)); - } + local = frame->local; + oplocal = local->oplocal; - local = (marker_local_t *) frame->local; + priv = this->private; frame->local = NULL; - STACK_UNWIND_STRICT (rename, frame, op_ret, op_errno, buf, preoldparent, - postoldparent, prenewparent, postnewparent); + if (op_ret < 0) { + if (local->err == 0) { + local->err = op_errno; + } - if (op_ret == -1 || local == NULL) + gf_log (this->name, GF_LOG_WARNING, + "inodelk (UNLOCK) failed on path:%s (gfid:%s) (%s)", + local->parent_loc.path, + uuid_utoa (local->parent_loc.inode->gfid), + strerror (op_errno)); + } + + if (local->stub != NULL) { + call_resume (local->stub); + local->stub = NULL; + } else if (local->err != 0) { + STACK_UNWIND_STRICT (rename, frame, -1, local->err, NULL, NULL, + NULL, NULL, NULL, NULL); + } + + mq_reduce_parent_size (this, &oplocal->loc, oplocal->contribution); + + if (local->loc.inode != NULL) { + mq_reduce_parent_size (this, &local->loc, local->contribution); + } + + newloc.inode = inode_ref (oplocal->loc.inode); + newloc.path = gf_strdup (local->loc.path); + newloc.name = strrchr (newloc.path, '/'); + if (newloc.name) + newloc.name++; + newloc.parent = inode_ref (local->loc.parent); + + mq_rename_update_newpath (this, &newloc); + + loc_wipe (&newloc); + + if (priv->feature_enabled & GF_XTIME) { + //update marks on oldpath + uuid_copy (local->loc.gfid, oplocal->loc.inode->gfid); + marker_xtime_update_marks (this, oplocal); + marker_xtime_update_marks (this, local); + } + + marker_local_unref (local); + marker_local_unref (oplocal); + return 0; +} + + +int32_t +marker_rename_release_newp_lock (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno, dict_t *xdata) +{ + marker_local_t *local = NULL, *oplocal = NULL; + struct gf_flock lock = {0, }; + + local = frame->local; + oplocal = local->oplocal; + + if (op_ret < 0) { + if (local->err == 0) { + local->err = op_errno; + } + + gf_log (this->name, GF_LOG_WARNING, + "inodelk (UNLOCK) failed on %s (gfid:%s) (%s)", + oplocal->parent_loc.path, + uuid_utoa (oplocal->parent_loc.inode->gfid), + strerror (op_errno)); + } + + if (local->next_lock_on == NULL) { + marker_rename_done (frame, NULL, this, 0, 0, NULL); goto out; + } + + lock.l_type = F_UNLCK; + lock.l_whence = SEEK_SET; + lock.l_start = 0; + lock.l_len = 0; + lock.l_pid = 0; + + STACK_WIND (frame, + marker_rename_done, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, &local->parent_loc, F_SETLKW, &lock, NULL); +out: + return 0; +} + + +int32_t +marker_rename_release_oldp_lock (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno, dict_t *xdata) +{ + marker_local_t *local = NULL, *oplocal = NULL; + struct gf_flock lock = {0, }; + + local = frame->local; oplocal = local->oplocal; - local->oplocal = NULL; + + if ((op_ret < 0) && (op_errno != ENOATTR)) { + local->err = op_errno; + } + + //Reset frame uid and gid if set. + if (cookie == (void *) _GF_UID_GID_CHANGED) + MARKER_RESET_UID_GID (frame, frame->root, local); + + lock.l_type = F_UNLCK; + lock.l_whence = SEEK_SET; + lock.l_start = 0; + lock.l_len = 0; + lock.l_pid = 0; + + STACK_WIND (frame, + marker_rename_release_newp_lock, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, &oplocal->parent_loc, F_SETLKW, &lock, NULL); + return 0; +} + + +int32_t +marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *buf, + struct iatt *preoldparent, struct iatt *postoldparent, + struct iatt *prenewparent, struct iatt *postnewparent, + dict_t *xdata) +{ + marker_conf_t *priv = NULL; + marker_local_t *local = NULL; + marker_local_t *oplocal = NULL; + call_stub_t *stub = NULL; + int32_t ret = 0; + char contri_key [512] = {0, }; + loc_t newloc = {0, }; + + local = (marker_local_t *) frame->local; + + if (local != NULL) { + oplocal = local->oplocal; + } priv = this->private; + if (op_ret < 0) { + if (local != NULL) { + local->err = op_errno; + } + + gf_log (this->name, GF_LOG_TRACE, "%s occurred while " + "renaming a file ", strerror (op_errno)); + } + if (priv->feature_enabled & GF_QUOTA) { - reduce_parent_size (this, &oplocal->loc); + if ((op_ret < 0) || (local == NULL)) { + goto quota_err; + } - if (local->loc.inode != NULL) { - reduce_parent_size (this, &local->loc); + stub = fop_rename_cbk_stub (frame, default_rename_cbk, op_ret, + op_errno, buf, preoldparent, + postoldparent, prenewparent, + postnewparent, xdata); + if (stub == NULL) { + local->err = ENOMEM; + goto quota_err; } + local->stub = stub; + + GET_CONTRI_KEY (contri_key, oplocal->loc.parent->gfid, ret); + if (ret < 0) { + local->err = ENOMEM; + goto quota_err; + } + + /* Removexattr requires uid and gid to be 0, + * reset them in the callback. + */ + MARKER_SET_UID_GID (frame, local, frame->root); + newloc.inode = inode_ref (oplocal->loc.inode); newloc.path = gf_strdup (local->loc.path); - newloc.name = gf_strdup (local->loc.name); + newloc.name = strrchr (newloc.path, '/'); + if (newloc.name) + newloc.name++; newloc.parent = inode_ref (local->loc.parent); - newloc.ino = oplocal->loc.inode->ino; + uuid_copy (newloc.gfid, oplocal->loc.inode->gfid); - quota_rename_update_newpath (this, &newloc); + STACK_WIND_COOKIE (frame, marker_rename_release_oldp_lock, + frame->cookie, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->removexattr, + &newloc, contri_key, NULL); loc_wipe (&newloc); - } + } else { + frame->local = NULL; - if (priv->feature_enabled & GF_XTIME) { - //update marks on oldpath - marker_xtime_update_marks (this, oplocal); - marker_xtime_update_marks (this, local); + STACK_UNWIND_STRICT (rename, frame, op_ret, op_errno, buf, + preoldparent, postoldparent, prenewparent, + postnewparent, xdata); + + if ((op_ret < 0) || (local == NULL)) { + goto out; + } + + if (priv->feature_enabled & GF_XTIME) { + //update marks on oldpath + uuid_copy (local->loc.gfid, oplocal->loc.inode->gfid); + marker_xtime_update_marks (this, oplocal); + marker_xtime_update_marks (this, local); + } } + out: - marker_local_unref (local); - marker_local_unref (oplocal); + if (!(priv->feature_enabled & GF_QUOTA)) { + marker_local_unref (local); + marker_local_unref (oplocal); + } + + return 0; + +quota_err: + marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); return 0; } int32_t -marker_quota_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) +marker_do_rename (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *dict, dict_t *xdata) { - marker_local_t *local = NULL, *oplocal = NULL; + marker_local_t *local = NULL, *oplocal = NULL; + char contri_key[512] = {0, }; + int32_t ret = 0; + int64_t *contribution = 0; + + local = frame->local; + oplocal = local->oplocal; + + //Reset frame uid and gid if set. + if (cookie == (void *) _GF_UID_GID_CHANGED) + MARKER_RESET_UID_GID (frame, frame->root, local); if ((op_ret < 0) && (op_errno != ENOATTR)) { - goto unwind; + local->err = op_errno; + gf_log (this->name, GF_LOG_WARNING, + "fetching contribution values from %s (gfid:%s) " + "failed (%s)", local->loc.path, + uuid_utoa (local->loc.inode->gfid), + strerror (op_errno)); + goto err; } - local = frame->local; - oplocal = local->oplocal; + if (local->loc.inode != NULL) { + GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret); + if (ret < 0) { + local->err = errno; + goto err; + } + + if (dict_get_bin (dict, contri_key, + (void **) &contribution) == 0) { + local->contribution = ntoh64 (*contribution); + } + } STACK_WIND (frame, marker_rename_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, &oplocal->loc, - &local->loc); + &local->loc, NULL); + return 0; -unwind: - STACK_UNWIND_STRICT (rename, frame, -1, ENOMEM, NULL, - NULL, NULL, NULL, NULL); - if (local) { - local->oplocal = NULL; - marker_local_unref (local); - GF_FREE (local); +err: + marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); + return 0; +} + + +int32_t +marker_get_newpath_contribution (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno, dict_t *dict, dict_t *xdata) +{ + marker_local_t *local = NULL, *oplocal = NULL; + char contri_key[512] = {0, }; + int32_t ret = 0; + int64_t *contribution = 0; + + local = frame->local; + oplocal = local->oplocal; + + //Reset frame uid and gid if set. + if (cookie == (void *) _GF_UID_GID_CHANGED) + MARKER_RESET_UID_GID (frame, frame->root, local); + + if ((op_ret < 0) && (op_errno != ENOATTR)) { + local->err = op_errno; + gf_log (this->name, GF_LOG_WARNING, + "fetching contribution values from %s (gfid:%s) " + "failed (%s)", oplocal->loc.path, + uuid_utoa (oplocal->loc.inode->gfid), + strerror (op_errno)); + goto err; } - if (oplocal) { - marker_local_unref (oplocal); - GF_FREE (oplocal); + + GET_CONTRI_KEY (contri_key, oplocal->loc.parent->gfid, ret); + if (ret < 0) { + local->err = errno; + goto err; } + + if (dict_get_bin (dict, contri_key, (void **) &contribution) == 0) + oplocal->contribution = ntoh64 (*contribution); + + if (local->loc.inode != NULL) { + GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret); + if (ret < 0) { + local->err = errno; + goto err; + } + + /* getxattr requires uid and gid to be 0, + * reset them in the callback. + */ + MARKER_SET_UID_GID (frame, local, frame->root); + if (uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, local->loc.inode->gfid); + + GF_UUID_ASSERT (local->loc.gfid); + + STACK_WIND_COOKIE (frame, marker_do_rename, + frame->cookie, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->getxattr, + &local->loc, contri_key, NULL); + } else { + marker_do_rename (frame, NULL, this, 0, 0, NULL, NULL); + } + + return 0; +err: + marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); + return 0; +} + + +int32_t +marker_get_oldpath_contribution (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno, dict_t *xdata) +{ + marker_local_t *local = NULL, *oplocal = NULL; + char contri_key[512] = {0, }; + int32_t ret = 0; + + local = frame->local; + oplocal = local->oplocal; + + if (op_ret < 0) { + local->err = op_errno; + gf_log (this->name, GF_LOG_WARNING, + "cannot hold inodelk on %s (gfid:%s) (%s)", + local->next_lock_on->path, + uuid_utoa (local->next_lock_on->inode->gfid), + strerror (op_errno)); + goto lock_err; + } + + GET_CONTRI_KEY (contri_key, oplocal->loc.parent->gfid, ret); + if (ret < 0) { + local->err = errno; + goto quota_err; + } + + /* getxattr requires uid and gid to be 0, + * reset them in the callback. + */ + MARKER_SET_UID_GID (frame, local, frame->root); + + if (uuid_is_null (oplocal->loc.gfid)) + uuid_copy (oplocal->loc.gfid, + oplocal->loc.inode->gfid); + + GF_UUID_ASSERT (oplocal->loc.gfid); + + STACK_WIND_COOKIE (frame, marker_get_newpath_contribution, + frame->cookie, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->getxattr, + &oplocal->loc, contri_key, NULL); + return 0; + +quota_err: + marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); + return 0; + +lock_err: + if ((local->next_lock_on == NULL) + || (local->next_lock_on == &local->parent_loc)) { + local->next_lock_on = NULL; + marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL); + } else { + marker_rename_release_newp_lock (frame, NULL, this, 0, 0, NULL); + } + + return 0; +} + + +int32_t +marker_rename_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + marker_local_t *local = NULL, *oplocal = NULL; + loc_t *loc = NULL; + struct gf_flock lock = {0, }; + + local = frame->local; + oplocal = local->oplocal; + + if (op_ret < 0) { + if (local->next_lock_on != &oplocal->parent_loc) { + loc = &oplocal->parent_loc; + } else { + loc = &local->parent_loc; + } + + local->err = op_errno; + gf_log (this->name, GF_LOG_WARNING, + "cannot hold inodelk on %s (gfid:%s) (%s)", + loc->path, uuid_utoa (loc->inode->gfid), + strerror (op_errno)); + goto err; + } + + if (local->next_lock_on != NULL) { + lock.l_len = 0; + lock.l_start = 0; + lock.l_type = F_WRLCK; + lock.l_whence = SEEK_SET; + + STACK_WIND (frame, + marker_get_oldpath_contribution, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, local->next_lock_on, + F_SETLKW, &lock, NULL); + } else { + marker_get_oldpath_contribution (frame, 0, this, 0, 0, NULL); + } + + return 0; + +err: + marker_rename_done (frame, NULL, this, 0, 0, NULL); return 0; } int32_t marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, - loc_t *newloc) + loc_t *newloc, dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; marker_local_t *oplocal = NULL; marker_conf_t *priv = NULL; - char contri_key[512] = {0,}; + struct gf_flock lock = {0, }; + loc_t *lock_on = NULL; priv = this->private; if (priv->feature_enabled == 0) goto rename_wind; - GET_CONTRI_KEY (contri_key, oldloc->parent->gfid, ret); - if (ret < 0) - goto err; - - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); - ALLOCATE_OR_GOTO (oplocal, marker_local_t, err); + oplocal = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, oplocal); frame->local = local; - local->oplocal = oplocal; + local->oplocal = marker_local_ref (oplocal); ret = loc_copy (&local->loc, newloc); - if (ret == -1) + if (ret < 0) goto err; ret = loc_copy (&oplocal->loc, oldloc); - if (ret == -1) + if (ret < 0) goto err; - STACK_WIND (frame, marker_quota_removexattr_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->removexattr, oldloc, contri_key); + if (!(priv->feature_enabled & GF_QUOTA)) { + goto rename_wind; + } + + ret = mq_inode_loc_fill (NULL, newloc->parent, &local->parent_loc); + if (ret < 0) + goto err; + + ret = mq_inode_loc_fill (NULL, oldloc->parent, &oplocal->parent_loc); + if (ret < 0) + goto err; + + if ((newloc->inode != NULL) && (newloc->parent != oldloc->parent) + && (uuid_compare (newloc->parent->gfid, + oldloc->parent->gfid) < 0)) { + lock_on = &local->parent_loc; + local->next_lock_on = &oplocal->parent_loc; + } else { + lock_on = &oplocal->parent_loc; + if ((newloc->inode != NULL) && (newloc->parent + != oldloc->parent)) { + local->next_lock_on = &local->parent_loc; + } + } + + lock.l_len = 0; + lock.l_start = 0; + lock.l_type = F_WRLCK; + lock.l_whence = SEEK_SET; + + STACK_WIND (frame, + marker_rename_inodelk_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, lock_on, + F_SETLKW, &lock, NULL); + return 0; rename_wind: STACK_WIND (frame, marker_rename_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->rename, oldloc, newloc); + FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata); return 0; err: STACK_UNWIND_STRICT (rename, frame, -1, ENOMEM, NULL, - NULL, NULL, NULL, NULL); + NULL, NULL, NULL, NULL, NULL); return 0; } @@ -1039,14 +1533,14 @@ err: int32_t marker_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *prebuf, - struct iatt *postbuf) + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; if (op_ret == -1) { - gf_log (this->name, GF_LOG_TRACE, "%s occured while " + gf_log (this->name, GF_LOG_TRACE, "%s occurred while " "truncating a file ", strerror (op_errno)); } @@ -1055,7 +1549,7 @@ marker_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; STACK_UNWIND_STRICT (truncate, frame, op_ret, op_errno, prebuf, - postbuf); + postbuf, xdata); if (op_ret == -1 || local == NULL) goto out; @@ -1063,7 +1557,7 @@ marker_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, priv = this->private; if (priv->feature_enabled & GF_QUOTA) - initiate_quota_txn (this, &local->loc); + mq_initiate_quota_txn (this, &local->loc); if (priv->feature_enabled & GF_XTIME) marker_xtime_update_marks (this, local); @@ -1075,7 +1569,8 @@ out: } int32_t -marker_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset) +marker_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, + dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1086,7 +1581,7 @@ marker_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset) if (priv->feature_enabled == 0) goto wind; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -1096,10 +1591,10 @@ marker_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset) goto err; wind: STACK_WIND (frame, marker_truncate_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->truncate, loc, offset); + FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); return 0; err: - STACK_UNWIND_STRICT (truncate, frame, -1, ENOMEM, NULL, NULL); + STACK_UNWIND_STRICT (truncate, frame, -1, ENOMEM, NULL, NULL, NULL); return 0; } @@ -1107,14 +1602,14 @@ err: int32_t marker_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *prebuf, - struct iatt *postbuf) + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; if (op_ret == -1) { - gf_log (this->name, GF_LOG_TRACE, "%s occured while " + gf_log (this->name, GF_LOG_TRACE, "%s occurred while " "truncating a file ", strerror (op_errno)); } @@ -1123,7 +1618,7 @@ marker_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; STACK_UNWIND_STRICT (ftruncate, frame, op_ret, op_errno, prebuf, - postbuf); + postbuf, xdata); if (op_ret == -1 || local == NULL) goto out; @@ -1131,7 +1626,7 @@ marker_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, priv = this->private; if (priv->feature_enabled & GF_QUOTA) - initiate_quota_txn (this, &local->loc); + mq_initiate_quota_txn (this, &local->loc); if (priv->feature_enabled & GF_XTIME) marker_xtime_update_marks (this, local); @@ -1142,7 +1637,8 @@ out: } int32_t -marker_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset) +marker_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1153,7 +1649,7 @@ marker_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset) if (priv->feature_enabled == 0) goto wind; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -1163,10 +1659,10 @@ marker_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset) goto err; wind: STACK_WIND (frame, marker_ftruncate_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->ftruncate, fd, offset); + FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); return 0; err: - STACK_UNWIND_STRICT (ftruncate, frame, -1, ENOMEM, NULL, NULL); + STACK_UNWIND_STRICT (ftruncate, frame, -1, ENOMEM, NULL, NULL, NULL); return 0; } @@ -1174,15 +1670,15 @@ err: int32_t marker_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, inode_t *inode, - struct iatt *buf, struct iatt *preparent, - struct iatt *postparent) + int32_t op_ret, int32_t op_errno, inode_t *inode, + struct iatt *buf, struct iatt *preparent, + struct iatt *postparent, dict_t *xdata) { marker_conf_t *priv = NULL; marker_local_t *local = NULL; if (op_ret == -1) { - gf_log (this->name, GF_LOG_TRACE, "%s occured while " + gf_log (this->name, GF_LOG_TRACE, "%s occurred while " "creating symlinks ", strerror (op_errno)); } @@ -1191,15 +1687,18 @@ marker_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; STACK_UNWIND_STRICT (symlink, frame, op_ret, op_errno, inode, buf, - preparent, postparent); + preparent, postparent, xdata); if (op_ret == -1 || local == NULL) goto out; + if (uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, buf->ia_gfid); + priv = this->private; if (priv->feature_enabled & GF_QUOTA) - inspect_file_xattr (this, &local->loc, NULL, *buf); + mq_set_inode_xattr (this, &local->loc); if (priv->feature_enabled & GF_XTIME) marker_xtime_update_marks (this, local); @@ -1211,7 +1710,7 @@ out: int marker_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath, - loc_t *loc, dict_t *params) + loc_t *loc, mode_t umask, dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1222,7 +1721,7 @@ marker_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath, if (priv->feature_enabled == 0) goto wind; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -1232,26 +1731,27 @@ marker_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath, goto err; wind: STACK_WIND (frame, marker_symlink_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->symlink, linkpath, loc, params); + FIRST_CHILD(this)->fops->symlink, linkpath, loc, umask, + xdata); return 0; err: STACK_UNWIND_STRICT (symlink, frame, -1, ENOMEM, NULL, - NULL, NULL, NULL); + NULL, NULL, NULL, NULL); return 0; } int32_t marker_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, inode_t *inode, - struct iatt *buf, struct iatt *preparent, - struct iatt *postparent) + int32_t op_ret, int32_t op_errno, inode_t *inode, + struct iatt *buf, struct iatt *preparent, + struct iatt *postparent, dict_t *xdata) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; if (op_ret == -1) { - gf_log (this->name, GF_LOG_TRACE, "%s occured while " + gf_log (this->name, GF_LOG_TRACE, "%s occurred while " "creating symlinks ", strerror (op_errno)); } @@ -1260,15 +1760,18 @@ marker_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; STACK_UNWIND_STRICT (mknod, frame, op_ret, op_errno, inode, - buf, preparent, postparent); + buf, preparent, postparent, xdata); if (op_ret == -1 || local == NULL) goto out; + if (uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, buf->ia_gfid); + priv = this->private; if ((priv->feature_enabled & GF_QUOTA) && (S_ISREG (local->mode))) { - inspect_file_xattr (this, &local->loc, NULL, *buf); + mq_set_inode_xattr (this, &local->loc); } if (priv->feature_enabled & GF_XTIME) @@ -1281,7 +1784,7 @@ out: int marker_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, - dev_t rdev, dict_t *parms) + dev_t rdev, mode_t umask, dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1292,7 +1795,7 @@ marker_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, if (priv->feature_enabled == 0) goto wind; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -1304,11 +1807,216 @@ marker_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, goto err; wind: STACK_WIND (frame, marker_mknod_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, parms); + FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, umask, + xdata); return 0; err: STACK_UNWIND_STRICT (mknod, frame, -1, ENOMEM, NULL, - NULL, NULL, NULL); + NULL, NULL, NULL, NULL); + return 0; +} + + +int32_t +marker_fallocate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + marker_local_t *local = NULL; + marker_conf_t *priv = NULL; + + if (op_ret == -1) { + gf_log (this->name, GF_LOG_TRACE, "%s occurred while " + "fallocating a file ", strerror (op_errno)); + } + + local = (marker_local_t *) frame->local; + + frame->local = NULL; + + STACK_UNWIND_STRICT (fallocate, frame, op_ret, op_errno, prebuf, + postbuf, xdata); + + if (op_ret == -1 || local == NULL) + goto out; + + priv = this->private; + + if (priv->feature_enabled & GF_QUOTA) + mq_initiate_quota_txn (this, &local->loc); + + if (priv->feature_enabled & GF_XTIME) + marker_xtime_update_marks (this, local); +out: + marker_local_unref (local); + + return 0; +} + +int32_t +marker_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode, + off_t offset, size_t len, dict_t *xdata) +{ + int32_t ret = 0; + marker_local_t *local = NULL; + marker_conf_t *priv = NULL; + + priv = this->private; + + if (priv->feature_enabled == 0) + goto wind; + + local = mem_get0 (this->local_pool); + + MARKER_INIT_LOCAL (frame, local); + + ret = marker_inode_loc_fill (fd->inode, &local->loc); + + if (ret == -1) + goto err; +wind: + STACK_WIND (frame, marker_fallocate_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fallocate, fd, mode, offset, len, + xdata); + return 0; +err: + STACK_UNWIND_STRICT (fallocate, frame, -1, ENOMEM, NULL, NULL, NULL); + + return 0; +} + + +int32_t +marker_discard_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + marker_local_t *local = NULL; + marker_conf_t *priv = NULL; + + if (op_ret == -1) { + gf_log (this->name, GF_LOG_TRACE, "%s occurred during discard", + strerror (op_errno)); + } + + local = (marker_local_t *) frame->local; + + frame->local = NULL; + + STACK_UNWIND_STRICT (discard, frame, op_ret, op_errno, prebuf, + postbuf, xdata); + + if (op_ret == -1 || local == NULL) + goto out; + + priv = this->private; + + if (priv->feature_enabled & GF_QUOTA) + mq_initiate_quota_txn (this, &local->loc); + + if (priv->feature_enabled & GF_XTIME) + marker_xtime_update_marks (this, local); +out: + marker_local_unref (local); + + return 0; +} + +int32_t +marker_discard(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + size_t len, dict_t *xdata) +{ + int32_t ret = 0; + marker_local_t *local = NULL; + marker_conf_t *priv = NULL; + + priv = this->private; + + if (priv->feature_enabled == 0) + goto wind; + + local = mem_get0 (this->local_pool); + + MARKER_INIT_LOCAL (frame, local); + + ret = marker_inode_loc_fill (fd->inode, &local->loc); + + if (ret == -1) + goto err; +wind: + STACK_WIND (frame, marker_discard_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->discard, fd, offset, len, xdata); + return 0; +err: + STACK_UNWIND_STRICT (discard, frame, -1, ENOMEM, NULL, NULL, NULL); + + return 0; +} + +int32_t +marker_zerofill_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + marker_local_t *local = NULL; + marker_conf_t *priv = NULL; + + if (op_ret == -1) { + gf_log (this->name, GF_LOG_TRACE, "%s occurred during zerofill", + strerror (op_errno)); + } + + local = (marker_local_t *) frame->local; + + frame->local = NULL; + + STACK_UNWIND_STRICT (zerofill, frame, op_ret, op_errno, prebuf, + postbuf, xdata); + + if (op_ret == -1 || local == NULL) + goto out; + + priv = this->private; + + if (priv->feature_enabled & GF_QUOTA) + mq_initiate_quota_txn (this, &local->loc); + + if (priv->feature_enabled & GF_XTIME) + marker_xtime_update_marks (this, local); +out: + marker_local_unref (local); + + return 0; +} + +int32_t +marker_zerofill(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + size_t len, dict_t *xdata) +{ + int32_t ret = 0; + marker_local_t *local = NULL; + marker_conf_t *priv = NULL; + + priv = this->private; + + if (priv->feature_enabled == 0) + goto wind; + + local = mem_get0 (this->local_pool); + + MARKER_INIT_LOCAL (frame, local); + + ret = marker_inode_loc_fill (fd->inode, &local->loc); + + if (ret == -1) + goto err; +wind: + STACK_WIND (frame, marker_zerofill_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->zerofill, fd, offset, len, xdata); + return 0; +err: + STACK_UNWIND_STRICT (zerofill, frame, -1, ENOMEM, NULL, NULL, NULL); + return 0; } @@ -1339,7 +2047,7 @@ call_from_sp_client_to_reset_tmfile (call_frame_t *frame, if (data == NULL) return -1; - if (frame->root->pid != -1) { + if (frame->root->pid != GF_CLIENT_PID_GSYNCD) { op_ret = -1; op_errno = EPERM; @@ -1347,7 +2055,7 @@ call_from_sp_client_to_reset_tmfile (call_frame_t *frame, } if (data->len == 0 || (data->len == 5 && - memcmp (data->data, "RESET", 5) == 0)) { + memcmp (data->data, "RESET", 5) == 0)) { fd = open (priv->timestamp_file, O_WRONLY|O_TRUNC); if (fd != -1) { /* TODO check whether the O_TRUNC would update the @@ -1368,7 +2076,7 @@ call_from_sp_client_to_reset_tmfile (call_frame_t *frame, op_errno = EINVAL; } out: - STACK_UNWIND_STRICT (setxattr, frame, op_ret, op_errno); + STACK_UNWIND_STRICT (setxattr, frame, op_ret, op_errno, NULL); return 0; } @@ -1376,21 +2084,21 @@ out: int32_t marker_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) + int32_t op_ret, int32_t op_errno, dict_t *xdata) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; if (op_ret == -1) { - gf_log (this->name, GF_LOG_TRACE, "%s occured while " - "creating symlinks ", strerror (op_errno)); + gf_log (this->name, GF_LOG_TRACE, "%s occurred in " + "setxattr ", strerror (op_errno)); } local = (marker_local_t *) frame->local; frame->local = NULL; - STACK_UNWIND_STRICT (setxattr, frame, op_ret, op_errno); + STACK_UNWIND_STRICT (setxattr, frame, op_ret, op_errno, xdata); if (op_ret == -1 || local == NULL) goto out; @@ -1407,7 +2115,7 @@ out: int32_t marker_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, - int32_t flags) + int32_t flags, dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1422,7 +2130,7 @@ marker_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, if (ret == 0) return 0; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -1432,10 +2140,10 @@ marker_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, goto err; wind: STACK_WIND (frame, marker_setxattr_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->setxattr, loc, dict, flags); + FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, xdata); return 0; err: - STACK_UNWIND_STRICT (setxattr, frame, -1, ENOMEM); + STACK_UNWIND_STRICT (setxattr, frame, -1, ENOMEM, NULL); return 0; } @@ -1443,13 +2151,13 @@ err: int32_t marker_fsetxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) + int32_t op_ret, int32_t op_errno, dict_t *xdata) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; if (op_ret == -1) { - gf_log (this->name, GF_LOG_TRACE, "%s occured while " + gf_log (this->name, GF_LOG_TRACE, "%s occurred while " "creating symlinks ", strerror (op_errno)); } @@ -1457,7 +2165,7 @@ marker_fsetxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; - STACK_UNWIND_STRICT (fsetxattr, frame, op_ret, op_errno); + STACK_UNWIND_STRICT (fsetxattr, frame, op_ret, op_errno, xdata); if (op_ret == -1 || local == NULL) goto out; @@ -1474,7 +2182,7 @@ out: int32_t marker_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict, - int32_t flags) + int32_t flags, dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1489,7 +2197,7 @@ marker_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict, if (ret == 0) return 0; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -1499,10 +2207,10 @@ marker_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict, goto err; wind: STACK_WIND (frame, marker_fsetxattr_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags); + FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata); return 0; err: - STACK_UNWIND_STRICT (fsetxattr, frame, -1, ENOMEM); + STACK_UNWIND_STRICT (fsetxattr, frame, -1, ENOMEM, NULL); return 0; } @@ -1510,14 +2218,14 @@ err: int32_t marker_fsetattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *statpre, - struct iatt *statpost) + int32_t op_ret, int32_t op_errno, struct iatt *statpre, + struct iatt *statpost, dict_t *xdata) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; if (op_ret == -1) { - gf_log (this->name, GF_LOG_ERROR, "%s occured while " + gf_log (this->name, GF_LOG_ERROR, "%s occurred while " "creating symlinks ", strerror (op_errno)); } @@ -1526,7 +2234,7 @@ marker_fsetattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; STACK_UNWIND_STRICT (fsetattr, frame, op_ret, op_errno, statpre, - statpost); + statpost, xdata); if (op_ret == -1 || local == NULL) goto out; @@ -1544,7 +2252,7 @@ out: int32_t marker_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd, - struct iatt *stbuf, int32_t valid) + struct iatt *stbuf, int32_t valid, dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1555,7 +2263,7 @@ marker_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd, if (priv->feature_enabled == 0) goto wind; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -1565,10 +2273,10 @@ marker_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd, goto err; wind: STACK_WIND (frame, marker_fsetattr_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->fsetattr, fd, stbuf, valid); + FIRST_CHILD (this)->fops->fsetattr, fd, stbuf, valid, xdata); return 0; err: - STACK_UNWIND_STRICT (fsetattr, frame, -1, ENOMEM, NULL, NULL); + STACK_UNWIND_STRICT (fsetattr, frame, -1, ENOMEM, NULL, NULL, NULL); return 0; } @@ -1576,8 +2284,8 @@ err: int32_t marker_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *statpre, - struct iatt *statpost) + int32_t op_ret, int32_t op_errno, struct iatt *statpre, + struct iatt *statpost, dict_t *xdata) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; @@ -1589,13 +2297,13 @@ marker_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, if (op_ret == -1) { gf_log (this->name, ((op_errno == ENOENT) ? GF_LOG_DEBUG : GF_LOG_ERROR), - "%s occured during setattr of %s", + "%s occurred during setattr of %s", strerror (op_errno), (local ? local->loc.path : "<nul>")); } STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, statpre, - statpost); + statpost, xdata); if (op_ret == -1 || local == NULL) goto out; @@ -1612,7 +2320,7 @@ out: int32_t marker_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, - struct iatt *stbuf, int32_t valid) + struct iatt *stbuf, int32_t valid, dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1623,7 +2331,7 @@ marker_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, if (priv->feature_enabled == 0) goto wind; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -1633,10 +2341,10 @@ marker_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, goto err; wind: STACK_WIND (frame, marker_setattr_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->setattr, loc, stbuf, valid); + FIRST_CHILD (this)->fops->setattr, loc, stbuf, valid, xdata); return 0; err: - STACK_UNWIND_STRICT (setattr, frame, -1, ENOMEM, NULL, NULL); + STACK_UNWIND_STRICT (setattr, frame, -1, ENOMEM, NULL, NULL, NULL); return 0; } @@ -1644,13 +2352,13 @@ err: int32_t marker_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) + int32_t op_ret, int32_t op_errno, dict_t *xdata) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; if (op_ret == -1) { - gf_log (this->name, GF_LOG_ERROR, "%s occured while " + gf_log (this->name, GF_LOG_ERROR, "%s occurred while " "creating symlinks ", strerror (op_errno)); } @@ -1658,7 +2366,7 @@ marker_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; - STACK_UNWIND_STRICT (removexattr, frame, op_ret, op_errno); + STACK_UNWIND_STRICT (removexattr, frame, op_ret, op_errno, xdata); if (op_ret == -1 || local == NULL) goto out; @@ -1675,7 +2383,7 @@ out: int32_t marker_removexattr (call_frame_t *frame, xlator_t *this, loc_t *loc, - const char *name) + const char *name, dict_t *xdata) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1686,7 +2394,7 @@ marker_removexattr (call_frame_t *frame, xlator_t *this, loc_t *loc, if (priv->feature_enabled == 0) goto wind; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -1696,10 +2404,10 @@ marker_removexattr (call_frame_t *frame, xlator_t *this, loc_t *loc, goto err; wind: STACK_WIND (frame, marker_removexattr_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->removexattr, loc, name); + FIRST_CHILD(this)->fops->removexattr, loc, name, xdata); return 0; err: - STACK_UNWIND_STRICT (removexattr, frame, -1, ENOMEM); + STACK_UNWIND_STRICT (removexattr, frame, -1, ENOMEM, NULL); return 0; } @@ -1728,10 +2436,19 @@ marker_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, if (op_ret == -1 || local == NULL) goto out; + /* copy the gfid from the stat structure instead of inode, + * since if the lookup is fresh lookup, then the inode + * would have not yet linked to the inode table which happens + * in protocol/server. + */ + if (uuid_is_null (local->loc.gfid)) + uuid_copy (local->loc.gfid, buf->ia_gfid); + + priv = this->private; if (priv->feature_enabled & GF_QUOTA) { - quota_xattr_state (this, &local->loc, dict, *buf); + mq_xattr_state (this, &local->loc, dict, *buf); } out: @@ -1753,7 +2470,7 @@ marker_lookup (call_frame_t *frame, xlator_t *this, if (priv->feature_enabled == 0) goto wind; - ALLOCATE_OR_GOTO (local, marker_local_t, err); + local = mem_get0 (this->local_pool); MARKER_INIT_LOCAL (frame, local); @@ -1762,7 +2479,7 @@ marker_lookup (call_frame_t *frame, xlator_t *this, goto err; if ((priv->feature_enabled & GF_QUOTA) && xattr_req) - quota_req_xattr (this, loc, xattr_req); + mq_req_xattr (this, loc, xattr_req); wind: STACK_WIND (frame, marker_lookup_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup, loc, xattr_req); @@ -1773,6 +2490,49 @@ err: return 0; } +int +marker_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, gf_dirent_t *entries, + dict_t *xdata) +{ + gf_dirent_t *entry = NULL; + + if (op_ret <= 0) + goto unwind; + + list_for_each_entry (entry, &entries->list, list) { + /* TODO: fill things */ + } + +unwind: + STACK_UNWIND_STRICT (readdirp, frame, op_ret, op_errno, entries, xdata); + + return 0; +} + +int +marker_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, + off_t offset, dict_t *dict) +{ + marker_conf_t *priv = NULL; + + priv = this->private; + + if (priv->feature_enabled == 0) + goto wind; + + if ((priv->feature_enabled & GF_QUOTA) && dict) + mq_req_xattr (this, NULL, dict); + +wind: + STACK_WIND (frame, marker_readdirp_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, + fd, size, offset, dict); + + return 0; +} + + int32_t mem_acct_init (xlator_t *this) { @@ -1785,7 +2545,7 @@ mem_acct_init (xlator_t *this) if (ret != 0) { gf_log(this->name, GF_LOG_ERROR, "Memory accounting init" - "failed"); + "failed"); return ret; } @@ -1811,12 +2571,14 @@ init_xtime_priv (xlator_t *this, dict_t *options) ret = uuid_parse (priv->volume_uuid, priv->volume_uuid_bin); if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, "invalid volume uuid %s", priv->volume_uuid); + gf_log (this->name, GF_LOG_ERROR, + "invalid volume uuid %s", priv->volume_uuid); goto out; } ret = gf_asprintf (& (priv->marker_xattr), "%s.%s.%s", - MARKER_XATTR_PREFIX, priv->volume_uuid, XTIME); + MARKER_XATTR_PREFIX, priv->volume_uuid, + XTIME); if (ret == -1){ priv->marker_xattr = NULL; @@ -1871,14 +2633,11 @@ marker_xtime_priv_cleanup (xlator_t *this) GF_VALIDATE_OR_GOTO (this->name, priv, out); - if (priv->volume_uuid != NULL) - GF_FREE (priv->volume_uuid); + GF_FREE (priv->volume_uuid); - if (priv->timestamp_file != NULL) - GF_FREE (priv->timestamp_file); + GF_FREE (priv->timestamp_file); - if (priv->marker_xattr != NULL) - GF_FREE (priv->marker_xattr); + GF_FREE (priv->marker_xattr); out: return; } @@ -1906,7 +2665,7 @@ out: int32_t reconfigure (xlator_t *this, dict_t *options) { - int32_t ret = -1; + int32_t ret = 0; data_t *data = NULL; gf_boolean_t flag = _gf_false; marker_conf_t *priv = NULL; @@ -1947,11 +2706,17 @@ reconfigure (xlator_t *this, dict_t *options) "xtime updation will fail"); } else { priv->feature_enabled |= GF_XTIME; + data = dict_get (options, "gsync-force-xtime"); + if (!data) + goto out; + ret = gf_string2boolean (data->data, &flag); + if (ret == 0 && flag) + priv->feature_enabled |= GF_XTIME_GSYNC_FORCE; } } } out: - return 0; + return ret; } @@ -2007,9 +2772,23 @@ init (xlator_t *this) goto err; priv->feature_enabled |= GF_XTIME; + data = dict_get (options, "gsync-force-xtime"); + if (!data) + goto cont; + ret = gf_string2boolean (data->data, &flag); + if (ret == 0 && flag) + priv->feature_enabled |= GF_XTIME_GSYNC_FORCE; } } + cont: + this->local_pool = mem_pool_new (marker_local_t, 128); + if (!this->local_pool) { + gf_log (this->name, GF_LOG_ERROR, + "failed to create local_t's memory pool"); + goto err; + } + return 0; err: marker_priv_cleanup (this); @@ -2031,7 +2810,7 @@ marker_forget (xlator_t *this, inode_t *inode) goto out; } - quota_forget (this, ctx->quota_ctx); + mq_forget (this, ctx->quota_ctx); GF_FREE (ctx); out: @@ -2062,11 +2841,15 @@ struct xlator_fops fops = { .setattr = marker_setattr, .fsetattr = marker_fsetattr, .removexattr = marker_removexattr, - .getxattr = marker_getxattr + .getxattr = marker_getxattr, + .readdirp = marker_readdirp, + .fallocate = marker_fallocate, + .discard = marker_discard, + .zerofill = marker_zerofill, }; struct xlator_cbks cbks = { - .forget = marker_forget + .forget = marker_forget }; struct volume_options options[] = { @@ -2074,5 +2857,6 @@ struct volume_options options[] = { {.key = {"timestamp-file"}}, {.key = {"quota"}}, {.key = {"xtime"}}, + {.key = {"gsync-force-xtime"}}, {.key = {NULL}} }; diff --git a/xlators/features/marker/src/marker.h b/xlators/features/marker/src/marker.h index ea1f5cc0a..1a58f8cfc 100644 --- a/xlators/features/marker/src/marker.h +++ b/xlators/features/marker/src/marker.h @@ -1,21 +1,12 @@ -/*Copyright (c) 2008-2010 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ +/* + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ #ifndef _MARKER_H #define _MARKER_H @@ -28,6 +19,7 @@ #include "xlator.h" #include "defaults.h" #include "uuid.h" +#include "call-stub.h" #define MARKER_XATTR_PREFIX "trusted.glusterfs" #define XTIME "xtime" @@ -36,8 +28,9 @@ #define TIMESTAMP_FILE "timestamp-file" enum { - GF_QUOTA=1, - GF_XTIME=2 + GF_QUOTA = 1, + GF_XTIME = 2, + GF_XTIME_GSYNC_FORCE = 4, }; /*initialize the local variable*/ @@ -46,6 +39,8 @@ enum { _local->pid = _frame->root->pid; \ memset (&_local->loc, 0, sizeof (loc_t)); \ _local->ref = 1; \ + _local->uid = -1; \ + _local->gid = -1; \ LOCK_INIT (&_local->lock); \ _local->oplocal = NULL; \ } while (0) @@ -61,18 +56,67 @@ enum { } \ } while (0) +#define _MARKER_SET_UID_GID(dest, src) \ + do { \ + if (src->uid != -1 && \ + src->gid != -1) { \ + dest->uid = src->uid; \ + dest->gid = src->gid; \ + } \ + } while (0) + +#define MARKER_SET_UID_GID(frame, dest, src) \ + do { \ + _MARKER_SET_UID_GID (dest, src); \ + frame->root->uid = 0; \ + frame->root->gid = 0; \ + frame->cookie = (void *) _GF_UID_GID_CHANGED; \ + } while (0) + +#define MARKER_RESET_UID_GID(frame, dest, src) \ + do { \ + _MARKER_SET_UID_GID (dest, src); \ + frame->cookie = NULL; \ + } while (0) + struct marker_local{ uint32_t timebuf[2]; pid_t pid; loc_t loc; + loc_t parent_loc; + loc_t *next_lock_on; + uid_t uid; + gid_t gid; int32_t ref; int32_t ia_nlink; gf_lock_t lock; mode_t mode; + int32_t err; + call_stub_t *stub; + int64_t contribution; struct marker_local *oplocal; + + /* marker quota specific */ + int64_t delta; + int64_t d_off; + int64_t sum; + int64_t size; + int32_t hl_count; + int32_t dentry_child_count; + + fd_t *fd; + call_frame_t *frame; + + quota_inode_ctx_t *ctx; + inode_contribution_t *contri; + + int xflag; + dict_t *xdata; }; typedef struct marker_local marker_local_t; +#define quota_local_t marker_local_t + struct marker_inode_ctx { struct quota_inode_ctx *quota_ctx; }; @@ -91,5 +135,4 @@ struct marker_conf{ }; typedef struct marker_conf marker_conf_t; -int32_t k; #endif |
