diff options
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog.c')
| -rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog.c | 998 |
1 files changed, 533 insertions, 465 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c index f3f6ffbe976..57c3d39ef76 100644 --- a/xlators/features/changelog/lib/src/gf-changelog.c +++ b/xlators/features/changelog/lib/src/gf-changelog.c @@ -1,5 +1,5 @@ /* - Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser @@ -14,571 +14,639 @@ #include <sys/types.h> #include <sys/socket.h> #include <sys/un.h> +#include <sys/time.h> +#include <sys/resource.h> #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include <string.h> -#include "globals.h" -#include "glusterfs.h" -#include "logging.h" -#include "defaults.h" +#include <glusterfs/globals.h> +#include <glusterfs/glusterfs.h> +#include <glusterfs/logging.h> +#include <glusterfs/defaults.h> +#include <glusterfs/syncop.h> +#include "gf-changelog-rpc.h" #include "gf-changelog-helpers.h" /* from the changelog translator */ #include "changelog-misc.h" #include "changelog-mem-types.h" +#include "changelog-lib-messages.h" -int byebye = 0; +/** + * Global singleton xlator pointer for the library, initialized + * during library load. This should probably be hidden inside + * an initialized object which is an handle for the consumer. + * + * TODO: do away with the global.. + */ +xlator_t *master = NULL; -static void -gf_changelog_cleanup (gf_changelog_t *gfc) +static inline gf_private_t * +gf_changelog_alloc_priv() { - /* socket */ - if (gfc->gfc_sockfd != -1) - close (gfc->gfc_sockfd); - /* tracker fd */ - if (gfc->gfc_fd != -1) - close (gfc->gfc_fd); - /* processing dir */ - if (gfc->gfc_dir) - closedir (gfc->gfc_dir); - - if (gfc->gfc_working_dir) - free (gfc->gfc_working_dir); /* allocated by realpath */ + int ret = 0; + gf_private_t *priv = NULL; + + priv = GF_CALLOC(1, sizeof(*priv), gf_changelog_mt_priv_t); + if (!priv) + goto error_return; + INIT_LIST_HEAD(&priv->connections); + INIT_LIST_HEAD(&priv->cleanups); + + ret = pthread_mutex_init(&priv->lock, NULL); + if (ret != 0) + goto free_priv; + ret = pthread_cond_init(&priv->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + + priv->api = NULL; + return priv; + +cleanup_mutex: + (void)pthread_mutex_destroy(&priv->lock); +free_priv: + GF_FREE(priv); +error_return: + return NULL; } -void -__attribute__ ((constructor)) gf_changelog_ctor (void) +#define GF_CHANGELOG_EVENT_POOL_SIZE 16384 +#define GF_CHANGELOG_EVENT_THREAD_COUNT 4 + +static int +gf_changelog_ctx_defaults_init(glusterfs_ctx_t *ctx) { - glusterfs_ctx_t *ctx = NULL; + cmd_args_t *cmd_args = NULL; + struct rlimit lim = { + 0, + }; + call_pool_t *pool = NULL; + int ret = -1; + + ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end); + if (ret != 0) + return -1; - ctx = glusterfs_ctx_new (); - if (!ctx) - return; + ctx->process_uuid = generate_glusterfs_ctx_id(); + if (!ctx->process_uuid) + return -1; - if (glusterfs_globals_init (ctx)) { - free (ctx); - ctx = NULL; - return; - } + ctx->page_size = 128 * GF_UNIT_KB; - THIS->ctx = ctx; - if (xlator_mem_acct_init (THIS, gf_changelog_mt_end)) - return; -} + ctx->iobuf_pool = iobuf_pool_new(); + if (!ctx->iobuf_pool) + goto free_pool; -void -__attribute__ ((destructor)) gf_changelog_dtor (void) -{ - xlator_t *this = NULL; - glusterfs_ctx_t *ctx = NULL; - gf_changelog_t *gfc = NULL; + ctx->event_pool = gf_event_pool_new(GF_CHANGELOG_EVENT_POOL_SIZE, + GF_CHANGELOG_EVENT_THREAD_COUNT); + if (!ctx->event_pool) + goto free_pool; - this = THIS; - if (!this) - return; - - ctx = this->ctx; - gfc = this->private; - - if (gfc) { - if (gfc->hist_gfc) { - gf_changelog_cleanup(gfc->hist_gfc); - GF_FREE (gfc->hist_gfc); - } - gf_changelog_cleanup (gfc); - GF_FREE (gfc); - } + pool = GF_CALLOC(1, sizeof(call_pool_t), + gf_changelog_mt_libgfchangelog_call_pool_t); + if (!pool) + goto free_pool; - if (ctx) { - pthread_mutex_destroy (&ctx->lock); - free (ctx); - ctx = NULL; - } -} + /* frame_mem_pool size 112 * 64 */ + pool->frame_mem_pool = mem_pool_new(call_frame_t, 32); + if (!pool->frame_mem_pool) + goto free_pool; + /* stack_mem_pool size 256 * 128 */ + pool->stack_mem_pool = mem_pool_new(call_stack_t, 16); -static int -gf_changelog_open_dirs (gf_changelog_t *gfc) -{ - int ret = -1; - DIR *dir = NULL; - int tracker_fd = 0; - char tracker_path[PATH_MAX] = {0,}; - - (void) snprintf (gfc->gfc_current_dir, PATH_MAX, - "%s/"GF_CHANGELOG_CURRENT_DIR"/", - gfc->gfc_working_dir); - ret = mkdir_p (gfc->gfc_current_dir, 0600, _gf_false); - if (ret) - goto out; + if (!pool->stack_mem_pool) + goto free_pool; - (void) snprintf (gfc->gfc_processed_dir, PATH_MAX, - "%s/"GF_CHANGELOG_PROCESSED_DIR"/", - gfc->gfc_working_dir); - ret = mkdir_p (gfc->gfc_processed_dir, 0600, _gf_false); - if (ret) - goto out; + ctx->stub_mem_pool = mem_pool_new(call_stub_t, 16); + if (!ctx->stub_mem_pool) + goto free_pool; - (void) snprintf (gfc->gfc_processing_dir, PATH_MAX, - "%s/"GF_CHANGELOG_PROCESSING_DIR"/", - gfc->gfc_working_dir); - ret = mkdir_p (gfc->gfc_processing_dir, 0600, _gf_false); - if (ret) - goto out; + ctx->dict_pool = mem_pool_new(dict_t, 32); + if (!ctx->dict_pool) + goto free_pool; - dir = opendir (gfc->gfc_processing_dir); - if (!dir) { - gf_log ("", GF_LOG_ERROR, - "opendir() error [reason: %s]", strerror (errno)); - goto out; - } + ctx->dict_pair_pool = mem_pool_new(data_pair_t, 512); + if (!ctx->dict_pair_pool) + goto free_pool; - gfc->gfc_dir = dir; + ctx->dict_data_pool = mem_pool_new(data_t, 512); + if (!ctx->dict_data_pool) + goto free_pool; - (void) snprintf (tracker_path, PATH_MAX, - "%s/"GF_CHANGELOG_TRACKER, gfc->gfc_working_dir); + ctx->logbuf_pool = mem_pool_new(log_buf_t, 256); + if (!ctx->logbuf_pool) + goto free_pool; - tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR, - S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); - if (tracker_fd < 0) { - closedir (gfc->gfc_dir); - ret = -1; - goto out; - } + INIT_LIST_HEAD(&pool->all_frames); + LOCK_INIT(&pool->lock); + ctx->pool = pool; - gfc->gfc_fd = tracker_fd; - ret = 0; - out: - return ret; -} + LOCK_INIT(&ctx->lock); -int -gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc) -{ - int ret = 0; - int len = 0; - int tries = 0; - int sockfd = 0; - struct sockaddr_un remote; - - this = gfc->this; - - if (gfc->gfc_sockfd != -1) { - gf_log (this->name, GF_LOG_INFO, - "Reconnecting..."); - close (gfc->gfc_sockfd); - } + cmd_args = &ctx->cmd_args; - sockfd = socket (AF_UNIX, SOCK_STREAM, 0); - if (sockfd < 0) { - ret = -1; - goto out; - } + INIT_LIST_HEAD(&cmd_args->xlator_options); - CHANGELOG_MAKE_SOCKET_PATH (gfc->gfc_brickpath, - gfc->gfc_sockpath, UNIX_PATH_MAX); - gf_log (this->name, GF_LOG_INFO, - "connecting to changelog socket: %s (brick: %s)", - gfc->gfc_sockpath, gfc->gfc_brickpath); + lim.rlim_cur = RLIM_INFINITY; + lim.rlim_max = RLIM_INFINITY; + setrlimit(RLIMIT_CORE, &lim); - remote.sun_family = AF_UNIX; - strcpy (remote.sun_path, gfc->gfc_sockpath); + return 0; - len = strlen (remote.sun_path) + sizeof (remote.sun_family); +free_pool: + if (pool) { + GF_FREE(pool->frame_mem_pool); - while (tries < gfc->gfc_connretries) { - gf_log (this->name, GF_LOG_WARNING, - "connection attempt %d/%d...", - tries + 1, gfc->gfc_connretries); + GF_FREE(pool->stack_mem_pool); - /* initiate a connect */ - if (connect (sockfd, (struct sockaddr *) &remote, len) == 0) { - gfc->gfc_sockfd = sockfd; - break; - } + GF_FREE(pool); + } - tries++; - sleep (2); - } + GF_FREE(ctx->stub_mem_pool); - if (tries == gfc->gfc_connretries) { - gf_log (this->name, GF_LOG_ERROR, - "could not connect to changelog socket!" - " bailing out..."); - close (sockfd); - ret = -1; - } else - gf_log (this->name, GF_LOG_INFO, - "connection successful"); - - out: - return ret; -} + GF_FREE(ctx->dict_pool); -int -gf_changelog_done (char *file) -{ - int ret = -1; - char *buffer = NULL; - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - char to_path[PATH_MAX] = {0,}; + GF_FREE(ctx->dict_pair_pool); - errno = EINVAL; + GF_FREE(ctx->dict_data_pool); - this = THIS; - if (!this) - goto out; - - gfc = (gf_changelog_t *) this->private; - if (!gfc) - goto out; - - if (!file || !strlen (file)) - goto out; - - /* make sure 'file' is inside ->gfc_working_dir */ - buffer = realpath (file, NULL); - if (!buffer) - goto out; - - if (strncmp (gfc->gfc_working_dir, - buffer, strlen (gfc->gfc_working_dir))) - goto out; - - (void) snprintf (to_path, PATH_MAX, "%s%s", - gfc->gfc_processed_dir, basename (buffer)); - gf_log (this->name, GF_LOG_DEBUG, - "moving %s to processed directory", file); - ret = rename (buffer, to_path); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "cannot move %s to %s (reason: %s)", - file, to_path, strerror (errno)); - goto out; - } + GF_FREE(ctx->logbuf_pool); + + GF_FREE(ctx->iobuf_pool); - ret = 0; + GF_FREE(ctx->event_pool); - out: - if (buffer) - free (buffer); /* allocated by realpath() */ - return ret; + return -1; } -/** - * @API - * for a set of changelogs, start from the beginning - */ -int -gf_changelog_start_fresh () +/* TODO: cleanup ctx defaults */ +void +gf_changelog_cleanup_this(xlator_t *this) { - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - - this = THIS; - if (!this) - goto out; + glusterfs_ctx_t *ctx = NULL; - errno = EINVAL; + if (!this) + return; - gfc = (gf_changelog_t *) this->private; - if (!gfc) - goto out; + ctx = this->ctx; + syncenv_destroy(ctx->env); + free(ctx); - if (gf_ftruncate (gfc->gfc_fd, 0)) - goto out; + this->private = NULL; + this->ctx = NULL; - return 0; - - out: - return -1; + mem_pools_fini(); } -/** - * @API - * return the next changelog file entry. zero means all chanelogs - * consumed. - */ -ssize_t -gf_changelog_next_change (char *bufptr, size_t maxlen) +static int +gf_changelog_init_context() { - ssize_t size = -1; - int tracker_fd = 0; - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - char buffer[PATH_MAX] = {0,}; + glusterfs_ctx_t *ctx = NULL; - errno = EINVAL; + ctx = glusterfs_ctx_new(); + if (!ctx) + goto error_return; - this = THIS; - if (!this) - goto out; + if (glusterfs_globals_init(ctx)) + goto free_ctx; - gfc = (gf_changelog_t *) this->private; - if (!gfc) - goto out; + THIS->ctx = ctx; + if (gf_changelog_ctx_defaults_init(ctx)) + goto free_ctx; - tracker_fd = gfc->gfc_fd; + ctx->env = syncenv_new(0, 0, 0); + if (!ctx->env) + goto free_ctx; + return 0; - size = gf_readline (tracker_fd, buffer, maxlen); - if (size < 0) { - size = -1; - goto out; - } +free_ctx: + free(ctx); + THIS->ctx = NULL; +error_return: + return -1; +} - if (size == 0) - goto out; +static int +gf_changelog_init_master() +{ + int ret = 0; - memcpy (bufptr, buffer, size - 1); - bufptr[size - 1] = '\0'; + ret = gf_changelog_init_context(); + mem_pools_init(); -out: - return size; + return ret; } -/** - * @API - * gf_changelog_scan() - scan and generate a list of change entries - * - * calling this api multiple times (without calling gf_changlog_done()) - * would result new changelogs(s) being refreshed in the tracker file. - * This call also acts as a cancellation point for the consumer. - */ -ssize_t -gf_changelog_scan () +/* TODO: cleanup clnt/svc on failure */ +int +gf_changelog_setup_rpc(xlator_t *this, gf_changelog_t *entry, int proc) { - int ret = 0; - int tracker_fd = 0; - size_t len = 0; - size_t off = 0; - xlator_t *this = NULL; - size_t nr_entries = 0; - gf_changelog_t *gfc = NULL; - struct dirent *entryp = NULL; - struct dirent *result = NULL; - char buffer[PATH_MAX] = {0,}; + int ret = 0; + rpcsvc_t *svc = NULL; + struct rpc_clnt *rpc = NULL; + + /** + * Initialize a connect back socket. A probe() RPC call to the server + * triggers a reverse connect. + */ + svc = gf_changelog_reborp_init_rpc_listner(this, entry->brick, + RPC_SOCK(entry), entry); + if (!svc) + goto error_return; + RPC_REBORP(entry) = svc; + + /* Initialize an RPC client */ + rpc = gf_changelog_rpc_init(this, entry); + if (!rpc) + goto error_return; + RPC_PROBER(entry) = rpc; + + /** + * @FIXME + * till we have connection state machine, let's delay the RPC call + * for now.. + */ + sleep(2); + + /** + * Probe changelog translator for reverse connection. After a successful + * call, there's less use of the client and can be disconnected, but + * let's leave the connection active for any future RPC calls. + */ + ret = gf_changelog_invoke_rpc(this, entry, proc); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED, + "Could not initiate probe RPC, bailing out!!!"); + goto error_return; + } + + return 0; + +error_return: + return -1; +} - this = THIS; - if (!this) - goto out; - - gfc = (gf_changelog_t *) this->private; - if (!gfc) - goto out; - - /** - * do we need to protect 'byebye' with locks? worst, the - * consumer would get notified during next scan(). - */ - if (byebye) { - errno = ECONNREFUSED; - goto out; - } +int +gf_cleanup_event(xlator_t *this, struct gf_event_list *ev) +{ + int ret = 0; + + ret = gf_thread_cleanup(this, ev->invoker); + if (ret) { + gf_msg(this->name, GF_LOG_WARNING, -ret, + CHANGELOG_LIB_MSG_CLEANUP_ERROR, + "cannot cleanup callback invoker thread." + " Not freeing resources"); + return -1; + } - errno = EINVAL; - - tracker_fd = gfc->gfc_fd; - - if (gf_ftruncate (tracker_fd, 0)) - goto out; - - len = offsetof(struct dirent, d_name) - + pathconf(gfc->gfc_processing_dir, _PC_NAME_MAX) + 1; - entryp = GF_CALLOC (1, len, - gf_changelog_mt_libgfchangelog_dirent_t); - if (!entryp) - goto out; - - rewinddir (gfc->gfc_dir); - while (1) { - ret = readdir_r (gfc->gfc_dir, entryp, &result); - if (ret || !result) - break; - - if ( !strcmp (basename (entryp->d_name), ".") - || !strcmp (basename (entryp->d_name), "..") ) - continue; - - nr_entries++; - - GF_CHANGELOG_FILL_BUFFER (gfc->gfc_processing_dir, - buffer, off, - strlen (gfc->gfc_processing_dir)); - GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer, - off, strlen (entryp->d_name)); - GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1); - - if (gf_changelog_write (tracker_fd, buffer, off) != off) { - gf_log (this->name, GF_LOG_ERROR, - "error writing changelog filename" - " to tracker file"); - break; - } - off = 0; - } + ev->entry = NULL; - GF_FREE (entryp); + return 0; +} - if (!result) { - if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) - return nr_entries; - } - out: - return -1; +static int +gf_init_event(gf_changelog_t *entry) +{ + int ret = 0; + struct gf_event_list *ev = NULL; + + ev = &entry->event; + ev->entry = entry; + + ret = pthread_mutex_init(&ev->lock, NULL); + if (ret != 0) + goto error_return; + ret = pthread_cond_init(&ev->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + INIT_LIST_HEAD(&ev->events); + + ev->next_seq = 0; /* bootstrap sequencing */ + + if (GF_NEED_ORDERED_EVENTS(entry)) { + entry->pickevent = pick_event_ordered; + entry->queueevent = queue_ordered_event; + } else { + entry->pickevent = pick_event_unordered; + entry->queueevent = queue_unordered_event; + } + + ret = gf_thread_create(&ev->invoker, NULL, gf_changelog_callback_invoker, + ev, "clogcbki"); + if (ret != 0) { + entry->pickevent = NULL; + entry->queueevent = NULL; + goto cleanup_cond; + } + + return 0; + +cleanup_cond: + (void)pthread_cond_destroy(&ev->cond); +cleanup_mutex: + (void)pthread_mutex_destroy(&ev->lock); +error_return: + return -1; } /** - * @API - * gf_changelog_register() - register a client for updates. + * TODO: + * - cleanup invoker thread + * - cleanup event list + * - destroy rpc{-clnt, svc} */ int -gf_changelog_register (char *brick_path, char *scratch_dir, - char *log_file, int log_level, int max_reconnects) +gf_cleanup_brick_connection(xlator_t *this, gf_changelog_t *entry) { - int i = 0; - int ret = -1; - int errn = 0; - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - char hist_scratch_dir[PATH_MAX] = {0,}; - struct stat buf = {0,}; - - this = THIS; - if (!this->ctx) - goto out; + return 0; +} - errno = ENOMEM; +int +gf_cleanup_connections(xlator_t *this) +{ + return 0; +} - gfc = GF_CALLOC (1, sizeof (*gfc), - gf_changelog_mt_libgfchangelog_t); - if (!gfc) - goto out; +static int +gf_setup_brick_connection(xlator_t *this, struct gf_brick_spec *brick, + gf_boolean_t ordered, void *xl) +{ + int ret = 0; + gf_private_t *priv = NULL; + gf_changelog_t *entry = NULL; + + priv = this->private; + + if (!brick->callback || !brick->init || !brick->fini) + goto error_return; + + entry = GF_CALLOC(1, sizeof(*entry), gf_changelog_mt_libgfchangelog_t); + if (!entry) + goto error_return; + INIT_LIST_HEAD(&entry->list); + + LOCK_INIT(&entry->statelock); + entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING; + + entry->notify = brick->filter; + if (snprintf(entry->brick, PATH_MAX, "%s", brick->brick_path) >= PATH_MAX) + goto free_entry; + + entry->this = this; + entry->invokerxl = xl; + + entry->ordered = ordered; + ret = gf_init_event(entry); + if (ret) + goto free_entry; + + entry->fini = brick->fini; + entry->callback = brick->callback; + entry->connected = brick->connected; + entry->disconnected = brick->disconnected; + + entry->ptr = brick->init(this, brick); + if (!entry->ptr) + goto cleanup_event; + priv->api = entry->ptr; /* pointer to API, if required */ + + pthread_mutex_lock(&priv->lock); + { + list_add_tail(&entry->list, &priv->connections); + } + pthread_mutex_unlock(&priv->lock); + + ret = gf_changelog_setup_rpc(this, entry, CHANGELOG_RPC_PROBE_FILTER); + if (ret) + goto cleanup_event; + return 0; + +cleanup_event: + (void)gf_cleanup_event(this, &entry->event); +free_entry: + gf_msg_debug(this->name, 0, "freeing entry %p", entry); + list_del(&entry->list); /* FIXME: kludge for now */ + GF_FREE(entry); +error_return: + return -1; +} - gfc->this = this; +int +gf_changelog_register_brick(xlator_t *this, struct gf_brick_spec *brick, + gf_boolean_t ordered, void *xl) +{ + return gf_setup_brick_connection(this, brick, ordered, xl); +} - gfc->gfc_dir = NULL; - gfc->gfc_fd = gfc->gfc_sockfd = -1; +static int +gf_changelog_setup_logging(xlator_t *this, char *logfile, int loglevel) +{ + /* passing ident as NULL means to use default ident for syslog */ + if (gf_log_init(this->ctx, logfile, NULL)) + return -1; - if (stat (scratch_dir, &buf) && errno == ENOENT) { - ret = mkdir_p (scratch_dir, 0600, _gf_true); - if (ret) { - errn = errno; - goto cleanup; - } - } + gf_log_set_loglevel(this->ctx, (loglevel == -1) ? GF_LOG_INFO : loglevel); + return 0; +} - gfc->gfc_working_dir = realpath (scratch_dir, NULL); - if (!gfc->gfc_working_dir) { - errn = errno; - goto cleanup; +static int +gf_changelog_set_master(xlator_t *master, void *xl) +{ + int32_t ret = 0; + xlator_t *this = NULL; + xlator_t *old_this = NULL; + gf_private_t *priv = NULL; + + this = xl; + if (!this || !this->ctx) { + ret = gf_changelog_init_master(); + if (ret) + return -1; + this = THIS; + } + + master->ctx = this->ctx; + + INIT_LIST_HEAD(&master->volume_options); + SAVE_THIS(THIS); + + ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end); + if (ret != 0) + goto restore_this; + + priv = gf_changelog_alloc_priv(); + if (!priv) { + ret = -1; + goto restore_this; + } + + if (!xl) { + /* poller thread */ + ret = gf_thread_create(&priv->poller, NULL, changelog_rpc_poller, THIS, + "clogpoll"); + if (ret != 0) { + GF_FREE(priv); + gf_msg(master->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, + "failed to spawn poller thread"); + goto restore_this; } + } - /* Begin: Changes for History API */ - gfc->hist_gfc = NULL; - - gfc->hist_gfc = GF_CALLOC (1, sizeof (*gfc), - gf_changelog_mt_libgfchangelog_t); - if (!gfc->hist_gfc) - goto cleanup; - - gfc->hist_gfc->gfc_dir = NULL; - gfc->hist_gfc->gfc_fd = gfc->hist_gfc->gfc_sockfd = -1; - gfc->hist_gfc->this = NULL; + master->private = priv; - (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX); - (void) snprintf (hist_scratch_dir, PATH_MAX, - "%s/"GF_CHANGELOG_HISTORY_DIR"/", - gfc->gfc_working_dir); +restore_this: + RESTORE_THIS(); - ret = mkdir_p (hist_scratch_dir, 0600, _gf_false); - if (ret) { - errn = errno; - goto cleanup; - } + return ret; +} - gfc->hist_gfc->gfc_working_dir = realpath (hist_scratch_dir, NULL); - if (!gfc->hist_gfc->gfc_working_dir) { - errn = errno; - goto cleanup; - } +int +gf_changelog_init(void *xl) +{ + int ret = 0; + gf_private_t *priv = NULL; - ret = gf_changelog_open_dirs (gfc->hist_gfc); - if (ret) { - errn = errno; - gf_log (this->name, GF_LOG_ERROR, - "could not create entries in history scratch dir"); - goto cleanup; - } + if (master) + return 0; - (void) strncpy (gfc->hist_gfc->gfc_brickpath, brick_path, PATH_MAX); + master = calloc(1, sizeof(*master)); + if (!master) + goto error_return; + + master->name = strdup("gfchangelog"); + if (!master->name) + goto dealloc_master; + + ret = gf_changelog_set_master(master, xl); + if (ret) + goto dealloc_name; + + priv = master->private; + ret = gf_thread_create(&priv->connectionjanitor, NULL, + gf_changelog_connection_janitor, master, "clogjan"); + if (ret != 0) { + /* TODO: cleanup priv, mutex (poller thread for !xl) */ + goto dealloc_name; + } + + return 0; + +dealloc_name: + free(master->name); +dealloc_master: + free(master); + master = NULL; +error_return: + return -1; +} - for (i=0; i < 256; i++) { - gfc->hist_gfc->rfc3986[i] = - (isalnum(i) || i == '~' || - i == '-' || i == '.' || i == '_') ? i : 0; - } - /* End: Changes for History API*/ - - ret = gf_changelog_open_dirs (gfc); - if (ret) { - errn = errno; - gf_log (this->name, GF_LOG_ERROR, - "could not create entries in scratch dir"); - goto cleanup; +int +gf_changelog_register_generic(struct gf_brick_spec *bricks, int count, + int ordered, char *logfile, int lvl, void *xl) +{ + int ret = 0; + xlator_t *this = NULL; + xlator_t *old_this = NULL; + struct gf_brick_spec *brick = NULL; + gf_boolean_t need_order = _gf_false; + + SAVE_THIS(xl); + + this = THIS; + if (!this) + goto error_return; + + ret = gf_changelog_setup_logging(this, logfile, lvl); + if (ret) + goto error_return; + + need_order = (ordered) ? _gf_true : _gf_false; + + brick = bricks; + while (count--) { + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, "brick=%s", + brick->brick_path, "notify_filter=%d", brick->filter, NULL); + + ret = gf_changelog_register_brick(this, brick, need_order, xl); + if (ret != 0) { + gf_msg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED, + "Error registering with changelog xlator"); + break; } - /* passing ident as NULL means to use default ident for syslog */ - if (gf_log_init (this->ctx, log_file, NULL)) - goto cleanup; + brick++; + } - gf_log_set_loglevel ((log_level == -1) ? GF_LOG_INFO : - log_level); + if (ret != 0) + goto cleanup_inited_bricks; - gfc->gfc_connretries = (max_reconnects <= 0) ? 1 : max_reconnects; - (void) strncpy (gfc->gfc_brickpath, brick_path, PATH_MAX); + RESTORE_THIS(); + return 0; - ret = gf_changelog_notification_init (this, gfc); - if (ret) { - errn = errno; - goto cleanup; - } +cleanup_inited_bricks: + gf_cleanup_connections(this); +error_return: + RESTORE_THIS(); + return -1; +} - ret = gf_thread_create (&gfc->gfc_changelog_processor, - NULL, gf_changelog_process, gfc); - if (ret) { - errn = errno; - gf_log (this->name, GF_LOG_ERROR, - "error creating changelog processor thread" - " new changes won't be recorded!!!"); - goto cleanup; - } +/** + * @API + * gf_changelog_register() + * + * This is _NOT_ a generic register API. It's a special API to handle + * updates at a journal granulality. This is used by consumers wanting + * to process persistent journal such as geo-replication via a set of + * APIs. All of this is required to maintain backward compatibility. + * Owner specific private data is stored in ->api (in gf_private_t), + * which is used by APIs to access it's private data. This limits + * the library access to a single brick, but that's how it used to + * be anyway. Furthermore, this API solely _owns_ "this", therefore + * callers already having a notion of "this" are expected to use the + * newer API. + * + * Newer applications wanting to use this library need not face this + * limitation and reply of the much more feature rich generic register + * API, which is purely callback based. + * + * NOTE: @max_reconnects is not used but required for backward compat. + * + * For generic API, refer gf_changelog_register_generic(). + */ +int +gf_changelog_register(char *brick_path, char *scratch_dir, char *log_file, + int log_level, int max_reconnects) +{ + struct gf_brick_spec brick = { + 0, + }; - for (i=0; i < 256; i++) { - gfc->rfc3986[i] = - (isalnum(i) || i == '~' || - i == '-' || i == '.' || i == '_') ? i : 0; - } + if (master) + THIS = master; + else + return -1; - ret = 0; - this->private = gfc; + brick.brick_path = brick_path; + brick.filter = CHANGELOG_OP_TYPE_JOURNAL; - goto out; + brick.init = gf_changelog_journal_init; + brick.fini = gf_changelog_journal_fini; + brick.callback = gf_changelog_handle_journal; + brick.connected = gf_changelog_journal_connect; + brick.disconnected = gf_changelog_journal_disconnect; - cleanup: - if (gfc->hist_gfc) { - gf_changelog_cleanup (gfc->hist_gfc); - GF_FREE (gfc->hist_gfc); - } - gf_changelog_cleanup (gfc); - GF_FREE (gfc); - this->private = NULL; - errno = errn; + brick.ptr = scratch_dir; - out: - return ret; + return gf_changelog_register_generic(&brick, 1, 1, log_file, log_level, + NULL); } |
