diff options
-rw-r--r-- | libglusterfs/src/call-stub.c | 2 | ||||
-rw-r--r-- | xlators/features/marker/src/marker-common.c | 18 | ||||
-rw-r--r-- | xlators/features/marker/src/marker-quota-helper.c | 42 | ||||
-rw-r--r-- | xlators/features/marker/src/marker-quota-helper.h | 17 | ||||
-rw-r--r-- | xlators/features/marker/src/marker-quota.c | 609 | ||||
-rw-r--r-- | xlators/features/marker/src/marker-quota.h | 58 | ||||
-rw-r--r-- | xlators/features/marker/src/marker.c | 643 | ||||
-rw-r--r-- | xlators/features/marker/src/marker.h | 6 |
8 files changed, 1019 insertions, 376 deletions
diff --git a/libglusterfs/src/call-stub.c b/libglusterfs/src/call-stub.c index 3715dfffa..4fe1caf33 100644 --- a/libglusterfs/src/call-stub.c +++ b/libglusterfs/src/call-stub.c @@ -2695,7 +2695,6 @@ call_resume_unwind (call_stub_t *stub) case GF_FOP_RENAME: { -#if 0 if (!stub->args.rename_cbk.fn) STACK_UNWIND (stub->frame, stub->args.rename_cbk.op_ret, @@ -2716,7 +2715,6 @@ call_resume_unwind (call_stub_t *stub) &stub->args.rename_cbk.postoldparent, &stub->args.rename_cbk.prenewparent, &stub->args.rename_cbk.postnewparent); -#endif break; } diff --git a/xlators/features/marker/src/marker-common.c b/xlators/features/marker/src/marker-common.c index 9d27167fc..dac08ec55 100644 --- a/xlators/features/marker/src/marker-common.c +++ b/xlators/features/marker/src/marker-common.c @@ -42,7 +42,7 @@ int32_t marker_force_inode_ctx_get (inode_t *inode, xlator_t *this, marker_inode_ctx_t **ctx) { - int32_t ret = -1; + int32_t ret = -1; uint64_t ctx_int = 0; LOCK (&inode->lock); @@ -71,16 +71,16 @@ unlock: UNLOCK (&inode->lock); void marker_filter_quota_xattr (dict_t *dict, char *key, - data_t *value, void *data) + data_t *value, void *data) { - int ret = -1; + int ret = -1; - GF_VALIDATE_OR_GOTO ("marker", dict, out); - GF_VALIDATE_OR_GOTO ("marker", key, out); + GF_VALIDATE_OR_GOTO ("marker", dict, out); + GF_VALIDATE_OR_GOTO ("marker", key, out); - ret = fnmatch ("trusted.glusterfs.quota*", key, 0); - if (ret == 0) - dict_del (dict, key); + ret = fnmatch ("trusted.glusterfs.quota*", key, 0); + if (ret == 0) + dict_del (dict, key); out: - return; + return; } diff --git a/xlators/features/marker/src/marker-quota-helper.c b/xlators/features/marker/src/marker-quota-helper.c index fba2cdd3f..a559da808 100644 --- a/xlators/features/marker/src/marker-quota-helper.c +++ b/xlators/features/marker/src/marker-quota-helper.c @@ -178,6 +178,8 @@ __add_new_contribution_node (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc) uuid_copy (contribution->gfid, loc->parent->gfid); + LOCK_INIT (&contribution->lock); + list_add_tail (&contribution->contri_list, &ctx->contribution_head); out: @@ -354,12 +356,13 @@ quota_local_ref (quota_local_t *local) int32_t quota_local_unref (xlator_t *this, quota_local_t *local) { + int32_t ref = 0; if (local == NULL) goto out; - QUOTA_SAFE_DECREMENT (&local->lock, local->ref); + QUOTA_SAFE_DECREMENT (&local->lock, local->ref, ref); - if (local->ref > 0) + if (ref > 0) goto out; if (local->fd != NULL) @@ -373,3 +376,38 @@ quota_local_unref (xlator_t *this, quota_local_t *local) out: return 0; } + + +inode_contribution_t * +get_contribution_from_loc (xlator_t *this, loc_t *loc) +{ + int32_t ret = 0; + quota_inode_ctx_t *ctx = NULL; + inode_contribution_t *contribution = NULL; + + ret = quota_inode_ctx_get (loc->inode, this, &ctx); + if (ret < 0) { + gf_log_callingfn (this->name, GF_LOG_WARNING, + "cannot get marker-quota context from inode " + "(ino: %"PRId64", gfid:%s, path:%s)", + loc->inode->ino, + uuid_utoa (loc->inode->gfid), + loc->path); + goto err; + } + + contribution = get_contribution_node (loc->parent, ctx); + if (contribution == NULL) { + gf_log_callingfn (this->name, GF_LOG_WARNING, + "inode (ino:%"PRId64", gfid:%s, path:%s ) has" + " no contribution towards parent (ino:%"PRId64 + ", gfid:%s)", loc->inode->ino, + uuid_utoa (loc->inode->gfid), + loc->path, loc->parent->ino, + uuid_utoa (loc->parent->gfid)); + goto err; + } + +err: + return contribution; +} diff --git a/xlators/features/marker/src/marker-quota-helper.h b/xlators/features/marker/src/marker-quota-helper.h index 9a24c8c3d..1c925ec7c 100644 --- a/xlators/features/marker/src/marker-quota-helper.h +++ b/xlators/features/marker/src/marker-quota-helper.h @@ -22,6 +22,7 @@ #define _CONFIG_H #include "config.h" #endif + #include "marker-quota.h" #define QUOTA_FREE_CONTRIBUTION_NODE(_contribution) \ @@ -37,11 +38,13 @@ UNLOCK (lock); \ } while (0) -#define QUOTA_SAFE_DECREMENT(lock, var) \ - do { \ - LOCK (lock); \ - var --; \ - UNLOCK (lock); \ +#define QUOTA_SAFE_DECREMENT(lock, var, value) \ + do { \ + LOCK (lock); \ + { \ + value = --var; \ + } \ + UNLOCK (lock); \ } while (0) inode_contribution_t * @@ -73,4 +76,8 @@ quota_local_unref (xlator_t *, quota_local_t *); inode_contribution_t * get_contribution_node (inode_t *, quota_inode_ctx_t *); + +inode_contribution_t * +get_contribution_from_loc (xlator_t *this, loc_t *loc); + #endif diff --git a/xlators/features/marker/src/marker-quota.c b/xlators/features/marker/src/marker-quota.c index 8d4f17ea5..16171bfb8 100644 --- a/xlators/features/marker/src/marker-quota.c +++ b/xlators/features/marker/src/marker-quota.c @@ -34,7 +34,7 @@ void mq_assign_lk_owner (xlator_t *this, call_frame_t *frame) { marker_conf_t *conf = NULL; - uint64_t lk_owner = 0; + uint64_t lk_owner = 0; conf = this->private; @@ -108,11 +108,12 @@ dirty_inode_updation_done (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno) { quota_local_t *local = NULL; + int32_t value = 0; local = frame->local; if (!local->err) - QUOTA_SAFE_DECREMENT (&local->lock, local->ref); + QUOTA_SAFE_DECREMENT (&local->lock, local->ref, value); else frame->local = NULL; @@ -125,7 +126,7 @@ int32_t release_lock_on_dirty_inode (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno) { - struct gf_flock lock; + struct gf_flock lock = {0, }; quota_local_t *local = NULL; local = frame->local; @@ -158,13 +159,13 @@ release_lock_on_dirty_inode (call_frame_t *frame, void *cookie, xlator_t *this, int32_t mark_inode_undirty (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, dict_t *dict) + int32_t op_ret, int32_t op_errno, dict_t *dict) { - int32_t ret = -1; - int64_t *size = NULL; - dict_t *newdict = NULL; - quota_local_t *local = NULL; - marker_conf_t *priv = NULL; + int32_t ret = -1; + int64_t *size = NULL; + dict_t *newdict = NULL; + quota_local_t *local = NULL; + marker_conf_t *priv = NULL; local = (quota_local_t *) frame->local; @@ -180,7 +181,11 @@ mark_inode_undirty (call_frame_t *frame, void *cookie, xlator_t *this, if (ret) goto wind; - local->ctx->size = ntoh64 (*size); + LOCK (&local->ctx->lock); + { + local->ctx->size = ntoh64 (*size); + } + UNLOCK (&local->ctx->lock); wind: newdict = dict_new (); @@ -282,10 +287,10 @@ err: int32_t get_dirty_inode_size (call_frame_t *frame, xlator_t *this) { - int32_t ret = -1; - dict_t *dict = NULL; - quota_local_t *local = NULL; - marker_conf_t *priv = NULL; + int32_t ret = -1; + dict_t *dict = NULL; + quota_local_t *local = NULL; + marker_conf_t *priv = NULL; local = (quota_local_t *) frame->local; @@ -326,11 +331,11 @@ get_child_contribution (call_frame_t *frame, dict_t *dict, struct iatt *postparent) { - int32_t ret = -1; - int32_t val = 0; - char contri_key [512] = {0, }; - int64_t *contri = NULL; - quota_local_t *local = NULL; + int32_t ret = -1; + int32_t val = 0; + char contri_key [512] = {0, }; + int64_t *contri = NULL; + quota_local_t *local = NULL; local = frame->local; @@ -370,7 +375,7 @@ out: if (val== 0) { if (local->err) { - QUOTA_SAFE_DECREMENT (&local->lock, local->ref); + QUOTA_SAFE_DECREMENT (&local->lock, local->ref, val); quota_local_unref (this, local); } else @@ -389,15 +394,15 @@ quota_readdir_cbk (call_frame_t *frame, int32_t op_errno, gf_dirent_t *entries) { - char contri_key [512] = {0, }; - loc_t loc; - int32_t ret = 0; - off_t offset = 0; - int32_t count = 0; - dict_t *dict = NULL; - quota_local_t *local = NULL; - gf_dirent_t *entry = NULL; - call_frame_t *newframe = NULL; + char contri_key [512] = {0, }; + int32_t ret = 0; + off_t offset = 0; + int32_t count = 0; + dict_t *dict = NULL; + quota_local_t *local = NULL; + gf_dirent_t *entry = NULL; + call_frame_t *newframe = NULL; + loc_t loc = {0, }; local = frame->local; @@ -418,15 +423,17 @@ quota_readdir_cbk (call_frame_t *frame, local->dentry_child_count = 0; list_for_each_entry (entry, (&entries->list), list) { - gf_log (this->name, GF_LOG_DEBUG, "entry = %s", entry->d_name); + gf_log (this->name, GF_LOG_DEBUG, "entry = %s", entry->d_name); - if ((!strcmp (entry->d_name, ".")) || (!strcmp (entry->d_name, - ".."))) { - gf_log (this->name, GF_LOG_DEBUG, "entry = %s", - entry->d_name); - continue; - } - count++; + if ((!strcmp (entry->d_name, ".")) || (!strcmp (entry->d_name, + ".."))) { + gf_log (this->name, GF_LOG_DEBUG, "entry = %s", + entry->d_name); + continue; + } + + offset = entry->d_off; + count++; } local->frame = frame; @@ -435,6 +442,7 @@ quota_readdir_cbk (call_frame_t *frame, LOCK (&local->lock); { local->dentry_child_count = count; + local->d_off = offset; } UNLOCK (&local->lock); } @@ -447,7 +455,6 @@ quota_readdir_cbk (call_frame_t *frame, ".."))) { gf_log (this->name, GF_LOG_DEBUG, "entry = %s", entry->d_name); - offset = entry->d_off; continue; } @@ -513,17 +520,12 @@ quota_readdir_cbk (call_frame_t *frame, break; } } - gf_log (this->name, GF_LOG_DEBUG, "offset before =%"PRIu64, - local->d_off); - local->d_off +=offset; - gf_log (this->name, GF_LOG_DEBUG, "offset after = %"PRIu64, - local->d_off); - if (ret) + if (ret) { release_lock_on_dirty_inode (frame, NULL, this, 0, 0); - - else if (count == 0 ) + } else if (count == 0 ) { get_dirty_inode_size (frame, this); + } return 0; } @@ -586,8 +588,8 @@ check_if_still_dirty (call_frame_t *frame, priv = this->private; if (!dict) { - ret = -1; - goto err; + ret = -1; + goto err; } ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty); @@ -619,6 +621,10 @@ err: release_lock_on_dirty_inode (frame, NULL, this, 0, 0); } + if (fd != NULL) { + fd_unref (fd); + } + return 0; } @@ -626,10 +632,10 @@ int32_t get_dirty_xattr (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno) { - int32_t ret = -1; - dict_t *xattr_req = NULL; - quota_local_t *local = NULL; - marker_conf_t *priv = NULL; + int32_t ret = -1; + dict_t *xattr_req = NULL; + quota_local_t *local = NULL; + marker_conf_t *priv = NULL; if (op_ret == -1) { dirty_inode_updation_done (frame, NULL, this, 0, 0); @@ -716,7 +722,7 @@ update_dirty_inode (xlator_t *this, return 0; fr_destroy: - QUOTA_STACK_DESTROY (frame, this); + QUOTA_STACK_DESTROY (frame, this); out: return 0; @@ -727,18 +733,42 @@ int32_t quota_inode_creation_done (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno) { - quota_local_t *local = NULL; - if (frame == NULL) return 0; + QUOTA_STACK_DESTROY (frame, this); + + return 0; +} + + +int32_t +quota_xattr_creation_release_lock (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno) +{ + struct gf_flock lock = {0, }; + quota_local_t *local = NULL; + local = frame->local; - QUOTA_STACK_DESTROY (frame, this); + lock.l_type = F_UNLCK; + lock.l_whence = SEEK_SET; + lock.l_start = 0; + lock.l_len = 0; + lock.l_pid = 0; + + STACK_WIND (frame, + quota_inode_creation_done, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, &local->loc, + F_SETLKW, &lock); return 0; } + int32_t create_dirty_xattr (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *dict) @@ -748,8 +778,9 @@ create_dirty_xattr (call_frame_t *frame, void *cookie, xlator_t *this, quota_local_t *local = NULL; marker_conf_t *priv = NULL; - if (op_ret == -1 && op_errno == ENOTCONN) + if (op_ret < 0) { goto err; + } local = frame->local; @@ -757,27 +788,31 @@ create_dirty_xattr (call_frame_t *frame, void *cookie, xlator_t *this, if (local->loc.inode->ia_type == IA_IFDIR) { newdict = dict_new (); - if (!newdict) + if (!newdict) { goto err; + } ret = dict_set_int8 (newdict, QUOTA_DIRTY_KEY, 0); - if (ret == -1) + if (ret == -1) { goto err; + } - STACK_WIND (frame, quota_inode_creation_done, + STACK_WIND (frame, quota_xattr_creation_release_lock, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, &local->loc, newdict, 0); - } else - quota_inode_creation_done (frame, NULL, this, 0, 0); + } else { + quota_xattr_creation_release_lock (frame, NULL, this, 0, 0); + } ret = 0; err: - if (ret == -1) - quota_inode_creation_done (frame, NULL, this, -1, 0); + if (ret < 0) { + quota_xattr_creation_release_lock (frame, NULL, this, 0, 0); + } - if (newdict) + if (newdict != NULL) dict_unref (newdict); return 0; @@ -785,27 +820,28 @@ err: int32_t -quota_set_inode_xattr (xlator_t *this, loc_t *loc) +quota_create_xattr (xlator_t *this, call_frame_t *frame) { int32_t ret = 0; int64_t *value = NULL; int64_t *size = NULL; dict_t *dict = NULL; char key[512] = {0, }; - call_frame_t *frame = NULL; quota_local_t *local = NULL; marker_conf_t *priv = NULL; quota_inode_ctx_t *ctx = NULL; inode_contribution_t *contri = NULL; - if (loc == NULL || this == NULL) + if (frame == NULL || this == NULL) return 0; + local = frame->local; + priv = (marker_conf_t *) this->private; - ret = quota_inode_ctx_get (loc->inode, this, &ctx); + ret = quota_inode_ctx_get (local->loc.inode, this, &ctx); if (ret < 0) { - ctx = quota_inode_ctx_new (loc->inode, this); + ctx = quota_inode_ctx_new (local->loc.inode, this); if (ctx == NULL) { gf_log (this->name, GF_LOG_WARNING, "quota_inode_ctx_new failed"); @@ -818,29 +854,150 @@ quota_set_inode_xattr (xlator_t *this, loc_t *loc) if (!dict) goto out; - if (loc->inode->ia_type == IA_IFDIR) { + if (local->loc.inode->ia_type == IA_IFDIR) { QUOTA_ALLOC_OR_GOTO (size, int64_t, ret, err); ret = dict_set_bin (dict, QUOTA_SIZE_KEY, size, 8); if (ret < 0) goto free_size; } - //if '/' then dont set contribution xattr - if (strcmp (loc->path, "/") == 0) - goto wind; + if (strcmp (local->loc.path, "/") != 0) { + contri = add_new_contribution_node (this, ctx, &local->loc); + if (contri == NULL) + goto err; - contri = add_new_contribution_node (this, ctx, loc); - if (contri == NULL) - goto err; + QUOTA_ALLOC_OR_GOTO (value, int64_t, ret, err); + GET_CONTRI_KEY (key, local->loc.parent->gfid, ret); + + ret = dict_set_bin (dict, key, value, 8); + if (ret < 0) + goto free_value; + } + + STACK_WIND (frame, create_dirty_xattr, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->xattrop, &local->loc, + GF_XATTROP_ADD_ARRAY64, dict); + ret = 0; + +free_size: + if (ret < 0) { + GF_FREE (size); + } + +free_value: + if (ret < 0) { + GF_FREE (value); + } + +err: + dict_unref (dict); + +out: + if (ret < 0) { + quota_xattr_creation_release_lock (frame, NULL, this, 0, 0); + } + + return 0; +} + + +int32_t +quota_check_n_set_inode_xattr (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, int32_t op_errno, + inode_t *inode, struct iatt *buf, dict_t *dict, + struct iatt *postparent) +{ + quota_local_t *local = NULL; + int64_t *size = NULL, *contri = NULL; + int8_t dirty = 0; + marker_conf_t *priv = NULL; + int32_t ret = 0; + char contri_key[512] = {0, }; + + if (op_ret < 0) { + goto out; + } - QUOTA_ALLOC_OR_GOTO (value, int64_t, ret, err); - GET_CONTRI_KEY (key, loc->parent->gfid, ret); + local = frame->local; + priv = this->private; - ret = dict_set_bin (dict, key, value, 8); + ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size); if (ret < 0) - goto free_value; + goto create_xattr; + + ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty); + if (ret < 0) + goto create_xattr; + + //check contribution xattr if not root + if (strcmp (local->loc.path, "/") != 0) { + GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret); + if (ret < 0) + goto out; + + ret = dict_get_bin (dict, contri_key, (void **) &contri); + if (ret < 0) + goto create_xattr; + } + +out: + quota_xattr_creation_release_lock (frame, NULL, this, 0, 0); + return 0; + +create_xattr: + quota_create_xattr (this, frame); + return 0; +} + + +int32_t +quota_get_xattr (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno) +{ + dict_t *xattr_req = NULL; + quota_local_t *local = NULL; + int32_t ret = 0; + + if (op_ret < 0) { + goto lock_err; + } + + local = frame->local; + + xattr_req = dict_new (); + if (xattr_req == NULL) { + goto err; + } + + ret = quota_req_xattr (this, &local->loc, xattr_req); + if (ret < 0) { + gf_log (this->name, GF_LOG_WARNING, "cannot request xattr"); + goto err; + } + + STACK_WIND (frame, quota_check_n_set_inode_xattr, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->lookup, &local->loc, xattr_req); + + return 0; + +err: + quota_xattr_creation_release_lock (frame, NULL, this, 0, 0); + return 0; + +lock_err: + quota_inode_creation_done (frame, NULL, this, 0, 0); + return 0; +} + + +int32_t +quota_set_inode_xattr (xlator_t *this, loc_t *loc) +{ + struct gf_flock lock = {0, }; + quota_local_t *local = NULL; + int32_t ret = 0; + call_frame_t *frame = NULL; -wind: frame = create_frame (this, this->ctx->pool); if (!frame) { ret = -1; @@ -848,38 +1005,34 @@ wind: } local = quota_local_new (); - if (local == NULL) - goto free_size; - - local->ctx = ctx; + if (local == NULL) { + goto err; + } - local->contri = contri; + frame->local = local; ret = loc_copy (&local->loc, loc); - if (ret < 0) - quota_local_unref (this, local); + if (ret < 0) { + goto err; + } frame->local = local; - STACK_WIND (frame, create_dirty_xattr, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->xattrop, &local->loc, - GF_XATTROP_ADD_ARRAY64, dict); - ret = 0; + lock.l_len = 0; + lock.l_start = 0; + lock.l_type = F_WRLCK; + lock.l_whence = SEEK_SET; -free_size: - if (ret < 0) - GF_FREE (size); + STACK_WIND (frame, + quota_get_xattr, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, &local->loc, F_SETLKW, &lock); -free_value: - if (ret < 0) - GF_FREE (value); + return 0; err: - dict_unref (dict); - -out: - if (ret < 0) - quota_inode_creation_done (NULL, NULL, this, -1, 0); + QUOTA_STACK_DESTROY (frame, this); return 0; } @@ -945,7 +1098,8 @@ quota_inodelk_cbk (call_frame_t *frame, void *cookie, gf_log (this->name, GF_LOG_DEBUG, "inodelk released on %s", local->parent_loc.path); - if (strcmp (local->parent_loc.path, "/") == 0) { + if ((strcmp (local->parent_loc.path, "/") == 0) + || (local->delta == 0)) { xattr_updation_done (frame, NULL, this, 0, 0, NULL); } else { ret = get_parent_inode_local (this, local); @@ -968,9 +1122,9 @@ quota_release_parent_lock (call_frame_t *frame, void *cookie, int32_t op_errno) { int32_t ret = 0; - struct gf_flock lock; quota_local_t *local = NULL; quota_inode_ctx_t *ctx = NULL; + struct gf_flock lock = {0, }; local = frame->local; @@ -1029,7 +1183,7 @@ quota_mark_undirty (call_frame_t *frame, priv = this->private; - //update the size of the parent inode + //update the size of the parent inode if (dict != NULL) { ret = quota_inode_ctx_get (local->parent_loc.inode, this, &ctx); if (ret < 0) @@ -1104,7 +1258,11 @@ quota_update_parent_size (call_frame_t *frame, goto err; } - local->contri->contribution += local->delta; + LOCK (&local->contri->lock); + { + local->contri->contribution += local->delta; + } + UNLOCK (&local->contri->lock); gf_log (this->name, GF_LOG_DEBUG, "%s %"PRId64 "%"PRId64, local->loc.path, local->ctx->size, @@ -1160,16 +1318,16 @@ quota_update_inode_contribution (call_frame_t *frame, void *cookie, struct iatt *buf, dict_t *dict, struct iatt *postparent) { - int32_t ret = -1; - int64_t *size = NULL; - int64_t *contri = NULL; - int64_t *delta = NULL; + int32_t ret = -1; + int64_t *size = NULL, size_int = 0, contri_int = 0; + int64_t *contri = NULL; + int64_t *delta = NULL; char contri_key [512] = {0, }; - dict_t *newdict = NULL; - quota_local_t *local = NULL; - quota_inode_ctx_t *ctx = NULL; - marker_conf_t *priv = NULL; - inode_contribution_t *contribution = NULL; + dict_t *newdict = NULL; + quota_local_t *local = NULL; + quota_inode_ctx_t *ctx = NULL; + marker_conf_t *priv = NULL; + inode_contribution_t *contribution = NULL; local = frame->local; @@ -1203,30 +1361,44 @@ quota_update_inode_contribution (call_frame_t *frame, void *cookie, } else ctx->size = buf->ia_blocks * 512; - ret = dict_get_bin (dict, contri_key, (void **) &contri); + size_int = ctx->size; + } +unlock: + UNLOCK (&ctx->lock); + + if (ret < 0) { + goto err; + } + + ret = dict_get_bin (dict, contri_key, (void **) &contri); + + LOCK (&contribution->lock); + { if (ret < 0) contribution->contribution = 0; else contribution->contribution = ntoh64 (*contri); - ret = 0; + contri_int = contribution->contribution; } -unlock: - UNLOCK (&ctx->lock); - - if (ret < 0) - goto err; + UNLOCK (&contribution->lock); gf_log (this->name, GF_LOG_DEBUG, "%s %"PRId64 "%"PRId64, - local->loc.path, ctx->size, contribution->contribution); + local->loc.path, size_int, contri_int); + + local->delta = size_int - contri_int; + + if (local->delta == 0) { + quota_mark_undirty (frame, NULL, this, 0, 0, NULL); + return 0; + } + newdict = dict_new (); if (newdict == NULL) { ret = -1; goto err; } - local->delta = ctx->size - contribution->contribution; - QUOTA_ALLOC_OR_GOTO (delta, int64_t, ret, err); *delta = hton64 (local->delta); @@ -1264,12 +1436,12 @@ quota_fetch_child_size_and_contri (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno) { - int32_t ret = -1; + int32_t ret = -1; char contri_key [512] = {0, }; - dict_t *newdict = NULL; - quota_local_t *local = NULL; - marker_conf_t *priv = NULL; - quota_inode_ctx_t *ctx = NULL; + dict_t *newdict = NULL; + quota_local_t *local = NULL; + marker_conf_t *priv = NULL; + quota_inode_ctx_t *ctx = NULL; local = frame->local; @@ -1390,7 +1562,7 @@ err: int32_t get_lock_on_parent (call_frame_t *frame, xlator_t *this) { - struct gf_flock lock; + struct gf_flock lock = {0, }; quota_local_t *local = NULL; GF_VALIDATE_OR_GOTO ("marker", frame, fr_destroy); @@ -1467,8 +1639,8 @@ err: int initiate_quota_txn (xlator_t *this, loc_t *loc) { - int32_t ret = -1; - quota_inode_ctx_t *ctx = NULL; + int32_t ret = -1; + quota_inode_ctx_t *ctx = NULL; inode_contribution_t *contribution = NULL; VALIDATE_OR_GOTO (loc, out); @@ -1491,34 +1663,33 @@ out: } -int32_t -validate_inode_size_contribution (xlator_t *this, - loc_t *loc, - quota_inode_ctx_t *ctx, - inode_contribution_t *contribution) -{ - if (ctx->size != contribution->contribution) - initiate_quota_txn (this, loc); +/* int32_t */ +/* validate_inode_size_contribution (xlator_t *this, loc_t *loc, int64_t size, */ +/* int64_t contribution) */ +/* { */ +/* if (size != contribution) { */ +/* initiate_quota_txn (this, loc); */ +/* } */ - return 0; -} +/* return 0; */ +/* } */ int32_t inspect_directory_xattr (xlator_t *this, - loc_t *loc, - dict_t *dict, - struct iatt buf) + loc_t *loc, + dict_t *dict, + struct iatt buf) { - int32_t ret = 0; - int8_t dirty = -1; - int64_t *size = NULL; - int64_t *contri = NULL; - char contri_key [512] = {0, }; - marker_conf_t *priv = NULL; - gf_boolean_t not_root = _gf_false; - quota_inode_ctx_t *ctx = NULL; - inode_contribution_t *contribution = NULL; + int32_t ret = 0; + int8_t dirty = -1; + int64_t *size = NULL, size_int = 0; + int64_t *contri = NULL, contri_int = 0; + char contri_key [512] = {0, }; + marker_conf_t *priv = NULL; + gf_boolean_t not_root = _gf_false; + quota_inode_ctx_t *ctx = NULL; + inode_contribution_t *contribution = NULL; priv = this->private; @@ -1559,20 +1730,28 @@ inspect_directory_xattr (xlator_t *this, if (ret < 0) goto out; - contribution->contribution = ntoh64 (*contri); + LOCK (&contribution->lock); + { + contribution->contribution = ntoh64 (*contri); + contri_int = contribution->contribution; + } + UNLOCK (&contribution->lock); } - ctx->size = ntoh64 (*size); + LOCK (&ctx->lock); + { + ctx->size = ntoh64 (*size); + ctx->dirty = dirty; + size_int = ctx->size; + } + UNLOCK (&ctx->lock); gf_log (this->name, GF_LOG_DEBUG, "size=%"PRId64 - " contri=%"PRId64, ctx->size, - contribution?contribution->contribution:0); + " contri=%"PRId64, size_int, contri_int); - ctx->dirty = dirty; - if (ctx->dirty == 1) { + if (dirty) { update_dirty_inode (this, loc, ctx, contribution); - } else if (not_root == _gf_true && - ctx->size != contribution->contribution) { + } else if ((not_root == _gf_true) && (size_int != contri_int)) { initiate_quota_txn (this, loc); } @@ -1590,13 +1769,13 @@ inspect_file_xattr (xlator_t *this, dict_t *dict, struct iatt buf) { - int32_t ret = -1; - uint64_t contri_int = 0; - int64_t *contri_ptr = NULL; + int32_t ret = -1; + uint64_t contri_int = 0, size = 0; + int64_t *contri_ptr = NULL; char contri_key [512] = {0, }; - marker_conf_t *priv = NULL; - quota_inode_ctx_t *ctx = NULL; - inode_contribution_t *contribution = NULL; + marker_conf_t *priv = NULL; + quota_inode_ctx_t *ctx = NULL; + inode_contribution_t *contribution = NULL; priv = this->private; @@ -1618,10 +1797,12 @@ inspect_file_xattr (xlator_t *this, LOCK (&ctx->lock); { ctx->size = 512 * buf.ia_blocks; + size = ctx->size; } UNLOCK (&ctx->lock); - list_for_each_entry (contribution, &ctx->contribution_head, contri_list) { + list_for_each_entry (contribution, &ctx->contribution_head, + contri_list) { GET_CONTRI_KEY (contri_key, contribution->gfid, ret); if (ret < 0) continue; @@ -1630,14 +1811,19 @@ inspect_file_xattr (xlator_t *this, if (ret == 0) { contri_ptr = (int64_t *)(unsigned long)contri_int; - contribution->contribution = ntoh64 (*contri_ptr); + LOCK (&contribution->lock); + { + contribution->contribution = ntoh64 (*contri_ptr); + contri_int = contribution->contribution; + } + UNLOCK (&contribution->lock); gf_log (this->name, GF_LOG_DEBUG, - "size=%"PRId64 " contri=%"PRId64, ctx->size, - contribution->contribution); + "size=%"PRId64 " contri=%"PRId64, size, contri_int); - ret = validate_inode_size_contribution - (this, loc, ctx, contribution); + if (size != contri_int) { + initiate_quota_txn (this, loc); + } } else initiate_quota_txn (this, loc); } @@ -1717,9 +1903,9 @@ int32_t quota_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno) { - int32_t ret = 0; - char contri_key [512] = {0, }; - quota_local_t *local = NULL; + int32_t ret = 0; + char contri_key [512] = {0, }; + quota_local_t *local = NULL; local = (quota_local_t *) frame->local; @@ -1734,8 +1920,8 @@ quota_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this, GET_CONTRI_KEY (contri_key, local->contri->gfid, ret); STACK_WIND (frame, quota_removexattr_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->removexattr, - &local->loc, contri_key); + FIRST_CHILD(this)->fops->removexattr, + &local->loc, contri_key); ret = 0; } else { quota_removexattr_cbk (frame, NULL, this, 0, 0); @@ -1758,20 +1944,39 @@ int32_t mq_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *dict) { - int32_t ret; - struct gf_flock lock; - quota_inode_ctx_t *ctx; - quota_local_t *local = NULL; + int32_t ret = -1; + struct gf_flock lock = {0, }; + quota_inode_ctx_t *ctx = NULL; + quota_local_t *local = NULL; + int64_t contribution = 0; local = frame->local; if (op_ret == -1) local->err = -1; ret = quota_inode_ctx_get (local->parent_loc.inode, this, &ctx); - if (ret == 0) - ctx->size -= local->contri->contribution; - local->contri->contribution = 0; + LOCK (&local->contri->lock); + { + contribution = local->contri->contribution; + } + UNLOCK (&local->contri->lock); + + if (contribution == local->size) { + if (ret == 0) { + LOCK (&ctx->lock); + { + ctx->size -= contribution; + } + UNLOCK (&ctx->lock); + + LOCK (&local->contri->lock); + { + local->contri->contribution = 0; + } + UNLOCK (&local->contri->lock); + } + } lock.l_type = F_UNLCK; lock.l_whence = SEEK_SET; @@ -1788,7 +1993,7 @@ mq_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this, return 0; } -static int32_t +int32_t mq_reduce_parent_size_xattr (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno) { @@ -1821,8 +2026,7 @@ mq_reduce_parent_size_xattr (call_frame_t *frame, void *cookie, QUOTA_ALLOC_OR_GOTO (size, int64_t, ret, err); - *size = hton64 (-contribution->contribution); - + *size = hton64 (-local->size); ret = dict_set_bin (dict, QUOTA_SIZE_KEY, size, 8); if (ret < 0) @@ -1844,7 +2048,7 @@ err: } int32_t -reduce_parent_size (xlator_t *this, loc_t *loc) +reduce_parent_size (xlator_t *this, loc_t *loc, int64_t contri) { int32_t ret = -1; struct gf_flock lock = {0,}; @@ -1873,6 +2077,21 @@ reduce_parent_size (xlator_t *this, loc_t *loc) goto out; } + if (contri >= 0) { + local->size = contri; + } else { + LOCK (&contribution->lock); + { + local->size = contribution->contribution; + } + UNLOCK (&contribution->lock); + } + + if (local->size == 0) { + ret = 0; + goto out; + } + ret = loc_copy (&local->loc, loc); if (ret < 0) goto out; @@ -1904,13 +2123,15 @@ reduce_parent_size (xlator_t *this, loc_t *loc) FIRST_CHILD(this), FIRST_CHILD(this)->fops->inodelk, this->name, &local->parent_loc, F_SETLKW, &lock); + local = NULL; ret = 0; out: - if (ret < 0) { + if (local != NULL) { quota_local_unref (this, local); GF_FREE (local); } + return ret; } diff --git a/xlators/features/marker/src/marker-quota.h b/xlators/features/marker/src/marker-quota.h index ea54bb631..34e8dfd81 100644 --- a/xlators/features/marker/src/marker-quota.h +++ b/xlators/features/marker/src/marker-quota.h @@ -53,7 +53,7 @@ char volname [40]; do { \ ret = 0; \ var = GF_CALLOC (sizeof (type), 1, \ - gf_marker_mt_##type); \ + gf_marker_mt_##type); \ if (!var) { \ gf_log ("", GF_LOG_ERROR, \ "out of memory"); \ @@ -64,7 +64,7 @@ char volname [40]; #define QUOTA_ALLOC_OR_GOTO(var, type, ret, label) \ do { \ var = GF_CALLOC (sizeof (type), 1, \ - gf_marker_mt_##type); \ + gf_marker_mt_##type); \ if (!var) { \ gf_log ("", GF_LOG_ERROR, \ "out of memory"); \ @@ -74,42 +74,35 @@ char volname [40]; ret = 0; \ } while (0); -#define GET_CONTRI_KEY(var, _gfid, _ret) \ - do { \ - char _gfid_unparsed[40]; \ - uuid_unparse (_gfid, _gfid_unparsed); \ +#define GET_CONTRI_KEY(var, _gfid, _ret) \ + do { \ + char _gfid_unparsed[40]; \ + uuid_unparse (_gfid, _gfid_unparsed); \ _ret = snprintf (var, CONTRI_KEY_MAX, QUOTA_XATTR_PREFIX \ - ".%s.%s." CONTRIBUTION, VOL_NAME, \ - _gfid_unparsed); \ + ".%s.%s." CONTRIBUTION, VOL_NAME, \ + _gfid_unparsed); \ } while (0); -#define QUOTA_SAFE_INCREMENT(lock, var) \ - do { \ - LOCK (lock); \ - var ++; \ - UNLOCK (lock); \ - } while (0) - -#define QUOTA_SAFE_DECREMENT(lock, var) \ - do { \ - LOCK (lock); \ - var --; \ - UNLOCK (lock); \ +#define QUOTA_SAFE_INCREMENT(lock, var) \ + do { \ + LOCK (lock); \ + var ++; \ + UNLOCK (lock); \ } while (0) - struct quota_inode_ctx { - int64_t size; - int8_t dirty; - gf_lock_t lock; - struct list_head contribution_head; + int64_t size; + int8_t dirty; + gf_lock_t lock; + struct list_head contribution_head; }; typedef struct quota_inode_ctx quota_inode_ctx_t; struct inode_contribution { struct list_head contri_list; - int64_t contribution; - uuid_t gfid; + int64_t contribution; + uuid_t gfid; + gf_lock_t lock; }; typedef struct inode_contribution inode_contribution_t; @@ -119,15 +112,16 @@ struct quota_local { int32_t err; int32_t ref; int64_t sum; + int64_t size; int32_t hl_count; int32_t dentry_child_count; - fd_t *fd; + fd_t *fd; call_frame_t *frame; - gf_lock_t lock; + gf_lock_t lock; - loc_t loc; - loc_t parent_loc; + loc_t loc; + loc_t parent_loc; quota_inode_ctx_t *ctx; inode_contribution_t *contri; @@ -157,7 +151,7 @@ quota_dirty_inode_readdir (call_frame_t *, void *, xlator_t *, int32_t, int32_t, fd_t *); int32_t -reduce_parent_size (xlator_t *, loc_t *); +reduce_parent_size (xlator_t *, loc_t *, int64_t); int32_t quota_rename_update_newpath (xlator_t *, loc_t *); diff --git a/xlators/features/marker/src/marker.c b/xlators/features/marker/src/marker.c index d7b792bc4..84c13435f 100644 --- a/xlators/features/marker/src/marker.c +++ b/xlators/features/marker/src/marker.c @@ -27,7 +27,9 @@ #include "marker.h" #include "marker-mem-types.h" #include "marker-quota.h" +#include "marker-quota-helper.h" #include "marker-common.h" +#include "byte-order.h" void fini (xlator_t *this); @@ -176,10 +178,11 @@ marker_local_unref (marker_local_t *local) goto out; loc_wipe (&local->loc); + loc_wipe (&local->parent_loc); if (local->oplocal) { - loc_wipe (&local->oplocal->loc); - GF_FREE (local->oplocal); + marker_local_unref (local->oplocal); + local->oplocal = NULL; } GF_FREE (local); out: @@ -187,13 +190,14 @@ out: } int32_t -stat_stampfile (xlator_t *this, marker_conf_t *priv, struct volume_mark **status) +stat_stampfile (xlator_t *this, marker_conf_t *priv, + struct volume_mark **status) { - struct stat buf; - struct volume_mark *vol_mark; + struct stat buf = {0, }; + struct volume_mark *vol_mark = NULL; vol_mark = GF_CALLOC (sizeof (struct volume_mark), 1, - gf_marker_mt_volume_mark); + gf_marker_mt_volume_mark); vol_mark->major = 1; vol_mark->minor = 0; @@ -215,9 +219,9 @@ stat_stampfile (xlator_t *this, marker_conf_t *priv, struct volume_mark **status int32_t marker_getxattr_stampfile_cbk (call_frame_t *frame, xlator_t *this, - const char *name, struct volume_mark *vol_mark) + const char *name, struct volume_mark *vol_mark) { - int32_t ret; + int32_t ret = -1; dict_t *dict = NULL; if (vol_mark == NULL){ @@ -229,7 +233,7 @@ marker_getxattr_stampfile_cbk (call_frame_t *frame, xlator_t *this, dict = dict_new (); ret = dict_set_bin (dict, (char *)name, vol_mark, - sizeof (struct volume_mark)); + sizeof (struct volume_mark)); STACK_UNWIND_STRICT (getxattr, frame, 0, 0, dict); @@ -262,14 +266,14 @@ out: int32_t marker_getxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, dict_t *dict) + int32_t op_ret, int32_t op_errno, dict_t *dict) { - if (cookie) { - gf_log (this->name, GF_LOG_DEBUG, - "Filtering the quota extended attributes"); + if (cookie) { + gf_log (this->name, GF_LOG_DEBUG, + "Filtering the quota extended attributes"); - dict_foreach (dict, marker_filter_quota_xattr, NULL); - } + dict_foreach (dict, marker_filter_quota_xattr, NULL); + } STACK_UNWIND_STRICT (getxattr, frame, op_ret, op_errno, dict); return 0; } @@ -292,19 +296,19 @@ marker_getxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, ret = call_from_special_client (frame, this, name); wind: if (ret == _gf_false) { - if (name == NULL) { + if (name == NULL) { /* Signifies that marker translator * has to filter the quota's xattr's, * this is to prevent afr from performing * self healing on marker-quota xattrs' */ - cookie = 1; + cookie = 1; } STACK_WIND_COOKIE (frame, marker_getxattr_cbk, (void *)cookie, FIRST_CHILD(this), FIRST_CHILD(this)->fops->getxattr, loc, name); - } + } return 0; } @@ -398,7 +402,7 @@ marker_start_setxattr (call_frame_t *frame, xlator_t *this) void marker_gettimeofday (marker_local_t *local) { - struct timeval tv; + struct timeval tv = {0, }; gettimeofday (&tv, NULL); @@ -437,9 +441,9 @@ marker_xtime_update_marks (xlator_t *this, marker_local_t *local) int32_t marker_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, inode_t *inode, - struct iatt *buf, struct iatt *preparent, - struct iatt *postparent) + int32_t op_ret, int32_t op_errno, inode_t *inode, + struct iatt *buf, struct iatt *preparent, + struct iatt *postparent) { marker_conf_t *priv = NULL; marker_local_t *local = NULL; @@ -508,9 +512,9 @@ err: int32_t marker_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode, - struct iatt *buf, struct iatt *preparent, - struct iatt *postparent) + int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode, + struct iatt *buf, struct iatt *preparent, + struct iatt *postparent) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; @@ -546,7 +550,7 @@ out: int32_t marker_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, - mode_t mode, fd_t *fd, dict_t *params) + mode_t mode, fd_t *fd, dict_t *params) { int32_t ret = 0; marker_local_t *local = NULL; @@ -571,7 +575,8 @@ wind: params); return 0; err: - STACK_UNWIND_STRICT (create, frame, -1, ENOMEM, NULL, NULL, NULL, NULL, NULL); + STACK_UNWIND_STRICT (create, frame, -1, ENOMEM, NULL, NULL, NULL, NULL, + NULL); return 0; } @@ -615,12 +620,12 @@ out: int32_t marker_writev (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - struct iovec *vector, - int32_t count, - off_t offset, - struct iobref *iobref) + xlator_t *this, + fd_t *fd, + struct iovec *vector, + int32_t count, + off_t offset, + struct iobref *iobref) { int32_t ret = 0; marker_local_t *local = NULL; @@ -653,8 +658,8 @@ err: int32_t marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *preparent, - struct iatt *postparent) + int32_t op_ret, int32_t op_errno, struct iatt *preparent, + struct iatt *postparent) { marker_conf_t *priv = NULL; marker_local_t *local = NULL; @@ -677,7 +682,7 @@ marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, priv = this->private; if (priv->feature_enabled & GF_QUOTA) - reduce_parent_size (this, &local->loc); + reduce_parent_size (this, &local->loc, -1); if (priv->feature_enabled & GF_XTIME) marker_xtime_update_marks (this, local); @@ -720,8 +725,8 @@ err: int32_t marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *preparent, - struct iatt *postparent) + int32_t op_ret, int32_t op_errno, struct iatt *preparent, + struct iatt *postparent) { marker_conf_t *priv = NULL; marker_local_t *local = NULL; @@ -744,7 +749,7 @@ marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, priv = this->private; if ((priv->feature_enabled & GF_QUOTA) && (local->ia_nlink == 1)) - reduce_parent_size (this, &local->loc); + reduce_parent_size (this, &local->loc, -1); if (priv->feature_enabled & GF_XTIME) marker_xtime_update_marks (this, local); @@ -820,9 +825,9 @@ err: int32_t marker_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, inode_t *inode, - struct iatt *buf, struct iatt *preparent, - struct iatt *postparent) + int32_t op_ret, int32_t op_errno, inode_t *inode, + struct iatt *buf, struct iatt *preparent, + struct iatt *postparent) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; @@ -887,60 +892,63 @@ err: int32_t -marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *buf, - struct iatt *preoldparent, struct iatt *postoldparent, - struct iatt *prenewparent, struct iatt *postnewparent) +marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno) { - marker_conf_t *priv = NULL; - marker_local_t *local = NULL; - marker_local_t *oplocal = NULL; - loc_t newloc = {0, }; + marker_local_t *local = NULL, *oplocal = NULL; + loc_t newloc = {0, }; + marker_conf_t *priv = NULL; - if (op_ret == -1) { - gf_log (this->name, GF_LOG_TRACE, "%s occured while " - "renaming a file ", strerror (op_errno)); - } + local = frame->local; + oplocal = local->oplocal; - local = (marker_local_t *) frame->local; + priv = this->private; frame->local = NULL; - STACK_UNWIND_STRICT (rename, frame, op_ret, op_errno, buf, preoldparent, - postoldparent, prenewparent, postnewparent); - - if (op_ret == -1 || local == NULL) - goto out; + if (op_ret < 0) { + if (local->err == 0) { + local->err = op_errno; + } - oplocal = local->oplocal; - local->oplocal = NULL; + gf_log (this->name, GF_LOG_WARNING, + "inodelk (UNLOCK) failed on path:%s, inode (ino:%"PRId64 + ", gfid:%s)(%s)", local->parent_loc.path, + local->parent_loc.inode->ino, + uuid_utoa (local->parent_loc.inode->gfid), + strerror (op_errno)); + } - priv = this->private; + if (local->stub != NULL) { + call_resume (local->stub); + local->stub = NULL; + } else if (local->err != 0) { + STACK_UNWIND_STRICT (rename, frame, -1, local->err, NULL, NULL, + NULL, NULL, NULL); + } - if (priv->feature_enabled & GF_QUOTA) { - reduce_parent_size (this, &oplocal->loc); + reduce_parent_size (this, &oplocal->loc, oplocal->contribution); - if (local->loc.inode != NULL) { - reduce_parent_size (this, &local->loc); - } + if (local->loc.inode != NULL) { + reduce_parent_size (this, &local->loc, local->contribution); + } - newloc.inode = inode_ref (oplocal->loc.inode); - newloc.path = gf_strdup (local->loc.path); - newloc.name = gf_strdup (local->loc.name); - newloc.parent = inode_ref (local->loc.parent); - newloc.ino = oplocal->loc.inode->ino; + newloc.inode = inode_ref (oplocal->loc.inode); + newloc.path = gf_strdup (local->loc.path); + newloc.name = gf_strdup (local->loc.name); + newloc.parent = inode_ref (local->loc.parent); + newloc.ino = oplocal->loc.inode->ino; - quota_rename_update_newpath (this, &newloc); + quota_rename_update_newpath (this, &newloc); - loc_wipe (&newloc); - } + loc_wipe (&newloc); if (priv->feature_enabled & GF_XTIME) { //update marks on oldpath marker_xtime_update_marks (this, oplocal); marker_xtime_update_marks (this, local); } -out: + marker_local_unref (local); marker_local_unref (oplocal); return 0; @@ -948,35 +956,372 @@ out: int32_t -marker_quota_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) +marker_rename_release_newp_lock (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno) { - marker_local_t *local = NULL, *oplocal = NULL; + marker_local_t *local = NULL, *oplocal = NULL; + struct gf_flock lock = {0, }; + + local = frame->local; + oplocal = local->oplocal; + + if (op_ret < 0) { + if (local->err == 0) { + local->err = op_errno; + } + + gf_log (this->name, GF_LOG_WARNING, + "inodelk (UNLOCK) failed on path:%s, inode (ino:%"PRId64 + ", gfid:%s)(%s)", oplocal->parent_loc.path, + oplocal->parent_loc.inode->ino, + uuid_utoa (oplocal->parent_loc.inode->gfid), + strerror (op_errno)); + } + + if (local->next_lock_on == NULL) { + marker_rename_done (frame, NULL, this, 0, 0); + goto out; + } + + lock.l_type = F_UNLCK; + lock.l_whence = SEEK_SET; + lock.l_start = 0; + lock.l_len = 0; + lock.l_pid = 0; + + STACK_WIND (frame, + marker_rename_done, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, &local->parent_loc, F_SETLKW, &lock); + +out: + return 0; +} + + +int32_t +marker_rename_release_oldp_lock (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno) +{ + marker_local_t *local = NULL, *oplocal = NULL; + struct gf_flock lock = {0, }; + + local = frame->local; + oplocal = local->oplocal; if ((op_ret < 0) && (op_errno != ENOATTR)) { - goto unwind; + local->err = op_errno; + } + + lock.l_type = F_UNLCK; + lock.l_whence = SEEK_SET; + lock.l_start = 0; + lock.l_len = 0; + lock.l_pid = 0; + + STACK_WIND (frame, + marker_rename_release_newp_lock, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, &oplocal->parent_loc, F_SETLKW, &lock); + return 0; +} + + +int32_t +marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *buf, + struct iatt *preoldparent, struct iatt *postoldparent, + struct iatt *prenewparent, struct iatt *postnewparent) +{ + marker_conf_t *priv = NULL; + marker_local_t *local = NULL; + marker_local_t *oplocal = NULL; + call_stub_t *stub = NULL; + int32_t ret = 0; + char contri_key [512] = {0, }; + + local = (marker_local_t *) frame->local; + + if (local != NULL) { + oplocal = local->oplocal; + } + + priv = this->private; + + if (op_ret < 0) { + if (local != NULL) { + local->err = op_errno; + } + + gf_log (this->name, GF_LOG_TRACE, "%s occured while " + "renaming a file ", strerror (op_errno)); + } + + if (priv->feature_enabled & GF_QUOTA) { + if ((op_ret < 0) || (local == NULL)) { + goto quota_err; + } + + stub = fop_rename_cbk_stub (frame, default_rename_cbk, op_ret, + op_errno, buf, preoldparent, + postoldparent, prenewparent, + postnewparent); + if (stub == NULL) { + local->err = ENOMEM; + goto quota_err; + } + + local->stub = stub; + + GET_CONTRI_KEY (contri_key, oplocal->loc.parent->gfid, ret); + if (ret < 0) { + local->err = ENOMEM; + goto quota_err; + } + + STACK_WIND (frame, marker_rename_release_oldp_lock, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->removexattr, &local->loc, + contri_key); + } else { + frame->local = NULL; + + STACK_UNWIND_STRICT (rename, frame, op_ret, op_errno, buf, + preoldparent, postoldparent, prenewparent, + postnewparent); + + if ((op_ret < 0) || (local == NULL)) { + goto out; + } + + if (priv->feature_enabled & GF_XTIME) { + //update marks on oldpath + marker_xtime_update_marks (this, oplocal); + marker_xtime_update_marks (this, local); + } + } + +out: + if (!(priv->feature_enabled & GF_QUOTA)) { + marker_local_unref (local); + marker_local_unref (oplocal); } + return 0; + +quota_err: + marker_rename_release_oldp_lock (frame, NULL, this, 0, 0); + return 0; +} + + +int32_t +marker_do_rename (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *dict) + +{ + marker_local_t *local = NULL, *oplocal = NULL; + char contri_key[512] = {0, }; + int32_t ret = 0; + int64_t *contribution = 0; + local = frame->local; oplocal = local->oplocal; + if ((op_ret < 0) && (op_errno != ENOATTR)) { + local->err = op_errno; + gf_log (this->name, GF_LOG_WARNING, + "fetching contribution values from %s (ino:%"PRId64", " + "gfid:%s) failed (%s)", local->loc.path, + local->loc.inode->ino, + uuid_utoa (local->loc.inode->gfid), + strerror (op_errno)); + goto err; + } + + if (local->loc.inode != NULL) { + GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret); + if (ret < 0) { + local->err = errno; + goto err; + } + + if (dict_get_bin (dict, contri_key, + (void **) &contribution) == 0) { + local->contribution = ntoh64 (*contribution); + } + } + STACK_WIND (frame, marker_rename_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, &oplocal->loc, &local->loc); + return 0; -unwind: - STACK_UNWIND_STRICT (rename, frame, -1, ENOMEM, NULL, - NULL, NULL, NULL, NULL); - if (local) { - local->oplocal = NULL; - marker_local_unref (local); - GF_FREE (local); +err: + marker_rename_release_oldp_lock (frame, NULL, this, 0, 0); + return 0; +} + + +int32_t +marker_get_newpath_contribution (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno, dict_t *dict) +{ + marker_local_t *local = NULL, *oplocal = NULL; + char contri_key[512] = {0, }; + int32_t ret = 0; + int64_t *contribution = 0; + + local = frame->local; + oplocal = local->oplocal; + + if ((op_ret < 0) && (op_errno != ENOATTR)) { + local->err = op_errno; + gf_log (this->name, GF_LOG_WARNING, + "fetching contribution values from %s (ino:%"PRId64", " + "gfid:%s) failed (%s)", oplocal->loc.path, + oplocal->loc.inode->ino, + uuid_utoa (oplocal->loc.inode->gfid), + strerror (op_errno)); + goto err; } - if (oplocal) { - marker_local_unref (oplocal); - GF_FREE (oplocal); + + GET_CONTRI_KEY (contri_key, oplocal->loc.parent->gfid, ret); + if (ret < 0) { + local->err = errno; + goto err; + } + + if (dict_get_bin (dict, contri_key, (void **) &contribution) == 0) + oplocal->contribution = ntoh64 (*contribution); + + if (local->loc.inode != NULL) { + GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret); + if (ret < 0) { + local->err = errno; + goto err; + } + + STACK_WIND (frame, marker_do_rename, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->getxattr, &local->loc, + contri_key); + } else { + marker_do_rename (frame, NULL, this, 0, 0, NULL); + } + + return 0; +err: + marker_rename_release_oldp_lock (frame, NULL, this, 0, 0); + return 0; +} + + +int32_t +marker_get_oldpath_contribution (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno) +{ + marker_local_t *local = NULL, *oplocal = NULL; + char contri_key[512] = {0, }; + int32_t ret = 0; + + local = frame->local; + oplocal = local->oplocal; + + if (op_ret < 0) { + local->err = op_errno; + gf_log (this->name, GF_LOG_WARNING, + "cannot hold inodelk on %s (ino:%"PRId64", gfid:%s)" + "(%s)", + local->next_lock_on->path, + local->next_lock_on->inode->ino, + uuid_utoa (local->next_lock_on->inode->gfid), + strerror (op_errno)); + goto lock_err; + } + + GET_CONTRI_KEY (contri_key, oplocal->loc.parent->gfid, ret); + if (ret < 0) { + local->err = errno; + goto quota_err; + } + + STACK_WIND (frame, marker_get_newpath_contribution, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->getxattr, &oplocal->loc, + contri_key); + return 0; + +quota_err: + marker_rename_release_oldp_lock (frame, NULL, this, 0, 0); + return 0; + +lock_err: + if ((local->next_lock_on == NULL) + || (local->next_lock_on == &local->parent_loc)) { + local->next_lock_on = NULL; + marker_rename_release_oldp_lock (frame, NULL, this, 0, 0); + } else { + marker_rename_release_newp_lock (frame, NULL, this, 0, 0); + } + + return 0; +} + + +int32_t +marker_rename_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno) +{ + marker_local_t *local = NULL, *oplocal = NULL; + loc_t *loc = NULL; + struct gf_flock lock = {0, }; + + local = frame->local; + oplocal = local->oplocal; + + if (op_ret < 0) { + if (local->next_lock_on != &oplocal->parent_loc) { + loc = &oplocal->parent_loc; + } else { + loc = &local->parent_loc; + } + + local->err = op_errno; + gf_log (this->name, GF_LOG_WARNING, + "cannot hold inodelk on %s (ino:%"PRId64", gfid:%s)" + "(%s)", loc->path, loc->inode->ino, + uuid_utoa (loc->inode->gfid), + strerror (op_errno)); + goto err; + } + + if (local->next_lock_on != NULL) { + lock.l_len = 0; + lock.l_start = 0; + lock.l_type = F_WRLCK; + lock.l_whence = SEEK_SET; + + STACK_WIND (frame, + marker_get_oldpath_contribution, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, local->next_lock_on, + F_SETLKW, &lock); + } else { + marker_get_oldpath_contribution (frame, 0, this, 0, 0); } + + return 0; + +err: + marker_rename_done (frame, NULL, this, 0, 0); return 0; } @@ -989,17 +1334,14 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, marker_local_t *local = NULL; marker_local_t *oplocal = NULL; marker_conf_t *priv = NULL; - char contri_key[512] = {0,}; + struct gf_flock lock = {0, }; + loc_t *lock_on = NULL; priv = this->private; if (priv->feature_enabled == 0) goto rename_wind; - GET_CONTRI_KEY (contri_key, oldloc->parent->gfid, ret); - if (ret < 0) - goto err; - ALLOCATE_OR_GOTO (local, marker_local_t, err); MARKER_INIT_LOCAL (frame, local); @@ -1010,18 +1352,53 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, frame->local = local; - local->oplocal = oplocal; + local->oplocal = marker_local_ref (oplocal); ret = loc_copy (&local->loc, newloc); - if (ret == -1) + if (ret < 0) goto err; ret = loc_copy (&oplocal->loc, oldloc); - if (ret == -1) + if (ret < 0) + goto err; + + if (!(priv->feature_enabled & GF_QUOTA)) { + goto rename_wind; + } + + ret = quota_inode_loc_fill (NULL, newloc->parent, &local->parent_loc); + if (ret < 0) goto err; - STACK_WIND (frame, marker_quota_removexattr_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->removexattr, oldloc, contri_key); + ret = quota_inode_loc_fill (NULL, oldloc->parent, &oplocal->parent_loc); + if (ret < 0) + goto err; + + if ((newloc->inode != NULL) && (newloc->parent != oldloc->parent) + && (uuid_compare (newloc->parent->gfid, + oldloc->parent->gfid) < 0)) { + lock_on = &local->parent_loc; + local->next_lock_on = &oplocal->parent_loc; + } else { + lock_on = &oplocal->parent_loc; + if ((newloc->inode != NULL) && (newloc->parent + != oldloc->parent)) { + local->next_lock_on = &local->parent_loc; + } + } + + lock.l_len = 0; + lock.l_start = 0; + lock.l_type = F_WRLCK; + lock.l_whence = SEEK_SET; + + STACK_WIND (frame, + marker_rename_inodelk_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->inodelk, + this->name, lock_on, + F_SETLKW, &lock); + return 0; rename_wind: @@ -1031,7 +1408,7 @@ rename_wind: return 0; err: STACK_UNWIND_STRICT (rename, frame, -1, ENOMEM, NULL, - NULL, NULL, NULL, NULL); + NULL, NULL, NULL, NULL); return 0; } @@ -1039,8 +1416,8 @@ err: int32_t marker_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *prebuf, - struct iatt *postbuf) + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; @@ -1107,8 +1484,8 @@ err: int32_t marker_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *prebuf, - struct iatt *postbuf) + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; @@ -1174,9 +1551,9 @@ err: int32_t marker_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, inode_t *inode, - struct iatt *buf, struct iatt *preparent, - struct iatt *postparent) + int32_t op_ret, int32_t op_errno, inode_t *inode, + struct iatt *buf, struct iatt *preparent, + struct iatt *postparent) { marker_conf_t *priv = NULL; marker_local_t *local = NULL; @@ -1211,7 +1588,7 @@ out: int marker_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath, - loc_t *loc, dict_t *params) + loc_t *loc, dict_t *params) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1236,16 +1613,16 @@ wind: return 0; err: STACK_UNWIND_STRICT (symlink, frame, -1, ENOMEM, NULL, - NULL, NULL, NULL); + NULL, NULL, NULL); return 0; } int32_t marker_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, inode_t *inode, - struct iatt *buf, struct iatt *preparent, - struct iatt *postparent) + int32_t op_ret, int32_t op_errno, inode_t *inode, + struct iatt *buf, struct iatt *preparent, + struct iatt *postparent) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; @@ -1281,7 +1658,7 @@ out: int marker_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, - dev_t rdev, dict_t *parms) + dev_t rdev, dict_t *parms) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1347,7 +1724,7 @@ call_from_sp_client_to_reset_tmfile (call_frame_t *frame, } if (data->len == 0 || (data->len == 5 && - memcmp (data->data, "RESET", 5) == 0)) { + memcmp (data->data, "RESET", 5) == 0)) { fd = open (priv->timestamp_file, O_WRONLY|O_TRUNC); if (fd != -1) { /* TODO check whether the O_TRUNC would update the @@ -1376,7 +1753,7 @@ out: int32_t marker_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) + int32_t op_ret, int32_t op_errno) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; @@ -1407,7 +1784,7 @@ out: int32_t marker_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, - int32_t flags) + int32_t flags) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1443,7 +1820,7 @@ err: int32_t marker_fsetxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) + int32_t op_ret, int32_t op_errno) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; @@ -1474,7 +1851,7 @@ out: int32_t marker_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict, - int32_t flags) + int32_t flags) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1510,8 +1887,8 @@ err: int32_t marker_fsetattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *statpre, - struct iatt *statpost) + int32_t op_ret, int32_t op_errno, struct iatt *statpre, + struct iatt *statpost) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; @@ -1544,7 +1921,7 @@ out: int32_t marker_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd, - struct iatt *stbuf, int32_t valid) + struct iatt *stbuf, int32_t valid) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1576,8 +1953,8 @@ err: int32_t marker_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *statpre, - struct iatt *statpost) + int32_t op_ret, int32_t op_errno, struct iatt *statpre, + struct iatt *statpost) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; @@ -1612,7 +1989,7 @@ out: int32_t marker_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, - struct iatt *stbuf, int32_t valid) + struct iatt *stbuf, int32_t valid) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1644,7 +2021,7 @@ err: int32_t marker_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) + int32_t op_ret, int32_t op_errno) { marker_local_t *local = NULL; marker_conf_t *priv = NULL; @@ -1675,7 +2052,7 @@ out: int32_t marker_removexattr (call_frame_t *frame, xlator_t *this, loc_t *loc, - const char *name) + const char *name) { int32_t ret = 0; marker_local_t *local = NULL; @@ -1785,7 +2162,7 @@ mem_acct_init (xlator_t *this) if (ret != 0) { gf_log(this->name, GF_LOG_ERROR, "Memory accounting init" - "failed"); + "failed"); return ret; } @@ -1811,12 +2188,14 @@ init_xtime_priv (xlator_t *this, dict_t *options) ret = uuid_parse (priv->volume_uuid, priv->volume_uuid_bin); if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, "invalid volume uuid %s", priv->volume_uuid); + gf_log (this->name, GF_LOG_ERROR, + "invalid volume uuid %s", priv->volume_uuid); goto out; } ret = gf_asprintf (& (priv->marker_xattr), "%s.%s.%s", - MARKER_XATTR_PREFIX, priv->volume_uuid, XTIME); + MARKER_XATTR_PREFIX, priv->volume_uuid, + XTIME); if (ret == -1){ priv->marker_xattr = NULL; @@ -2066,7 +2445,7 @@ struct xlator_fops fops = { }; struct xlator_cbks cbks = { - .forget = marker_forget + .forget = marker_forget }; struct volume_options options[] = { diff --git a/xlators/features/marker/src/marker.h b/xlators/features/marker/src/marker.h index ea1f5cc0a..8f4e11a5b 100644 --- a/xlators/features/marker/src/marker.h +++ b/xlators/features/marker/src/marker.h @@ -28,6 +28,7 @@ #include "xlator.h" #include "defaults.h" #include "uuid.h" +#include "call-stub.h" #define MARKER_XATTR_PREFIX "trusted.glusterfs" #define XTIME "xtime" @@ -65,10 +66,15 @@ struct marker_local{ uint32_t timebuf[2]; pid_t pid; loc_t loc; + loc_t parent_loc; + loc_t *next_lock_on; int32_t ref; int32_t ia_nlink; gf_lock_t lock; mode_t mode; + int32_t err; + call_stub_t *stub; + int64_t contribution; struct marker_local *oplocal; }; typedef struct marker_local marker_local_t; |