diff options
Diffstat (limited to 'xlators/features/bit-rot/src/stub/bit-rot-stub.c')
| -rw-r--r-- | xlators/features/bit-rot/src/stub/bit-rot-stub.c | 4223 |
1 files changed, 3193 insertions, 1030 deletions
diff --git a/xlators/features/bit-rot/src/stub/bit-rot-stub.c b/xlators/features/bit-rot/src/stub/bit-rot-stub.c index 0db500659b5..447dd47ff41 100644 --- a/xlators/features/bit-rot/src/stub/bit-rot-stub.c +++ b/xlators/features/bit-rot/src/stub/bit-rot-stub.c @@ -10,174 +10,433 @@ #include <ctype.h> #include <sys/uio.h> +#include <signal.h> -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif - -#include "glusterfs.h" -#include "xlator.h" -#include "logging.h" +#include <glusterfs/glusterfs.h> +#include <glusterfs/logging.h> #include "changelog.h" -#include "compat-errno.h" +#include <glusterfs/compat-errno.h> +#include <glusterfs/call-stub.h> #include "bit-rot-stub.h" #include "bit-rot-stub-mem-types.h" - +#include "bit-rot-stub-messages.h" #include "bit-rot-common.h" -#define BR_STUB_REQUEST_COOKIE 0x1 +#define BR_STUB_REQUEST_COOKIE 0x1 -int32_t -mem_acct_init (xlator_t *this) +void +br_stub_lock_cleaner(void *arg) { - int32_t ret = -1; + pthread_mutex_t *clean_mutex = arg; - if (!this) - return ret; + pthread_mutex_unlock(clean_mutex); + return; +} - ret = xlator_mem_acct_init (this, gf_br_stub_mt_end + 1); +void * +br_stub_signth(void *); - if (ret != 0) { - gf_log (this->name, GF_LOG_WARNING, "Memory accounting" - " init failed"); - return ret; - } +struct br_stub_signentry { + unsigned long v; + call_stub_t *stub; + + struct list_head list; +}; + +int32_t +mem_acct_init(xlator_t *this) +{ + int32_t ret = -1; + + if (!this) return ret; + + ret = xlator_mem_acct_init(this, gf_br_stub_mt_end + 1); + + if (ret != 0) { + gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_MEM_ACNT_FAILED, NULL); + return ret; + } + + return ret; +} + +int +br_stub_bad_object_container_init(xlator_t *this, br_stub_private_t *priv) +{ + pthread_attr_t w_attr; + int ret = -1; + + ret = pthread_cond_init(&priv->container.bad_cond, NULL); + if (ret != 0) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_THREAD_FAIL, + "cond_init ret=%d", ret, NULL); + goto out; + } + + ret = pthread_mutex_init(&priv->container.bad_lock, NULL); + if (ret != 0) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_THREAD_FAIL, + "mutex_init ret=%d", ret, NULL); + goto cleanup_cond; + } + + ret = pthread_attr_init(&w_attr); + if (ret != 0) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_THREAD_FAIL, + "attr_init ret=%d", ret, NULL); + goto cleanup_lock; + } + + ret = pthread_attr_setstacksize(&w_attr, BAD_OBJECT_THREAD_STACK_SIZE); + if (ret == EINVAL) { + gf_smsg(this->name, GF_LOG_WARNING, 0, + BRS_MSG_USING_DEFAULT_THREAD_SIZE, NULL); + } + + INIT_LIST_HEAD(&priv->container.bad_queue); + ret = br_stub_dir_create(this, priv); + if (ret < 0) + goto cleanup_lock; + + ret = gf_thread_create(&priv->container.thread, &w_attr, br_stub_worker, + this, "brswrker"); + if (ret) + goto cleanup_attr; + + return 0; + +cleanup_attr: + pthread_attr_destroy(&w_attr); +cleanup_lock: + pthread_mutex_destroy(&priv->container.bad_lock); +cleanup_cond: + pthread_cond_destroy(&priv->container.bad_cond); +out: + return -1; } int32_t -init (xlator_t *this) +init(xlator_t *this) { - char *tmp = NULL; - struct timeval tv = {0,}; - br_stub_private_t *priv = NULL; + int ret = 0; + char *tmp = NULL; + struct timeval tv = { + 0, + }; + br_stub_private_t *priv = NULL; - if (!this->children) { - gf_log (this->name, GF_LOG_ERROR, "FATAL: no children"); - goto error_return; - } + if (!this->children) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_NO_CHILD, NULL); + goto error_return; + } - priv = GF_CALLOC (1, sizeof (*priv), gf_br_stub_mt_private_t); - if (!priv) - goto error_return; + priv = GF_CALLOC(1, sizeof(*priv), gf_br_stub_mt_private_t); + if (!priv) + goto error_return; - priv->local_pool = mem_pool_new (br_stub_local_t, 512); - if (!priv->local_pool) - goto free_priv; + priv->local_pool = mem_pool_new(br_stub_local_t, 512); + if (!priv->local_pool) + goto free_priv; - GF_OPTION_INIT ("bitrot", priv->go, bool, free_mempool); + GF_OPTION_INIT("bitrot", priv->do_versioning, bool, free_mempool); - GF_OPTION_INIT ("export", tmp, str, free_mempool); - memcpy (priv->export, tmp, strlen (tmp) + 1); + GF_OPTION_INIT("export", tmp, str, free_mempool); - (void) gettimeofday (&tv, NULL); + if (snprintf(priv->export, PATH_MAX, "%s", tmp) >= PATH_MAX) + goto free_mempool; - /* boot time is in network endian format */ - priv->boot[0] = htonl (tv.tv_sec); - priv->boot[1] = htonl (tv.tv_usec); + if (snprintf(priv->stub_basepath, sizeof(priv->stub_basepath), "%s/%s", + priv->export, + BR_STUB_QUARANTINE_DIR) >= sizeof(priv->stub_basepath)) + goto free_mempool; - gf_log (this->name, GF_LOG_DEBUG, "bit-rot stub loaded"); - this->private = priv; - return 0; + (void)gettimeofday(&tv, NULL); - free_mempool: - mem_pool_destroy (priv->local_pool); - free_priv: - GF_FREE (priv); - error_return: - return -1; -} + /* boot time is in network endian format */ + priv->boot[0] = htonl(tv.tv_sec); + priv->boot[1] = htonl(tv.tv_usec); -void -fini (xlator_t *this) -{ - br_stub_private_t *priv = this->private; + pthread_mutex_init(&priv->lock, NULL); + pthread_cond_init(&priv->cond, NULL); + INIT_LIST_HEAD(&priv->squeue); - if (!priv) - return; - this->private = NULL; - GF_FREE (priv); + /* Thread creations need 'this' to be passed so that THIS can be + * assigned inside the thread. So setting this->private here. + */ + this->private = priv; + if (!priv->do_versioning) + return 0; - return; + ret = gf_thread_create(&priv->signth, NULL, br_stub_signth, this, + "brssign"); + if (ret != 0) { + gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SPAWN_SIGN_THRD_FAILED, + NULL); + goto cleanup_lock; + } + + ret = br_stub_bad_object_container_init(this, priv); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_CONTAINER_FAIL, NULL); + goto cleanup_lock; + } + + gf_msg_debug(this->name, 0, "bit-rot stub loaded"); + + return 0; + +cleanup_lock: + pthread_cond_destroy(&priv->cond); + pthread_mutex_destroy(&priv->lock); +free_mempool: + mem_pool_destroy(priv->local_pool); + priv->local_pool = NULL; +free_priv: + GF_FREE(priv); + this->private = NULL; +error_return: + return -1; } -static inline int -br_stub_alloc_versions (br_version_t **obuf, - br_signature_t **sbuf, size_t signaturelen) +/* TODO: + * As of now enabling bitrot option does 2 things. + * 1) Start the Bitrot Daemon which signs the objects (currently files only) + * upon getting notified by the stub. + * 2) Enable versioning of the objects. Object versions (again files only) are + * incremented upon modification. + * So object versioning is tied to bitrot daemon's signing. In future, object + * versioning might be necessary for other things as well apart from bit-rot + * detection (well that's the objective of bringing in object-versioning :)). + * In that case, better to make versioning a new option and letting it to be + * enabled despite bit-rot detection is not needed. + * Ex: ICAP. + */ +int32_t +reconfigure(xlator_t *this, dict_t *options) { - void *mem = NULL; - size_t size = 0; + int32_t ret = -1; + br_stub_private_t *priv = NULL; - if (obuf) - size += sizeof (br_version_t); - if (sbuf) - size += sizeof (br_signature_t) + signaturelen; + priv = this->private; - mem = GF_CALLOC (1, size, gf_br_stub_mt_version_t); - if (!mem) - goto error_return; + GF_OPTION_RECONF("bitrot", priv->do_versioning, options, bool, err); + if (priv->do_versioning && !priv->signth) { + ret = gf_thread_create(&priv->signth, NULL, br_stub_signth, this, + "brssign"); + if (ret != 0) { + gf_smsg(this->name, GF_LOG_WARNING, 0, + BRS_MSG_SPAWN_SIGN_THRD_FAILED, NULL); + goto err; + } - if (obuf) { - *obuf = (br_version_t *)mem; - mem = ((char *)mem + sizeof (br_version_t)); + ret = br_stub_bad_object_container_init(this, priv); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_CONTAINER_FAIL, + NULL); + goto err; } - if (sbuf) { - *sbuf = (br_signature_t *)mem; + } else { + if (priv->signth) { + if (gf_thread_cleanup_xint(priv->signth)) { + gf_smsg(this->name, GF_LOG_ERROR, 0, + BRS_MSG_CANCEL_SIGN_THREAD_FAILED, NULL); + } else { + gf_smsg(this->name, GF_LOG_INFO, 0, BRS_MSG_KILL_SIGN_THREAD, + NULL); + priv->signth = 0; + } } + if (priv->container.thread) { + if (gf_thread_cleanup_xint(priv->container.thread)) { + gf_smsg(this->name, GF_LOG_ERROR, 0, + BRS_MSG_CANCEL_SIGN_THREAD_FAILED, NULL); + } + priv->container.thread = 0; + } + } + + ret = 0; + return ret; +err: + if (priv->signth) { + if (gf_thread_cleanup_xint(priv->signth)) { + gf_smsg(this->name, GF_LOG_ERROR, 0, + BRS_MSG_CANCEL_SIGN_THREAD_FAILED, NULL); + } + priv->signth = 0; + } + + if (priv->container.thread) { + if (gf_thread_cleanup_xint(priv->container.thread)) { + gf_smsg(this->name, GF_LOG_ERROR, 0, + BRS_MSG_CANCEL_SIGN_THREAD_FAILED, NULL); + } + priv->container.thread = 0; + } + ret = -1; + return ret; +} + +int +notify(xlator_t *this, int event, void *data, ...) +{ + br_stub_private_t *priv = NULL; + + if (!this) return 0; - error_return: - return -1; + priv = this->private; + if (!priv) + return 0; + + default_notify(this, event, data); + return 0; } -static inline void -br_stub_dealloc_versions (void *mem) +void +fini(xlator_t *this) { - GF_FREE (mem); + int32_t ret = 0; + br_stub_private_t *priv = this->private; + struct br_stub_signentry *sigstub = NULL; + call_stub_t *stub = NULL; + + if (!priv) + return; + + if (!priv->do_versioning) + goto cleanup; + + ret = gf_thread_cleanup_xint(priv->signth); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CANCEL_SIGN_THREAD_FAILED, + NULL); + goto out; + } + priv->signth = 0; + + while (!list_empty(&priv->squeue)) { + sigstub = list_first_entry(&priv->squeue, struct br_stub_signentry, + list); + list_del_init(&sigstub->list); + + call_stub_destroy(sigstub->stub); + GF_FREE(sigstub); + } + + ret = gf_thread_cleanup_xint(priv->container.thread); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CANCEL_SIGN_THREAD_FAILED, + NULL); + goto out; + } + + priv->container.thread = 0; + + while (!list_empty(&priv->container.bad_queue)) { + stub = list_first_entry(&priv->container.bad_queue, call_stub_t, list); + list_del_init(&stub->list); + call_stub_destroy(stub); + } + + pthread_mutex_destroy(&priv->container.bad_lock); + pthread_cond_destroy(&priv->container.bad_cond); + +cleanup: + pthread_mutex_destroy(&priv->lock); + pthread_cond_destroy(&priv->cond); + + if (priv->local_pool) { + mem_pool_destroy(priv->local_pool); + priv->local_pool = NULL; + } + + this->private = NULL; + GF_FREE(priv); + +out: + return; } -static inline br_stub_local_t * -br_stub_alloc_local (xlator_t *this) +static int +br_stub_alloc_versions(br_version_t **obuf, br_signature_t **sbuf, + size_t signaturelen) { - br_stub_private_t *priv = this->private; + void *mem = NULL; + size_t size = 0; + + if (obuf) + size += sizeof(br_version_t); + if (sbuf) + size += sizeof(br_signature_t) + signaturelen; + + mem = GF_CALLOC(1, size, gf_br_stub_mt_version_t); + if (!mem) + goto error_return; + + if (obuf) { + *obuf = (br_version_t *)mem; + mem = ((char *)mem + sizeof(br_version_t)); + } + if (sbuf) { + *sbuf = (br_signature_t *)mem; + } + + return 0; + +error_return: + return -1; +} - return mem_get0 (priv->local_pool); +static void +br_stub_dealloc_versions(void *mem) +{ + GF_FREE(mem); } -static inline void -br_stub_dealloc_local (br_stub_local_t *ptr) +static br_stub_local_t * +br_stub_alloc_local(xlator_t *this) { - mem_put (ptr); + br_stub_private_t *priv = this->private; + + return mem_get0(priv->local_pool); } -static inline int -br_stub_prepare_version_request (xlator_t *this, dict_t *dict, +static void +br_stub_dealloc_local(br_stub_local_t *ptr) +{ + if (!ptr) + return; + + mem_put(ptr); +} + +static int +br_stub_prepare_version_request(xlator_t *this, dict_t *dict, br_version_t *obuf, unsigned long oversion) { - br_stub_private_t *priv = NULL; + br_stub_private_t *priv = NULL; - priv = this->private; - br_set_ongoingversion (obuf, oversion, priv->boot); + priv = this->private; + br_set_ongoingversion(obuf, oversion, priv->boot); - return dict_set_static_bin (dict, BITROT_CURRENT_VERSION_KEY, - (void *)obuf, sizeof (br_version_t)); + return dict_set_bin(dict, BITROT_CURRENT_VERSION_KEY, (void *)obuf, + sizeof(br_version_t)); } -static inline int -br_stub_prepare_signing_request (dict_t *dict, - br_signature_t *sbuf, - br_isignature_t *sign, size_t signaturelen) +static int +br_stub_prepare_signing_request(dict_t *dict, br_signature_t *sbuf, + br_isignature_t *sign, size_t signaturelen) { - size_t size = 0; + size_t size = 0; - br_set_signature (sbuf, sign, signaturelen, &size); + br_set_signature(sbuf, sign, signaturelen, &size); - return dict_set_static_bin (dict, BITROT_SIGNING_VERSION_KEY, - (void *)sbuf, size); + return dict_set_bin(dict, BITROT_SIGNING_VERSION_KEY, (void *)sbuf, size); } /** @@ -186,1038 +445,2831 @@ br_stub_prepare_signing_request (dict_t *dict, * context, hence the inode is marked dirty. this routine also * initializes the transient inode version. */ -static inline int -br_stub_init_inode_versions (xlator_t *this, fd_t *fd, inode_t *inode, - unsigned long version, gf_boolean_t markdirty) -{ - int32_t ret = 0; - br_stub_inode_ctx_t *ctx = NULL; - - ctx = GF_CALLOC (1, sizeof (br_stub_inode_ctx_t), - gf_br_stub_mt_inode_ctx_t); - if (!ctx) - goto error_return; - - (markdirty) ? __br_stub_mark_inode_dirty (ctx) - : __br_stub_mark_inode_synced (ctx); - __br_stub_set_ongoing_version (ctx, version); - __br_stub_reset_release_counters (ctx); - - if (fd) { - br_stub_require_release_call (this, fd); - __br_stub_track_openfd (fd, ctx); - } - ret = br_stub_set_inode_ctx (this, inode, ctx); +static int +br_stub_init_inode_versions(xlator_t *this, fd_t *fd, inode_t *inode, + unsigned long version, gf_boolean_t markdirty, + gf_boolean_t bad_object, uint64_t *ctx_addr) +{ + int32_t ret = 0; + br_stub_inode_ctx_t *ctx = NULL; + + ctx = GF_CALLOC(1, sizeof(br_stub_inode_ctx_t), gf_br_stub_mt_inode_ctx_t); + if (!ctx) + goto error_return; + + INIT_LIST_HEAD(&ctx->fd_list); + (markdirty) ? __br_stub_mark_inode_dirty(ctx) + : __br_stub_mark_inode_synced(ctx); + __br_stub_set_ongoing_version(ctx, version); + + if (bad_object) + __br_stub_mark_object_bad(ctx); + + if (fd) { + ret = br_stub_add_fd_to_inode(this, fd, ctx); if (ret) - goto free_ctx; - return 0; + goto free_ctx; + } - free_ctx: - GF_FREE (ctx); - error_return: - return -1; + ret = br_stub_set_inode_ctx(this, inode, ctx); + if (ret) + goto free_ctx; + + if (ctx_addr) + *ctx_addr = (uint64_t)(uintptr_t)ctx; + return 0; + +free_ctx: + GF_FREE(ctx); +error_return: + return -1; } /** * modify the ongoing version of an inode. */ -static inline int -br_stub_mod_inode_versions (xlator_t *this, - fd_t *fd, inode_t *inode, unsigned long version) +static int +br_stub_mod_inode_versions(xlator_t *this, fd_t *fd, inode_t *inode, + unsigned long version) { - int32_t ret = -1; - br_stub_inode_ctx_t *ctx = 0; - - LOCK (&inode->lock); - { - ctx = __br_stub_get_ongoing_version_ctx (this, inode, NULL); - if (ctx == NULL) - goto unblock; - if (__br_stub_is_inode_dirty (ctx)) { - __br_stub_set_ongoing_version (ctx, version); - __br_stub_mark_inode_synced (ctx); - } - - __br_stub_track_openfd (fd, ctx); - ret = 0; + int32_t ret = -1; + br_stub_inode_ctx_t *ctx = 0; + + LOCK(&inode->lock); + { + ctx = __br_stub_get_ongoing_version_ctx(this, inode, NULL); + if (ctx == NULL) + goto unblock; + if (__br_stub_is_inode_dirty(ctx)) { + __br_stub_set_ongoing_version(ctx, version); + __br_stub_mark_inode_synced(ctx); } - unblock: - UNLOCK (&inode->lock); - return ret; + ret = 0; + } +unblock: + UNLOCK(&inode->lock); + + return ret; } -static inline void -br_stub_fill_local (br_stub_local_t *local, - call_stub_t *stub, fd_t *fd, inode_t *inode, uuid_t gfid, - int versioningtype, unsigned long memversion, int dirty) +static void +br_stub_fill_local(br_stub_local_t *local, call_stub_t *stub, fd_t *fd, + inode_t *inode, uuid_t gfid, int versioningtype, + unsigned long memversion) { - local->fopstub = stub; - local->versioningtype = versioningtype; - local->u.context.version = memversion; - if (fd) - local->u.context.fd = fd_ref (fd); - if (inode) - local->u.context.inode = inode_ref (inode); - gf_uuid_copy (local->u.context.gfid, gfid); + local->fopstub = stub; + local->versioningtype = versioningtype; + local->u.context.version = memversion; + if (fd) + local->u.context.fd = fd_ref(fd); + if (inode) + local->u.context.inode = inode_ref(inode); + gf_uuid_copy(local->u.context.gfid, gfid); +} - /* mark inode dirty/fresh according to durability */ - local->u.context.markdirty = (dirty) ? _gf_true : _gf_false; +static void +br_stub_cleanup_local(br_stub_local_t *local) +{ + if (!local) + return; + + local->fopstub = NULL; + local->versioningtype = 0; + local->u.context.version = 0; + if (local->u.context.fd) { + fd_unref(local->u.context.fd); + local->u.context.fd = NULL; + } + if (local->u.context.inode) { + inode_unref(local->u.context.inode); + local->u.context.inode = NULL; + } + memset(local->u.context.gfid, '\0', sizeof(uuid_t)); } -static inline void -br_stub_cleanup_local (br_stub_local_t *local) +static int +br_stub_need_versioning(xlator_t *this, fd_t *fd, gf_boolean_t *versioning, + gf_boolean_t *modified, br_stub_inode_ctx_t **ctx) { - local->fopstub = NULL; - local->versioningtype = 0; - local->u.context.version = 0; - if (local->u.context.fd) { - fd_unref (local->u.context.fd); - local->u.context.fd = NULL; - } - if (local->u.context.inode) { - inode_unref (local->u.context.inode); - local->u.context.inode = NULL; + int32_t ret = -1; + uint64_t ctx_addr = 0; + br_stub_inode_ctx_t *c = NULL; + unsigned long version = BITROT_DEFAULT_CURRENT_VERSION; + + *versioning = _gf_false; + *modified = _gf_false; + + /* Bitrot stub inode context was initialized only in lookup, create + * and mknod cbk path. Object versioning was enabled by default + * irrespective of bitrot enabled or not. But it's made optional now. + * As a consequence there could be cases where getting inode ctx would + * fail because it's not set yet. + * e.g., If versioning (with bitrot enable) is enabled while I/O is + * happening, it could directly get other fops like writev without + * lookup, where getting inode ctx would fail. Hence initialize the + * inode ctx on failure to get ctx. This is done in all places where + * applicable. + */ + ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr); + if (ret < 0) { + ret = br_stub_init_inode_versions(this, fd, fd->inode, version, + _gf_true, _gf_false, &ctx_addr); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, + BRS_MSG_GET_INODE_CONTEXT_FAILED, "gfid=%s", + uuid_utoa(fd->inode->gfid), NULL); + goto error_return; } - local->u.context.markdirty = _gf_true; - memset (local->u.context.gfid, '\0', sizeof (uuid_t)); + } + + c = (br_stub_inode_ctx_t *)(long)ctx_addr; + + LOCK(&fd->inode->lock); + { + if (__br_stub_is_inode_dirty(c)) + *versioning = _gf_true; + if (__br_stub_is_inode_modified(c)) + *modified = _gf_true; + } + UNLOCK(&fd->inode->lock); + + if (ctx) + *ctx = c; + return 0; + +error_return: + return -1; } -/** - * callback for inode/fd full versioning - */ -int -br_stub_inode_fullversioning_cbk (call_frame_t *frame, - void *cookie, xlator_t *this, - int op_ret, int op_errno, dict_t *xdata) -{ - fd_t *fd = NULL; - inode_t *inode = NULL; - unsigned long version = 0; - gf_boolean_t dirty = _gf_true; - br_stub_local_t *local = NULL; - - local = (br_stub_local_t *)frame->local; - - /* be graceful to EEXIST */ - if ((op_ret < 0) && (op_errno == EEXIST)) { - op_ret = 0; - goto done; +static int32_t +br_stub_anon_fd_ctx(xlator_t *this, fd_t *fd, br_stub_inode_ctx_t *ctx) +{ + int32_t ret = -1; + br_stub_fd_t *br_stub_fd = NULL; + + br_stub_fd = br_stub_fd_ctx_get(this, fd); + if (!br_stub_fd) { + ret = br_stub_add_fd_to_inode(this, fd, ctx); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ADD_FD_TO_INODE, + "gfid=%s", uuid_utoa(fd->inode->gfid), NULL); + goto out; } + } - if (op_ret < 0) - goto done; + ret = 0; - fd = local->u.context.fd; - inode = local->u.context.inode; - version = local->u.context.version; - dirty = local->u.context.markdirty; +out: + return ret; +} - op_ret = br_stub_init_inode_versions (this, fd, inode, version, dirty); - if (op_ret < 0) - op_errno = EINVAL; +static int +br_stub_versioning_prep(call_frame_t *frame, xlator_t *this, fd_t *fd, + br_stub_inode_ctx_t *ctx) +{ + int32_t ret = -1; + br_stub_local_t *local = NULL; + + local = br_stub_alloc_local(this); + if (!local) { + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, BRS_MSG_NO_MEMORY, "gfid=%s", + uuid_utoa(fd->inode->gfid), NULL); + goto error_return; + } + + if (fd_is_anonymous(fd)) { + ret = br_stub_anon_fd_ctx(this, fd, ctx); + if (ret) + goto free_local; + } - done: - frame->local = NULL; - if (op_ret < 0) - call_unwind_error (local->fopstub, op_ret, op_errno); - else - call_resume (local->fopstub); - br_stub_cleanup_local (local); - br_stub_dealloc_local (local); + frame->local = local; - return 0; + return 0; + +free_local: + br_stub_dealloc_local(local); +error_return: + return -1; +} + +static int +br_stub_mark_inode_modified(xlator_t *this, br_stub_local_t *local) +{ + fd_t *fd = NULL; + int32_t ret = 0; + uint64_t ctx_addr = 0; + br_stub_inode_ctx_t *ctx = NULL; + unsigned long version = BITROT_DEFAULT_CURRENT_VERSION; + + fd = local->u.context.fd; + + ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr); + if (ret < 0) { + ret = br_stub_init_inode_versions(this, fd, fd->inode, version, + _gf_true, _gf_false, &ctx_addr); + if (ret) + goto error_return; + } + + ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; + + LOCK(&fd->inode->lock); + { + __br_stub_set_inode_modified(ctx); + } + UNLOCK(&fd->inode->lock); + + return 0; + +error_return: + return -1; } +/** + * The possible return values from br_stub_is_bad_object () are: + * 1) 0 => as per the inode context object is not bad + * 2) -1 => Failed to get the inode context itself + * 3) -2 => As per the inode context object is bad + * Both -ve values means the fop which called this function is failed + * and error is returned upwards. + */ +static int +br_stub_check_bad_object(xlator_t *this, inode_t *inode, int32_t *op_ret, + int32_t *op_errno) +{ + int ret = -1; + unsigned long version = BITROT_DEFAULT_CURRENT_VERSION; + + ret = br_stub_is_bad_object(this, inode); + if (ret == -2) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJECT_ACCESS, + "gfid=%s", uuid_utoa(inode->gfid), NULL); + *op_ret = -1; + *op_errno = EIO; + } + + if (ret == -1) { + ret = br_stub_init_inode_versions(this, NULL, inode, version, _gf_true, + _gf_false, NULL); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, + BRS_MSG_GET_INODE_CONTEXT_FAILED, "gfid=%s", + uuid_utoa(inode->gfid), NULL); + *op_ret = -1; + *op_errno = EINVAL; + } + } + + return ret; +} + +/** + * callback for inode/fd versioning + */ int -br_stub_fd_incversioning_cbk (call_frame_t *frame, - void *cookie, xlator_t *this, - int op_ret, int op_errno, dict_t *xdata) -{ - fd_t *fd = NULL; - inode_t *inode = NULL; - unsigned long version = 0; - br_stub_local_t *local = NULL; - - local = (br_stub_local_t *)frame->local; - if (op_ret < 0) - goto done; - fd = local->u.context.fd; - inode = local->u.context.inode; - version = local->u.context.version; - - op_ret = br_stub_mod_inode_versions (this, fd, inode, version); - if (op_ret < 0) - op_errno = EINVAL; - - done: - frame->local = NULL; - if (op_ret < 0) - call_unwind_error (local->fopstub, -1, op_errno); - else - call_resume (local->fopstub); - br_stub_cleanup_local (local); - br_stub_dealloc_local (local); +br_stub_fd_incversioning_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, dict_t *xdata) +{ + fd_t *fd = NULL; + inode_t *inode = NULL; + unsigned long version = 0; + br_stub_local_t *local = NULL; - return 0; + local = (br_stub_local_t *)frame->local; + if (op_ret < 0) + goto done; + fd = local->u.context.fd; + inode = local->u.context.inode; + version = local->u.context.version; + + op_ret = br_stub_mod_inode_versions(this, fd, inode, version); + if (op_ret < 0) + op_errno = EINVAL; + +done: + if (op_ret < 0) { + frame->local = NULL; + call_unwind_error(local->fopstub, -1, op_errno); + br_stub_cleanup_local(local); + br_stub_dealloc_local(local); + } else { + call_resume(local->fopstub); + } + return 0; } /** * Initial object versioning * * Version persists two (2) extended attributes as explained below: - * 1. Current (ongoing) version: This is incremented on an open() - * or creat() and is the running version for an object. + * 1. Current (ongoing) version: This is incremented on an writev () + * or truncate () and is the running version for an object. * 2. Signing version: This is the version against which an object * was signed (checksummed). * * During initial versioning, both ongoing and signing versions are - * set of one and zero respectively. An open() call increments the + * set of one and zero respectively. A write() call increments the * ongoing version as an indication of modification to the object. * Additionally this needs to be persisted on disk and needs to be * durable: fsync().. :-/ - * As an optimization only the first open() synchronizes the ongoing - * version to disk, subsequent open()s before the *last* release() + * As an optimization only the first write() synchronizes the ongoing + * version to disk, subsequent write()s before the *last* release() * are no-op's. * * create(), just like lookup() initializes the object versions to - * the default, but persists the version to disk. As an optimization - * this is not a durable operation: in case of a crash, hard reboot - * etc.. absence of versioning xattrs is ignored in scrubber along - * with the one time crawler explicitly triggering signing for such - * objects. + * the default. As an optimization this is not a durable operation: + * in case of a crash, hard reboot etc.. absence of versioning xattrs + * is ignored in scrubber along with the one time crawler explicitly + * triggering signing for such objects. * - * c.f. br_stub_open_cbk() / br_stub_create_cbk() + * c.f. br_stub_writev() / br_stub_truncate() */ /** * perform full or incremental versioning on an inode pointd by an * fd. incremental versioning is done when an inode is dirty and a - * writeback is trigerred. + * writeback is triggered. */ int -br_stub_fd_versioning (xlator_t *this, call_frame_t *frame, - call_stub_t *stub, dict_t *dict, fd_t *fd, - br_stub_version_cbk *callback, unsigned long memversion, - int versioningtype, int durable, int dirty) -{ - int32_t ret = -1; - int flags = 0; - dict_t *xdata = NULL; - br_stub_local_t *local = NULL; - - if (durable) { - xdata = dict_new (); - if (!xdata) - goto done; - ret = dict_set_int32 (xdata, GLUSTERFS_DURABLE_OP, 0); - if (ret) - goto dealloc_xdata; +br_stub_fd_versioning(xlator_t *this, call_frame_t *frame, call_stub_t *stub, + dict_t *dict, fd_t *fd, br_stub_version_cbk *callback, + unsigned long memversion, int versioningtype, int durable) +{ + int32_t ret = -1; + int flags = 0; + dict_t *xdata = NULL; + br_stub_local_t *local = NULL; + + xdata = dict_new(); + if (!xdata) + goto done; + + ret = dict_set_int32(xdata, GLUSTERFS_INTERNAL_FOP_KEY, 1); + if (ret) + goto dealloc_xdata; + + if (durable) { + ret = dict_set_int32(xdata, GLUSTERFS_DURABLE_OP, 0); + if (ret) + goto dealloc_xdata; + } + + local = frame->local; + + br_stub_fill_local(local, stub, fd, fd->inode, fd->inode->gfid, + versioningtype, memversion); + + STACK_WIND(frame, callback, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata); + + ret = 0; + +dealloc_xdata: + dict_unref(xdata); +done: + return ret; +} + +static int +br_stub_perform_incversioning(xlator_t *this, call_frame_t *frame, + call_stub_t *stub, fd_t *fd, + br_stub_inode_ctx_t *ctx) +{ + int32_t ret = -1; + dict_t *dict = NULL; + br_version_t *obuf = NULL; + unsigned long writeback_version = 0; + int op_errno = 0; + br_stub_local_t *local = NULL; + + op_errno = EINVAL; + local = frame->local; + + writeback_version = __br_stub_writeback_version(ctx); + + op_errno = ENOMEM; + dict = dict_new(); + if (!dict) + goto out; + ret = br_stub_alloc_versions(&obuf, NULL, 0); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ALLOC_MEM_FAILED, + "gfid=%s", uuid_utoa(fd->inode->gfid), NULL); + goto out; + } + ret = br_stub_prepare_version_request(this, dict, obuf, writeback_version); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_VERSION_PREPARE_FAIL, + "gfid=%s", uuid_utoa(fd->inode->gfid), NULL); + br_stub_dealloc_versions(obuf); + goto out; + } + + ret = br_stub_fd_versioning( + this, frame, stub, dict, fd, br_stub_fd_incversioning_cbk, + writeback_version, BR_STUB_INCREMENTAL_VERSIONING, !WRITEBACK_DURABLE); +out: + if (dict) + dict_unref(dict); + if (ret) { + if (local) + frame->local = NULL; + call_unwind_error(stub, -1, op_errno); + if (local) { + br_stub_cleanup_local(local); + br_stub_dealloc_local(local); } + } - local = br_stub_alloc_local (this); - if (!local) { - ret = -1; - goto dealloc_xdata; + return ret; +} + +/** {{{ */ + +/* fsetxattr() */ + +int32_t +br_stub_perform_objsign(call_frame_t *frame, xlator_t *this, fd_t *fd, + dict_t *dict, int flags, dict_t *xdata) +{ + STACK_WIND(frame, default_fsetxattr_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata); + + dict_unref(xdata); + return 0; +} + +void * +br_stub_signth(void *arg) +{ + xlator_t *this = arg; + br_stub_private_t *priv = this->private; + struct br_stub_signentry *sigstub = NULL; + + THIS = this; + while (1) { + /* + * Disabling bit-rot feature leads to this particular thread + * getting cleaned up by reconfigure via a call to the function + * gf_thread_cleanup_xint (which in turn calls pthread_cancel + * and pthread_join). But, if this thread had held the mutex + * &priv->lock at the time of cancellation, then it leads to + * deadlock in future when bit-rot feature is enabled (which + * again spawns this thread which cant hold the lock as the + * mutex is still held by the previous instance of the thread + * which got killed). Also, the br_stub_handle_object_signature + * function which is called whenever file has to be signed + * also gets blocked as it too attempts to acquire &priv->lock. + * + * So, arrange for the lock to be unlocked as part of the + * cleanup of this thread using pthread_cleanup_push and + * pthread_cleanup_pop. + */ + pthread_cleanup_push(br_stub_lock_cleaner, &priv->lock); + pthread_mutex_lock(&priv->lock); + { + while (list_empty(&priv->squeue)) + pthread_cond_wait(&priv->cond, &priv->lock); + + sigstub = list_first_entry(&priv->squeue, struct br_stub_signentry, + list); + list_del_init(&sigstub->list); } + pthread_mutex_unlock(&priv->lock); + pthread_cleanup_pop(0); - if (versioningtype == BR_STUB_FULL_VERSIONING) - flags |= XATTR_CREATE; + call_resume(sigstub->stub); - br_stub_fill_local (local, stub, fd, - fd->inode, fd->inode->gfid, - versioningtype, memversion, dirty); + GF_FREE(sigstub); + } - frame->local = local; - STACK_WIND (frame, callback, - FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetxattr, - fd, dict, flags, xdata); + return NULL; +} - ret = 0; +static gf_boolean_t +br_stub_internal_xattr(dict_t *dict) +{ + if (dict_get(dict, GLUSTERFS_SET_OBJECT_SIGNATURE) || + dict_get(dict, GLUSTERFS_GET_OBJECT_SIGNATURE) || + dict_get(dict, BR_REOPEN_SIGN_HINT_KEY) || + dict_get(dict, BITROT_OBJECT_BAD_KEY) || + dict_get(dict, BITROT_SIGNING_VERSION_KEY) || + dict_get(dict, BITROT_CURRENT_VERSION_KEY)) + return _gf_true; + + return _gf_false; +} - dealloc_xdata: - if (durable) - dict_unref (xdata); - done: - return ret; +int +orderq(struct list_head *elem1, struct list_head *elem2) +{ + struct br_stub_signentry *s1 = NULL; + struct br_stub_signentry *s2 = NULL; + + s1 = list_entry(elem1, struct br_stub_signentry, list); + s2 = list_entry(elem2, struct br_stub_signentry, list); + + return (s1->v > s2->v); } -static inline int -br_stub_perform_fullversioning (xlator_t *this, call_frame_t *frame, - call_stub_t *stub, fd_t *fd) +static int +br_stub_compare_sign_version(xlator_t *this, inode_t *inode, + br_signature_t *sbuf, dict_t *dict, + int *fakesuccess) { - int32_t ret = -1; - dict_t *dict = NULL; - br_version_t *obuf = NULL; - int op_errno = 0; + int32_t ret = -1; + uint64_t tmp_ctx = 0; + gf_boolean_t invalid = _gf_false; + br_stub_inode_ctx_t *ctx = NULL; + + GF_VALIDATE_OR_GOTO("bit-rot-stub", this, out); + GF_VALIDATE_OR_GOTO(this->name, inode, out); + GF_VALIDATE_OR_GOTO(this->name, sbuf, out); + GF_VALIDATE_OR_GOTO(this->name, dict, out); + + ret = br_stub_get_inode_ctx(this, inode, &tmp_ctx); + if (ret) { + dict_del(dict, BITROT_SIGNING_VERSION_KEY); + goto out; + } + + ctx = (br_stub_inode_ctx_t *)(long)tmp_ctx; + + LOCK(&inode->lock); + { + if (ctx->currentversion < sbuf->signedversion) { + invalid = _gf_true; + } else if (ctx->currentversion > sbuf->signedversion) { + gf_msg_debug(this->name, 0, + "\"Signing version\" " + "(%lu) lower than \"Current version \" " + "(%lu)", + ctx->currentversion, sbuf->signedversion); + *fakesuccess = 1; + } + } + UNLOCK(&inode->lock); + + if (invalid) { + ret = -1; + gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SIGN_VERSION_ERROR, + "Signing-ver=%lu", sbuf->signedversion, "current-ver=%lu", + ctx->currentversion, NULL); + } + +out: + return ret; +} - op_errno = ENOMEM; - dict = dict_new (); - if (!dict) - goto done; - ret = br_stub_alloc_versions (&obuf, NULL, 0); - if (ret) - goto dealloc_dict; +static int +br_stub_prepare_signature(xlator_t *this, dict_t *dict, inode_t *inode, + br_isignature_t *sign, int *fakesuccess) +{ + int32_t ret = -1; + size_t signaturelen = 0; + br_signature_t *sbuf = NULL; + + if (!br_is_signature_type_valid(sign->signaturetype)) + goto out; + + signaturelen = sign->signaturelen; + ret = br_stub_alloc_versions(NULL, &sbuf, signaturelen); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ALLOC_MEM_FAILED, + "gfid=%s", uuid_utoa(inode->gfid), NULL); + ret = -1; + goto out; + } + ret = br_stub_prepare_signing_request(dict, sbuf, sign, signaturelen); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_SIGN_PREPARE_FAIL, + "gfid=%s", uuid_utoa(inode->gfid), NULL); + ret = -1; + br_stub_dealloc_versions(sbuf); + goto out; + } + + /* At this point sbuf has been added to dict, so the memory will be freed + * when the data from the dict is destroyed + */ + ret = br_stub_compare_sign_version(this, inode, sbuf, dict, fakesuccess); +out: + return ret; +} - op_errno = EINVAL; - ret = br_stub_prepare_version_request (this, dict, obuf, - BITROT_DEFAULT_CURRENT_VERSION); - if (ret) - goto dealloc_versions; +static void +br_stub_handle_object_signature(call_frame_t *frame, xlator_t *this, fd_t *fd, + dict_t *dict, br_isignature_t *sign, + dict_t *xdata) +{ + int32_t ret = -1; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + int fakesuccess = 0; + br_stub_private_t *priv = NULL; + struct br_stub_signentry *sigstub = NULL; + + priv = this->private; + + if (frame->root->pid != GF_CLIENT_PID_BITD) { + gf_smsg(this->name, GF_LOG_WARNING, op_errno, BRS_MSG_NON_BITD_PID, + "PID=%d", frame->root->pid, NULL); + goto dofop; + } + + ret = br_stub_prepare_signature(this, dict, fd->inode, sign, &fakesuccess); + if (ret) { + gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SIGN_PREPARE_FAIL, + "gfid=%s", uuid_utoa(fd->inode->gfid), NULL); + goto dofop; + } + if (fakesuccess) { + op_ret = op_errno = 0; + goto dofop; + } + + dict_del(dict, GLUSTERFS_SET_OBJECT_SIGNATURE); + + ret = -1; + if (!xdata) { + xdata = dict_new(); + if (!xdata) + goto dofop; + } else { + dict_ref(xdata); + } + + ret = dict_set_int32(xdata, GLUSTERFS_DURABLE_OP, 0); + if (ret) + goto unref_dict; - /** - * Version extended attributes need not be durable at this point of - * time. If the objects (inode) data gets persisted on disk but the - * version extended attributes are lost due to a crash/power failure, - * a subsequent lookup marks the objects signature as stale. This way, - * dentry operation times do not shoot up. - */ - ret = br_stub_fd_versioning (this, frame, stub, dict, fd, - br_stub_inode_fullversioning_cbk, - BITROT_DEFAULT_CURRENT_VERSION, - BR_STUB_FULL_VERSIONING, !WRITEBACK_DURABLE, 0); - - dealloc_versions: - br_stub_dealloc_versions (obuf); - dealloc_dict: - dict_unref (dict); - done: - if (ret) - call_unwind_error (stub, -1, op_errno); - return ret; + /* prepare dispatch stub to order object signing */ + sigstub = GF_CALLOC(1, sizeof(*sigstub), gf_br_stub_mt_sigstub_t); + if (!sigstub) + goto unref_dict; + + INIT_LIST_HEAD(&sigstub->list); + sigstub->v = ntohl(sign->signedversion); + sigstub->stub = fop_fsetxattr_stub(frame, br_stub_perform_objsign, fd, dict, + 0, xdata); + if (!sigstub->stub) + goto cleanup_stub; + + pthread_mutex_lock(&priv->lock); + { + list_add_order(&sigstub->list, &priv->squeue, orderq); + pthread_cond_signal(&priv->cond); + } + pthread_mutex_unlock(&priv->lock); + + return; + +cleanup_stub: + GF_FREE(sigstub); +unref_dict: + dict_unref(xdata); +dofop: + STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL); } -static inline int -br_stub_perform_incversioning (xlator_t *this, - call_frame_t *frame, call_stub_t *stub, - fd_t *fd, br_stub_inode_ctx_t *ctx) +int32_t +br_stub_fsetxattr_resume(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) { - int32_t ret = -1; - dict_t *dict = NULL; - inode_t *inode = NULL; - br_version_t *obuf = NULL; - unsigned long writeback_version = 0; - int op_errno = 0; + int32_t ret = -1; + br_stub_local_t *local = NULL; - inode = fd->inode; + local = frame->local; + frame->local = NULL; + ret = br_stub_mark_inode_modified(this, local); + if (ret) { + op_ret = -1; op_errno = EINVAL; - ret = br_stub_require_release_call (this, fd); - if (ret) - goto done; + } - LOCK (&inode->lock); - { - if (__br_stub_is_inode_dirty (ctx)) - writeback_version = __br_stub_writeback_version (ctx); - else - __br_stub_track_openfd (fd, ctx); - } - UNLOCK (&inode->lock); + STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, xdata); - if (!writeback_version) { - ret = 0; - goto done; - } + br_stub_cleanup_local(local); + br_stub_dealloc_local(local); - /* inode requires writeback to disk */ - op_errno = ENOMEM; - dict = dict_new (); - if (!dict) - goto done; - ret = br_stub_alloc_versions (&obuf, NULL, 0); - if (ret) - goto dealloc_dict; - ret = br_stub_prepare_version_request (this, dict, - obuf, writeback_version); - if (ret) - goto dealloc_versions; - - ret = br_stub_fd_versioning - (this, frame, stub, dict, - fd, br_stub_fd_incversioning_cbk, writeback_version, - BR_STUB_INCREMENTAL_VERSIONING, WRITEBACK_DURABLE, 0); - - dealloc_versions: - br_stub_dealloc_versions (obuf); - dealloc_dict: - dict_unref (dict); - done: - if (!ret && !writeback_version) - call_resume (stub); - if (ret) - call_unwind_error (stub, -1, op_errno); - return ret; + return 0; } -/** {{{ */ +/** + * Handles object reopens. Object reopens can be of 3 types. 2 are from + * oneshot crawler and 1 from the regular signer. + * ONESHOT CRAWLER: + * For those objects which were created before bitrot was enabled. oneshow + * crawler crawls the namespace and signs all the objects. It has to do + * the versioning before making bit-rot-stub send a sign notification. + * So it sends fsetxattr with BR_OBJECT_REOPEN as the value. And bit-rot-stub + * upon getting BR_OBJECT_REOPEN value checks if the version has to be + * increased or not. By default the version will be increased. But if the + * object is modified before BR_OBJECT_REOPEN from oneshot crawler, then + * versioning need not be done. In that case simply a success is returned. + * SIGNER: + * Signer wait for 2 minutes upon getting the notification from bit-rot-stub + * and then it sends a dummy write (in reality a fsetxattr) call, to change + * the state of the inode from REOPEN_WAIT to SIGN_QUICK. The funny part here + * is though the inode's state is REOPEN_WAIT, the call sent by signer is + * BR_OBJECT_RESIGN. Once the state is changed to SIGN_QUICK, then yet another + * notification is sent upon release (RESIGN would have happened via fsetxattr, + * so a fd is needed) and the object is signed truly this time. + * There is a challenge in the above RESIGN method by signer. After sending + * the 1st notification, the inode could be forgotten before RESIGN request + * is received. In that case, the inode's context (the newly looked up inode) + * would not indicate the inode as being modified (it would be in the default + * state) and because of this, a SIGN_QUICK notification to truly sign the + * object would not be sent. So, this is how its handled. + * if (request == RESIGN) { + * if (inode->sign_info == NORMAL) { + * mark_inode_non_dirty; + * mark_inode_modified; + * } + * GOBACK (means unwind without doing versioning) + * } + */ +static void +br_stub_handle_object_reopen(call_frame_t *frame, xlator_t *this, fd_t *fd, + uint32_t val) +{ + int32_t ret = -1; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + call_stub_t *stub = NULL; + gf_boolean_t inc_version = _gf_false; + gf_boolean_t modified = _gf_false; + br_stub_inode_ctx_t *ctx = NULL; + br_stub_local_t *local = NULL; + gf_boolean_t goback = _gf_true; + + ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx); + if (ret) + goto unwind; + + LOCK(&fd->inode->lock); + { + if ((val == BR_OBJECT_REOPEN) && inc_version) + goback = _gf_false; + if (val == BR_OBJECT_RESIGN && ctx->info_sign == BR_SIGN_NORMAL) { + __br_stub_mark_inode_synced(ctx); + __br_stub_set_inode_modified(ctx); + } + (void)__br_stub_inode_sign_state(ctx, GF_FOP_FSETXATTR, fd); + } + UNLOCK(&fd->inode->lock); + + if (goback) { + op_ret = op_errno = 0; + goto unwind; + } + + ret = br_stub_versioning_prep(frame, this, fd, ctx); + if (ret) + goto unwind; + local = frame->local; + + stub = fop_fsetxattr_cbk_stub(frame, br_stub_fsetxattr_resume, 0, 0, NULL); + if (!stub) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED, + "fsetxattr gfid=%s", uuid_utoa(fd->inode->gfid), NULL); + goto cleanup_local; + } + + (void)br_stub_perform_incversioning(this, frame, stub, fd, ctx); + return; + +cleanup_local: + br_stub_cleanup_local(local); + br_stub_dealloc_local(local); + +unwind: + frame->local = NULL; + STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL); +} -/* fsetxattr() */ +/** + * This function only handles bad file identification. Instead of checking in + * fops like open, readv, writev whether the object is bad or not by doing + * getxattr calls, better to catch them when scrubber marks it as bad. + * So this callback is called only when the fsetxattr is sent by the scrubber + * to mark the object as bad. + */ +int +br_stub_fsetxattr_bad_object_cbk(call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno, dict_t *xdata) +{ + br_stub_local_t *local = NULL; + int32_t ret = -1; + + local = frame->local; + frame->local = NULL; + + if (op_ret < 0) + goto unwind; + + /* + * What to do if marking the object as bad fails? (i.e. in memory + * marking within the inode context. If we are here means fsetxattr + * fop has succeeded on disk and the bad object xattr has been set). + * We can return failure to scruber, but there is nothing the scrubber + * can do with it (it might assume that the on disk setxattr itself has + * failed). The main purpose of this operation is to help identify the + * bad object by checking the inode context itself (thus avoiding the + * necessity of doing a getxattr fop on the disk). + * + * So as of now, success itself is being returned even though inode + * context set operation fails. + * In future if there is any change in the policy which can handle this, + * then appropriate response should be sent (i.e. success or error). + */ + ret = br_stub_mark_object_bad(this, local->u.context.inode); + if (ret) + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_MARK_FAIL, + "gfid=%s", uuid_utoa(local->u.context.inode->gfid), NULL); + + ret = br_stub_add(this, local->u.context.inode->gfid); + +unwind: + STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, xdata); + br_stub_cleanup_local(local); + br_stub_dealloc_local(local); + return 0; +} + +static int32_t +br_stub_handle_bad_object_key(call_frame_t *frame, xlator_t *this, fd_t *fd, + dict_t *dict, int flags, dict_t *xdata) +{ + br_stub_local_t *local = NULL; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + + if (frame->root->pid != GF_CLIENT_PID_SCRUB) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_NON_SCRUB_BAD_OBJ_MARK, + "gfid=%s", uuid_utoa(fd->inode->gfid), NULL); + goto unwind; + } + + local = br_stub_alloc_local(this); + if (!local) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ALLOC_MEM_FAILED, + "fsetxattr gfid=%s", uuid_utoa(fd->inode->gfid), NULL); + op_ret = -1; + op_errno = ENOMEM; + goto unwind; + } + + br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid, + BR_STUB_NO_VERSIONING, 0); + frame->local = local; + + STACK_WIND(frame, br_stub_fsetxattr_bad_object_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata); + return 0; +unwind: + STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL); + return 0; +} -static inline int -br_stub_prepare_signature (xlator_t *this, dict_t *dict, - inode_t *inode, br_isignature_t *sign) +/** + * As of now, versioning is done by the stub (though as a setxattr + * operation) as part of inode modification operations such as writev, + * truncate, ftruncate. And signing is done by BitD by a fsetxattr call. + * So any kind of setxattr coming on the versioning and the signing xattr is + * not allowed (i.e. BITROT_CURRENT_VERSION_KEY and BITROT_SIGNING_VERSION_KEY). + * In future if BitD/scrubber are allowed to change the versioning + * xattrs (though I cannot see a reason for it as of now), then the below + * function can be modified to block setxattr on version for only applications. + * + * NOTE: BitD sends sign request on GLUSTERFS_SET_OBJECT_SIGNATURE key. + * BITROT_SIGNING_VERSION_KEY is the xattr used to save the signature. + * + */ +static int32_t +br_stub_handle_internal_xattr(call_frame_t *frame, xlator_t *this, fd_t *fd, + char *key) { - int32_t ret = 0; - size_t signaturelen = 0; - br_signature_t *sbuf = NULL; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; - if (!br_is_signature_type_valid (sign->signaturetype)) - goto error_return; + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_SET_INTERNAL_XATTR, + "setxattr key=%s", key, "inode-gfid=%s", uuid_utoa(fd->inode->gfid), + NULL); - signaturelen = sign->signaturelen; - ret = br_stub_alloc_versions (NULL, &sbuf, signaturelen); - if (ret) - goto error_return; - ret = br_stub_prepare_signing_request (dict, sbuf, sign, signaturelen); - if (ret) - goto dealloc_versions; - return 0; + STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL); + return 0; +} - dealloc_versions: - br_stub_dealloc_versions (sbuf); - error_return: - return -1; +static void +br_stub_dump_xattr(xlator_t *this, dict_t *dict, int *op_errno) +{ + char *format = "(%s:%s)"; + char *dump = NULL; + + dump = GF_CALLOC(1, BR_STUB_DUMP_STR_SIZE, gf_br_stub_mt_misc); + if (!dump) { + *op_errno = ENOMEM; + goto out; + } + dict_dump_to_str(dict, dump, BR_STUB_DUMP_STR_SIZE, format); + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_SET_INTERNAL_XATTR, + "fsetxattr dump=%s", dump, NULL); +out: + if (dump) { + GF_FREE(dump); + } + return; } int -br_stub_fsetxattr (call_frame_t *frame, xlator_t *this, - fd_t *fd, dict_t *dict, int flags, dict_t *xdata) +br_stub_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict, + int flags, dict_t *xdata) { - int32_t ret = 0; - br_isignature_t *sign = NULL; - gf_boolean_t xref = _gf_false; + int32_t ret = 0; + uint32_t val = 0; + br_isignature_t *sign = NULL; + br_stub_private_t *priv = NULL; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + + priv = this->private; + + if ((frame->root->pid != GF_CLIENT_PID_BITD && + frame->root->pid != GF_CLIENT_PID_SCRUB) && + br_stub_internal_xattr(dict)) { + br_stub_dump_xattr(this, dict, &op_errno); + goto unwind; + } + + if (!priv->do_versioning) + goto wind; + + if (!IA_ISREG(fd->inode->ia_type)) + goto wind; + + /* object signature request */ + ret = dict_get_bin(dict, GLUSTERFS_SET_OBJECT_SIGNATURE, (void **)&sign); + if (!ret) { + gf_msg_debug(this->name, 0, "got SIGNATURE request on %s", + uuid_utoa(fd->inode->gfid)); + br_stub_handle_object_signature(frame, this, fd, dict, sign, xdata); + goto done; + } - if (!IA_ISREG (fd->inode->ia_type)) - goto wind; - ret = dict_get_bin (dict, GLUSTERFS_SET_OBJECT_SIGNATURE, - (void **) &sign); - if (ret < 0) - goto wind; - if (frame->root->pid != GF_CLIENT_PID_BITD) - goto unwind; + /* signing xattr */ + if (dict_get(dict, BITROT_SIGNING_VERSION_KEY)) { + br_stub_handle_internal_xattr(frame, this, fd, + BITROT_SIGNING_VERSION_KEY); + goto done; + } - ret = br_stub_prepare_signature (this, dict, fd->inode, sign); - if (ret) - goto unwind; - dict_del (dict, GLUSTERFS_SET_OBJECT_SIGNATURE); - - if (!xdata) { - xdata = dict_new (); - if (!xdata) - goto unwind; - } else { - dict_ref (xdata); - } + /* version xattr */ + if (dict_get(dict, BITROT_CURRENT_VERSION_KEY)) { + br_stub_handle_internal_xattr(frame, this, fd, + BITROT_CURRENT_VERSION_KEY); + goto done; + } - xref = _gf_true; - ret = dict_set_int32 (xdata, GLUSTERFS_DURABLE_OP, 0); - if (ret) - goto unwind; + if (dict_get(dict, GLUSTERFS_GET_OBJECT_SIGNATURE)) { + br_stub_handle_internal_xattr(frame, this, fd, + GLUSTERFS_GET_OBJECT_SIGNATURE); + goto done; + } - wind: - STACK_WIND (frame, default_setxattr_cbk, - FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetxattr, fd, - dict, flags, xdata); + /* object reopen request */ + ret = dict_get_uint32(dict, BR_REOPEN_SIGN_HINT_KEY, &val); + if (!ret) { + br_stub_handle_object_reopen(frame, this, fd, val); goto done; + } - unwind: - STACK_UNWIND_STRICT (setxattr, frame, -1, EINVAL, NULL); - done: - if (xref) - dict_unref (xdata); - return 0; + /* handle bad object */ + if (dict_get(dict, BITROT_OBJECT_BAD_KEY)) { + br_stub_handle_bad_object_key(frame, this, fd, dict, flags, xdata); + goto done; + } + +wind: + STACK_WIND(frame, default_fsetxattr_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata); + return 0; + +unwind: + STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL); + +done: + return 0; +} + +/** + * Currently BitD and scrubber are doing fsetxattr to either sign the object + * or to mark it as bad. Hence setxattr on any of those keys is denied directly + * without checking from where the fop is coming. + * Later, if BitD or Scrubber does setxattr of those keys, then appropriate + * check has to be added below. + */ +int +br_stub_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, + int flags, dict_t *xdata) +{ + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + + if (br_stub_internal_xattr(dict)) { + br_stub_dump_xattr(this, dict, &op_errno); + goto unwind; + } + + STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, + loc, dict, flags, xdata); + return 0; +unwind: + STACK_UNWIND_STRICT(setxattr, frame, op_ret, op_errno, NULL); + return 0; } /** }}} */ +/** {{{ */ + +/* {f}removexattr() */ + +int32_t +br_stub_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc, + const char *name, dict_t *xdata) +{ + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + + if (!strcmp(BITROT_OBJECT_BAD_KEY, name) || + !strcmp(BITROT_SIGNING_VERSION_KEY, name) || + !strcmp(BITROT_CURRENT_VERSION_KEY, name)) { + gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_REMOVE_INTERNAL_XATTR, + "name=%s", name, "file-path=%s", loc->path, NULL); + goto unwind; + } + + STACK_WIND_TAIL(frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->removexattr, loc, name, xdata); + return 0; +unwind: + STACK_UNWIND_STRICT(removexattr, frame, op_ret, op_errno, NULL); + return 0; +} + +int32_t +br_stub_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd, + const char *name, dict_t *xdata) +{ + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + + if (!strcmp(BITROT_OBJECT_BAD_KEY, name) || + !strcmp(BITROT_SIGNING_VERSION_KEY, name) || + !strcmp(BITROT_CURRENT_VERSION_KEY, name)) { + gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_REMOVE_INTERNAL_XATTR, + "name=%s", name, "inode-gfid=%s", uuid_utoa(fd->inode->gfid), + NULL); + goto unwind; + } + + STACK_WIND_TAIL(frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata); + return 0; +unwind: + STACK_UNWIND_STRICT(fremovexattr, frame, op_ret, op_errno, NULL); + return 0; +} + +/** }}} */ /** {{{ */ /* {f}getxattr() */ int -br_stub_listxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int op_ret, int op_errno, dict_t *xattr, dict_t *xdata) +br_stub_listxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, dict_t *xattr, dict_t *xdata) { - if (op_ret < 0) - goto unwind; + if (op_ret < 0) + goto unwind; - br_stub_remove_vxattrs (xattr); + br_stub_remove_vxattrs(xattr, _gf_true); - unwind: - STACK_UNWIND (frame, op_ret, op_errno, xattr, xdata); - return 0; +unwind: + STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, xattr, xdata); + return 0; } +/** + * ONE SHOT CRAWLER from BitD signs the objects that it encounters while + * crawling, if the object is identified as stale by the stub. Stub follows + * the below logic to mark an object as stale or not. + * If the ongoing version and the signed_version match, then the object is not + * stale. Just return. Otherwise if they does not match, then it means one + * of the below things. + * 1) If the inode does not need write back of the version and the sign state is + * is NORMAL, then some active i/o is going on the object. So skip it. + * A notification will be sent to trigger the sign once the release is + * received on the object. + * 2) If inode does not need writeback of the version and the sign state is + * either reopen wait or quick sign, then it means: + * A) BitD restarted and it is not sure whether the object it encountered + * while crawling is in its timer wheel or not. Since there is no way to + * scan the timer wheel as of now, ONE SHOT CRAWLER just goes ahead and + * signs the object. Since the inode does not need writeback, version will + * not be incremented and directly the object will be signed. + * 3) If the inode needs writeback, then it means the inode was forgotten after + * the versioning and it has to be signed now. + * + * This is the algorithm followed: + * if (ongoing_version == signed_version); then + * object_is_not_stale; + * return; + * else; then + * if (!inode_needs_writeback && inode_sign_state != NORMAL); then + * object_is_stale; + * if (inode_needs_writeback); then + * object_is_stale; + * + * For SCRUBBER, no need to check for the sign state and inode writeback. + * If the ondisk ongoingversion and the ondisk signed version does not match, + * then treat the object as stale. + */ +char +br_stub_is_object_stale(xlator_t *this, call_frame_t *frame, inode_t *inode, + br_version_t *obuf, br_signature_t *sbuf) +{ + uint64_t ctx_addr = 0; + br_stub_inode_ctx_t *ctx = NULL; + int32_t ret = -1; + char stale = 0; + + if (obuf->ongoingversion == sbuf->signedversion) + goto out; + + if (frame->root->pid == GF_CLIENT_PID_SCRUB) { + stale = 1; + goto out; + } + + ret = br_stub_get_inode_ctx(this, inode, &ctx_addr); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_GET_INODE_CONTEXT_FAILED, + "gfid=%s", uuid_utoa(inode->gfid), NULL); + goto out; + } + + ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; + + LOCK(&inode->lock); + { + if ((!__br_stub_is_inode_dirty(ctx) && + ctx->info_sign != BR_SIGN_NORMAL) || + __br_stub_is_inode_dirty(ctx)) + stale = 1; + } + UNLOCK(&inode->lock); + +out: + return stale; +} int -br_stub_getxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int op_ret, int op_errno, dict_t *xattr, dict_t *xdata) +br_stub_getxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, dict_t *xattr, dict_t *xdata) { - int32_t ret = 0; - size_t totallen = 0; - size_t signaturelen = 0; - br_version_t *obuf = NULL; - br_signature_t *sbuf = NULL; - br_isignature_out_t *sign = NULL; - br_vxattr_status_t status; + int32_t ret = 0; + size_t totallen = 0; + size_t signaturelen = 0; + br_stub_private_t *priv = NULL; + br_version_t *obuf = NULL; + br_signature_t *sbuf = NULL; + br_isignature_out_t *sign = NULL; + br_vxattr_status_t status; + br_stub_local_t *local = NULL; + inode_t *inode = NULL; + gf_boolean_t bad_object = _gf_false; + gf_boolean_t ver_enabled = _gf_false; + + BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled); + priv = this->private; + + if (op_ret < 0) + goto unwind; + BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), delkeys); + + if (cookie != (void *)BR_STUB_REQUEST_COOKIE) + goto unwind; + + local = frame->local; + frame->local = NULL; + if (!local) { + op_ret = -1; + op_errno = EINVAL; + goto unwind; + } + inode = local->u.context.inode; + + op_ret = -1; + status = br_version_xattr_state(xattr, &obuf, &sbuf, &bad_object); + + op_errno = EIO; + if (bad_object) + goto delkeys; + + op_errno = EINVAL; + if (status == BR_VXATTR_STATUS_INVALID) + goto delkeys; + + op_errno = ENODATA; + if ((status == BR_VXATTR_STATUS_MISSING) || + (status == BR_VXATTR_STATUS_UNSIGNED)) + goto delkeys; + + /** + * okay.. we have enough information to satisfy the request, + * namely: version and signing extended attribute. what's + * pending is the signature length -- that's figured out + * indirectly via the size of the _whole_ xattr and the + * on-disk signing xattr header size. + */ + op_errno = EINVAL; + ret = dict_get_uint32(xattr, BITROT_SIGNING_XATTR_SIZE_KEY, + (uint32_t *)&signaturelen); + if (ret) + goto delkeys; + + signaturelen -= sizeof(br_signature_t); + totallen = sizeof(br_isignature_out_t) + signaturelen; + + op_errno = ENOMEM; + sign = GF_CALLOC(1, totallen, gf_br_stub_mt_signature_t); + if (!sign) + goto delkeys; + + sign->time[0] = obuf->timebuf[0]; + sign->time[1] = obuf->timebuf[1]; + + /* Object's dirty state & current signed version */ + sign->version = sbuf->signedversion; + sign->stale = br_stub_is_object_stale(this, frame, inode, obuf, sbuf); + + /* Object's signature */ + sign->signaturelen = signaturelen; + sign->signaturetype = sbuf->signaturetype; + (void)memcpy(sign->signature, sbuf->signature, signaturelen); + + op_errno = EINVAL; + ret = dict_set_bin(xattr, GLUSTERFS_GET_OBJECT_SIGNATURE, (void *)sign, + totallen); + if (ret < 0) { + GF_FREE(sign); + goto delkeys; + } + op_errno = 0; + op_ret = totallen; + +delkeys: + br_stub_remove_vxattrs(xattr, _gf_true); + +unwind: + STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, xattr, xdata); + br_stub_cleanup_local(local); + br_stub_dealloc_local(local); + return 0; +} - if (op_ret < 0) - goto unwind; - if (cookie != (void *) BR_STUB_REQUEST_COOKIE) - goto unwind; +static void +br_stub_send_stub_init_time(call_frame_t *frame, xlator_t *this) +{ + int op_ret = 0; + int op_errno = 0; + dict_t *xattr = NULL; + br_stub_init_t stub = { + { + 0, + }, + }; + br_stub_private_t *priv = NULL; - op_ret = -1; - status = br_version_xattr_state (xattr, &obuf, &sbuf); + priv = this->private; - op_errno = EINVAL; - if (status == BR_VXATTR_STATUS_INVALID) - goto delkeys; + xattr = dict_new(); + if (!xattr) { + op_ret = -1; + op_errno = ENOMEM; + goto unwind; + } - op_errno = ENODATA; - if ((status == BR_VXATTR_STATUS_MISSING) - || (status == BR_VXATTR_STATUS_UNSIGNED)) - goto delkeys; + stub.timebuf[0] = priv->boot[0]; + stub.timebuf[1] = priv->boot[1]; + memcpy(stub.export, priv->export, strlen(priv->export) + 1); - /** - * okay.. we have enough information to satisfy the request, - * namely: version and signing extended attribute. what's - * pending is the signature length -- that's figured out - * indirectly via the size of the _whole_ xattr and the - * on-disk signing xattr header size. - */ + op_ret = dict_set_static_bin(xattr, GLUSTERFS_GET_BR_STUB_INIT_TIME, + (void *)&stub, sizeof(br_stub_init_t)); + if (op_ret < 0) { op_errno = EINVAL; - ret = dict_get_uint32 (xattr, BITROT_SIGNING_XATTR_SIZE_KEY, - (uint32_t *)&signaturelen); - if (ret) - goto delkeys; + goto unwind; + } - signaturelen -= sizeof (br_signature_t); - totallen = sizeof (br_isignature_out_t) + signaturelen; + op_ret = sizeof(br_stub_init_t); - op_errno = ENOMEM; - sign = GF_CALLOC (1, totallen, gf_br_stub_mt_signature_t); - if (!sign) - goto delkeys; +unwind: + STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, xattr, NULL); - sign->time[0] = obuf->timebuf[0]; - sign->time[1] = obuf->timebuf[1]; + if (xattr) + dict_unref(xattr); +} - /* Object's dirty state & current signed version */ - sign->version = sbuf->signedversion; - sign->stale = (obuf->ongoingversion != sbuf->signedversion) ? 1 : 0; +int +br_stub_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, + const char *name, dict_t *xdata) +{ + void *cookie = NULL; + static uuid_t rootgfid = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}; + fop_getxattr_cbk_t cbk = br_stub_getxattr_cbk; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + br_stub_local_t *local = NULL; + br_stub_private_t *priv = NULL; + + GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); + GF_VALIDATE_OR_GOTO(this->name, loc, unwind); + GF_VALIDATE_OR_GOTO(this->name, this->private, unwind); + GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind); + + if (!name) { + cbk = br_stub_listxattr_cbk; + goto wind; + } + + if (br_stub_is_internal_xattr(name)) + goto unwind; + + priv = this->private; + BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); + + /** + * If xattr is node-uuid and the inode is marked bad, return EIO. + * Returning EIO would result in AFR to choose correct node-uuid + * corresponding to the subvolume * where the good copy of the + * file resides. + */ + if (IA_ISREG(loc->inode->ia_type) && XATTR_IS_NODE_UUID(name) && + br_stub_check_bad_object(this, loc->inode, &op_ret, &op_errno)) { + goto unwind; + } + + /** + * this special extended attribute is allowed only on root + */ + if (name && + (strncmp(name, GLUSTERFS_GET_BR_STUB_INIT_TIME, + sizeof(GLUSTERFS_GET_BR_STUB_INIT_TIME) - 1) == 0) && + ((gf_uuid_compare(loc->gfid, rootgfid) == 0) || + (gf_uuid_compare(loc->inode->gfid, rootgfid) == 0))) { + BR_STUB_RESET_LOCAL_NULL(frame); + br_stub_send_stub_init_time(frame, this); + return 0; + } - /* Object's signature */ - sign->signaturelen = signaturelen; - sign->signaturetype = sbuf->signaturetype; - (void) memcpy (sign->signature, sbuf->signature, signaturelen); + if (!IA_ISREG(loc->inode->ia_type)) + goto wind; - op_errno = EINVAL; - ret = dict_set_bin (xattr, GLUSTERFS_GET_OBJECT_SIGNATURE, - (void *)sign, totallen); - if (ret < 0) - goto delkeys; - op_errno = 0; - op_ret = totallen; + if (name && (strncmp(name, GLUSTERFS_GET_OBJECT_SIGNATURE, + sizeof(GLUSTERFS_GET_OBJECT_SIGNATURE) - 1) == 0)) { + cookie = (void *)BR_STUB_REQUEST_COOKIE; - delkeys: - br_stub_remove_vxattrs (xattr); + local = br_stub_alloc_local(this); + if (!local) { + op_ret = -1; + op_errno = ENOMEM; + goto unwind; + } - unwind: - STACK_UNWIND (frame, op_ret, op_errno, xattr, xdata); - return 0; + br_stub_fill_local(local, NULL, NULL, loc->inode, loc->inode->gfid, + BR_STUB_NO_VERSIONING, 0); + frame->local = local; + } + +wind: + STACK_WIND_COOKIE(frame, cbk, cookie, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->getxattr, loc, name, xdata); + return 0; +unwind: + BR_STUB_RESET_LOCAL_NULL(frame); + STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, NULL, NULL); + return 0; } -static inline void -br_stub_send_stub_init_time (call_frame_t *frame, xlator_t *this) +int +br_stub_fgetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, + const char *name, dict_t *xdata) { - int op_ret = 0; - int op_errno = 0; - dict_t *xattr = NULL; - br_stub_init_t stub = {{0,},}; - br_stub_private_t *priv = NULL; + void *cookie = NULL; + static uuid_t rootgfid = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}; + fop_fgetxattr_cbk_t cbk = br_stub_getxattr_cbk; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + br_stub_local_t *local = NULL; + br_stub_private_t *priv = NULL; + + priv = this->private; + + if (!name) { + cbk = br_stub_listxattr_cbk; + goto wind; + } + + if (br_stub_is_internal_xattr(name)) + goto unwind; + + BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); + + /** + * If xattr is node-uuid and the inode is marked bad, return EIO. + * Returning EIO would result in AFR to choose correct node-uuid + * corresponding to the subvolume * where the good copy of the + * file resides. + */ + if (IA_ISREG(fd->inode->ia_type) && XATTR_IS_NODE_UUID(name) && + br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno)) { + goto unwind; + } + + /** + * this special extended attribute is allowed only on root + */ + if (name && + (strncmp(name, GLUSTERFS_GET_BR_STUB_INIT_TIME, + sizeof(GLUSTERFS_GET_BR_STUB_INIT_TIME) - 1) == 0) && + (gf_uuid_compare(fd->inode->gfid, rootgfid) == 0)) { + BR_STUB_RESET_LOCAL_NULL(frame); + br_stub_send_stub_init_time(frame, this); + return 0; + } + + if (!IA_ISREG(fd->inode->ia_type)) + goto wind; - priv = this->private; + if (name && (strncmp(name, GLUSTERFS_GET_OBJECT_SIGNATURE, + sizeof(GLUSTERFS_GET_OBJECT_SIGNATURE) - 1) == 0)) { + cookie = (void *)BR_STUB_REQUEST_COOKIE; - xattr = dict_new (); - if (!xattr) { - op_ret = -1; - op_errno = ENOMEM; - goto unwind; + local = br_stub_alloc_local(this); + if (!local) { + op_ret = -1; + op_errno = ENOMEM; + goto unwind; } - stub.timebuf[0] = priv->boot[0]; - stub.timebuf[1] = priv->boot[1]; - memcpy (stub.export, priv->export, strlen (priv->export) + 1); + br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid, + BR_STUB_NO_VERSIONING, 0); + frame->local = local; + } + +wind: + STACK_WIND_COOKIE(frame, cbk, cookie, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fgetxattr, fd, name, xdata); + return 0; +unwind: + BR_STUB_RESET_LOCAL_NULL(frame); + STACK_UNWIND_STRICT(fgetxattr, frame, op_ret, op_errno, NULL, NULL); + return 0; +} - op_ret = dict_set_static_bin (xattr, GLUSTERFS_GET_BR_STUB_INIT_TIME, - (void *) &stub, sizeof (br_stub_init_t)); - if (op_ret < 0) { - op_errno = EINVAL; - goto unwind; - } +int32_t +br_stub_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, + off_t offset, uint32_t flags, dict_t *xdata) +{ + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + int32_t ret = -1; + br_stub_private_t *priv = NULL; + + GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); + GF_VALIDATE_OR_GOTO(this->name, frame, unwind); + GF_VALIDATE_OR_GOTO(this->name, this->private, unwind); + GF_VALIDATE_OR_GOTO(this->name, fd, unwind); + GF_VALIDATE_OR_GOTO(this->name, fd->inode, unwind); + + priv = this->private; + if (!priv->do_versioning) + goto wind; + + ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno); + if (ret) + goto unwind; + +wind: + STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv, + fd, size, offset, flags, xdata); + return 0; + +unwind: + STACK_UNWIND_STRICT(readv, frame, op_ret, op_errno, NULL, 0, NULL, NULL, + NULL); + return 0; +} + +/** + * The first write response on the first fd in the list of fds will set + * the flag to indicate that the inode is modified. The subsequent write + * respnses coming on either the first fd or some other fd will not change + * the fd. The inode-modified flag is unset only upon release of all the + * fds. + */ +int32_t +br_stub_writev_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + int32_t ret = 0; + br_stub_local_t *local = NULL; - op_ret = sizeof (br_stub_init_t); + local = frame->local; + frame->local = NULL; - unwind: - STACK_UNWIND (frame, op_ret, op_errno, xattr, NULL); + if (op_ret < 0) + goto unwind; - if (xattr) - dict_unref (xattr); + ret = br_stub_mark_inode_modified(this, local); + if (ret) { + op_ret = -1; + op_errno = EINVAL; + } + +unwind: + STACK_UNWIND_STRICT(writev, frame, op_ret, op_errno, prebuf, postbuf, + xdata); + + br_stub_cleanup_local(local); + br_stub_dealloc_local(local); + + return 0; } -int -br_stub_getxattr (call_frame_t *frame, xlator_t *this, - loc_t *loc, const char *name, dict_t *xdata) +int32_t +br_stub_writev_resume(call_frame_t *frame, xlator_t *this, fd_t *fd, + struct iovec *vector, int32_t count, off_t offset, + uint32_t flags, struct iobref *iobref, dict_t *xdata) { - void *cookie = NULL; - uuid_t rootgfid = {0, }; - fop_getxattr_cbk_t cbk = br_stub_getxattr_cbk; + STACK_WIND(frame, br_stub_writev_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->writev, fd, vector, count, offset, + flags, iobref, xdata); + return 0; +} - rootgfid[15] = 1; +/** + * This is probably the most crucial part about the whole versioning thing. + * There's absolutely no differentiation as such between an anonymous fd + * and a regular fd except the fd context initialization. Object versioning + * is performed when the inode is dirty. Parallel write operations are no + * special with each write performing object versioning followed by marking + * the inode as non-dirty (synced). This is followed by the actual operation + * (writev() in this case) which on a success marks the inode as modified. + * This prevents signing of objects that have not been modified. + */ +int32_t +br_stub_writev(call_frame_t *frame, xlator_t *this, fd_t *fd, + struct iovec *vector, int32_t count, off_t offset, + uint32_t flags, struct iobref *iobref, dict_t *xdata) +{ + call_stub_t *stub = NULL; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + gf_boolean_t inc_version = _gf_false; + gf_boolean_t modified = _gf_false; + br_stub_inode_ctx_t *ctx = NULL; + int32_t ret = -1; + fop_writev_cbk_t cbk = default_writev_cbk; + br_stub_local_t *local = NULL; + br_stub_private_t *priv = NULL; + + GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); + GF_VALIDATE_OR_GOTO(this->name, this->private, unwind); + GF_VALIDATE_OR_GOTO(this->name, frame, unwind); + GF_VALIDATE_OR_GOTO(this->name, fd, unwind); + + priv = this->private; + if (!priv->do_versioning) + goto wind; + + ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx); + if (ret) + goto unwind; + + ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno); + if (ret) + goto unwind; + + /** + * The inode is not dirty and also witnessed at least one successful + * modification operation. Therefore, subsequent operations need not + * perform any special tracking. + */ + if (!inc_version && modified) + goto wind; + + /** + * okay.. so, either the inode needs versioning or the modification + * needs to be tracked. ->cbk is set to the appropriate callback + * routine for this. + * NOTE: ->local needs to be deallocated on failures from here on. + */ + ret = br_stub_versioning_prep(frame, this, fd, ctx); + if (ret) + goto unwind; + + local = frame->local; + if (!inc_version) { + br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid, + BR_STUB_NO_VERSIONING, 0); + cbk = br_stub_writev_cbk; + goto wind; + } + + stub = fop_writev_stub(frame, br_stub_writev_resume, fd, vector, count, + offset, flags, iobref, xdata); + + if (!stub) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED, + "write gfid=%s", uuid_utoa(fd->inode->gfid), NULL); + goto cleanup_local; + } + + /* Perform Versioning */ + return br_stub_perform_incversioning(this, frame, stub, fd, ctx); + +wind: + STACK_WIND(frame, cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, + fd, vector, count, offset, flags, iobref, xdata); + return 0; + +cleanup_local: + br_stub_cleanup_local(local); + br_stub_dealloc_local(local); + +unwind: + frame->local = NULL; + STACK_UNWIND_STRICT(writev, frame, op_ret, op_errno, NULL, NULL, NULL); + + return 0; +} - if (!name) { - cbk = br_stub_listxattr_cbk; - goto wind; - } +int32_t +br_stub_ftruncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + int32_t ret = -1; + br_stub_local_t *local = NULL; - if (br_stub_is_internal_xattr (name)) { - STACK_UNWIND (frame, -1, EINVAL, NULL, NULL); - return 0; - } + local = frame->local; + frame->local = NULL; - /** - * this special extended attribute is allowed only on root - */ - if (name - && (strncmp (name, GLUSTERFS_GET_BR_STUB_INIT_TIME, - strlen (GLUSTERFS_GET_BR_STUB_INIT_TIME)) == 0) - && ((gf_uuid_compare (loc->gfid, rootgfid) == 0) - || (gf_uuid_compare (loc->inode->gfid, rootgfid) == 0))) { - br_stub_send_stub_init_time (frame, this); - return 0; - } + if (op_ret < 0) + goto unwind; - if (!IA_ISREG (loc->inode->ia_type)) - goto wind; + ret = br_stub_mark_inode_modified(this, local); + if (ret) { + op_ret = -1; + op_errno = EINVAL; + } - if (name && (strncmp (name, GLUSTERFS_GET_OBJECT_SIGNATURE, - strlen (GLUSTERFS_GET_OBJECT_SIGNATURE)) == 0)) { - cookie = (void *) BR_STUB_REQUEST_COOKIE; - } +unwind: + STACK_UNWIND_STRICT(ftruncate, frame, op_ret, op_errno, prebuf, postbuf, + xdata); - wind: - STACK_WIND_COOKIE - (frame, cbk, cookie, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->getxattr, loc, name, xdata); - return 0; + br_stub_cleanup_local(local); + br_stub_dealloc_local(local); + + return 0; } -int -br_stub_fgetxattr (call_frame_t *frame, xlator_t *this, - fd_t *fd, const char *name, dict_t *xdata) +int32_t +br_stub_ftruncate_resume(call_frame_t *frame, xlator_t *this, fd_t *fd, + off_t offset, dict_t *xdata) { - void *cookie = NULL; - uuid_t rootgfid = {0, }; - fop_fgetxattr_cbk_t cbk = br_stub_getxattr_cbk; + STACK_WIND(frame, br_stub_ftruncate_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); + return 0; +} - rootgfid[15] = 1; +/* c.f. br_stub_writev() for explanation */ +int32_t +br_stub_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + dict_t *xdata) +{ + br_stub_local_t *local = NULL; + call_stub_t *stub = NULL; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + gf_boolean_t inc_version = _gf_false; + gf_boolean_t modified = _gf_false; + br_stub_inode_ctx_t *ctx = NULL; + int32_t ret = -1; + fop_ftruncate_cbk_t cbk = default_ftruncate_cbk; + br_stub_private_t *priv = NULL; + + GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); + GF_VALIDATE_OR_GOTO(this->name, this->private, unwind); + GF_VALIDATE_OR_GOTO(this->name, frame, unwind); + GF_VALIDATE_OR_GOTO(this->name, fd, unwind); + + priv = this->private; + if (!priv->do_versioning) + goto wind; + + ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx); + if (ret) + goto unwind; + + ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno); + if (ret) + goto unwind; + + if (!inc_version && modified) + goto wind; + + ret = br_stub_versioning_prep(frame, this, fd, ctx); + if (ret) + goto unwind; + + local = frame->local; + if (!inc_version) { + br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid, + BR_STUB_NO_VERSIONING, 0); + cbk = br_stub_ftruncate_cbk; + goto wind; + } + + stub = fop_ftruncate_stub(frame, br_stub_ftruncate_resume, fd, offset, + xdata); + if (!stub) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED, + "ftruncate gfid=%s", uuid_utoa(fd->inode->gfid), NULL); + goto cleanup_local; + } + + return br_stub_perform_incversioning(this, frame, stub, fd, ctx); + +wind: + STACK_WIND(frame, cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); + return 0; + +cleanup_local: + br_stub_cleanup_local(local); + br_stub_dealloc_local(local); + +unwind: + frame->local = NULL; + STACK_UNWIND_STRICT(ftruncate, frame, op_ret, op_errno, NULL, NULL, NULL); + + return 0; +} - if (!name) { - cbk = br_stub_listxattr_cbk; - goto wind; - } +int32_t +br_stub_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + int32_t ret = 0; + br_stub_local_t *local = NULL; - if (br_stub_is_internal_xattr (name)) { - STACK_UNWIND (frame, -1, EINVAL, NULL, NULL); - return 0; - } + local = frame->local; + frame->local = NULL; - /** - * this special extended attribute is allowed only on root - */ - if (name - && (strncmp (name, GLUSTERFS_GET_BR_STUB_INIT_TIME, - strlen (GLUSTERFS_GET_BR_STUB_INIT_TIME)) == 0) - && (gf_uuid_compare (fd->inode->gfid, rootgfid) == 0)) { - br_stub_send_stub_init_time (frame, this); - return 0; - } + if (op_ret < 0) + goto unwind; - if (!IA_ISREG (fd->inode->ia_type)) - goto wind; + ret = br_stub_mark_inode_modified(this, local); + if (ret) { + op_ret = -1; + op_errno = EINVAL; + } + +unwind: + STACK_UNWIND_STRICT(truncate, frame, op_ret, op_errno, prebuf, postbuf, + xdata); + br_stub_cleanup_local(local); + br_stub_dealloc_local(local); + return 0; +} - if (name && (strncmp (name, GLUSTERFS_GET_OBJECT_SIGNATURE, - strlen (GLUSTERFS_GET_OBJECT_SIGNATURE)) == 0)) { - cookie = (void *) BR_STUB_REQUEST_COOKIE; - } +int32_t +br_stub_truncate_resume(call_frame_t *frame, xlator_t *this, loc_t *loc, + off_t offset, dict_t *xdata) +{ + br_stub_local_t *local = frame->local; - wind: - STACK_WIND_COOKIE - (frame, cbk, cookie, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->fgetxattr, fd, name, xdata); - return 0; + fd_unref(local->u.context.fd); + STACK_WIND(frame, br_stub_ftruncate_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); + return 0; } -/** }}} */ +/** + * Bit-rot-stub depends heavily on the fd based operations to for doing + * versioning and sending notification. It starts tracking the operation + * upon getting first fd based modify operation by doing versioning and + * sends notification when last fd using which the inode was modified is + * released. + * But for truncate there is no fd and hence it becomes difficult to do + * the versioning and send notification. It is handled by doing versioning + * on an anonymous fd. The fd will be valid till the completion of the + * truncate call. It guarantees that release on this anonymous fd will happen + * after the truncate call and notification is sent after the truncate call. + * + * c.f. br_writev_cbk() for explanation + */ +int32_t +br_stub_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, + dict_t *xdata) +{ + br_stub_local_t *local = NULL; + call_stub_t *stub = NULL; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + gf_boolean_t inc_version = _gf_false; + gf_boolean_t modified = _gf_false; + br_stub_inode_ctx_t *ctx = NULL; + int32_t ret = -1; + fd_t *fd = NULL; + fop_truncate_cbk_t cbk = default_truncate_cbk; + br_stub_private_t *priv = NULL; + + GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); + GF_VALIDATE_OR_GOTO(this->name, this->private, unwind); + GF_VALIDATE_OR_GOTO(this->name, frame, unwind); + GF_VALIDATE_OR_GOTO(this->name, loc, unwind); + GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind); + + priv = this->private; + if (!priv->do_versioning) + goto wind; + + fd = fd_anonymous(loc->inode); + if (!fd) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CREATE_ANONYMOUS_FD_FAILED, + "inode-gfid=%s", uuid_utoa(loc->inode->gfid), NULL); + goto unwind; + } + + ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx); + if (ret) + goto cleanup_fd; + + ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno); + if (ret) + goto unwind; + + if (!inc_version && modified) + goto wind; + + ret = br_stub_versioning_prep(frame, this, fd, ctx); + if (ret) + goto cleanup_fd; + + local = frame->local; + if (!inc_version) { + br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid, + BR_STUB_NO_VERSIONING, 0); + cbk = br_stub_truncate_cbk; + goto wind; + } + + stub = fop_truncate_stub(frame, br_stub_truncate_resume, loc, offset, + xdata); + if (!stub) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED, + "truncate gfid=%s", uuid_utoa(fd->inode->gfid), NULL); + goto cleanup_local; + } + + return br_stub_perform_incversioning(this, frame, stub, fd, ctx); + +wind: + STACK_WIND(frame, cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, + loc, offset, xdata); + if (fd) + fd_unref(fd); + return 0; + +cleanup_local: + br_stub_cleanup_local(local); + br_stub_dealloc_local(local); +cleanup_fd: + fd_unref(fd); +unwind: + frame->local = NULL; + STACK_UNWIND_STRICT(truncate, frame, op_ret, op_errno, NULL, NULL, NULL); + + return 0; +} +/** }}} */ /** {{{ */ /* open() */ +/** + * It's probably worth mentioning a bit about why some of the housekeeping + * work is done in open() call path, rather than the callback path. + * Two (or more) open()'s in parallel can race and lead to a situation + * where a release() gets triggered (possibly after a series of write() + * calls) when *other* open()'s have still not reached callback path + * thereby having an active fd on an inode that is in process of getting + * signed with the current version. + * + * Maintaining fd list in the call path ensures that a release() would + * not be triggered if an open() call races ahead (followed by a close()) + * threby finding non-empty fd list. + */ + int -br_stub_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int op_ret, int op_errno, fd_t *fd, dict_t *xdata) +br_stub_open(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, + fd_t *fd, dict_t *xdata) { - int32_t ret = 0; - uint64_t ctx_addr = 0; - br_stub_inode_ctx_t *ctx = NULL; - call_stub_t *stub = NULL; + int32_t ret = -1; + br_stub_inode_ctx_t *ctx = NULL; + uint64_t ctx_addr = 0; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + br_stub_private_t *priv = NULL; + unsigned long version = BITROT_DEFAULT_CURRENT_VERSION; + + GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); + GF_VALIDATE_OR_GOTO(this->name, this->private, unwind); + GF_VALIDATE_OR_GOTO(this->name, loc, unwind); + GF_VALIDATE_OR_GOTO(this->name, fd, unwind); + GF_VALIDATE_OR_GOTO(this->name, fd->inode, unwind); + + priv = this->private; + + if (!priv->do_versioning) + goto wind; + + ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr); + if (ret) { + ret = br_stub_init_inode_versions(this, fd, fd->inode, version, + _gf_true, _gf_false, &ctx_addr); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, + BRS_MSG_GET_INODE_CONTEXT_FAILED, "path=%s", loc->path, + "gfid=%s", uuid_utoa(fd->inode->gfid), NULL); + goto unwind; + } + } + + ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; + + ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno); + if (ret) + goto unwind; + + if (frame->root->pid == GF_CLIENT_PID_SCRUB) + goto wind; + + if (flags == O_RDONLY) + goto wind; + + ret = br_stub_add_fd_to_inode(this, fd, ctx); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ADD_FD_TO_LIST_FAILED, + "gfid=%s", uuid_utoa(fd->inode->gfid), NULL); + goto unwind; + } + +wind: + STACK_WIND(frame, default_open_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->open, loc, flags, fd, xdata); + return 0; +unwind: + STACK_UNWIND_STRICT(open, frame, op_ret, op_errno, NULL, NULL); + return 0; +} - if (op_ret < 0) - goto unwind; - if (cookie != (void *) BR_STUB_REQUEST_COOKIE) - goto unwind; +/** }}} */ - ret = br_stub_get_inode_ctx (this, fd->inode, &ctx_addr); - if (ret < 0) - goto unwind; +/** {{{ */ - stub = fop_open_cbk_stub (frame, NULL, op_ret, op_errno, fd, xdata); - if (!stub) { - op_ret = -1; - op_errno = EINVAL; - goto unwind; - } +/* creat() */ - /** - * Ongoing version needs to be incremented. If the inode is not dirty, - * things are simple: increment the ongoing version safely and be done. - * If inode is dirty, a writeback to disk is required. This is tricky in - * case of multiple open()'s as ongoing version needs to be incremented - * on a successful writeback. It's probably safe to remember the ongoing - * version before writeback and *assigning* it in the callback, but that - * may lead to a trustable checksum to be treated as stale by scrubber - * (the case where the in-memory ongoing version is lesser than the - * on-disk version). Therefore, *all* open() calls (which might have - * come in parallel) try to synchronize the next ongoing version to - * disk. In the callback path, the winner marks the inode as synced - * therby loosing open() calls become no-op's. - */ - ctx = (br_stub_inode_ctx_t *) (long) ctx_addr; - return br_stub_perform_incversioning (this, frame, stub, fd, ctx); +/** + * This routine registers a release callback for the given fd and adds the + * fd to the inode context fd tracking list. + */ +int32_t +br_stub_add_fd_to_inode(xlator_t *this, fd_t *fd, br_stub_inode_ctx_t *ctx) +{ + int32_t ret = -1; + br_stub_fd_t *br_stub_fd = NULL; + + ret = br_stub_require_release_call(this, fd, &br_stub_fd); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_SET_FD_CONTEXT_FAILED, + "gfid=%s", uuid_utoa(fd->inode->gfid), NULL); + goto out; + } + + LOCK(&fd->inode->lock); + { + list_add_tail(&ctx->fd_list, &br_stub_fd->list); + } + UNLOCK(&fd->inode->lock); + + ret = 0; + +out: + return ret; +} - unwind: - STACK_UNWIND_STRICT (open, frame, - op_ret, op_errno, fd, xdata); - return 0; +int +br_stub_create_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, fd_t *fd, inode_t *inode, + struct iatt *stbuf, struct iatt *preparent, + struct iatt *postparent, dict_t *xdata) +{ + int32_t ret = 0; + uint64_t ctx_addr = 0; + br_stub_inode_ctx_t *ctx = NULL; + unsigned long version = BITROT_DEFAULT_CURRENT_VERSION; + br_stub_private_t *priv = NULL; + + priv = this->private; + + if (op_ret < 0) + goto unwind; + + if (!priv->do_versioning) + goto unwind; + + ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr); + if (ret < 0) { + ret = br_stub_init_inode_versions(this, fd, inode, version, _gf_true, + _gf_false, &ctx_addr); + if (ret) { + op_ret = -1; + op_errno = EINVAL; + } + } else { + ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; + ret = br_stub_add_fd_to_inode(this, fd, ctx); + } + +unwind: + STACK_UNWIND_STRICT(create, frame, op_ret, op_errno, fd, inode, stbuf, + preparent, postparent, xdata); + return 0; } int -br_stub_open (call_frame_t *frame, xlator_t *this, - loc_t *loc, int32_t flags, fd_t *fd, dict_t *xdata) +br_stub_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, + mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata) { - void *cookie = NULL; + GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); + GF_VALIDATE_OR_GOTO(this->name, loc, unwind); + GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind); + GF_VALIDATE_OR_GOTO(this->name, fd, unwind); + GF_VALIDATE_OR_GOTO(this->name, fd->inode, unwind); + + STACK_WIND(frame, br_stub_create_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd, + xdata); + return 0; +unwind: + STACK_UNWIND_STRICT(create, frame, -1, EINVAL, NULL, NULL, NULL, NULL, NULL, + NULL); + return 0; +} - if (!flags) - goto wind; - if (frame->root->pid == GF_CLIENT_PID_SCRUB) - goto wind; - cookie = (void *) BR_STUB_REQUEST_COOKIE; +int +br_stub_mknod_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int op_ret, + int op_errno, inode_t *inode, struct iatt *stbuf, + struct iatt *preparent, struct iatt *postparent, + dict_t *xdata) +{ + int32_t ret = -1; + unsigned long version = BITROT_DEFAULT_CURRENT_VERSION; + br_stub_private_t *priv = NULL; - wind: - STACK_WIND_COOKIE (frame, br_stub_open_cbk, cookie, - FIRST_CHILD (this), FIRST_CHILD (this)->fops->open, - loc, flags, fd, xdata); - return 0; + priv = this->private; + + if (op_ret < 0) + goto unwind; + + if (!priv->do_versioning) + goto unwind; + + ret = br_stub_init_inode_versions(this, NULL, inode, version, _gf_true, + _gf_false, NULL); + /** + * Like lookup, if init_inode_versions fail, return EINVAL + */ + if (ret) { + op_ret = -1; + op_errno = EINVAL; + } + +unwind: + STACK_UNWIND_STRICT(mknod, frame, op_ret, op_errno, inode, stbuf, preparent, + postparent, xdata); + return 0; +} + +int +br_stub_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, + dev_t dev, mode_t umask, dict_t *xdata) +{ + GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); + GF_VALIDATE_OR_GOTO(this->name, loc, unwind); + GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind); + + STACK_WIND(frame, br_stub_mknod_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->mknod, loc, mode, dev, umask, xdata); + return 0; +unwind: + STACK_UNWIND_STRICT(mknod, frame, -1, EINVAL, NULL, NULL, NULL, NULL, NULL); + return 0; } /** }}} */ +/** + * As of now, only lookup searches for bad object xattr and marks the + * object as bad in its inode context if the xattr is present. But there + * is a possibility that, at the time of the lookup the object was not + * marked bad (i.e. bad object xattr was not set), and later its marked + * as bad. In this case, object is not bad, so when a fop such as open or + * readv or writev comes on the object, the fop will be sent downward instead + * of sending as error upwards. + * The solution for this is to do a getxattr for the below list of fops. + * lookup, readdirp, open, readv, writev. + * But doing getxattr for each of the above fops might be costly. + * So another method followed is to catch the bad file marking by the scrubber + * and set that info within the object's inode context. In this way getxattr + * calls can be avoided and bad objects can be caught instantly. Fetching the + * xattr is needed only in lookups when there is a brick restart or inode + * forget. + * + * If the dict (@xattr) is NULL, then how should that be handled? Fail the + * lookup operation? Or let it continue with version being initialized to + * BITROT_DEFAULT_CURRENT_VERSION. But what if the version was different + * on disk (and also a right signature was there), but posix failed to + * successfully allocate the dict? Posix does not treat call back xdata + * creattion failure as the lookup failure. + */ +static int32_t +br_stub_lookup_version(xlator_t *this, uuid_t gfid, inode_t *inode, + dict_t *xattr) +{ + unsigned long version = 0; + br_version_t *obuf = NULL; + br_signature_t *sbuf = NULL; + br_vxattr_status_t status; + gf_boolean_t bad_object = _gf_false; + + /** + * versioning xattrs were requested from POSIX. if available, figure + * out the correct version to use in the inode context (start with + * the default version if unavailable). As of now versions are not + * persisted on-disk. The inode is marked dirty, so that the first + * operation (such as write(), etc..) triggers synchronization to + * disk. + */ + status = br_version_xattr_state(xattr, &obuf, &sbuf, &bad_object); + version = ((status == BR_VXATTR_STATUS_FULL) || + (status == BR_VXATTR_STATUS_UNSIGNED)) + ? obuf->ongoingversion + : BITROT_DEFAULT_CURRENT_VERSION; + + /** + * If signature is there, but version is not there then that status is + * is treated as INVALID. So in that case, we should not initialize the + * inode context with wrong version names etc. + */ + if (status == BR_VXATTR_STATUS_INVALID) + return -1; + + return br_stub_init_inode_versions(this, NULL, inode, version, _gf_true, + bad_object, NULL); +} /** {{{ */ -/* creat() */ +int32_t +br_stub_opendir(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd, + dict_t *xdata) +{ + br_stub_private_t *priv = NULL; + br_stub_fd_t *fd_ctx = NULL; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; -int -br_stub_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int op_ret, int op_errno, fd_t *fd, inode_t *inode, - struct iatt *stbuf, struct iatt *preparent, - struct iatt *postparent, dict_t *xdata) + priv = this->private; + if (gf_uuid_compare(fd->inode->gfid, priv->bad_object_dir_gfid)) + goto normal; + + fd_ctx = br_stub_fd_new(); + if (!fd_ctx) { + op_errno = ENOMEM; + goto unwind; + } + + fd_ctx->bad_object.dir_eof = -1; + fd_ctx->bad_object.dir = sys_opendir(priv->stub_basepath); + if (!fd_ctx->bad_object.dir) { + op_errno = errno; + goto err_freectx; + } + + op_ret = br_stub_fd_ctx_set(this, fd, fd_ctx); + if (!op_ret) + goto unwind; + + sys_closedir(fd_ctx->bad_object.dir); + +err_freectx: + GF_FREE(fd_ctx); +unwind: + STACK_UNWIND_STRICT(opendir, frame, op_ret, op_errno, fd, NULL); + return 0; + +normal: + STACK_WIND(frame, default_opendir_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->opendir, loc, fd, xdata); + return 0; +} + +int32_t +br_stub_readdir(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, + off_t off, dict_t *xdata) { - int32_t ret = 0; - uint64_t ctx_addr = 0; - call_stub_t *stub = NULL; - br_stub_inode_ctx_t *ctx = NULL; + call_stub_t *stub = NULL; + br_stub_private_t *priv = NULL; + + priv = this->private; + if (!priv->do_versioning) + goto out; + + if (gf_uuid_compare(fd->inode->gfid, priv->bad_object_dir_gfid)) + goto out; + stub = fop_readdir_stub(frame, br_stub_readdir_wrapper, fd, size, off, + xdata); + if (!stub) { + STACK_UNWIND_STRICT(readdir, frame, -1, ENOMEM, NULL, NULL); + return 0; + } + br_stub_worker_enqueue(this, stub); + return 0; +out: + STACK_WIND(frame, default_readdir_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readdir, fd, size, off, xdata); + return 0; +} - if (op_ret < 0) - goto unwind; +int +br_stub_readdirp_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, gf_dirent_t *entries, + dict_t *dict) +{ + int32_t ret = 0; + uint64_t ctxaddr = 0; + gf_dirent_t *entry = NULL; + br_stub_private_t *priv = NULL; + gf_boolean_t ver_enabled = _gf_false; + + BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled); + priv = this->private; + BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), unwind); + + if (op_ret < 0) + goto unwind; + + list_for_each_entry(entry, &entries->list, list) + { + if ((strcmp(entry->d_name, ".") == 0) || + (strcmp(entry->d_name, "..") == 0)) + continue; + + if (!IA_ISREG(entry->d_stat.ia_type)) + continue; + + /* + * Readdirp for most part is a bulk lookup for all the entries + * present in the directory being read. Ideally, for each + * entry, the handling should be similar to that of a lookup + * callback. But for now, just keeping this as it has been + * until now (which means, this comment has been added much + * later as part of a change that wanted to send the flag + * of true/false to br_stub_remove_vxattrs to indicate whether + * the bad-object xattr should be removed from the entry->dict + * or not). Until this change, the function br_stub_remove_vxattrs + * was just removing all the xattrs associated with bit-rot-stub + * (like version, bad-object, signature etc). But, there are + * scenarios where we only want to send bad-object xattr and not + * others. So this comment is part of that change which also + * mentions about another possible change that might be needed + * in future. + * But for now, adding _gf_true means functionally its same as + * what this function was doing before. Just remove all the stub + * related xattrs. + */ + ret = br_stub_get_inode_ctx(this, entry->inode, &ctxaddr); + if (ret < 0) + ctxaddr = 0; + if (ctxaddr) { /* already has the context */ + br_stub_remove_vxattrs(entry->dict, _gf_true); + continue; + } - stub = fop_create_cbk_stub (frame, NULL, op_ret, op_errno, fd, inode, - stbuf, preparent, postparent, xdata); - if (!stub) { - op_ret = -1; - op_errno = EINVAL; - goto unwind; + ret = br_stub_lookup_version(this, entry->inode->gfid, entry->inode, + entry->dict); + br_stub_remove_vxattrs(entry->dict, _gf_true); + if (ret) { + /** + * there's no per-file granularity support in case of + * failure. let's fail the entire request for now.. + */ + break; } + } - ret = br_stub_get_inode_ctx (this, fd->inode, &ctx_addr); - if (ret < 0) - ctx_addr = 0; - ctx = (br_stub_inode_ctx_t *) (long) ctx_addr; + if (ret) { + op_ret = -1; + op_errno = EINVAL; + } - /* see comment in br_stub_open_cbk().. */ - return (ctx) - ? br_stub_perform_incversioning (this, frame, stub, fd, ctx) - : br_stub_perform_fullversioning (this, frame, stub, fd); +unwind: + STACK_UNWIND_STRICT(readdirp, frame, op_ret, op_errno, entries, dict); - unwind: - STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, - fd, inode, stbuf, preparent, postparent, xdata); - return 0; + return 0; } int -br_stub_create (call_frame_t *frame, - xlator_t *this, loc_t *loc, int32_t flags, - mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata) +br_stub_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, + off_t offset, dict_t *dict) { - STACK_WIND (frame, br_stub_create_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->create, - loc, flags, mode, umask, fd, xdata); - return 0; + int32_t ret = -1; + int op_errno = 0; + gf_boolean_t xref = _gf_false; + br_stub_private_t *priv = NULL; + + priv = this->private; + BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); + + op_errno = ENOMEM; + if (!dict) { + dict = dict_new(); + if (!dict) + goto unwind; + } else { + dict = dict_ref(dict); + } + + xref = _gf_true; + + op_errno = EINVAL; + ret = dict_set_uint32(dict, BITROT_CURRENT_VERSION_KEY, 0); + if (ret) + goto unwind; + ret = dict_set_uint32(dict, BITROT_SIGNING_VERSION_KEY, 0); + if (ret) + goto unwind; + ret = dict_set_uint32(dict, BITROT_OBJECT_BAD_KEY, 0); + if (ret) + goto unwind; + +wind: + STACK_WIND(frame, br_stub_readdirp_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readdirp, fd, size, offset, dict); + goto unref_dict; + +unwind: + if (frame->local == (void *)0x1) + frame->local = NULL; + STACK_UNWIND_STRICT(readdirp, frame, -1, op_errno, NULL, NULL); + return 0; + +unref_dict: + if (xref) + dict_unref(dict); + return 0; } /** }}} */ -static inline int32_t -br_stub_lookup_version (xlator_t *this, - uuid_t gfid, inode_t *inode, dict_t *xattr) +/** {{{ */ + +/* lookup() */ + +/** + * This function mainly handles the ENOENT error for the bad objects. Though + * br_stub_forget () handles removal of the link for the bad object from the + * quarantine directory, its better to handle it in lookup as well, where + * a failed lookup on a bad object with ENOENT, will trigger deletion of the + * link for the bad object from quarantine directory. So whoever comes first + * either forget () or lookup () will take care of removing the link. + */ +void +br_stub_handle_lookup_error(xlator_t *this, inode_t *inode, int32_t op_errno) { - unsigned long version = 0; - br_version_t *obuf = NULL; - br_signature_t *sbuf = NULL; - br_vxattr_status_t status; + int32_t ret = -1; + uint64_t ctx_addr = 0; + br_stub_inode_ctx_t *ctx = NULL; - /** - * versioning xattrs were requested from POSIX. if available, figure - * out the correct version to use in the inode context (start with - * the default version if unavailable). As of now versions are not - * persisted on-disk. The inode is marked dirty, so that the first - * operation (such as open(), etc..) would trigger synchronization - * to disk. - */ - status = br_version_xattr_state (xattr, &obuf, &sbuf); + if (op_errno != ENOENT) + goto out; - /** - * stub does not know how to handle presence of signature but not - * the object version, therefore, in such cases, bail out.. + if (!inode_is_linked(inode)) + goto out; + + ret = br_stub_get_inode_ctx(this, inode, &ctx_addr); + if (ret) + goto out; + + ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; + + LOCK(&inode->lock); + { + if (__br_stub_is_bad_object(ctx)) + (void)br_stub_del(this, inode->gfid); + } + UNLOCK(&inode->lock); + + if (__br_stub_is_bad_object(ctx)) { + /* File is not present, might be deleted for recovery, + * del the bitrot inode context */ - if (status == BR_VXATTR_STATUS_INVALID) { - gf_log (this->name, GF_LOG_ERROR, "Invalid versioning xattrs. " - "Bailing out [GFID: %s]", uuid_utoa (gfid)); - return -1; + ctx_addr = 0; + inode_ctx_del(inode, this, &ctx_addr); + if (ctx_addr) { + ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; + GF_FREE(ctx); } + } - version = ((status == BR_VXATTR_STATUS_FULL) - || (status == BR_VXATTR_STATUS_UNSIGNED)) - ? obuf->ongoingversion : BITROT_DEFAULT_CURRENT_VERSION; - return br_stub_init_inode_versions (this, NULL, - inode, version, _gf_true); +out: + return; } +int +br_stub_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, inode_t *inode, struct iatt *stbuf, + dict_t *xattr, struct iatt *postparent) +{ + int32_t ret = 0; + br_stub_private_t *priv = NULL; + gf_boolean_t ver_enabled = _gf_false; + gf_boolean_t remove_bad_file_marker = _gf_true; -/** {{{ */ + BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled); + priv = this->private; -int -br_stub_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int op_ret, int op_errno, gf_dirent_t *entries, - dict_t *dict) -{ - int32_t ret = 0; - uint64_t ctxaddr = 0; - gf_dirent_t *entry = NULL; - - if (op_ret < 0) - goto unwind; - - list_for_each_entry (entry, &entries->list, list) { - if ((strcmp (entry->d_name, ".") == 0) - || (strcmp (entry->d_name, "..") == 0)) - continue; - - if (!IA_ISREG (entry->d_stat.ia_type)) - continue; - - ret = br_stub_get_inode_ctx (this, entry->inode, &ctxaddr); - if (ret < 0) - ctxaddr = 0; - if (ctxaddr) { /* already has the context */ - br_stub_remove_vxattrs (entry->dict); - continue; - } - - ret = br_stub_lookup_version - (this, entry->inode->gfid, entry->inode, entry->dict); - br_stub_remove_vxattrs (entry->dict); - if (ret) { - /** - * there's no per-file granularity support in case of - * failure. let's fail the entire request for now.. - */ - break; - } - } + if (op_ret < 0) { + (void)br_stub_handle_lookup_error(this, inode, op_errno); + /* + * If the lookup error is not ENOENT, then it is better + * to send the bad file marker to the higher layer (if + * it has been set) + */ + if (op_errno != ENOENT) + remove_bad_file_marker = _gf_false; + goto delkey; + } + + BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), delkey); + + if (!IA_ISREG(stbuf->ia_type)) + goto unwind; + + /** + * If the object is bad, then "bad inode" marker has to be sent back + * in resoinse, for revalidated lookups as well. Some xlators such as + * quick-read might cache the data in revalidated lookup as fresh + * lookup would anyway have sent "bad inode" marker. + * In general send bad inode marker for every lookup operation on the + * bad object. + */ + if (cookie != (void *)BR_STUB_REQUEST_COOKIE) { + ret = br_stub_mark_xdata_bad_object(this, inode, xattr); if (ret) { - op_ret = -1; - op_errno = EINVAL; + op_ret = -1; + op_errno = EIO; + /* + * This flag ensures that in the label @delkey below, + * bad file marker is not removed from the dictinary, + * but other virtual xattrs (such as version, signature) + * are removed. + */ + remove_bad_file_marker = _gf_false; } + goto delkey; + } - unwind: - STACK_UNWIND_STRICT (readdirp, frame, op_ret, op_errno, entries, dict); + ret = br_stub_lookup_version(this, stbuf->ia_gfid, inode, xattr); + if (ret < 0) { + op_ret = -1; + op_errno = EINVAL; + goto delkey; + } + + /** + * If the object is bad, send "bad inode" marker back in response + * for xlator(s) to act accordingly (such as quick-read, etc..) + */ + ret = br_stub_mark_xdata_bad_object(this, inode, xattr); + if (ret) { + /** + * aaha! bad object, but sorry we would not + * satisfy the request on allocation failures. + */ + op_ret = -1; + op_errno = EIO; + goto delkey; + } + +delkey: + br_stub_remove_vxattrs(xattr, remove_bad_file_marker); +unwind: + STACK_UNWIND_STRICT(lookup, frame, op_ret, op_errno, inode, stbuf, xattr, + postparent); + + return 0; +} + +int +br_stub_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) +{ + int32_t ret = 0; + int op_errno = 0; + void *cookie = NULL; + uint64_t ctx_addr = 0; + gf_boolean_t xref = _gf_false; + br_stub_private_t *priv = NULL; + call_stub_t *stub = NULL; + + GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); + GF_VALIDATE_OR_GOTO(this->name, loc, unwind); + GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind); + + priv = this->private; + + BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); + if (!gf_uuid_compare(loc->gfid, priv->bad_object_dir_gfid) || + !gf_uuid_compare(loc->pargfid, priv->bad_object_dir_gfid)) { + stub = fop_lookup_stub(frame, br_stub_lookup_wrapper, loc, xdata); + if (!stub) { + op_errno = ENOMEM; + goto unwind; + } + br_stub_worker_enqueue(this, stub); return 0; + } + + ret = br_stub_get_inode_ctx(this, loc->inode, &ctx_addr); + if (ret < 0) + ctx_addr = 0; + if (ctx_addr != 0) + goto wind; + + /** + * fresh lookup: request version keys from POSIX + */ + op_errno = ENOMEM; + if (!xdata) { + xdata = dict_new(); + if (!xdata) + goto unwind; + } else { + xdata = dict_ref(xdata); + } + + xref = _gf_true; + + /** + * Requesting both xattrs provides a way of sanity checking the + * object. Anomaly checking is done in cbk by examining absence + * of either or both xattrs. + */ + op_errno = EINVAL; + ret = dict_set_uint32(xdata, BITROT_CURRENT_VERSION_KEY, 0); + if (ret) + goto unwind; + ret = dict_set_uint32(xdata, BITROT_SIGNING_VERSION_KEY, 0); + if (ret) + goto unwind; + ret = dict_set_uint32(xdata, BITROT_OBJECT_BAD_KEY, 0); + if (ret) + goto unwind; + cookie = (void *)BR_STUB_REQUEST_COOKIE; + +wind: + STACK_WIND_COOKIE(frame, br_stub_lookup_cbk, cookie, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->lookup, loc, xdata); + goto dealloc_dict; + +unwind: + if (frame->local == (void *)0x1) + frame->local = NULL; + STACK_UNWIND_STRICT(lookup, frame, -1, op_errno, NULL, NULL, NULL, NULL); +dealloc_dict: + if (xref) + dict_unref(xdata); + return 0; } +/** }}} */ + +/** {{{ */ + +/* stat */ int -br_stub_readdirp (call_frame_t *frame, xlator_t *this, - fd_t *fd, size_t size, off_t offset, dict_t *dict) +br_stub_stat(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { - int32_t ret = -1; - int op_errno = 0; - gf_boolean_t xref = _gf_false; + int32_t ret = 0; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + br_stub_private_t *priv = NULL; - op_errno = ENOMEM; - if (!dict) { - dict = dict_new (); - if (!dict) - goto unwind; - } else { - dict = dict_ref (dict); - } + priv = this->private; - xref = _gf_true; + if (!priv->do_versioning) + goto wind; - op_errno = EINVAL; - ret = dict_set_uint32 (dict, BITROT_CURRENT_VERSION_KEY, 0); - if (ret) - goto unwind; - ret = dict_set_uint32 (dict, BITROT_SIGNING_VERSION_KEY, 0); - if (ret) - goto unwind; + if (!IA_ISREG(loc->inode->ia_type)) + goto wind; - STACK_WIND (frame, br_stub_readdirp_cbk, FIRST_CHILD (this), - FIRST_CHILD(this)->fops->readdirp, fd, size, - offset, dict); - goto unref_dict; + ret = br_stub_check_bad_object(this, loc->inode, &op_ret, &op_errno); + if (ret) + goto unwind; - unwind: - STACK_UNWIND_STRICT (readdirp, frame, -1, op_errno, NULL, NULL); - return 0; +wind: + STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat, + loc, xdata); + return 0; - unref_dict: - if (xref) - dict_unref (dict); - return 0; +unwind: + STACK_UNWIND_STRICT(stat, frame, op_ret, op_errno, NULL, NULL); + return 0; } -/** }}} */ +/* fstat */ +int +br_stub_fstat(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) +{ + int32_t ret = 0; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + br_stub_private_t *priv = NULL; + priv = this->private; -/** {{{ */ + if (!priv->do_versioning) + goto wind; -/* lookup() */ + if (!IA_ISREG(fd->inode->ia_type)) + goto wind; -int -br_stub_lookup_cbk (call_frame_t *frame, void *cookie, - xlator_t *this, int op_ret, int op_errno, inode_t *inode, - struct iatt *stbuf, dict_t *xattr, struct iatt *postparent) -{ - int32_t ret = 0; - - if (op_ret < 0) - goto unwind; - if (!IA_ISREG (stbuf->ia_type)) - goto unwind; - if (cookie != (void *) BR_STUB_REQUEST_COOKIE) - goto delkey; - - ret = br_stub_lookup_version (this, stbuf->ia_gfid, inode, xattr); - if (ret < 0) { - op_ret = -1; - op_errno = EINVAL; - } + ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno); + if (ret) + goto unwind; - delkey: - br_stub_remove_vxattrs (xattr); - unwind: - STACK_UNWIND_STRICT (lookup, frame, - op_ret, op_errno, inode, stbuf, xattr, postparent); +wind: + STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat, + fd, xdata); + return 0; - return 0; +unwind: + STACK_UNWIND_STRICT(fstat, frame, op_ret, op_errno, NULL, NULL); + return 0; } +/** }}} */ + +/** {{{ */ + +/* unlink() */ + int -br_stub_lookup (call_frame_t *frame, - xlator_t *this, loc_t *loc, dict_t *xdata) +br_stub_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *preparent, + struct iatt *postparent, dict_t *xdata) { - int32_t ret = 0; - int op_errno = 0; - void *cookie = NULL; - uint64_t ctx_addr = 0; - gf_boolean_t xref = _gf_false; + br_stub_local_t *local = NULL; + inode_t *inode = NULL; + uint64_t ctx_addr = 0; + br_stub_inode_ctx_t *ctx = NULL; + int32_t ret = -1; + br_stub_private_t *priv = NULL; + gf_boolean_t ver_enabled = _gf_false; + + BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled); + priv = this->private; + BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), unwind); + + local = frame->local; + frame->local = NULL; + + if (op_ret < 0) + goto unwind; + + if (!local) { + gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_NULL_LOCAL, NULL); + goto unwind; + } + inode = local->u.context.inode; + if (!IA_ISREG(inode->ia_type)) + goto unwind; + + ret = br_stub_get_inode_ctx(this, inode, &ctx_addr); + if (ret) { + /** + * If the inode is bad AND context is not there, then there + * is a possibility of the gfid of the object being listed + * in the quarantine directory and will be shown in the + * bad objects list. So continuing with the fop with a + * warning log. The entry from the quarantine directory + * has to be removed manually. Its not a good idea to fail + * the fop, as the object has already been deleted. + */ + gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_GET_INODE_CONTEXT_FAILED, + "inode-gfid=%s", uuid_utoa(inode->gfid), NULL); + goto unwind; + } - ret = br_stub_get_inode_ctx (this, loc->inode, &ctx_addr); - if (ret < 0) - ctx_addr = 0; - if (ctx_addr != 0) - goto wind; + ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; + LOCK(&inode->lock); + { /** - * fresh lookup: request version keys from POSIX + * Ignoring the return value of br_stub_del (). + * There is not much that can be done if unlinking + * of the entry in the quarantine directory fails. + * The failure is logged. */ + if (__br_stub_is_bad_object(ctx)) + (void)br_stub_del(this, inode->gfid); + } + UNLOCK(&inode->lock); + +unwind: + STACK_UNWIND_STRICT(unlink, frame, op_ret, op_errno, preparent, postparent, + xdata); + br_stub_cleanup_local(local); + br_stub_dealloc_local(local); + return 0; +} + +int +br_stub_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int flag, + dict_t *xdata) +{ + br_stub_local_t *local = NULL; + int32_t op_ret = -1; + int32_t op_errno = 0; + br_stub_private_t *priv = NULL; + + priv = this->private; + BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); + + local = br_stub_alloc_local(this); + if (!local) { + op_ret = -1; op_errno = ENOMEM; - if (!xdata) { - xdata = dict_new (); - if (!xdata) - goto unwind; - } else { - xdata = dict_ref (xdata); - } + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, BRS_MSG_ALLOC_MEM_FAILED, + "local path=%s", loc->path, "gfid=%s", + uuid_utoa(loc->inode->gfid), NULL); + goto unwind; + } - xref = _gf_true; + br_stub_fill_local(local, NULL, NULL, loc->inode, loc->inode->gfid, + BR_STUB_NO_VERSIONING, 0); - /** - * Requesting both xattrs provides a way of sanity checking the - * object. Anomaly checking is done in cbk by examining absence - * of either or both xattrs. - */ - op_errno = EINVAL; - ret = dict_set_uint32 (xdata, BITROT_CURRENT_VERSION_KEY, 0); - if (ret) - goto unwind; - ret = dict_set_uint32 (xdata, BITROT_SIGNING_VERSION_KEY, 0); - if (ret) - goto unwind; - cookie = (void *) BR_STUB_REQUEST_COOKIE; + frame->local = local; - wind: - STACK_WIND_COOKIE (frame, br_stub_lookup_cbk, cookie, - FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup, - loc, xdata); - goto dealloc_dict; +wind: + STACK_WIND(frame, br_stub_unlink_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->unlink, loc, flag, xdata); + return 0; - unwind: - STACK_UNWIND_STRICT (lookup, frame, - -1, op_errno, NULL, NULL, NULL, NULL); - dealloc_dict: - if (xref) - dict_unref (xdata); - return 0; +unwind: + if (frame->local == (void *)0x1) + frame->local = NULL; + STACK_UNWIND_STRICT(unlink, frame, op_ret, op_errno, NULL, NULL, NULL); + return 0; } /** }}} */ @@ -1227,19 +3279,20 @@ br_stub_lookup (call_frame_t *frame, /* forget() */ int -br_stub_forget (xlator_t *this, inode_t *inode) +br_stub_forget(xlator_t *this, inode_t *inode) { - uint64_t ctx_addr = 0; - br_stub_inode_ctx_t *ctx = NULL; + uint64_t ctx_addr = 0; + br_stub_inode_ctx_t *ctx = NULL; + + inode_ctx_del(inode, this, &ctx_addr); + if (!ctx_addr) + return 0; - inode_ctx_del (inode, this, &ctx_addr); - if (!ctx_addr) - return 0; + ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; - ctx = (br_stub_inode_ctx_t *) (long) ctx_addr; - GF_FREE (ctx); + GF_FREE(ctx); - return 0; + return 0; } /** }}} */ @@ -1247,94 +3300,188 @@ br_stub_forget (xlator_t *this, inode_t *inode) /** {{{ */ int32_t -br_stub_noop (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, dict_t *xdata) +br_stub_noop(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, + int32_t op_errno, dict_t *xdata) { - STACK_DESTROY (frame->root); - return 0; + STACK_DESTROY(frame->root); + return 0; } -static inline void -br_stub_send_ipc_fop (xlator_t *this, - fd_t *fd, unsigned long releaseversion, int32_t flags) +static void +br_stub_send_ipc_fop(xlator_t *this, fd_t *fd, unsigned long releaseversion, + int sign_info) { - int32_t op = 0; - int32_t ret = 0; - dict_t *xdata = NULL; - call_frame_t *frame = NULL; - changelog_event_t ev = {0,}; + int32_t op = 0; + int32_t ret = 0; + dict_t *xdata = NULL; + call_frame_t *frame = NULL; + changelog_event_t ev = { + 0, + }; + + ev.ev_type = CHANGELOG_OP_TYPE_BR_RELEASE; + ev.u.releasebr.version = releaseversion; + ev.u.releasebr.sign_info = sign_info; + gf_uuid_copy(ev.u.releasebr.gfid, fd->inode->gfid); + + xdata = dict_new(); + if (!xdata) { + gf_smsg(this->name, GF_LOG_WARNING, ENOMEM, BRS_MSG_DICT_ALLOC_FAILED, + NULL); + goto out; + } + + ret = dict_set_static_bin(xdata, "RELEASE-EVENT", &ev, CHANGELOG_EV_SIZE); + if (ret) { + gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SET_EVENT_FAILED, NULL); + goto dealloc_dict; + } - ev.ev_type = CHANGELOG_OP_TYPE_BR_RELEASE; - ev.u.releasebr.flags = flags; - ev.u.releasebr.version = releaseversion; - gf_uuid_copy (ev.u.releasebr.gfid, fd->inode->gfid); + frame = create_frame(this, this->ctx->pool); + if (!frame) { + gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_CREATE_FRAME_FAILED, + NULL); + goto dealloc_dict; + } - xdata = dict_new (); - if (!xdata) { - gf_log (this->name, GF_LOG_WARNING, - "dict allocation failed: cannot send IPC FOP " - "to changelog"); - goto out; - } + op = GF_IPC_TARGET_CHANGELOG; + STACK_WIND(frame, br_stub_noop, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->ipc, op, xdata); - ret = dict_set_static_bin (xdata, - "RELEASE-EVENT", &ev, CHANGELOG_EV_SIZE); - if (ret) { - gf_log (this->name, GF_LOG_WARNING, - "cannot set release event in dict"); - goto dealloc_dict; - } +dealloc_dict: + dict_unref(xdata); +out: + return; +} - frame = create_frame (this, this->ctx->pool); - if (!frame) { - gf_log (this->name, GF_LOG_WARNING, "create_frame() failure"); - goto dealloc_dict; - } +/** + * This is how the state machine of sign info works: + * 3 states: + * 1) BR_SIGN_NORMAL => The default State of the inode + * 2) BR_SIGN_REOPEN_WAIT => A release has been sent and is waiting for reopen + * 3) BR_SIGN_QUICK => reopen has happened and this release should trigger sign + * 2 events: + * 1) GF_FOP_RELEASE + * 2) GF_FOP_WRITE (actually a dummy write for BitD) + * + * This is how states are changed based on events: + * EVENT: GF_FOP_RELEASE: + * if (state == BR_SIGN_NORMAL) ; then + * set state = BR_SIGN_REOPEN_WAIT; + * if (state == BR_SIGN_QUICK); then + * set state = BR_SIGN_NORMAL; + * EVENT: GF_FOP_WRITE: + * if (state == BR_SIGN_REOPEN_WAIT); then + * set state = BR_SIGN_QUICK; + */ +br_sign_state_t +__br_stub_inode_sign_state(br_stub_inode_ctx_t *ctx, glusterfs_fop_t fop, + fd_t *fd) +{ + br_sign_state_t sign_info = BR_SIGN_INVALID; - op = GF_IPC_TARGET_CHANGELOG; - STACK_WIND (frame, br_stub_noop, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->ipc, op, xdata); + switch (fop) { + case GF_FOP_FSETXATTR: + sign_info = ctx->info_sign = BR_SIGN_QUICK; + break; - dealloc_dict: - dict_unref (xdata); - out: - return; + case GF_FOP_RELEASE: + GF_ASSERT(ctx->info_sign != BR_SIGN_REOPEN_WAIT); + + if (ctx->info_sign == BR_SIGN_NORMAL) { + sign_info = ctx->info_sign = BR_SIGN_REOPEN_WAIT; + } else { + sign_info = ctx->info_sign; + ctx->info_sign = BR_SIGN_NORMAL; + } + + break; + default: + break; + } + + return sign_info; } int32_t -br_stub_release (xlator_t *this, fd_t *fd) +br_stub_release(xlator_t *this, fd_t *fd) { - int32_t ret = 0; - int32_t flags = 0; - inode_t *inode = NULL; - unsigned long releaseversion = 0; - br_stub_inode_ctx_t *ctx = NULL; + int32_t ret = 0; + int32_t flags = 0; + inode_t *inode = NULL; + unsigned long releaseversion = 0; + br_stub_inode_ctx_t *ctx = NULL; + uint64_t tmp = 0; + br_stub_fd_t *br_stub_fd = NULL; + int32_t signinfo = 0; + + inode = fd->inode; + + LOCK(&inode->lock); + { + ctx = __br_stub_get_ongoing_version_ctx(this, inode, NULL); + if (ctx == NULL) + goto unblock; + br_stub_fd = br_stub_fd_ctx_get(this, fd); + if (br_stub_fd) { + list_del_init(&br_stub_fd->list); + } - inode = fd->inode; + ret = __br_stub_can_trigger_release(inode, ctx, &releaseversion); + if (!ret) + goto unblock; - LOCK (&inode->lock); - { - ctx = __br_stub_get_ongoing_version_ctx (this, inode, NULL); - if (ctx == NULL) - goto unblock; - __br_stub_track_release (ctx); - ret = __br_stub_can_trigger_release - (inode, ctx, &releaseversion, &flags); - if (ret) { - GF_ASSERT (__br_stub_is_inode_dirty (ctx) == 0); - __br_stub_mark_inode_dirty (ctx); - } - } - unblock: - UNLOCK (&inode->lock); + signinfo = __br_stub_inode_sign_state(ctx, GF_FOP_RELEASE, fd); + signinfo = htonl(signinfo); - if (ret) { - gf_log (this->name, GF_LOG_DEBUG, - "releaseversion: %lu|flags: %d", releaseversion, flags); - br_stub_send_ipc_fop (this, fd, releaseversion, flags); + /* inode back to initital state: mark dirty */ + if (ctx->info_sign == BR_SIGN_NORMAL) { + __br_stub_mark_inode_dirty(ctx); + __br_stub_unset_inode_modified(ctx); } + } +unblock: + UNLOCK(&inode->lock); - return 0; + if (ret) { + gf_msg_debug(this->name, 0, + "releaseversion: %lu | flags: %d " + "| signinfo: %d", + (unsigned long)ntohl(releaseversion), flags, + ntohl(signinfo)); + br_stub_send_ipc_fop(this, fd, releaseversion, signinfo); + } + + ret = fd_ctx_del(fd, this, &tmp); + br_stub_fd = (br_stub_fd_t *)(long)tmp; + + GF_FREE(br_stub_fd); + + return 0; +} + +int32_t +br_stub_releasedir(xlator_t *this, fd_t *fd) +{ + br_stub_fd_t *fctx = NULL; + uint64_t ctx = 0; + int ret = 0; + + ret = fd_ctx_del(fd, this, &ctx); + if (ret < 0) + goto out; + + fctx = (br_stub_fd_t *)(long)ctx; + if (fctx->bad_object.dir) { + ret = sys_closedir(fctx->bad_object.dir); + if (ret) + gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_DIR_CLOSE_FAIL, + "error=%s", strerror(errno), NULL); + } + + GF_FREE(fctx); +out: + return 0; } /** }}} */ @@ -1344,84 +3491,100 @@ br_stub_release (xlator_t *this, fd_t *fd) /* ictxmerge */ void -br_stub_ictxmerge (xlator_t *this, fd_t *fd, - inode_t *inode, inode_t *linked_inode) +br_stub_ictxmerge(xlator_t *this, fd_t *fd, inode_t *inode, + inode_t *linked_inode) { - int32_t ret = 0; - uint64_t ctxaddr = 0; - uint64_t lctxaddr = 0; - br_stub_inode_ctx_t *ctx = NULL; - br_stub_inode_ctx_t *lctx = NULL; + int32_t ret = 0; + uint64_t ctxaddr = 0; + uint64_t lctxaddr = 0; + br_stub_inode_ctx_t *ctx = NULL; + br_stub_inode_ctx_t *lctx = NULL; + br_stub_fd_t *br_stub_fd = NULL; + + ret = br_stub_get_inode_ctx(this, inode, &ctxaddr); + if (ret < 0) + goto done; + ctx = (br_stub_inode_ctx_t *)(uintptr_t)ctxaddr; - ret = br_stub_get_inode_ctx (this, inode, &ctxaddr); + LOCK(&linked_inode->lock); + { + ret = __br_stub_get_inode_ctx(this, linked_inode, &lctxaddr); if (ret < 0) - goto done; - ctx = (br_stub_inode_ctx_t *) ctxaddr; - - LOCK (&linked_inode->lock); - { - ret = __br_stub_get_inode_ctx (this, linked_inode, &lctxaddr); - if (ret < 0) - goto unblock; - lctx = (br_stub_inode_ctx_t *) lctxaddr; - - if (__br_stub_is_inode_dirty (lctx)) { - /** - * RACY code: An inode can end up in this situation - * after a lookup() or after a create() followed by - * a release(). Even if we distinguish b/w the two, - * there needs to be more infrastructure built up - * in stub to handle these races. Note, that it's - * probably OK to ignore the race iff the version - * was initialized on the very first lookup(), i.e., - * [ongoingversion: default]. - * - * FIXME: fixup races [create(1..n)/lookup(1..n)]. - */ - GF_ASSERT (lctx->currentversion - == BITROT_DEFAULT_CURRENT_VERSION); - __br_stub_track_openfd (fd, lctx); - __br_stub_mark_inode_synced (lctx); - } else { - GF_ASSERT (ctx->currentversion <= lctx->currentversion); - __br_stub_track_openfd (fd, lctx); - } + goto unblock; + lctx = (br_stub_inode_ctx_t *)(uintptr_t)lctxaddr; + + GF_ASSERT(list_is_singular(&ctx->fd_list)); + br_stub_fd = list_first_entry(&ctx->fd_list, br_stub_fd_t, list); + if (br_stub_fd) { + GF_ASSERT(br_stub_fd->fd == fd); + list_move_tail(&br_stub_fd->list, &lctx->fd_list); } - unblock: - UNLOCK (&linked_inode->lock); + } +unblock: + UNLOCK(&linked_inode->lock); - done: - return; +done: + return; } /** }}} */ - struct xlator_fops fops = { - .lookup = br_stub_lookup, - .open = br_stub_open, - .create = br_stub_create, - .readdirp = br_stub_readdirp, - .getxattr = br_stub_getxattr, - .fgetxattr = br_stub_fgetxattr, - .fsetxattr = br_stub_fsetxattr, + .lookup = br_stub_lookup, + .stat = br_stub_stat, + .fstat = br_stub_fstat, + .open = br_stub_open, + .create = br_stub_create, + .readdirp = br_stub_readdirp, + .getxattr = br_stub_getxattr, + .fgetxattr = br_stub_fgetxattr, + .fsetxattr = br_stub_fsetxattr, + .writev = br_stub_writev, + .truncate = br_stub_truncate, + .ftruncate = br_stub_ftruncate, + .mknod = br_stub_mknod, + .readv = br_stub_readv, + .removexattr = br_stub_removexattr, + .fremovexattr = br_stub_fremovexattr, + .setxattr = br_stub_setxattr, + .opendir = br_stub_opendir, + .readdir = br_stub_readdir, + .unlink = br_stub_unlink, }; struct xlator_cbks cbks = { - .forget = br_stub_forget, - .release = br_stub_release, - .ictxmerge = br_stub_ictxmerge, + .forget = br_stub_forget, + .release = br_stub_release, + .ictxmerge = br_stub_ictxmerge, }; struct volume_options options[] = { - { .key = {"bitrot"}, - .type = GF_OPTION_TYPE_BOOL, - .default_value = "on", - .description = "enable/disable bitrot stub" - }, - { .key = {"export"}, - .type = GF_OPTION_TYPE_PATH, - .description = "brick path for versioning" - }, - { .key = {NULL} }, + {.key = {"bitrot"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "off", + .op_version = {GD_OP_VERSION_3_7_0}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_FORCE, + .tags = {"bitrot"}, + .description = "enable/disable bitrot stub"}, + {.key = {"export"}, + .type = GF_OPTION_TYPE_PATH, + .op_version = {GD_OP_VERSION_3_7_0}, + .tags = {"bitrot"}, + .description = "brick path for versioning", + .default_value = "{{ brick.path }}"}, + {.key = {NULL}}, +}; + +xlator_api_t xlator_api = { + .init = init, + .fini = fini, + .notify = notify, + .reconfigure = reconfigure, + .mem_acct_init = mem_acct_init, + .op_version = {1}, /* Present from the initial version */ + .fops = &fops, + .cbks = &cbks, + .options = options, + .identifier = "bitrot-stub", + .category = GF_MAINTAINED, }; |
