diff options
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog.c')
| -rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog.c | 652 |
1 files changed, 652 insertions, 0 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c new file mode 100644 index 00000000000..57c3d39ef76 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog.c @@ -0,0 +1,652 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <errno.h> +#include <dirent.h> +#include <stddef.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <sys/time.h> +#include <sys/resource.h> + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include <string.h> + +#include <glusterfs/globals.h> +#include <glusterfs/glusterfs.h> +#include <glusterfs/logging.h> +#include <glusterfs/defaults.h> +#include <glusterfs/syncop.h> + +#include "gf-changelog-rpc.h" +#include "gf-changelog-helpers.h" + +/* from the changelog translator */ +#include "changelog-misc.h" +#include "changelog-mem-types.h" +#include "changelog-lib-messages.h" + +/** + * Global singleton xlator pointer for the library, initialized + * during library load. This should probably be hidden inside + * an initialized object which is an handle for the consumer. + * + * TODO: do away with the global.. + */ +xlator_t *master = NULL; + +static inline gf_private_t * +gf_changelog_alloc_priv() +{ + int ret = 0; + gf_private_t *priv = NULL; + + priv = GF_CALLOC(1, sizeof(*priv), gf_changelog_mt_priv_t); + if (!priv) + goto error_return; + INIT_LIST_HEAD(&priv->connections); + INIT_LIST_HEAD(&priv->cleanups); + + ret = pthread_mutex_init(&priv->lock, NULL); + if (ret != 0) + goto free_priv; + ret = pthread_cond_init(&priv->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + + priv->api = NULL; + return priv; + +cleanup_mutex: + (void)pthread_mutex_destroy(&priv->lock); +free_priv: + GF_FREE(priv); +error_return: + return NULL; +} + +#define GF_CHANGELOG_EVENT_POOL_SIZE 16384 +#define GF_CHANGELOG_EVENT_THREAD_COUNT 4 + +static int +gf_changelog_ctx_defaults_init(glusterfs_ctx_t *ctx) +{ + cmd_args_t *cmd_args = NULL; + struct rlimit lim = { + 0, + }; + call_pool_t *pool = NULL; + int ret = -1; + + ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end); + if (ret != 0) + return -1; + + ctx->process_uuid = generate_glusterfs_ctx_id(); + if (!ctx->process_uuid) + return -1; + + ctx->page_size = 128 * GF_UNIT_KB; + + ctx->iobuf_pool = iobuf_pool_new(); + if (!ctx->iobuf_pool) + goto free_pool; + + ctx->event_pool = gf_event_pool_new(GF_CHANGELOG_EVENT_POOL_SIZE, + GF_CHANGELOG_EVENT_THREAD_COUNT); + if (!ctx->event_pool) + goto free_pool; + + pool = GF_CALLOC(1, sizeof(call_pool_t), + gf_changelog_mt_libgfchangelog_call_pool_t); + if (!pool) + goto free_pool; + + /* frame_mem_pool size 112 * 64 */ + pool->frame_mem_pool = mem_pool_new(call_frame_t, 32); + if (!pool->frame_mem_pool) + goto free_pool; + + /* stack_mem_pool size 256 * 128 */ + pool->stack_mem_pool = mem_pool_new(call_stack_t, 16); + + if (!pool->stack_mem_pool) + goto free_pool; + + ctx->stub_mem_pool = mem_pool_new(call_stub_t, 16); + if (!ctx->stub_mem_pool) + goto free_pool; + + ctx->dict_pool = mem_pool_new(dict_t, 32); + if (!ctx->dict_pool) + goto free_pool; + + ctx->dict_pair_pool = mem_pool_new(data_pair_t, 512); + if (!ctx->dict_pair_pool) + goto free_pool; + + ctx->dict_data_pool = mem_pool_new(data_t, 512); + if (!ctx->dict_data_pool) + goto free_pool; + + ctx->logbuf_pool = mem_pool_new(log_buf_t, 256); + if (!ctx->logbuf_pool) + goto free_pool; + + INIT_LIST_HEAD(&pool->all_frames); + LOCK_INIT(&pool->lock); + ctx->pool = pool; + + LOCK_INIT(&ctx->lock); + + cmd_args = &ctx->cmd_args; + + INIT_LIST_HEAD(&cmd_args->xlator_options); + + lim.rlim_cur = RLIM_INFINITY; + lim.rlim_max = RLIM_INFINITY; + setrlimit(RLIMIT_CORE, &lim); + + return 0; + +free_pool: + if (pool) { + GF_FREE(pool->frame_mem_pool); + + GF_FREE(pool->stack_mem_pool); + + GF_FREE(pool); + } + + GF_FREE(ctx->stub_mem_pool); + + GF_FREE(ctx->dict_pool); + + GF_FREE(ctx->dict_pair_pool); + + GF_FREE(ctx->dict_data_pool); + + GF_FREE(ctx->logbuf_pool); + + GF_FREE(ctx->iobuf_pool); + + GF_FREE(ctx->event_pool); + + return -1; +} + +/* TODO: cleanup ctx defaults */ +void +gf_changelog_cleanup_this(xlator_t *this) +{ + glusterfs_ctx_t *ctx = NULL; + + if (!this) + return; + + ctx = this->ctx; + syncenv_destroy(ctx->env); + free(ctx); + + this->private = NULL; + this->ctx = NULL; + + mem_pools_fini(); +} + +static int +gf_changelog_init_context() +{ + glusterfs_ctx_t *ctx = NULL; + + ctx = glusterfs_ctx_new(); + if (!ctx) + goto error_return; + + if (glusterfs_globals_init(ctx)) + goto free_ctx; + + THIS->ctx = ctx; + if (gf_changelog_ctx_defaults_init(ctx)) + goto free_ctx; + + ctx->env = syncenv_new(0, 0, 0); + if (!ctx->env) + goto free_ctx; + return 0; + +free_ctx: + free(ctx); + THIS->ctx = NULL; +error_return: + return -1; +} + +static int +gf_changelog_init_master() +{ + int ret = 0; + + ret = gf_changelog_init_context(); + mem_pools_init(); + + return ret; +} + +/* TODO: cleanup clnt/svc on failure */ +int +gf_changelog_setup_rpc(xlator_t *this, gf_changelog_t *entry, int proc) +{ + int ret = 0; + rpcsvc_t *svc = NULL; + struct rpc_clnt *rpc = NULL; + + /** + * Initialize a connect back socket. A probe() RPC call to the server + * triggers a reverse connect. + */ + svc = gf_changelog_reborp_init_rpc_listner(this, entry->brick, + RPC_SOCK(entry), entry); + if (!svc) + goto error_return; + RPC_REBORP(entry) = svc; + + /* Initialize an RPC client */ + rpc = gf_changelog_rpc_init(this, entry); + if (!rpc) + goto error_return; + RPC_PROBER(entry) = rpc; + + /** + * @FIXME + * till we have connection state machine, let's delay the RPC call + * for now.. + */ + sleep(2); + + /** + * Probe changelog translator for reverse connection. After a successful + * call, there's less use of the client and can be disconnected, but + * let's leave the connection active for any future RPC calls. + */ + ret = gf_changelog_invoke_rpc(this, entry, proc); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED, + "Could not initiate probe RPC, bailing out!!!"); + goto error_return; + } + + return 0; + +error_return: + return -1; +} + +int +gf_cleanup_event(xlator_t *this, struct gf_event_list *ev) +{ + int ret = 0; + + ret = gf_thread_cleanup(this, ev->invoker); + if (ret) { + gf_msg(this->name, GF_LOG_WARNING, -ret, + CHANGELOG_LIB_MSG_CLEANUP_ERROR, + "cannot cleanup callback invoker thread." + " Not freeing resources"); + return -1; + } + + ev->entry = NULL; + + return 0; +} + +static int +gf_init_event(gf_changelog_t *entry) +{ + int ret = 0; + struct gf_event_list *ev = NULL; + + ev = &entry->event; + ev->entry = entry; + + ret = pthread_mutex_init(&ev->lock, NULL); + if (ret != 0) + goto error_return; + ret = pthread_cond_init(&ev->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + INIT_LIST_HEAD(&ev->events); + + ev->next_seq = 0; /* bootstrap sequencing */ + + if (GF_NEED_ORDERED_EVENTS(entry)) { + entry->pickevent = pick_event_ordered; + entry->queueevent = queue_ordered_event; + } else { + entry->pickevent = pick_event_unordered; + entry->queueevent = queue_unordered_event; + } + + ret = gf_thread_create(&ev->invoker, NULL, gf_changelog_callback_invoker, + ev, "clogcbki"); + if (ret != 0) { + entry->pickevent = NULL; + entry->queueevent = NULL; + goto cleanup_cond; + } + + return 0; + +cleanup_cond: + (void)pthread_cond_destroy(&ev->cond); +cleanup_mutex: + (void)pthread_mutex_destroy(&ev->lock); +error_return: + return -1; +} + +/** + * TODO: + * - cleanup invoker thread + * - cleanup event list + * - destroy rpc{-clnt, svc} + */ +int +gf_cleanup_brick_connection(xlator_t *this, gf_changelog_t *entry) +{ + return 0; +} + +int +gf_cleanup_connections(xlator_t *this) +{ + return 0; +} + +static int +gf_setup_brick_connection(xlator_t *this, struct gf_brick_spec *brick, + gf_boolean_t ordered, void *xl) +{ + int ret = 0; + gf_private_t *priv = NULL; + gf_changelog_t *entry = NULL; + + priv = this->private; + + if (!brick->callback || !brick->init || !brick->fini) + goto error_return; + + entry = GF_CALLOC(1, sizeof(*entry), gf_changelog_mt_libgfchangelog_t); + if (!entry) + goto error_return; + INIT_LIST_HEAD(&entry->list); + + LOCK_INIT(&entry->statelock); + entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING; + + entry->notify = brick->filter; + if (snprintf(entry->brick, PATH_MAX, "%s", brick->brick_path) >= PATH_MAX) + goto free_entry; + + entry->this = this; + entry->invokerxl = xl; + + entry->ordered = ordered; + ret = gf_init_event(entry); + if (ret) + goto free_entry; + + entry->fini = brick->fini; + entry->callback = brick->callback; + entry->connected = brick->connected; + entry->disconnected = brick->disconnected; + + entry->ptr = brick->init(this, brick); + if (!entry->ptr) + goto cleanup_event; + priv->api = entry->ptr; /* pointer to API, if required */ + + pthread_mutex_lock(&priv->lock); + { + list_add_tail(&entry->list, &priv->connections); + } + pthread_mutex_unlock(&priv->lock); + + ret = gf_changelog_setup_rpc(this, entry, CHANGELOG_RPC_PROBE_FILTER); + if (ret) + goto cleanup_event; + return 0; + +cleanup_event: + (void)gf_cleanup_event(this, &entry->event); +free_entry: + gf_msg_debug(this->name, 0, "freeing entry %p", entry); + list_del(&entry->list); /* FIXME: kludge for now */ + GF_FREE(entry); +error_return: + return -1; +} + +int +gf_changelog_register_brick(xlator_t *this, struct gf_brick_spec *brick, + gf_boolean_t ordered, void *xl) +{ + return gf_setup_brick_connection(this, brick, ordered, xl); +} + +static int +gf_changelog_setup_logging(xlator_t *this, char *logfile, int loglevel) +{ + /* passing ident as NULL means to use default ident for syslog */ + if (gf_log_init(this->ctx, logfile, NULL)) + return -1; + + gf_log_set_loglevel(this->ctx, (loglevel == -1) ? GF_LOG_INFO : loglevel); + return 0; +} + +static int +gf_changelog_set_master(xlator_t *master, void *xl) +{ + int32_t ret = 0; + xlator_t *this = NULL; + xlator_t *old_this = NULL; + gf_private_t *priv = NULL; + + this = xl; + if (!this || !this->ctx) { + ret = gf_changelog_init_master(); + if (ret) + return -1; + this = THIS; + } + + master->ctx = this->ctx; + + INIT_LIST_HEAD(&master->volume_options); + SAVE_THIS(THIS); + + ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end); + if (ret != 0) + goto restore_this; + + priv = gf_changelog_alloc_priv(); + if (!priv) { + ret = -1; + goto restore_this; + } + + if (!xl) { + /* poller thread */ + ret = gf_thread_create(&priv->poller, NULL, changelog_rpc_poller, THIS, + "clogpoll"); + if (ret != 0) { + GF_FREE(priv); + gf_msg(master->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, + "failed to spawn poller thread"); + goto restore_this; + } + } + + master->private = priv; + +restore_this: + RESTORE_THIS(); + + return ret; +} + +int +gf_changelog_init(void *xl) +{ + int ret = 0; + gf_private_t *priv = NULL; + + if (master) + return 0; + + master = calloc(1, sizeof(*master)); + if (!master) + goto error_return; + + master->name = strdup("gfchangelog"); + if (!master->name) + goto dealloc_master; + + ret = gf_changelog_set_master(master, xl); + if (ret) + goto dealloc_name; + + priv = master->private; + ret = gf_thread_create(&priv->connectionjanitor, NULL, + gf_changelog_connection_janitor, master, "clogjan"); + if (ret != 0) { + /* TODO: cleanup priv, mutex (poller thread for !xl) */ + goto dealloc_name; + } + + return 0; + +dealloc_name: + free(master->name); +dealloc_master: + free(master); + master = NULL; +error_return: + return -1; +} + +int +gf_changelog_register_generic(struct gf_brick_spec *bricks, int count, + int ordered, char *logfile, int lvl, void *xl) +{ + int ret = 0; + xlator_t *this = NULL; + xlator_t *old_this = NULL; + struct gf_brick_spec *brick = NULL; + gf_boolean_t need_order = _gf_false; + + SAVE_THIS(xl); + + this = THIS; + if (!this) + goto error_return; + + ret = gf_changelog_setup_logging(this, logfile, lvl); + if (ret) + goto error_return; + + need_order = (ordered) ? _gf_true : _gf_false; + + brick = bricks; + while (count--) { + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, "brick=%s", + brick->brick_path, "notify_filter=%d", brick->filter, NULL); + + ret = gf_changelog_register_brick(this, brick, need_order, xl); + if (ret != 0) { + gf_msg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED, + "Error registering with changelog xlator"); + break; + } + + brick++; + } + + if (ret != 0) + goto cleanup_inited_bricks; + + RESTORE_THIS(); + return 0; + +cleanup_inited_bricks: + gf_cleanup_connections(this); +error_return: + RESTORE_THIS(); + return -1; +} + +/** + * @API + * gf_changelog_register() + * + * This is _NOT_ a generic register API. It's a special API to handle + * updates at a journal granulality. This is used by consumers wanting + * to process persistent journal such as geo-replication via a set of + * APIs. All of this is required to maintain backward compatibility. + * Owner specific private data is stored in ->api (in gf_private_t), + * which is used by APIs to access it's private data. This limits + * the library access to a single brick, but that's how it used to + * be anyway. Furthermore, this API solely _owns_ "this", therefore + * callers already having a notion of "this" are expected to use the + * newer API. + * + * Newer applications wanting to use this library need not face this + * limitation and reply of the much more feature rich generic register + * API, which is purely callback based. + * + * NOTE: @max_reconnects is not used but required for backward compat. + * + * For generic API, refer gf_changelog_register_generic(). + */ +int +gf_changelog_register(char *brick_path, char *scratch_dir, char *log_file, + int log_level, int max_reconnects) +{ + struct gf_brick_spec brick = { + 0, + }; + + if (master) + THIS = master; + else + return -1; + + brick.brick_path = brick_path; + brick.filter = CHANGELOG_OP_TYPE_JOURNAL; + + brick.init = gf_changelog_journal_init; + brick.fini = gf_changelog_journal_fini; + brick.callback = gf_changelog_handle_journal; + brick.connected = gf_changelog_journal_connect; + brick.disconnected = gf_changelog_journal_disconnect; + + brick.ptr = scratch_dir; + + return gf_changelog_register_generic(&brick, 1, 1, log_file, log_level, + NULL); +} |
