diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog.c')
-rw-r--r-- | xlators/features/changelog/src/changelog.c | 473 |
1 files changed, 293 insertions, 180 deletions
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index 4263a462ad7..e7d8522ae8c 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -19,14 +19,13 @@ #include "iobuf.h" #include "changelog-rt.h" -#include "changelog-helpers.h" #include "changelog-encoders.h" #include "changelog-mem-types.h" #include <pthread.h> -#include "changelog-notifier.h" +#include "changelog-rpc.h" static struct changelog_bootstrap cb_bootstrap[] = { @@ -912,14 +911,30 @@ changelog_create_cbk (call_frame_t *frame, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { + int32_t ret = 0; changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; + changelog_event_t ev = {0,}; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + /* fill the event structure.. similar to open() */ + ev.ev_type = CHANGELOG_OP_TYPE_CREATE; + uuid_copy (ev.u.create.gfid, buf->ia_gfid); + ev.u.create.flags = fd->flags; + changelog_dispatch_event (this, priv, &ev); + + if (changelog_ev_selected + (this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) { + ret = fd_ctx_set (fd, this, (uint64_t)(long) 0x1); + if (ret) + gf_log (this->name, GF_LOG_WARNING, + "could not set fd context (for release cbk)"); + } + changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: @@ -1633,6 +1648,92 @@ changelog_writev (call_frame_t *frame, /* }}} */ +/* open, release and other beasts */ + +/* {{{ */ + + + +int +changelog_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, fd_t *fd, dict_t *xdata) +{ + int ret = 0; + void *opaque = NULL; + char *buf = NULL; + ssize_t buflen = 0; + changelog_priv_t *priv = NULL; + changelog_event_t ev = {0,}; + gf_boolean_t logopen = _gf_false; + + priv = this->private; + if (frame->local) { + frame->local = NULL; + logopen = _gf_true; + } + + CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !logopen), unwind); + + /* fill the event structure */ + ev.ev_type = CHANGELOG_OP_TYPE_OPEN; + uuid_copy (ev.u.open.gfid, fd->inode->gfid); + ev.u.open.flags = fd->flags; + changelog_dispatch_event (this, priv, &ev); + + if (changelog_ev_selected + (this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) { + ret = fd_ctx_set (fd, this, (uint64_t)(long) 0x1); + if (ret) + gf_log (this->name, GF_LOG_WARNING, + "could not set fd context (for release cbk)"); + } + + unwind: + CHANGELOG_STACK_UNWIND (open, frame, op_ret, op_errno, fd, xdata); + return 0; +} + +int +changelog_open (call_frame_t *frame, xlator_t *this, + loc_t *loc, int flags, fd_t *fd, dict_t *xdata) +{ + changelog_priv_t *priv = NULL; + + priv = this->private; + CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); + + frame->local = (void *)0x1; /* do not dereference in ->cbk */ + + wind: + STACK_WIND (frame, changelog_open_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->open, loc, flags, fd, xdata); + return 0; +} + +/* }}} */ + +/* {{{ */ + +int32_t +changelog_release (xlator_t *this, fd_t *fd) +{ + changelog_event_t ev = {0,}; + changelog_priv_t *priv = NULL; + + priv = this->private; + + ev.ev_type = CHANGELOG_OP_TYPE_RELEASE; + uuid_copy (ev.u.release.gfid, fd->inode->gfid); + changelog_dispatch_event (this, priv, &ev); + + (void) fd_ctx_del (fd, this, NULL); + + return 0; +} + + +/* }}} */ + /** * The * - @init () @@ -1679,7 +1780,7 @@ changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv) int ret = 0; if (priv->cr.rollover_th) { - changelog_thread_cleanup (this, priv->cr.rollover_th); + (void) changelog_thread_cleanup (this, priv->cr.rollover_th); priv->cr.rollover_th = 0; ret = close (priv->cr_wfd); if (ret) @@ -1689,7 +1790,7 @@ changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv) } if (priv->cf.fsync_th) { - changelog_thread_cleanup (this, priv->cf.fsync_th); + (void) changelog_thread_cleanup (this, priv->cf.fsync_th); priv->cf.fsync_th = 0; } } @@ -1754,67 +1855,6 @@ changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv) return ret; } -/* cleanup the notifier thread */ -static int -changelog_cleanup_notifier (xlator_t *this, changelog_priv_t *priv) -{ - int ret = 0; - - if (priv->cn.notify_th) { - changelog_thread_cleanup (this, priv->cn.notify_th); - priv->cn.notify_th = 0; - - ret = close (priv->wfd); - if (ret) - gf_log (this->name, GF_LOG_ERROR, - "error closing writer end of notifier pipe" - " (reason: %s)", strerror (errno)); - } - - return ret; -} - -/* spawn the notifier thread - nop if already running */ -static int -changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv) -{ - int ret = 0; - int flags = 0; - int pipe_fd[2] = {0, 0}; - - if (priv->cn.notify_th) - goto out; /* notifier thread already running */ - - ret = pipe (pipe_fd); - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "Cannot create pipe (reason: %s)", strerror (errno)); - goto out; - } - - /* writer is non-blocking */ - flags = fcntl (pipe_fd[1], F_GETFL); - flags |= O_NONBLOCK; - - ret = fcntl (pipe_fd[1], F_SETFL, flags); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "failed to set O_NONBLOCK flag"); - goto out; - } - - priv->wfd = pipe_fd[1]; - - priv->cn.this = this; - priv->cn.rfd = pipe_fd[0]; - - ret = gf_thread_create (&priv->cn.notify_th, - NULL, changelog_notifier, priv); - - out: - return ret; -} - int notify (xlator_t *this, int event, void *data, ...) { @@ -2054,11 +2094,6 @@ changelog_init (xlator_t *this, changelog_priv_t *priv) if (!priv->active) return ret; - /* spawn the notifier thread */ - ret = changelog_spawn_notifier (this, priv); - if (ret) - goto out; - /** * start with a fresh changelog file every time. this is done * in case there was an encoding change. so... things are kept @@ -2086,9 +2121,11 @@ changelog_init (xlator_t *this, changelog_priv_t *priv) return ret; } -/* Init all pthread condition variables and locks in changelog*/ +/** + * Init barrier related condition variables and locks + */ static int -changelog_pthread_init (xlator_t *this, changelog_priv_t *priv) +changelog_barrier_pthread_init (xlator_t *this, changelog_priv_t *priv) { gf_boolean_t bn_mutex_init = _gf_false; gf_boolean_t bn_cond_init = _gf_false; @@ -2165,9 +2202,9 @@ changelog_pthread_init (xlator_t *this, changelog_priv_t *priv) return ret; } -/* Destroy all pthread condition variables and locks in changelog */ +/* Destroy barrier related condition variables and locks */ static inline void -changelog_pthread_destroy (changelog_priv_t *priv) +changelog_barrier_pthread_destroy (changelog_priv_t *priv) { pthread_mutex_destroy (&priv->bn.bnotify_mutex); pthread_cond_destroy (&priv->bn.bnotify_cond); @@ -2284,17 +2321,13 @@ reconfigure (xlator_t *this, dict_t *options) } htime_open(this, priv, tv.tv_sec); } - ret = changelog_spawn_notifier (this, priv); - if (!ret) - ret = changelog_spawn_helper_threads (this, - priv); - } else - ret = changelog_cleanup_notifier (this, priv); + ret = changelog_spawn_helper_threads (this, priv); + } } out: if (ret) { - ret = changelog_cleanup_notifier (this, priv); + /* TODO */ } else { gf_log (this->name, GF_LOG_DEBUG, "changelog reconfigured"); @@ -2305,67 +2338,40 @@ reconfigure (xlator_t *this, dict_t *options) return ret; } -int32_t -init (xlator_t *this) +static void +changelog_freeup_options (xlator_t *this, changelog_priv_t *priv) { - int ret = -1; - char *tmp = NULL; - changelog_priv_t *priv = NULL; - gf_boolean_t cond_lock_init = _gf_false; - char htime_dir[PATH_MAX] = {0,}; - char csnap_dir[PATH_MAX] = {0,}; - uint32_t timeout = 0; - - GF_VALIDATE_OR_GOTO ("changelog", this, out); - - if (!this->children || this->children->next) { - gf_log (this->name, GF_LOG_ERROR, - "translator needs a single subvolume"); - goto out; - } - - if (!this->parents) { - gf_log (this->name, GF_LOG_ERROR, - "dangling volume. please check volfile"); - goto out; - } - - priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t); - if (!priv) - goto out; + int ret = 0; - this->local_pool = mem_pool_new (changelog_local_t, 64); - if (!this->local_pool) { + ret = priv->cb->dtor (this, &priv->cd); + if (ret) gf_log (this->name, GF_LOG_ERROR, - "failed to create local memory pool"); - goto out; - } - - LOCK_INIT (&priv->lock); - LOCK_INIT (&priv->c_snap_lock); + "could not cleanup bootstrapper"); + GF_FREE (priv->changelog_brick); + GF_FREE (priv->changelog_dir); +} - GF_OPTION_INIT ("changelog-brick", tmp, str, out); - if (!tmp) { - gf_log (this->name, GF_LOG_ERROR, - "\"changelog-brick\" option is not set"); - goto out; - } +static int +changelog_init_options (xlator_t *this, changelog_priv_t *priv) +{ + int ret = 0; + char *tmp = NULL; + uint32_t timeout = 0; + char htime_dir[PATH_MAX] = {0,}; + char csnap_dir[PATH_MAX] = {0,}; + GF_OPTION_INIT ("changelog-brick", tmp, str, error_return); priv->changelog_brick = gf_strdup (tmp); if (!priv->changelog_brick) - goto out; - tmp = NULL; + goto error_return; - GF_OPTION_INIT ("changelog-dir", tmp, str, out); - if (!tmp) { - gf_log (this->name, GF_LOG_ERROR, - "\"changelog-dir\" option is not set"); - goto out; - } + tmp = NULL; + GF_OPTION_INIT ("changelog-dir", tmp, str, dealloc_1); priv->changelog_dir = gf_strdup (tmp); if (!priv->changelog_dir) - goto out; + goto dealloc_1; + tmp = NULL; /** @@ -2375,35 +2381,38 @@ init (xlator_t *this) ret = mkdir_p (priv->changelog_dir, 0600, _gf_true); if (ret) - goto out; + goto dealloc_2; - CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir); + CHANGELOG_FILL_HTIME_DIR (priv->changelog_dir, htime_dir); ret = mkdir_p (htime_dir, 0600, _gf_true); if (ret) - goto out; + goto dealloc_2; - CHANGELOG_FILL_CSNAP_DIR(priv->changelog_dir, csnap_dir); + CHANGELOG_FILL_CSNAP_DIR (priv->changelog_dir, csnap_dir); ret = mkdir_p (csnap_dir, 0600, _gf_true); if (ret) - goto out; + goto dealloc_2; - GF_OPTION_INIT ("changelog", priv->active, bool, out); + GF_OPTION_INIT ("changelog", priv->active, bool, dealloc_2); - GF_OPTION_INIT ("op-mode", tmp, str, out); + GF_OPTION_INIT ("op-mode", tmp, str, dealloc_2); changelog_assign_opmode (priv, tmp); tmp = NULL; - GF_OPTION_INIT ("encoding", tmp, str, out); + GF_OPTION_INIT ("encoding", tmp, str, dealloc_2); changelog_assign_encoding (priv, tmp); + changelog_encode_change (priv); - GF_OPTION_INIT ("rollover-time", priv->rollover_time, int32, out); + GF_OPTION_INIT ("rollover-time", + priv->rollover_time, int32, dealloc_2); - GF_OPTION_INIT ("fsync-interval", priv->fsync_interval, int32, out); - GF_OPTION_INIT ("changelog-barrier-timeout", timeout, time, out); - priv->timeout.tv_sec = timeout; + GF_OPTION_INIT ("fsync-interval", + priv->fsync_interval, int32, dealloc_2); - changelog_encode_change(priv); + GF_OPTION_INIT ("changelog-barrier-timeout", + timeout, time, dealloc_2); + changelog_assign_barrier_timeout (priv, timeout); GF_ASSERT (cb_bootstrap[priv->op_mode].mode == priv->op_mode); priv->cb = &cb_bootstrap[priv->op_mode]; @@ -2411,10 +2420,111 @@ init (xlator_t *this) /* ... now bootstrap the logger */ ret = priv->cb->ctor (this, &priv->cd); if (ret) - goto out; + goto dealloc_2; priv->changelog_fd = -1; + return 0; + + dealloc_2: + GF_FREE (priv->changelog_dir); + dealloc_1: + GF_FREE (priv->changelog_brick); + error_return: + return -1; +} + +static void +changelog_cleanup_rpc (xlator_t *this, changelog_priv_t *priv) +{ + /* terminate rpc server */ + changelog_destroy_rpc_listner (this, priv); + + /* cleanup rot buffs */ + rbuf_dtor (priv->rbuf); + + /* cleanup poller thread */ + (void) changelog_thread_cleanup (this, priv->poller); +} + +static int +changelog_init_rpc (xlator_t *this, changelog_priv_t *priv) +{ + int ret = 0; + rpcsvc_t *rpc = NULL; + changelog_ev_selector_t *selection = NULL; + + selection = &priv->ev_selection; + + /* initialize event selection */ + changelog_init_event_selection (this, selection); + + ret = pthread_create (&priv->poller, NULL, changelog_rpc_poller, this); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to spawn poller thread"); + goto error_return; + } + + priv->rbuf = rbuf_init (NR_ROTT_BUFFS); + if (!priv->rbuf) + goto cleanup_thread; + + rpc = changelog_init_rpc_listner (this, priv, + priv->rbuf, NR_DISPATCHERS); + if (!rpc) + goto cleanup_rbuf; + priv->rpc = rpc; + + return 0; + + cleanup_rbuf: + rbuf_dtor (priv->rbuf); + cleanup_thread: + (void) changelog_thread_cleanup (this, priv->poller); + error_return: + return -1; +} + +int32_t +init (xlator_t *this) +{ + int ret = -1; + char *tmp = NULL; + changelog_priv_t *priv = NULL; + + GF_VALIDATE_OR_GOTO ("changelog", this, error_return); + + if (!this->children || this->children->next) { + gf_log (this->name, GF_LOG_ERROR, + "translator needs a single subvolume"); + goto error_return; + } + + if (!this->parents) { + gf_log (this->name, GF_LOG_ERROR, + "dangling volume. please check volfile"); + goto error_return; + } + + priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t); + if (!priv) + goto error_return; + + this->local_pool = mem_pool_new (changelog_local_t, 64); + if (!this->local_pool) { + gf_log (this->name, GF_LOG_ERROR, + "failed to create local memory pool"); + goto cleanup_priv; + } + + LOCK_INIT (&priv->lock); + LOCK_INIT (&priv->c_snap_lock); + + ret = changelog_init_options (this, priv); + if (ret) + goto cleanup_mempool; + /* snap dependency changes */ priv->dm.black_fop_cnt = 0; priv->dm.white_fop_cnt = 0; @@ -2422,67 +2532,68 @@ init (xlator_t *this) priv->dm.drain_wait_white = _gf_false; priv->current_color = FOP_COLOR_BLACK; priv->explicit_rollover = _gf_false; + /* Mutex is not needed as threads are not spawned yet */ priv->bn.bnotify = _gf_false; - ret = changelog_pthread_init (this, priv); + ret = changelog_barrier_pthread_init (this, priv); if (ret) - goto out; - + goto cleanup_options; LOCK_INIT (&priv->bflags.lock); - cond_lock_init = _gf_true; priv->bflags.barrier_ext = _gf_false; /* Changelog barrier init */ INIT_LIST_HEAD (&priv->queue); priv->barrier_enabled = _gf_false; - ret = changelog_init (this, priv); + /* RPC ball rolling.. */ + ret = changelog_init_rpc (this, priv); if (ret) - goto out; + goto cleanup_barrier; + ret = changelog_init (this, priv); + if (ret) + goto cleanup_rpc; gf_log (this->name, GF_LOG_DEBUG, "changelog translator loaded"); - out: - if (ret) { - if (this && this->local_pool) - mem_pool_destroy (this->local_pool); - if (priv) { - if (priv->cb) { - ret = priv->cb->dtor (this, &priv->cd); - if (ret) - gf_log (this->name, GF_LOG_ERROR, - "error in cleanup during init()"); - } - GF_FREE (priv->changelog_brick); - GF_FREE (priv->changelog_dir); - if (cond_lock_init) - changelog_pthread_destroy (priv); - GF_FREE (priv); - } - this->private = NULL; - } else - this->private = priv; + this->private = priv; + return 0; - return ret; + cleanup_rpc: + changelog_cleanup_rpc (this, priv); + cleanup_barrier: + changelog_barrier_pthread_destroy (priv); + cleanup_options: + changelog_freeup_options (this, priv); + cleanup_mempool: + mem_pool_destroy (this->local_pool); + cleanup_priv: + GF_FREE (priv); + error_return: + this->private = NULL; + return -1; } void fini (xlator_t *this) { - int ret = -1; changelog_priv_t *priv = NULL; priv = this->private; if (priv) { - ret = priv->cb->dtor (this, &priv->cd); - if (ret) - gf_log (this->name, GF_LOG_ERROR, - "error in fini"); + /* terminate RPC server/threads */ + changelog_cleanup_rpc (this, priv); + + /* cleanup barrier related objects */ + changelog_barrier_pthread_destroy (priv); + + /* cleanup allocated options */ + changelog_freeup_options (this, priv); + + /* deallocate mempool */ mem_pool_destroy (this->local_pool); - GF_FREE (priv->changelog_brick); - GF_FREE (priv->changelog_dir); - changelog_pthread_destroy (priv); + + /* finally, dealloac private variable */ GF_FREE (priv); } @@ -2492,6 +2603,7 @@ fini (xlator_t *this) } struct xlator_fops fops = { + .open = changelog_open, .mknod = changelog_mknod, .mkdir = changelog_mkdir, .create = changelog_create, @@ -2513,6 +2625,7 @@ struct xlator_fops fops = { struct xlator_cbks cbks = { .forget = changelog_forget, + .release = changelog_release, }; struct volume_options options[] = { |