diff options
Diffstat (limited to 'xlators/features/changelog/lib')
15 files changed, 2104 insertions, 710 deletions
diff --git a/xlators/features/changelog/lib/examples/c/get-changes-multi.c b/xlators/features/changelog/lib/examples/c/get-changes-multi.c new file mode 100644 index 00000000000..8f23c81c2a0 --- /dev/null +++ b/xlators/features/changelog/lib/examples/c/get-changes-multi.c @@ -0,0 +1,84 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +/** + * Compile it using: + * gcc -o getchanges-multi `pkg-config --cflags libgfchangelog` \ + * get-changes-multi.c `pkg-config --libs libgfchangelog` + */ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/un.h> +#include <limits.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <errno.h> + +#include "changelog.h" + +void *brick_init (void *xl, struct gf_brick_spec *brick) +{ + return brick; +} + +void brick_fini (void *xl, char *brick, void *data) +{ + return; +} + +void brick_callback (void *xl, char *brick, + void *data, changelog_event_t *ev) +{ + printf ("->callback: (brick,type) [%s:%d]\n", brick, ev->ev_type); +} + +void fill_brick_spec (struct gf_brick_spec *brick, char *path) +{ + brick->brick_path = strdup (path); + brick->filter = CHANGELOG_OP_TYPE_RELEASE; + + brick->init = brick_init; + brick->fini = brick_fini; + brick->callback = brick_callback; + brick->connected = NULL; + brick->disconnected = NULL; +} + +int +main (int argc, char **argv) +{ + int ret = 0; + void *bricks = NULL; + struct gf_brick_spec *brick = NULL; + + bricks = calloc (2, sizeof (struct gf_brick_spec)); + if (!bricks) + goto error_return; + + brick = (struct gf_brick_spec *)bricks; + fill_brick_spec (brick, "/export/z1/zwoop"); + + brick++; + fill_brick_spec (brick, "/export/z2/zwoop"); + + ret = gf_changelog_register_generic ((struct gf_brick_spec *)bricks, 2, + 1, "/tmp/multi-changes.log", 9, + NULL); + if (ret) + goto error_return; + + /* let callbacks do the job */ + select (0, NULL, NULL, NULL, NULL); + + error_return: + return -1; +} diff --git a/xlators/features/changelog/lib/examples/c/get-changes.c b/xlators/features/changelog/lib/examples/c/get-changes.c index 6d0d0357db9..0b2808c7e35 100644 --- a/xlators/features/changelog/lib/examples/c/get-changes.c +++ b/xlators/features/changelog/lib/examples/c/get-changes.c @@ -40,7 +40,7 @@ main (int argc, char ** argv) char fbuf[PATH_MAX] = {0,}; /* get changes for brick "/home/vshankar/export/yow/yow-1" */ - ret = gf_changelog_register ("/home/vshankar/exports/yow/yow-1", + ret = gf_changelog_register ("/export/z1/zwoop", "/tmp/scratch", "/tmp/change.log", 9, 5); if (ret) { handle_error ("register failed"); diff --git a/xlators/features/changelog/lib/examples/c/get-history.c b/xlators/features/changelog/lib/examples/c/get-history.c index 33eb8c32d4d..2e1ff3c767f 100644 --- a/xlators/features/changelog/lib/examples/c/get-history.c +++ b/xlators/features/changelog/lib/examples/c/get-history.c @@ -40,8 +40,8 @@ main (int argc, char ** argv) char fbuf[PATH_MAX] = {0,}; unsigned long end_ts = 0; - ret = gf_changelog_register ("/export1/v1/b1", - "/tmp/scratch_v1", "/tmp/scratch_v1/changes.log", + ret = gf_changelog_register ("/export/z1/zwoop", + "/tmp/scratch_v1", "/tmp/changes.log", 9, 5); if (ret) { handle_error ("register failed"); @@ -51,7 +51,8 @@ main (int argc, char ** argv) int a, b; printf ("give the two numbers start and end\t"); scanf ("%d%d", &a, &b); - ret = gf_history_changelog ("/export1/v1/b1/.glusterfs/changelogs",a, b, 3, &end_ts); + ret = gf_history_changelog ("/export/z1/zwoop/.glusterfs/changelogs", + a, b, 3, &end_ts); if (ret == -1) { printf ("history failed"); goto out; diff --git a/xlators/features/changelog/lib/src/Makefile.am b/xlators/features/changelog/lib/src/Makefile.am index 1ae919bfb38..306306bd585 100644 --- a/xlators/features/changelog/lib/src/Makefile.am +++ b/xlators/features/changelog/lib/src/Makefile.am @@ -4,9 +4,13 @@ libgfchangelog_la_CFLAGS = -Wall $(GF_CFLAGS) $(GF_DARWIN_LIBGLUSTERFS_CFLAGS) \ libgfchangelog_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 -fpic \ -I../../../src/ -I$(top_srcdir)/libglusterfs/src \ -I$(top_srcdir)/xlators/features/changelog/src \ + -I$(top_srcdir)/rpc/xdr/src -I$(top_srcdir)/rpc/rpc-lib/src \ + -I$(top_srcdir)/rpc/rpc-transport/socket/src \ -DDATADIR=\"$(localstatedir)\" -libgfchangelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la +libgfchangelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \ + $(top_builddir)/rpc/xdr/src/libgfxdr.la \ + $(top_builddir)/rpc/rpc-lib/src/libgfrpc.la libgfchangelog_la_LDFLAGS = $(GF_LDFLAGS) -version-info $(LIBGFCHANGELOG_LT_VERSION) @@ -15,18 +19,18 @@ lib_LTLIBRARIES = libgfchangelog.la CONTRIB_BUILDDIR = $(top_builddir)/contrib -libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-process.c \ - gf-changelog-helpers.c gf-history-changelog.c \ - $(CONTRIBDIR)/uuid/clear.c \ - $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c \ - $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/parse.c \ - $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c \ - $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c \ - $(CONTRIBDIR)/uuid/unpack.c - -noinst_HEADERS = gf-changelog-helpers.h $(CONTRIBDIR)/uuid/uuidd.h \ - $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h \ - $(CONTRIB_BUILDDIR)/uuid/uuid_types.h +libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-journal-handler.c gf-changelog-helpers.c \ + gf-changelog-api.c gf-history-changelog.c gf-changelog-rpc.c gf-changelog-reborp.c \ + $(top_srcdir)/xlators/features/changelog/src/changelog-rpc-common.c \ + $(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c \ + $(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c \ + $(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c \ + $(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c \ + $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c + +noinst_HEADERS = gf-changelog-helpers.h gf-changelog-rpc.h gf-changelog-journal.h \ + $(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h \ + $(CONTRIBDIR)/uuid/uuidP.h $(CONTRIB_BUILDDIR)/uuid/uuid_types.h libgfchangelog_HEADERS = changelog.h diff --git a/xlators/features/changelog/lib/src/changelog.h b/xlators/features/changelog/lib/src/changelog.h index 5cddfb5839c..d7048ff2508 100644 --- a/xlators/features/changelog/lib/src/changelog.h +++ b/xlators/features/changelog/lib/src/changelog.h @@ -11,6 +11,73 @@ #ifndef _GF_CHANGELOG_H #define _GF_CHANGELOG_H +struct gf_brick_spec; + +/** + * Max bit shiter for event selection + */ +#define CHANGELOG_EV_SELECTION_RANGE 4 + +#define CHANGELOG_OP_TYPE_JOURNAL (1<<0) +#define CHANGELOG_OP_TYPE_OPEN (1<<1) +#define CHANGELOG_OP_TYPE_CREATE (1<<2) +#define CHANGELOG_OP_TYPE_RELEASE (1<<3) +#define CHANGELOG_OP_TYPE_MAX (1<<CHANGELOG_EV_SELECTION_RANGE) + + +struct ev_open { + unsigned char gfid[16]; + int32_t flags; +}; + +struct ev_creat { + unsigned char gfid[16]; + int32_t flags; +}; + +struct ev_release { + unsigned char gfid[16]; +}; + +struct ev_changelog { + char path[PATH_MAX]; +}; + +typedef struct changelog_event { + unsigned int ev_type; + union { + struct ev_open open; + struct ev_creat create; + struct ev_release release; + struct ev_changelog journal; + } u; +} changelog_event_t; + +#define CHANGELOG_EV_SIZE (sizeof (changelog_event_t)) + +/** + * event callback, connected & disconnection defs + */ +typedef void (CALLBACK) (void *, char *, + void *, changelog_event_t *); +typedef void *(INIT) (void *, struct gf_brick_spec *); +typedef void (FINI) (void *, char *, void *); +typedef void (CONNECT) (void *, char *, void *); +typedef void (DISCONNECT) (void *, char *, void *); + +struct gf_brick_spec { + char *brick_path; + unsigned int filter; + + INIT *init; + FINI *fini; + CALLBACK *callback; + CONNECT *connected; + DISCONNECT *disconnected; + + void *ptr; +}; + /* API set */ int @@ -28,4 +95,9 @@ gf_changelog_next_change (char *bufptr, size_t maxlen); int gf_changelog_done (char *file); +/* newer flexible API */ +int +gf_changelog_register_generic (struct gf_brick_spec *bricks, int count, + int ordered, char *logfile, int lvl, void *xl); + #endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-api.c b/xlators/features/changelog/lib/src/gf-changelog-api.c new file mode 100644 index 00000000000..cea2ff01988 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-api.c @@ -0,0 +1,224 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "uuid.h" +#include "globals.h" +#include "glusterfs.h" + +#include "gf-changelog-helpers.h" +#include "gf-changelog-journal.h" +#include "changelog-mem-types.h" + +int +gf_changelog_done (char *file) +{ + int ret = -1; + char *buffer = NULL; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + char to_path[PATH_MAX] = {0,}; + + errno = EINVAL; + + this = THIS; + if (!this) + goto out; + + jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); + if (!jnl) + goto out; + + if (!file || !strlen (file)) + goto out; + + /* make sure 'file' is inside ->jnl_working_dir */ + buffer = realpath (file, NULL); + if (!buffer) + goto out; + + if (strncmp (jnl->jnl_working_dir, + buffer, strlen (jnl->jnl_working_dir))) + goto out; + + (void) snprintf (to_path, PATH_MAX, "%s%s", + jnl->jnl_processed_dir, basename (buffer)); + gf_log (this->name, GF_LOG_DEBUG, + "moving %s to processed directory", file); + ret = rename (buffer, to_path); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "cannot move %s to %s (reason: %s)", + file, to_path, strerror (errno)); + goto out; + } + + ret = 0; + + out: + if (buffer) + free (buffer); /* allocated by realpath() */ + return ret; +} + +/** + * @API + * for a set of changelogs, start from the beginning + */ +int +gf_changelog_start_fresh () +{ + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + + this = THIS; + if (!this) + goto out; + + errno = EINVAL; + + jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); + if (!jnl) + goto out; + + if (gf_ftruncate (jnl->jnl_fd, 0)) + goto out; + + return 0; + + out: + return -1; +} + +/** + * @API + * return the next changelog file entry. zero means all chanelogs + * consumed. + */ +ssize_t +gf_changelog_next_change (char *bufptr, size_t maxlen) +{ + ssize_t size = -1; + int tracker_fd = 0; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + char buffer[PATH_MAX] = {0,}; + + errno = EINVAL; + + this = THIS; + if (!this) + goto out; + + jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); + if (!jnl) + goto out; + + tracker_fd = jnl->jnl_fd; + + size = gf_readline (tracker_fd, buffer, maxlen); + if (size < 0) { + size = -1; + goto out; + } + + if (size == 0) + goto out; + + memcpy (bufptr, buffer, size - 1); + bufptr[size - 1] = '\0'; + +out: + return size; +} + +/** + * @API + * gf_changelog_scan() - scan and generate a list of change entries + * + * calling this api multiple times (without calling gf_changlog_done()) + * would result new changelogs(s) being refreshed in the tracker file. + * This call also acts as a cancellation point for the consumer. + */ +ssize_t +gf_changelog_scan () +{ + int ret = 0; + int tracker_fd = 0; + size_t len = 0; + size_t off = 0; + xlator_t *this = NULL; + size_t nr_entries = 0; + gf_changelog_journal_t *jnl = NULL; + struct dirent *entryp = NULL; + struct dirent *result = NULL; + char buffer[PATH_MAX] = {0,}; + + this = THIS; + if (!this) + goto out; + + jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); + if (!jnl) + goto out; + if (JNL_IS_API_DISCONNECTED (jnl)) { + errno = ENOTCONN; + goto out; + } + + errno = EINVAL; + + tracker_fd = jnl->jnl_fd; + if (gf_ftruncate (tracker_fd, 0)) + goto out; + + len = offsetof(struct dirent, d_name) + + pathconf(jnl->jnl_processing_dir, _PC_NAME_MAX) + 1; + entryp = GF_CALLOC (1, len, + gf_changelog_mt_libgfchangelog_dirent_t); + if (!entryp) + goto out; + + rewinddir (jnl->jnl_dir); + while (1) { + ret = readdir_r (jnl->jnl_dir, entryp, &result); + if (ret || !result) + break; + + if (!strcmp (basename (entryp->d_name), ".") + || !strcmp (basename (entryp->d_name), "..")) + continue; + + nr_entries++; + + GF_CHANGELOG_FILL_BUFFER (jnl->jnl_processing_dir, + buffer, off, + strlen (jnl->jnl_processing_dir)); + GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer, + off, strlen (entryp->d_name)); + GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1); + + if (gf_changelog_write (tracker_fd, buffer, off) != off) { + gf_log (this->name, GF_LOG_ERROR, + "error writing changelog filename" + " to tracker file"); + break; + } + off = 0; + } + + GF_FREE (entryp); + + if (!result) { + if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) + return nr_entries; + } + out: + return -1; +} diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.c b/xlators/features/changelog/lib/src/gf-changelog-helpers.c index f071b057d59..6bf709dc664 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-helpers.c +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.c @@ -178,3 +178,35 @@ gf_ftruncate (int fd, off_t length) return 0; } + +int +gf_thread_cleanup (xlator_t *this, pthread_t thread) +{ + int ret = 0; + void *res = NULL; + + ret = pthread_cancel (thread); + if (ret != 0) { + gf_log (this->name, GF_LOG_WARNING, + "Failed to send cancellation to thread"); + goto error_return; + } + + ret = pthread_join (thread, &res); + if (ret != 0) { + gf_log (this->name, GF_LOG_WARNING, + "failed to join thread"); + goto error_return; + } + + if (res != PTHREAD_CANCELED) { + gf_log (this->name, GF_LOG_WARNING, + "Thread could not be cleaned up"); + goto error_return; + } + + return 0; + + error_return: + return -1; +} diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h index 9b875d45dcc..17b8862a89b 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h @@ -18,6 +18,11 @@ #include <xlator.h> +#include "changelog.h" + +#include "changelog-rpc-common.h" +#include "gf-changelog-journal.h" + #define GF_CHANGELOG_TRACKER "tracker" #define GF_CHANGELOG_CURRENT_DIR ".current" @@ -41,79 +46,143 @@ typedef struct read_line { char rl_buf[MAXLINE]; } read_line_t; +struct gf_changelog; + +/** + * Event list for ordered event notification + * + * ->next_seq holds the next _expected_ sequence number. + */ +struct gf_event_list { + pthread_mutex_t lock; /* protects this structure */ + pthread_cond_t cond; + + pthread_t invoker; + + unsigned long next_seq; /* next sequence number expected: + zero during bootstrap */ + + struct gf_changelog *entry; /* backpointer to it's brick + encapsulator (entry) */ + struct list_head events; /* list of events (ordered) */ +}; + +/** + * include a refcount if it's of use by additional layers + */ +struct gf_event { + int count; + + unsigned long seq; + + struct list_head list; + + struct iovec iov[0]; +}; +#define GF_EVENT_CALLOC_SIZE(cnt, len) \ + (sizeof (struct gf_event) + (cnt * sizeof (struct iovec)) + len) + +/** + * assign the base address of the IO vector to the correct memory + * area and set it's addressable length. + */ +#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos) \ + do { \ + vec->iov_base = ((char *)event) + \ + sizeof (struct gf_event) + \ + (event->count * sizeof (struct iovec)) + pos; \ + vec->iov_len = len; \ + pos += len; \ + } while (0) + +/** + * An instance of this structure is allocated for each brick for which + * notifications are streamed. + */ typedef struct gf_changelog { xlator_t *this; - /* 'processing' directory stream */ - DIR *gfc_dir; - - /* fd to the tracker file */ - int gfc_fd; - - /* connection retries */ - int gfc_connretries; + struct list_head list; /* list of instances */ - char gfc_sockpath[UNIX_PATH_MAX]; + char brick[PATH_MAX]; /* brick path for this end-point */ - char gfc_brickpath[PATH_MAX]; + changelog_rpc_t grpc; /* rpc{-clnt,svc} for this brick */ +#define RPC_PROBER(ent) ent->grpc.rpc +#define RPC_REBORP(ent) ent->grpc.svc +#define RPC_SOCK(ent) ent->grpc.sock - /* socket for receiving notifications */ - int gfc_sockfd; + unsigned int notify; /* notification flag(s) */ - char *gfc_working_dir; + FINI *fini; /* destructor callback */ + CALLBACK *callback; /* event callback dispatcher */ + CONNECT *connected; /* connect callback */ + DISCONNECT *disconnected; /* disconnection callback */ - /* RFC 3986 string encoding */ - char rfc3986[256]; + void *ptr; /* owner specific private data */ + xlator_t *invokerxl; /* consumers _this_, if valid, + assigned to THIS before cbk is + invoked */ - char gfc_current_dir[PATH_MAX]; - char gfc_processed_dir[PATH_MAX]; - char gfc_processing_dir[PATH_MAX]; + gf_boolean_t ordered; - pthread_t gfc_changelog_processor; - - /* Holds gfc for History API */ - struct gf_changelog *hist_gfc; - - /* holds 0 done scanning, 1 keep scanning and -1 error */ - int hist_done; + struct gf_event_list event; } gf_changelog_t; -typedef struct gf_changelog_history_data { - int len; - - int htime_fd; - - /* parallelism count */ - int n_parallel; - - /* history from, to indexes */ - unsigned long from; - unsigned long to; -} gf_changelog_history_data_t; - -typedef struct gf_changelog_consume_data { - /** set of inputs */ - - /* fd to read from */ - int fd; - - /* from @offset */ - off_t offset; - - xlator_t *this; - gf_changelog_t *gfc; - - /** set of outputs */ +static inline int +gf_changelog_filter_check (gf_changelog_t *entry, changelog_event_t *event) +{ + if (event->ev_type & entry->notify) + return 1; + return 0; +} + +#define GF_NEED_ORDERED_EVENTS(ent) (ent->ordered == _gf_true) + +/** private structure */ +typedef struct gf_private { + gf_lock_t lock; /* protects ->connections */ + + void *api; /* pointer for API access */ + + pthread_t poller; /* event poller thread */ + + struct list_head connections; /* list of connections */ +} gf_private_t; + +#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *) this->private)->api) + +/** + * upcall: invoke callback with _correct_ THIS + */ +#define GF_CHANGELOG_INVOKE_CBK(this, cbk, brick, args ...) \ + do { \ + xlator_t *old_this = NULL; \ + xlator_t *invokerxl = NULL; \ + \ + invokerxl = entry->invokerxl; \ + old_this = this; \ + \ + if (invokerxl) { \ + THIS = invokerxl; \ + } \ + \ + cbk (invokerxl, brick, args); \ + THIS = old_this; \ + \ + } while (0) - /* return value */ - int retval; +#define SAVE_THIS(xl) \ + do { \ + old_this = xl; \ + THIS = master; \ + } while (0) - /* journal processed */ - char changelog[PATH_MAX]; -} gf_changelog_consume_data_t; +#define RESTORE_THIS() \ + do { \ + THIS = old_this; \ + } while (0) -int -gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc); +/** APIs and the rest */ void * gf_changelog_process (void *data); @@ -138,9 +207,14 @@ gf_lseek (int fd, off_t offset, int whence); int gf_changelog_consume (xlator_t *this, - gf_changelog_t *gfc, + gf_changelog_journal_t *jnl, char *from_path, gf_boolean_t no_publish); int -gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path); +gf_changelog_publish (xlator_t *this, + gf_changelog_journal_t *jnl, char *from_path); +int +gf_thread_cleanup (xlator_t *this, pthread_t thread); +void * +gf_changelog_callback_invoker (void *arg); #endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-process.c b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c index 1a275e676fb..6ee6f9f074f 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-process.c +++ b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c @@ -1,5 +1,5 @@ /* - Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser @@ -8,9 +8,6 @@ cases as published by the Free Software Foundation. */ -#include <unistd.h> -#include <pthread.h> - #include "uuid.h" #include "globals.h" #include "glusterfs.h" @@ -19,6 +16,9 @@ /* from the changelog translator */ #include "changelog-misc.h" +#include "changelog-mem-types.h" + +#include "gf-changelog-journal.h" extern int byebye; @@ -98,7 +98,7 @@ conv_noop (char *ptr) { return ptr; } MOVER_MOVE (mover, nleft, sizeof (uuid_t)); \ } \ -#define LINE_BUFSIZE 3*PATH_MAX /* enough buffer for extra chars too */ +#define LINE_BUFSIZE (3*PATH_MAX) /* enough buffer for extra chars too */ /** * using mmap() makes parsing easy. fgets() cannot be used here as @@ -114,7 +114,8 @@ conv_noop (char *ptr) { return ptr; } static int gf_changelog_parse_binary (xlator_t *this, - gf_changelog_t *gfc, int from_fd, int to_fd, + gf_changelog_journal_t *jnl, + int from_fd, int to_fd, size_t start_offset, struct stat *stbuf) { @@ -165,7 +166,8 @@ gf_changelog_parse_binary (xlator_t *this, PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err); bname_start = mover; - if ( (bname_end = strchr (mover, '\n')) == NULL ) { + bname_end = strchr (mover, '\n'); + if (bname_end == NULL) { parse_err = 1; break; } @@ -201,7 +203,7 @@ gf_changelog_parse_binary (xlator_t *this, MOVER_MOVE (mover, nleft, 1); } - if ( (nleft == 0) && (!parse_err)) + if ((nleft == 0) && (!parse_err)) ret = 0; if (munmap (start, stbuf->st_size)) @@ -218,7 +220,8 @@ gf_changelog_parse_binary (xlator_t *this, */ static int gf_changelog_parse_ascii (xlator_t *this, - gf_changelog_t *gfc, int from_fd, int to_fd, + gf_changelog_journal_t *jnl, + int from_fd, int to_fd, size_t start_offset, struct stat *stbuf) { int ng = 0; @@ -281,7 +284,8 @@ gf_changelog_parse_ascii (xlator_t *this, VERIFY_SEPARATOR (mover, len, parse_err); fop = atoi (mover); - if ( (fopname = gf_fop_list[fop]) == NULL) { + fopname = gf_fop_list[fop]; + if (fopname == NULL) { parse_err = 1; break; } @@ -309,7 +313,8 @@ gf_changelog_parse_ascii (xlator_t *this, VERIFY_SEPARATOR (mover, len, parse_err); fop = atoi (mover); - if ( (fopname = gf_fop_list[fop]) == NULL) { + fopname = gf_fop_list[fop]; + if (fopname == NULL) { parse_err = 1; break; } @@ -320,7 +325,7 @@ gf_changelog_parse_ascii (xlator_t *this, GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len); ng = nr_extra_recs[fop]; - for (;ng > 0; ng--) { + for (; ng > 0; ng--) { MOVER_MOVE (mover, nleft, 1); len = strlen (mover); VERIFY_SEPARATOR (mover, len, parse_err); @@ -346,7 +351,7 @@ gf_changelog_parse_ascii (xlator_t *this, } gf_rfc3986_encode ((unsigned char *) ptr, - eptr, gfc->rfc3986); + eptr, jnl->rfc3986); FILL_AND_MOVE (eptr, ascii, off, mover, nleft, len); free (eptr); @@ -374,7 +379,7 @@ gf_changelog_parse_ascii (xlator_t *this, } - if ( (nleft == 0) && (!parse_err)) + if ((nleft == 0) && (!parse_err)) ret = 0; if (munmap (start, stbuf->st_size)) @@ -410,8 +415,8 @@ gf_changelog_copy (xlator_t *this, int from_fd, int to_fd) } static int -gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd, - int to_fd, struct stat *stbuf, int *zerob) +gf_changelog_decode (xlator_t *this, gf_changelog_journal_t *jnl, + int from_fd, int to_fd, struct stat *stbuf, int *zerob) { int ret = -1; int encoding = -1; @@ -441,12 +446,12 @@ gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd, * this ideally should have been a part of changelog-encoders.c * (ie. part of the changelog translator). */ - ret = gf_changelog_parse_binary (this, gfc, from_fd, + ret = gf_changelog_parse_binary (this, jnl, from_fd, to_fd, elen, stbuf); break; case CHANGELOG_ENCODE_ASCII: - ret = gf_changelog_parse_ascii (this, gfc, from_fd, + ret = gf_changelog_parse_ascii (this, jnl, from_fd, to_fd, elen, stbuf); break; default: @@ -458,7 +463,8 @@ gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd, } int -gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path) +gf_changelog_publish (xlator_t *this, + gf_changelog_journal_t *jnl, char *from_path) { int ret = 0; char dest[PATH_MAX] = {0,}; @@ -466,21 +472,21 @@ gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path) struct stat stbuf = {0,}; (void) snprintf (to_path, PATH_MAX, "%s%s", - gfc->gfc_current_dir, basename (from_path)); + jnl->jnl_current_dir, basename (from_path)); /* handle zerob file that wont exist in current */ ret = stat (to_path, &stbuf); - if (ret){ + if (ret) { if (errno == ENOENT) ret = 0; goto out; } (void) snprintf (dest, PATH_MAX, "%s%s", - gfc->gfc_processing_dir, basename (from_path)); + jnl->jnl_processing_dir, basename (from_path)); ret = rename (to_path, dest); - if (ret){ + if (ret) { gf_log (this->name, GF_LOG_ERROR, "error moving %s to processing dir" " (reason: %s)", to_path, strerror (errno)); @@ -492,7 +498,7 @@ out: int gf_changelog_consume (xlator_t *this, - gf_changelog_t *gfc, + gf_changelog_journal_t *jnl, char *from_path, gf_boolean_t no_publish) { int ret = -1; @@ -520,9 +526,9 @@ gf_changelog_consume (xlator_t *this, } (void) snprintf (to_path, PATH_MAX, "%s%s", - gfc->gfc_current_dir, basename (from_path)); + jnl->jnl_current_dir, basename (from_path)); (void) snprintf (dest, PATH_MAX, "%s%s", - gfc->gfc_processing_dir, basename (from_path)); + jnl->jnl_processing_dir, basename (from_path)); fd2 = open (to_path, O_CREAT | O_TRUNC | O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); @@ -532,7 +538,7 @@ gf_changelog_consume (xlator_t *this, to_path, strerror (errno)); goto close_fd; } else { - ret = gf_changelog_decode (this, gfc, fd1, + ret = gf_changelog_decode (this, jnl, fd1, fd2, &stbuf, &zerob); close (fd2); @@ -568,88 +574,412 @@ gf_changelog_consume (xlator_t *this, return ret; } -static char * -gf_changelog_ext_change (xlator_t *this, - gf_changelog_t *gfc, char *path, size_t readlen) +void * +gf_changelog_process (void *data) { - int alo = 0; - int ret = 0; - size_t len = 0; - char *buf = NULL; - - buf = path; - while (len < readlen) { - if (*buf == '\0') { - alo = 1; - gf_log (this->name, GF_LOG_DEBUG, - "processing changelog: %s", path); - ret = gf_changelog_consume (this, gfc, path, _gf_false); - } + int ret = 0; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_entry_t *entry = NULL; + gf_changelog_processor_t *jnl_proc = NULL; - if (ret) - break; + this = THIS; - len++; buf++; - if (alo) { - alo = 0; - path = buf; + jnl = data; + jnl_proc = jnl->jnl_proc; + + while (1) { + pthread_mutex_lock (&jnl_proc->lock); + { + while (list_empty (&jnl_proc->entries)) { + jnl_proc->waiting = _gf_true; + pthread_cond_wait + (&jnl_proc->cond, &jnl_proc->lock); + } + + entry = list_first_entry (&jnl_proc->entries, + gf_changelog_entry_t, list); + list_del (&entry->list); + jnl_proc->waiting = _gf_false; } + pthread_mutex_unlock (&jnl_proc->lock); + + if (entry) { + ret = gf_changelog_consume (this, jnl, + entry->path, _gf_false); + GF_FREE (entry); + } + } + + return NULL; +} + +inline void +gf_changelog_queue_journal (gf_changelog_processor_t *jnl_proc, + changelog_event_t *event) +{ + size_t len = 0; + gf_changelog_entry_t *entry = NULL; + + entry = GF_CALLOC (1, sizeof (gf_changelog_entry_t), + gf_changelog_mt_libgfchangelog_entry_t); + if (!entry) + return; + INIT_LIST_HEAD (&entry->list); + + len = strlen (event->u.journal.path); + (void)memcpy (entry->path, event->u.journal.path, len+1); + + pthread_mutex_lock (&jnl_proc->lock); + { + list_add_tail (&entry->list, &jnl_proc->entries); + if (jnl_proc->waiting) + pthread_cond_signal (&jnl_proc->cond); + } + pthread_mutex_unlock (&jnl_proc->lock); + + return; +} + +void +gf_changelog_handle_journal (void *xl, char *brick, + void *cbkdata, changelog_event_t *event) +{ + int ret = 0; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_processor_t *jnl_proc = NULL; + + jnl = cbkdata; + jnl_proc = jnl->jnl_proc; + + gf_changelog_queue_journal (jnl_proc, event); +} + +void +gf_changelog_journal_disconnect (void *xl, char *brick, void *data) +{ + gf_changelog_journal_t *jnl = NULL; + + jnl = data; + + pthread_spin_lock (&jnl->lock); + { + JNL_SET_API_STATE (jnl, JNL_API_DISCONNECTED); + }; + pthread_spin_unlock (&jnl->lock); +} + +void +gf_changelog_journal_connect (void *xl, char *brick, void *data) +{ + gf_changelog_journal_t *jnl = NULL; + + jnl = data; + + pthread_spin_lock (&jnl->lock); + { + JNL_SET_API_STATE (jnl, JNL_API_CONNECTED); + }; + pthread_spin_unlock (&jnl->lock); + + return; +} + +void +gf_changelog_cleanup_processor (gf_changelog_journal_t *jnl) +{ + int ret = 0; + xlator_t *this = NULL; + gf_changelog_processor_t *jnl_proc = NULL; + + this = THIS; + if (!this || !jnl || !jnl->jnl_proc) + goto error_return; + + jnl_proc = jnl->jnl_proc; + + ret = gf_thread_cleanup (this, jnl_proc->processor); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to cleanup processor thread"); + goto error_return; + } + + (void)pthread_mutex_destroy (&jnl_proc->lock); + (void)pthread_cond_destroy (&jnl_proc->cond); + + GF_FREE (jnl_proc); + + error_return: + return; +} + +int +gf_changelog_init_processor (gf_changelog_journal_t *jnl) +{ + int ret = -1; + gf_changelog_processor_t *jnl_proc = NULL; + + jnl_proc = GF_CALLOC (1, sizeof (gf_changelog_processor_t), + gf_changelog_mt_libgfchangelog_t); + if (!jnl_proc) + goto error_return; + + ret = pthread_mutex_init (&jnl_proc->lock, NULL); + if (ret != 0) + goto free_jnl_proc; + ret = pthread_cond_init (&jnl_proc->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + + INIT_LIST_HEAD (&jnl_proc->entries); + ret = pthread_create (&jnl_proc->processor, + NULL, gf_changelog_process, jnl); + if (ret != 0) + goto cleanup_cond; + jnl_proc->waiting = _gf_false; + + jnl->jnl_proc = jnl_proc; + return 0; + + cleanup_cond: + (void) pthread_cond_destroy (&jnl_proc->cond); + cleanup_mutex: + (void) pthread_mutex_destroy (&jnl_proc->lock); + free_jnl_proc: + GF_FREE (jnl_proc); + error_return: + return -1; +} + +static void +gf_changelog_cleanup_fds (gf_changelog_journal_t *jnl) +{ + /* tracker fd */ + if (jnl->jnl_fd != -1) + close (jnl->jnl_fd); + /* processing dir */ + if (jnl->jnl_dir) + closedir (jnl->jnl_dir); + + if (jnl->jnl_working_dir) + free (jnl->jnl_working_dir); /* allocated by realpath */ +} + +static int +gf_changelog_open_dirs (xlator_t *this, gf_changelog_journal_t *jnl) +{ + int ret = -1; + DIR *dir = NULL; + int tracker_fd = 0; + char tracker_path[PATH_MAX] = {0,}; + + /* .current */ + (void) snprintf (jnl->jnl_current_dir, PATH_MAX, + "%s/"GF_CHANGELOG_CURRENT_DIR"/", + jnl->jnl_working_dir); + ret = recursive_rmdir (jnl->jnl_current_dir); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Failed to rmdir: %s, err: %s", + jnl->jnl_current_dir, strerror (errno)); + goto out; + } + ret = mkdir_p (jnl->jnl_current_dir, 0600, _gf_false); + if (ret) + goto out; + + /* .processed */ + (void) snprintf (jnl->jnl_processed_dir, PATH_MAX, + "%s/"GF_CHANGELOG_PROCESSED_DIR"/", + jnl->jnl_working_dir); + ret = mkdir_p (jnl->jnl_processed_dir, 0600, _gf_false); + if (ret) + goto out; + + /* .processing */ + (void) snprintf (jnl->jnl_processing_dir, PATH_MAX, + "%s/"GF_CHANGELOG_PROCESSING_DIR"/", + jnl->jnl_working_dir); + ret = recursive_rmdir (jnl->jnl_processing_dir); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Failed to rmdir: %s, err: %s", + jnl->jnl_processing_dir, strerror (errno)); + goto out; + } + + ret = mkdir_p (jnl->jnl_processing_dir, 0600, _gf_false); + if (ret) + goto out; + + dir = opendir (jnl->jnl_processing_dir); + if (!dir) { + gf_log ("", GF_LOG_ERROR, + "opendir() error [reason: %s]", strerror (errno)); + goto out; } - return (ret) ? NULL : path; + jnl->jnl_dir = dir; + + (void) snprintf (tracker_path, PATH_MAX, + "%s/"GF_CHANGELOG_TRACKER, jnl->jnl_working_dir); + + tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (tracker_fd < 0) { + closedir (jnl->jnl_dir); + ret = -1; + goto out; + } + + jnl->jnl_fd = tracker_fd; + ret = 0; + out: + return ret; +} + +int +gf_changelog_init_history (xlator_t *this, + gf_changelog_journal_t *jnl, + char *brick_path, char *scratch_dir) +{ + int i = 0; + int ret = 0; + char hist_scratch_dir[PATH_MAX] = {0,}; + + jnl->hist_jnl = GF_CALLOC (1, sizeof (*jnl), + gf_changelog_mt_libgfchangelog_t); + if (!jnl->hist_jnl) + goto error_return; + + jnl->hist_jnl->jnl_dir = NULL; + jnl->hist_jnl->jnl_fd = -1; + + (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX); + (void) snprintf (hist_scratch_dir, PATH_MAX, + "%s/"GF_CHANGELOG_HISTORY_DIR"/", + jnl->jnl_working_dir); + + ret = mkdir_p (hist_scratch_dir, 0600, _gf_false); + if (ret) + goto dealloc_hist; + + jnl->hist_jnl->jnl_working_dir = realpath (hist_scratch_dir, NULL); + if (!jnl->hist_jnl->jnl_working_dir) + goto dealloc_hist; + + ret = gf_changelog_open_dirs (this, jnl->hist_jnl); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "could not create entries in history scratch dir"); + goto dealloc_hist; + } + + (void) strncpy (jnl->hist_jnl->jnl_brickpath, brick_path, PATH_MAX); + + for (i = 0; i < 256; i++) { + jnl->hist_jnl->rfc3986[i] = + (isalnum(i) || i == '~' || + i == '-' || i == '.' || i == '_') ? i : 0; + } + + return 0; + + dealloc_hist: + GF_FREE (jnl->hist_jnl); + jnl->hist_jnl = NULL; + error_return: + return -1; +} + +void +gf_changelog_journal_fini (void *xl, char *brick, void *data) +{ + int ret = 0; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + + this = xl; + jnl = data; + + gf_changelog_cleanup_processor (jnl); + + gf_changelog_cleanup_fds (jnl); + if (jnl->hist_jnl) + gf_changelog_cleanup_fds (jnl->hist_jnl); + + GF_FREE (jnl); } void * -gf_changelog_process (void *data) +gf_changelog_journal_init (void *xl, struct gf_brick_spec *brick) { - ssize_t len = 0; - ssize_t offlen = 0; - xlator_t *this = NULL; - char *sbuf = NULL; - gf_changelog_t *gfc = NULL; - char from_path[PATH_MAX] = {0,}; - - gfc = (gf_changelog_t *) data; - this = gfc->this; - - pthread_detach (pthread_self()); - - for (;;) { - len = gf_changelog_read_path (gfc->gfc_sockfd, - from_path + offlen, - PATH_MAX - offlen); - if (len < 0) - continue; /* ignore it for now */ - - if (len == 0) { /* close() from the changelog translator */ - gf_log (this->name, GF_LOG_INFO, "close from changelog" - " notification translator."); - - if (gfc->gfc_connretries != 1) { - if (!gf_changelog_notification_init(this, gfc)) - continue; - } + int i = 0; + int ret = 0; + xlator_t *this = NULL; + struct stat buf = {0,}; + char *scratch_dir = NULL; + gf_changelog_journal_t *jnl = NULL; + + this = xl; + scratch_dir = (char *) brick->ptr; + + jnl = GF_CALLOC (1, sizeof (gf_changelog_journal_t), + gf_changelog_mt_libgfchangelog_t); + if (!jnl) + goto error_return; + + if (stat (scratch_dir, &buf) && errno == ENOENT) { + ret = mkdir_p (scratch_dir, 0600, _gf_true); + if (ret) + goto dealloc_private; + } - byebye = 1; - break; - } + jnl->jnl_working_dir = realpath (scratch_dir, NULL); + if (!jnl->jnl_working_dir) + goto dealloc_private; - len += offlen; - sbuf = gf_changelog_ext_change (this, gfc, from_path, len); - if (!sbuf) { - gf_log (this->name, GF_LOG_ERROR, - "could not extract changelog filename"); - continue; - } + ret = gf_changelog_open_dirs (this, jnl); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "could not create entries in scratch dir"); + goto dealloc_private; + } - offlen = 0; - if (sbuf != (from_path + len)) { - offlen = from_path + len - sbuf; - memmove (from_path, sbuf, offlen); - } + (void) strncpy (jnl->jnl_brickpath, brick->brick_path, PATH_MAX); + + /* RFC 3986 {de,en}coding */ + for (i = 0; i < 256; i++) { + jnl->rfc3986[i] = + (isalnum(i) || i == '~' || + i == '-' || i == '.' || i == '_') ? i : 0; } - gf_log (this->name, GF_LOG_DEBUG, - "byebye (%d) from processing thread...", byebye); + ret = gf_changelog_init_history (this, jnl, + brick->brick_path, scratch_dir); + if (ret) + goto cleanup_fds; + + /* initialize journal processor */ + ret = gf_changelog_init_processor (jnl); + if (ret) + goto cleanup_fds; + + JNL_SET_API_STATE (jnl, JNL_API_CONN_INPROGESS); + ret = pthread_spin_init (&jnl->lock, 0); + if (ret != 0) + goto cleanup_processor; + return jnl; + + cleanup_processor: + gf_changelog_cleanup_processor (jnl); + cleanup_fds: + gf_changelog_cleanup_fds (jnl); + if (jnl->hist_jnl) + gf_changelog_cleanup_fds (jnl->hist_jnl); + dealloc_private: + GF_FREE (jnl); + error_return: return NULL; } diff --git a/xlators/features/changelog/lib/src/gf-changelog-journal.h b/xlators/features/changelog/lib/src/gf-changelog-journal.h new file mode 100644 index 00000000000..9a0f0b28956 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-journal.h @@ -0,0 +1,114 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef __GF_CHANGELOG_JOURNAL_H +#define __GF_CHANGELOG_JOURNAL_H + +#include <unistd.h> +#include <pthread.h> + +#include "changelog.h" + +enum api_conn { + JNL_API_CONNECTED, + JNL_API_CONN_INPROGESS, + JNL_API_DISCONNECTED, +}; + +typedef struct gf_changelog_entry { + char path[PATH_MAX]; + + struct list_head list; +} gf_changelog_entry_t; + +typedef struct gf_changelog_processor { + pthread_mutex_t lock; /* protects ->entries */ + pthread_cond_t cond; /* waiter during empty list */ + gf_boolean_t waiting; + + pthread_t processor; /* thread-id of journal processing thread */ + + struct list_head entries; +} gf_changelog_processor_t; + +typedef struct gf_changelog_journal { + DIR *jnl_dir; /* 'processing' directory stream */ + + int jnl_fd; /* fd to the tracker file */ + + char jnl_brickpath[PATH_MAX]; /* brick path for this end-point */ + + gf_changelog_processor_t *jnl_proc; + + char *jnl_working_dir; /* scratch directory */ + + char jnl_current_dir[PATH_MAX]; + char jnl_processed_dir[PATH_MAX]; + char jnl_processing_dir[PATH_MAX]; + + char rfc3986[256]; /* RFC 3986 string encoding */ + + struct gf_changelog_journal *hist_jnl; + int hist_done; /* holds 0 done scanning, + 1 keep scanning and -1 error */ + + pthread_spinlock_t lock; + int connected; +} gf_changelog_journal_t; + +#define JNL_SET_API_STATE(jnl, state) (jnl->connected = state) +#define JNL_IS_API_DISCONNECTED(jnl) (jnl->connected == JNL_API_DISCONNECTED) + +/* History API */ +typedef struct gf_changelog_history_data { + int len; + + int htime_fd; + + /* parallelism count */ + int n_parallel; + + /* history from, to indexes */ + unsigned long from; + unsigned long to; +} gf_changelog_history_data_t; + +typedef struct gf_changelog_consume_data { + /** set of inputs */ + + /* fd to read from */ + int fd; + + /* from @offset */ + off_t offset; + + xlator_t *this; + + gf_changelog_journal_t *jnl; + + /** set of outputs */ + + /* return value */ + int retval; + + /* journal processed */ + char changelog[PATH_MAX]; +} gf_changelog_consume_data_t; + +/* event handler */ +CALLBACK gf_changelog_handle_journal; + +/* init, connect & disconnect handler */ +INIT gf_changelog_journal_init; +FINI gf_changelog_journal_fini; +CONNECT gf_changelog_journal_connect; +DISCONNECT gf_changelog_journal_disconnect; + +#endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c new file mode 100644 index 00000000000..d7e60fb9634 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c @@ -0,0 +1,381 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "changelog-misc.h" +#include "changelog-mem-types.h" + +#include "gf-changelog-helpers.h" +#include "changelog-rpc-common.h" + +/** + * Reverse socket: actual data transfer handler. Connection + * initiator is PROBER, data transfer is REBORP. + */ + +struct rpcsvc_program *gf_changelog_reborp_programs[]; + +/** + * On a reverse connection, unlink the socket file. + */ +int +gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata, + rpcsvc_event_t event, void *data) +{ + int ret = 0; + xlator_t *this = NULL; + gf_private_t *priv = NULL; + gf_changelog_t *entry = NULL; + char sock[UNIX_PATH_MAX] = {0,}; + + entry = mydata; + this = entry->this; + priv = this->private; + + switch (event) { + case RPCSVC_EVENT_ACCEPT: + ret = unlink (RPC_SOCK(entry)); + if (ret != 0) + gf_log (this->name, GF_LOG_WARNING, "failed to unlink" + " reverse socket file %s", RPC_SOCK (entry)); + if (entry->connected) + GF_CHANGELOG_INVOKE_CBK (this, entry->connected, + entry->brick, entry->ptr); + break; + case RPCSVC_EVENT_DISCONNECT: + LOCK (&priv->lock); + { + list_del (&entry->list); + } + UNLOCK (&priv->lock); + + if (entry->disconnected) + GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected, + entry->brick, entry->ptr); + + GF_FREE (entry); + break; + default: + break; + } + + return 0; +} + +rpcsvc_t * +gf_changelog_reborp_init_rpc_listner (xlator_t *this, + char *path, char *sock, void *cbkdata) +{ + CHANGELOG_MAKE_TMP_SOCKET_PATH (path, sock, UNIX_PATH_MAX); + return changelog_rpc_server_init (this, sock, cbkdata, + gf_changelog_reborp_rpcsvc_notify, + gf_changelog_reborp_programs); +} + +/** + * This is dirty and painful as of now untill there is event filtering in the + * server. The entire event buffer is scanned and interested events are picked, + * whereas we _should_ be notified with the events we were interested in + * (selected at the time of probe). As of now this is complete BS and needs + * fixture ASAP. I just made it work, it needs to be better. + * + * @FIXME: cleanup this bugger once server filters events. + */ +inline void +gf_changelog_invoke_callback (gf_changelog_t *entry, + struct iovec **vec, int payloadcnt) +{ + int i = 0; + int evsize = 0; + xlator_t *this = NULL; + changelog_event_t *event = NULL; + + this = entry->this; + + for (; i < payloadcnt; i++) { + event = (changelog_event_t *)vec[i]->iov_base; + evsize = vec[i]->iov_len / CHANGELOG_EV_SIZE; + + for (; evsize > 0; evsize--, event++) { + if (gf_changelog_filter_check (entry, event)) { + GF_CHANGELOG_INVOKE_CBK (this, + entry->callback, + entry->brick, + entry->ptr, event); + } + } + } +} + +/** + * Ordered event handler is self-adaptive.. if the event sequence number + * is what's expected (->next_seq) there is no ordering list that's + * maintained. On out-of-order event notifications, event buffers are + * dynamically allocated and ordered. + */ + +inline int +__is_expected_sequence (struct gf_event_list *ev, struct gf_event *event) +{ + return (ev->next_seq == event->seq); +} + +inline int +__can_process_event (struct gf_event_list *ev, struct gf_event **event) +{ + *event = list_first_entry (&ev->events, struct gf_event, list); + + if (__is_expected_sequence (ev, *event)) { + list_del (&(*event)->list); + ev->next_seq++; + return 1; + } + + return 0; +} + +inline void +__process_event_list (struct gf_event_list *ev, struct gf_event **event) +{ + while (list_empty (&ev->events) + || !__can_process_event (ev, event)) + pthread_cond_wait (&ev->cond, &ev->lock); +} + +void * +gf_changelog_callback_invoker (void *arg) +{ + int ret = 0; + xlator_t *this = NULL; + gf_changelog_t *entry = NULL; + struct iovec *vec = NULL; + struct gf_event *event = NULL; + struct gf_event_list *ev = NULL; + + ev = arg; + entry = ev->entry; + this = entry->this; + + while (1) { + pthread_mutex_lock (&ev->lock); + { + __process_event_list (ev, &event); + } + pthread_mutex_unlock (&ev->lock); + + vec = (struct iovec *) &event->iov; + gf_changelog_invoke_callback (entry, &vec, event->count); + + GF_FREE (event); + } + + return NULL; +} + +static int +orderfn (struct list_head *pos1, struct list_head *pos2) +{ + struct gf_event *event1 = NULL; + struct gf_event *event2 = NULL; + + event1 = list_entry (pos1, struct gf_event, list); + event2 = list_entry (pos2, struct gf_event, list); + + if (event1->seq > event2->seq) + return 1; + return -1; +} + +int +gf_changelog_ordered_event_handler (rpcsvc_request_t *req, + xlator_t *this, gf_changelog_t *entry) +{ + int i = 0; + size_t payloadlen = 0; + ssize_t len = 0; + int payloadcnt = 0; + changelog_event_req rpc_req = {0,}; + changelog_event_rsp rpc_rsp = {0,}; + struct iovec *vec = NULL; + struct gf_event *event = NULL; + struct gf_event_list *ev = NULL; + + ev = &entry->event; + + len = xdr_to_generic (req->msg[0], + &rpc_req, (xdrproc_t)xdr_changelog_event_req); + if (len < 0) { + gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed"); + req->rpc_err = GARBAGE_ARGS; + goto handle_xdr_error; + } + + if (len < req->msg[0].iov_len) { + payloadcnt = 1; + payloadlen = (req->msg[0].iov_len - len); + } + for (i = 1; i < req->count; i++) { + payloadcnt++; + payloadlen += req->msg[i].iov_len; + } + + event = GF_CALLOC (1, GF_EVENT_CALLOC_SIZE (payloadcnt, payloadlen), + gf_changelog_mt_libgfchangelog_event_t); + if (!event) + goto handle_xdr_error; + INIT_LIST_HEAD (&event->list); + + payloadlen = 0; + event->seq = rpc_req.seq; + event->count = payloadcnt; + + /* deep copy IO vectors */ + vec = &event->iov[0]; + GF_EVENT_ASSIGN_IOVEC (vec, event, + (req->msg[0].iov_len - len), payloadlen); + (void) memcpy (vec->iov_base, + req->msg[0].iov_base + len, vec->iov_len); + + for (i = 1; i < req->count; i++) { + vec = &event->iov[i]; + GF_EVENT_ASSIGN_IOVEC (vec, event, + req->msg[i].iov_len, payloadlen); + (void) memcpy (event->iov[i].iov_base, + req->msg[i].iov_base, req->msg[i].iov_len); + } + + gf_log (this->name, GF_LOG_DEBUG, + "seq: %lu [%s] (time: %lu.%lu), (vec: %d, len: %ld)", + rpc_req.seq, entry->brick, rpc_req.tv_sec, + rpc_req.tv_usec, payloadcnt, payloadlen); + + /* add it to the ordered event list and wake up listner(s) */ + pthread_mutex_lock (&ev->lock); + { + list_add_order (&event->list, &ev->events, orderfn); + if (!ev->next_seq) + ev->next_seq = event->seq; + if (ev->next_seq == event->seq) + pthread_cond_signal (&ev->cond); + } + pthread_mutex_unlock (&ev->lock); + + /* ack sequence number */ + rpc_rsp.op_ret = 0; + rpc_rsp.seq = rpc_req.seq; + + goto submit_rpc; + + handle_xdr_error: + rpc_rsp.op_ret = -1; + rpc_rsp.seq = 0; /* invalid */ + submit_rpc: + return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL, + (xdrproc_t)xdr_changelog_event_rsp); +} + +int +gf_changelog_unordered_event_handler (rpcsvc_request_t *req, + xlator_t *this, gf_changelog_t *entry) +{ + int i = 0; + int ret = 0; + ssize_t len = 0; + int payloadcnt = 0; + struct iovec vector[MAX_IOVEC] = {{0,}, }; + changelog_event_req rpc_req = {0,}; + changelog_event_rsp rpc_rsp = {0,}; + + len = xdr_to_generic (req->msg[0], + &rpc_req, (xdrproc_t)xdr_changelog_event_req); + if (len < 0) { + gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed"); + req->rpc_err = GARBAGE_ARGS; + goto handle_xdr_error; + } + + /* prepare payload */ + if (len < req->msg[0].iov_len) { + payloadcnt = 1; + vector[0].iov_base = (req->msg[0].iov_base + len); + vector[0].iov_len = (req->msg[0].iov_len - len); + } + + for (i = 1; i < req->count; i++) { + vector[payloadcnt++] = req->msg[i]; + } + + gf_log (this->name, GF_LOG_DEBUG, + "seq: %lu (time: %lu.%lu), (vec: %d)", + rpc_req.seq, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt); + + /* invoke callback */ + struct iovec *vec = (struct iovec *) &vector; + gf_changelog_invoke_callback (entry, &vec, payloadcnt); + + /* ack sequence number */ + rpc_rsp.op_ret = 0; + rpc_rsp.seq = rpc_req.seq; + + goto submit_rpc; + + handle_xdr_error: + rpc_rsp.op_ret = -1; + rpc_rsp.seq = 0; /* invalid */ + submit_rpc: + return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL, + (xdrproc_t)xdr_changelog_event_rsp); +} + +int +gf_changelog_reborp_handle_event (rpcsvc_request_t *req) +{ + int ret = 0; + xlator_t *this = NULL; + rpcsvc_t *svc = NULL; + gf_changelog_t *entry = NULL; + + svc = rpcsvc_request_service (req); + entry = svc->mydata; + + this = THIS = entry->this; + + ret = GF_NEED_ORDERED_EVENTS (entry) + ? gf_changelog_ordered_event_handler (req, this, entry) + : gf_changelog_unordered_event_handler (req, this, entry); + + return ret; +} + +rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = { + [CHANGELOG_REV_PROC_EVENT] = { + "CHANGELOG EVENT HANDLER", CHANGELOG_REV_PROC_EVENT, + gf_changelog_reborp_handle_event, NULL, 0, DRC_NA + }, +}; + +/** + * Do not use synctask as the RPC layer dereferences ->mydata as THIS. + * In gf_changelog_setup_rpc(), @cbkdata is of type @gf_changelog_t, + * and that's required to invoke the callback with the appropriate + * brick path and it's private data. + */ +struct rpcsvc_program gf_changelog_reborp_prog = { + .progname = "LIBGFCHANGELOG REBORP", + .prognum = CHANGELOG_REV_RPC_PROCNUM, + .progver = CHANGELOG_REV_RPC_PROCVER, + .numactors = CHANGELOG_REV_PROC_MAX, + .actors = gf_changelog_reborp_actors, + .synctask = _gf_false, +}; + +struct rpcsvc_program *gf_changelog_reborp_programs[] = { + &gf_changelog_reborp_prog, + NULL, +}; diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.c b/xlators/features/changelog/lib/src/gf-changelog-rpc.c new file mode 100644 index 00000000000..c2a4c044d23 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.c @@ -0,0 +1,105 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "gf-changelog-rpc.h" +#include "changelog-misc.h" +#include "changelog-mem-types.h" + +struct rpc_clnt_program gf_changelog_clnt; + +/* TODO: piggyback reconnect to called (upcall) */ +int +gf_changelog_rpc_notify (struct rpc_clnt *rpc, + void *mydata, rpc_clnt_event_t event, void *data) +{ + xlator_t *this = NULL; + + this = mydata; + + switch (event) { + case RPC_CLNT_CONNECT: + rpc_clnt_set_connected (&rpc->conn); + break; + case RPC_CLNT_DISCONNECT: + rpc_clnt_unset_connected (&rpc->conn); + break; + case RPC_CLNT_MSG: + case RPC_CLNT_DESTROY: + break; + } + + return 0; +} + +struct rpc_clnt * +gf_changelog_rpc_init (xlator_t *this, gf_changelog_t *entry) +{ + char sockfile[UNIX_PATH_MAX] = {0,}; + + CHANGELOG_MAKE_SOCKET_PATH (entry->brick, + sockfile, UNIX_PATH_MAX); + return changelog_rpc_client_init (this, entry, + sockfile, gf_changelog_rpc_notify); +} + +/** + * remote procedure calls declarations. + */ + +int +gf_probe_changelog_cbk (struct rpc_req *req, + struct iovec *iovec, int count, void *myframe) +{ + return 0; +} + +int +gf_probe_changelog_filter (call_frame_t *frame, xlator_t *this, void *data) +{ + int ret = 0; + char *sock = NULL; + gf_changelog_t *entry = NULL; + changelog_probe_req req = {0,}; + + entry = data; + sock = RPC_SOCK (entry); + + (void) memcpy (&req.sock, sock, strlen (sock)); + req.filter = entry->notify; + + /* invoke RPC */ + return changelog_rpc_sumbit_req (RPC_PROBER (entry), (void *) &req, + frame, &gf_changelog_clnt, + CHANGELOG_RPC_PROBE_FILTER, NULL, 0, + NULL, this, gf_probe_changelog_cbk, + (xdrproc_t) xdr_changelog_probe_req); +} + +int +gf_changelog_invoke_rpc (xlator_t *this, gf_changelog_t *entry, int procidx) +{ + return changelog_invoke_rpc (this, RPC_PROBER (entry), + &gf_changelog_clnt, procidx, entry); +} + +struct rpc_clnt_procedure gf_changelog_procs[CHANGELOG_RPC_PROC_MAX] = { + [CHANGELOG_RPC_PROC_NULL] = {"NULL", NULL}, + [CHANGELOG_RPC_PROBE_FILTER] = { + "PROBE FILTER", gf_probe_changelog_filter + }, +}; + +struct rpc_clnt_program gf_changelog_clnt = { + .progname = "LIBGFCHANGELOG", + .prognum = CHANGELOG_RPC_PROGNUM, + .progver = CHANGELOG_RPC_PROGVER, + .numproc = CHANGELOG_RPC_PROC_MAX, + .proctable = gf_changelog_procs, +}; diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.h b/xlators/features/changelog/lib/src/gf-changelog-rpc.h new file mode 100644 index 00000000000..1c982eef809 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.h @@ -0,0 +1,26 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef __GF_CHANGELOG_RPC_H +#define __GF_CHANGELOG_RPC_H + +#include "xlator.h" + +#include "gf-changelog-helpers.h" +#include "changelog-rpc-common.h" + +struct rpc_clnt *gf_changelog_rpc_init (xlator_t *, gf_changelog_t *); + +int gf_changelog_invoke_rpc (xlator_t *, gf_changelog_t *, int); + +rpcsvc_t * +gf_changelog_reborp_init_rpc_listner (xlator_t *, char *, char *, void *); + +#endif diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c index fb2d9037ffb..8f33eb01013 100644 --- a/xlators/features/changelog/lib/src/gf-changelog.c +++ b/xlators/features/changelog/lib/src/gf-changelog.c @@ -14,6 +14,8 @@ #include <sys/types.h> #include <sys/socket.h> #include <sys/un.h> +#include <sys/time.h> +#include <sys/resource.h> #ifndef _GNU_SOURCE #define _GNU_SOURCE @@ -24,589 +26,523 @@ #include "glusterfs.h" #include "logging.h" #include "defaults.h" +#include "syncop.h" +#include "gf-changelog-rpc.h" #include "gf-changelog-helpers.h" /* from the changelog translator */ #include "changelog-misc.h" #include "changelog-mem-types.h" -int byebye = 0; +/** + * Global singleton xlator pointer for the library, initialized + * during library load. This should probably be hidden inside + * an initialized object which is an handle for the consumer. + * + * TODO: do away with the global.. + */ +xlator_t *master = NULL; -static void -gf_changelog_cleanup (gf_changelog_t *gfc) +static inline +gf_private_t *gf_changelog_alloc_priv () { - /* socket */ - if (gfc->gfc_sockfd != -1) - close (gfc->gfc_sockfd); - /* tracker fd */ - if (gfc->gfc_fd != -1) - close (gfc->gfc_fd); - /* processing dir */ - if (gfc->gfc_dir) - closedir (gfc->gfc_dir); - - if (gfc->gfc_working_dir) - free (gfc->gfc_working_dir); /* allocated by realpath */ -} + int ret = 0; + gf_private_t *priv = NULL; -void -__attribute__ ((constructor)) gf_changelog_ctor (void) -{ - glusterfs_ctx_t *ctx = NULL; + priv = calloc (1, sizeof (gf_private_t)); + if (!priv) + goto error_return; + INIT_LIST_HEAD (&priv->connections); - ctx = glusterfs_ctx_new (); - if (!ctx) - return; + ret = LOCK_INIT (&priv->lock); + if (ret != 0) + goto free_priv; + priv->api = NULL; - if (glusterfs_globals_init (ctx)) { - free (ctx); - ctx = NULL; - return; - } + return priv; - THIS->ctx = ctx; - if (xlator_mem_acct_init (THIS, gf_changelog_mt_end)) - return; + free_priv: + free (priv); + error_return: + return NULL; } -void -__attribute__ ((destructor)) gf_changelog_dtor (void) -{ - xlator_t *this = NULL; - glusterfs_ctx_t *ctx = NULL; - gf_changelog_t *gfc = NULL; +#define GF_CHANGELOG_EVENT_POOL_SIZE 16384 +#define GF_CHANGELOG_EVENT_THREAD_COUNT 4 - this = THIS; - if (!this) - return; - - ctx = this->ctx; - gfc = this->private; - - if (gfc) { - if (gfc->hist_gfc) { - gf_changelog_cleanup(gfc->hist_gfc); - GF_FREE (gfc->hist_gfc); - } - gf_changelog_cleanup (gfc); - GF_FREE (gfc); +static int +gf_changelog_ctx_defaults_init (glusterfs_ctx_t *ctx) +{ + cmd_args_t *cmd_args = NULL; + struct rlimit lim = {0, }; + call_pool_t *pool = NULL; + int ret = -1; + + ret = xlator_mem_acct_init (THIS, gf_changelog_mt_end); + if (ret != 0) { + return ret; } - if (ctx) { - pthread_mutex_destroy (&ctx->lock); - free (ctx); - ctx = NULL; - } -} + ctx->process_uuid = generate_glusterfs_ctx_id (); + if (!ctx->process_uuid) + return -1; + ctx->page_size = 128 * GF_UNIT_KB; -static int -gf_changelog_open_dirs (gf_changelog_t *gfc) -{ - int ret = -1; - DIR *dir = NULL; - int tracker_fd = 0; - char tracker_path[PATH_MAX] = {0,}; - xlator_t *this = NULL; + ctx->iobuf_pool = iobuf_pool_new (); + if (!ctx->iobuf_pool) + return -1; - this = THIS; - GF_ASSERT (this); + ctx->event_pool = event_pool_new (GF_CHANGELOG_EVENT_POOL_SIZE, + GF_CHANGELOG_EVENT_THREAD_COUNT); + if (!ctx->event_pool) + return -1; - (void) snprintf (gfc->gfc_current_dir, PATH_MAX, - "%s/"GF_CHANGELOG_CURRENT_DIR"/", - gfc->gfc_working_dir); + pool = GF_CALLOC (1, sizeof (call_pool_t), + gf_changelog_mt_libgfchangelog_call_pool_t); + if (!pool) + return -1; - ret = recursive_rmdir (gfc->gfc_current_dir); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "Failed to rmdir: %s, err: %s", - gfc->gfc_current_dir, strerror (errno)); - goto out; - } + /* frame_mem_pool size 112 * 64 */ + pool->frame_mem_pool = mem_pool_new (call_frame_t, 32); + if (!pool->frame_mem_pool) + return -1; - ret = mkdir_p (gfc->gfc_current_dir, 0600, _gf_false); - if (ret) - goto out; + /* stack_mem_pool size 256 * 128 */ + pool->stack_mem_pool = mem_pool_new (call_stack_t, 16); - (void) snprintf (gfc->gfc_processed_dir, PATH_MAX, - "%s/"GF_CHANGELOG_PROCESSED_DIR"/", - gfc->gfc_working_dir); + if (!pool->stack_mem_pool) + return -1; - ret = mkdir_p (gfc->gfc_processed_dir, 0600, _gf_false); - if (ret) - goto out; + ctx->stub_mem_pool = mem_pool_new (call_stub_t, 16); + if (!ctx->stub_mem_pool) + return -1; - (void) snprintf (gfc->gfc_processing_dir, PATH_MAX, - "%s/"GF_CHANGELOG_PROCESSING_DIR"/", - gfc->gfc_working_dir); + ctx->dict_pool = mem_pool_new (dict_t, 32); + if (!ctx->dict_pool) + return -1; - ret = recursive_rmdir (gfc->gfc_processing_dir); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "Failed to rmdir: %s, err: %s", - gfc->gfc_processing_dir, strerror (errno)); - goto out; - } + ctx->dict_pair_pool = mem_pool_new (data_pair_t, 512); + if (!ctx->dict_pair_pool) + return -1; - ret = mkdir_p (gfc->gfc_processing_dir, 0600, _gf_false); - if (ret) - goto out; + ctx->dict_data_pool = mem_pool_new (data_t, 512); + if (!ctx->dict_data_pool) + return -1; - dir = opendir (gfc->gfc_processing_dir); - if (!dir) { - gf_log ("", GF_LOG_ERROR, - "opendir() error [reason: %s]", strerror (errno)); - goto out; - } + INIT_LIST_HEAD (&pool->all_frames); + LOCK_INIT (&pool->lock); + ctx->pool = pool; - gfc->gfc_dir = dir; + pthread_mutex_init (&(ctx->lock), NULL); - (void) snprintf (tracker_path, PATH_MAX, - "%s/"GF_CHANGELOG_TRACKER, gfc->gfc_working_dir); + cmd_args = &ctx->cmd_args; - tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR, - S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); - if (tracker_fd < 0) { - closedir (gfc->gfc_dir); - ret = -1; - goto out; - } + INIT_LIST_HEAD (&cmd_args->xlator_options); + + lim.rlim_cur = RLIM_INFINITY; + lim.rlim_max = RLIM_INFINITY; + setrlimit (RLIMIT_CORE, &lim); - gfc->gfc_fd = tracker_fd; - ret = 0; - out: - return ret; + return 0; } -int -gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc) +/* TODO: cleanup ctx defaults */ +static void +gf_changelog_cleanup_this (xlator_t *this) { - int ret = 0; - int len = 0; - int tries = 0; - int sockfd = 0; - struct sockaddr_un remote; - - this = gfc->this; + glusterfs_ctx_t *ctx = NULL; - if (gfc->gfc_sockfd != -1) { - gf_log (this->name, GF_LOG_INFO, - "Reconnecting..."); - close (gfc->gfc_sockfd); - } + if (!this) + return; - sockfd = socket (AF_UNIX, SOCK_STREAM, 0); - if (sockfd < 0) { - ret = -1; - goto out; - } + ctx = this->ctx; + syncenv_destroy (ctx->env); + free (ctx); - CHANGELOG_MAKE_SOCKET_PATH (gfc->gfc_brickpath, - gfc->gfc_sockpath, UNIX_PATH_MAX); - gf_log (this->name, GF_LOG_INFO, - "connecting to changelog socket: %s (brick: %s)", - gfc->gfc_sockpath, gfc->gfc_brickpath); + if (this->private) + free (this->private); - remote.sun_family = AF_UNIX; - strcpy (remote.sun_path, gfc->gfc_sockpath); + this->private = NULL; + this->ctx = NULL; +} - len = strlen (remote.sun_path) + sizeof (remote.sun_family); +static int +gf_changelog_init_this () +{ + glusterfs_ctx_t *ctx = NULL; - while (tries < gfc->gfc_connretries) { - gf_log (this->name, GF_LOG_WARNING, - "connection attempt %d/%d...", - tries + 1, gfc->gfc_connretries); + ctx = glusterfs_ctx_new (); + if (!ctx) + goto error_return; - /* initiate a connect */ - if (connect (sockfd, (struct sockaddr *) &remote, len) == 0) { - gfc->gfc_sockfd = sockfd; - break; - } + if (glusterfs_globals_init (ctx)) + goto free_ctx; - tries++; - sleep (2); - } + THIS->ctx = ctx; + if (gf_changelog_ctx_defaults_init (ctx)) + goto free_ctx; - if (tries == gfc->gfc_connretries) { - gf_log (this->name, GF_LOG_ERROR, - "could not connect to changelog socket!" - " bailing out..."); - close (sockfd); - ret = -1; - } else - gf_log (this->name, GF_LOG_INFO, - "connection successful"); + ctx->env = syncenv_new (0, 0, 0); + if (!ctx->env) + goto free_ctx; + return 0; - out: - return ret; + free_ctx: + free (ctx); + THIS->ctx = NULL; + error_return: + return -1; } -int -gf_changelog_done (char *file) +static int +gf_changelog_init_master () { - int ret = -1; - char *buffer = NULL; - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - char to_path[PATH_MAX] = {0,}; - - errno = EINVAL; - - this = THIS; - if (!this) - goto out; - - gfc = (gf_changelog_t *) this->private; - if (!gfc) - goto out; + int ret = 0; + gf_private_t *priv = NULL; + glusterfs_ctx_t *ctx = NULL; - if (!file || !strlen (file)) - goto out; + ret = gf_changelog_init_this (); + if (ret != 0) + goto error_return; + master = THIS; + + priv = gf_changelog_alloc_priv (); + if (!priv) + goto cleanup_master; + master->private = priv; + + /* poller thread */ + ret = pthread_create (&priv->poller, + NULL, changelog_rpc_poller, master); + if (ret != 0) { + gf_log (master->name, GF_LOG_ERROR, + "failed to spawn poller thread"); + goto cleanup_master; + } - /* make sure 'file' is inside ->gfc_working_dir */ - buffer = realpath (file, NULL); - if (!buffer) - goto out; + return 0; - if (strncmp (gfc->gfc_working_dir, - buffer, strlen (gfc->gfc_working_dir))) - goto out; + cleanup_master: + master->private = NULL; + gf_changelog_cleanup_this (master); + error_return: + return -1; +} - (void) snprintf (to_path, PATH_MAX, "%s%s", - gfc->gfc_processed_dir, basename (buffer)); - gf_log (this->name, GF_LOG_DEBUG, - "moving %s to processed directory", file); - ret = rename (buffer, to_path); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "cannot move %s to %s (reason: %s)", - file, to_path, strerror (errno)); - goto out; - } +/* ctor/dtor */ - ret = 0; +void +__attribute__ ((constructor)) gf_changelog_ctor (void) +{ + (void) gf_changelog_init_master (); +} - out: - if (buffer) - free (buffer); /* allocated by realpath() */ - return ret; +void +__attribute__ ((destructor)) gf_changelog_dtor (void) +{ + gf_changelog_cleanup_this (master); } -/** - * @API - * for a set of changelogs, start from the beginning - */ +/* TODO: cleanup clnt/svc on failure */ int -gf_changelog_start_fresh () +gf_changelog_setup_rpc (xlator_t *this, + gf_changelog_t *entry, int proc) { - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - - this = THIS; - if (!this) - goto out; + int ret = 0; + rpcsvc_t *svc = NULL; + struct rpc_clnt *rpc = NULL; - errno = EINVAL; + /** + * Initialize a connect back socket. A probe() RPC call to the server + * triggers a reverse connect. + */ + svc = gf_changelog_reborp_init_rpc_listner (this, entry->brick, + RPC_SOCK (entry), entry); + if (!svc) + goto error_return; + RPC_REBORP (entry) = svc; + + /* Initialize an RPC client */ + rpc = gf_changelog_rpc_init (this, entry); + if (!rpc) + goto error_return; + RPC_PROBER (entry) = rpc; - gfc = (gf_changelog_t *) this->private; - if (!gfc) - goto out; + /** + * @FIXME + * till we have connection state machine, let's delay the RPC call + * for now.. + */ + sleep (2); - if (gf_ftruncate (gfc->gfc_fd, 0)) - goto out; + /** + * Probe changelog translator for reverse connection. After a successful + * call, there's less use of the client and can be disconnected, but + * let's leave the connection active for any future RPC calls. + */ + ret = gf_changelog_invoke_rpc (this, entry, proc); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Could not initiate probe RPC, bailing out!!!"); + goto error_return; + } return 0; - out: + error_return: return -1; } -/** - * @API - * return the next changelog file entry. zero means all chanelogs - * consumed. - */ -ssize_t -gf_changelog_next_change (char *bufptr, size_t maxlen) +static void +gf_cleanup_event (gf_changelog_t *entry) { - ssize_t size = -1; - int tracker_fd = 0; - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - char buffer[PATH_MAX] = {0,}; - - if (maxlen > PATH_MAX) { - errno = ENAMETOOLONG; - goto out; - } + xlator_t *this = NULL; + struct gf_event_list *ev = NULL; - errno = EINVAL; + this = entry->this; + ev = &entry->event; - this = THIS; - if (!this) - goto out; + (void) gf_thread_cleanup (this, ev->invoker); - gfc = (gf_changelog_t *) this->private; - if (!gfc) - goto out; + (void) pthread_mutex_destroy (&ev->lock); + (void) pthread_cond_destroy (&ev->cond); - tracker_fd = gfc->gfc_fd; + ev->entry = NULL; +} - size = gf_readline (tracker_fd, buffer, maxlen); - if (size < 0) { - size = -1; - goto out; +static int +gf_init_event (gf_changelog_t *entry) +{ + int ret = 0; + struct gf_event_list *ev = NULL; + + ev = &entry->event; + ev->entry = entry; + + ret = pthread_mutex_init (&ev->lock, NULL); + if (ret != 0) + goto error_return; + ret = pthread_cond_init (&ev->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + INIT_LIST_HEAD (&ev->events); + + ev->next_seq = 0; /* bootstrap sequencing */ + + if (entry->ordered) { + ret = pthread_create (&ev->invoker, NULL, + gf_changelog_callback_invoker, ev); + if (ret != 0) + goto cleanup_cond; } - if (size == 0) - goto out; - - memcpy (bufptr, buffer, size - 1); - bufptr[size - 1] = '\0'; + return 0; -out: - return size; + cleanup_cond: + (void) pthread_cond_destroy (&ev->cond); + cleanup_mutex: + (void) pthread_mutex_destroy (&ev->lock); + error_return: + return -1; } /** - * @API - * gf_changelog_scan() - scan and generate a list of change entries - * - * calling this api multiple times (without calling gf_changlog_done()) - * would result new changelogs(s) being refreshed in the tracker file. - * This call also acts as a cancellation point for the consumer. + * TODO: + * - cleanup invoker thread (if ordered mode) + * - cleanup event list + * - destroy rpc{-clnt, svc} */ -ssize_t -gf_changelog_scan () +int +gf_cleanup_brick_connection (xlator_t *this, gf_changelog_t *entry) { - int ret = 0; - int tracker_fd = 0; - size_t len = 0; - size_t off = 0; - xlator_t *this = NULL; - size_t nr_entries = 0; - gf_changelog_t *gfc = NULL; - struct dirent *entryp = NULL; - struct dirent *result = NULL; - char buffer[PATH_MAX] = {0,}; - - this = THIS; - if (!this) - goto out; + return 0; +} - gfc = (gf_changelog_t *) this->private; - if (!gfc) - goto out; +int +gf_cleanup_connections (xlator_t *this) +{ + return 0; +} - /** - * do we need to protect 'byebye' with locks? worst, the - * consumer would get notified during next scan(). - */ - if (byebye) { - errno = ECONNREFUSED; - goto out; - } +static int +gf_setup_brick_connection (xlator_t *this, + struct gf_brick_spec *brick, + gf_boolean_t ordered, void *xl) +{ + int ret = 0; + gf_private_t *priv = NULL; + gf_changelog_t *entry = NULL; - errno = EINVAL; + priv = this->private; - tracker_fd = gfc->gfc_fd; + if (!brick->callback || !brick->init || !brick->fini) + goto error_return; - if (gf_ftruncate (tracker_fd, 0)) - goto out; + entry = GF_CALLOC (1, sizeof (*entry), + gf_changelog_mt_libgfchangelog_t); + if (!entry) + goto error_return; + INIT_LIST_HEAD (&entry->list); - len = offsetof(struct dirent, d_name) - + pathconf(gfc->gfc_processing_dir, _PC_NAME_MAX) + 1; - entryp = GF_CALLOC (1, len, - gf_changelog_mt_libgfchangelog_dirent_t); - if (!entryp) - goto out; + entry->notify = brick->filter; + (void) strncpy (entry->brick, brick->brick_path, PATH_MAX); - rewinddir (gfc->gfc_dir); - while (1) { - ret = readdir_r (gfc->gfc_dir, entryp, &result); - if (ret || !result) - break; + entry->this = this; + entry->invokerxl = xl; - if ( !strcmp (basename (entryp->d_name), ".") - || !strcmp (basename (entryp->d_name), "..") ) - continue; + entry->ordered = ordered; + if (ordered) { + ret = gf_init_event (entry); + if (ret) + goto free_entry; + } - nr_entries++; + entry->fini = brick->fini; + entry->callback = brick->callback; + entry->connected = brick->connected; + entry->disconnected = brick->disconnected; - GF_CHANGELOG_FILL_BUFFER (gfc->gfc_processing_dir, - buffer, off, - strlen (gfc->gfc_processing_dir)); - GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer, - off, strlen (entryp->d_name)); - GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1); + entry->ptr = brick->init (this, brick); + if (!entry->ptr) + goto cleanup_event; + priv->api = entry->ptr; /* pointer to API, if required */ - if (gf_changelog_write (tracker_fd, buffer, off) != off) { - gf_log (this->name, GF_LOG_ERROR, - "error writing changelog filename" - " to tracker file"); - break; - } - off = 0; + LOCK (&priv->lock); + { + list_add_tail (&entry->list, &priv->connections); } + UNLOCK (&priv->lock); - GF_FREE (entryp); + ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER); + if (ret) + goto cleanup_event; + return 0; - if (!result) { - if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) - return nr_entries; - } - out: + cleanup_event: + if (ordered) + gf_cleanup_event (entry); + free_entry: + list_del (&entry->list); /* FIXME: kludge for now */ + GF_FREE (entry); + error_return: return -1; } -/** - * @API - * gf_changelog_register() - register a client for updates. - */ int -gf_changelog_register (char *brick_path, char *scratch_dir, - char *log_file, int log_level, int max_reconnects) +gf_changelog_register_brick (xlator_t *this, + struct gf_brick_spec *brick, + gf_boolean_t ordered, void *xl) { - int i = 0; - int ret = -1; - int errn = 0; - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - char hist_scratch_dir[PATH_MAX] = {0,}; - struct stat buf = {0,}; - - this = THIS; - if (!this->ctx) - goto out; - - errno = ENOMEM; - - gfc = GF_CALLOC (1, sizeof (*gfc), - gf_changelog_mt_libgfchangelog_t); - if (!gfc) - goto out; - - gfc->this = this; - - gfc->gfc_dir = NULL; - gfc->gfc_fd = gfc->gfc_sockfd = -1; - - if (stat (scratch_dir, &buf) && errno == ENOENT) { - ret = mkdir_p (scratch_dir, 0600, _gf_true); - if (ret) { - errn = errno; - goto cleanup; - } - } - - gfc->gfc_working_dir = realpath (scratch_dir, NULL); - if (!gfc->gfc_working_dir) { - errn = errno; - goto cleanup; - } + return gf_setup_brick_connection (this, brick, ordered, xl); +} - /* Begin: Changes for History API */ - gfc->hist_gfc = NULL; +static int +gf_changelog_setup_logging (xlator_t *this, char *logfile, int loglevel) +{ + /* passing ident as NULL means to use default ident for syslog */ + if (gf_log_init (this->ctx, logfile, NULL)) + return -1; - gfc->hist_gfc = GF_CALLOC (1, sizeof (*gfc), - gf_changelog_mt_libgfchangelog_t); - if (!gfc->hist_gfc) - goto cleanup; + gf_log_set_loglevel ((loglevel == -1) ? GF_LOG_INFO : + loglevel); + return 0; +} - gfc->hist_gfc->gfc_dir = NULL; - gfc->hist_gfc->gfc_fd = gfc->hist_gfc->gfc_sockfd = -1; - gfc->hist_gfc->this = NULL; +int +gf_changelog_register_generic (struct gf_brick_spec *bricks, int count, + int ordered, char *logfile, int lvl, void *xl) +{ + int ret = 0; + xlator_t *this = NULL; + xlator_t *old_this = NULL; + struct gf_brick_spec *brick = NULL; + gf_boolean_t need_order = _gf_false; - (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX); - (void) snprintf (hist_scratch_dir, PATH_MAX, - "%s/"GF_CHANGELOG_HISTORY_DIR"/", - gfc->gfc_working_dir); + SAVE_THIS (xl); - ret = mkdir_p (hist_scratch_dir, 0600, _gf_false); - if (ret) { - errn = errno; - goto cleanup; - } + this = THIS; + if (!this) + goto error_return; - gfc->hist_gfc->gfc_working_dir = realpath (hist_scratch_dir, NULL); - if (!gfc->hist_gfc->gfc_working_dir) { - errn = errno; - goto cleanup; - } + ret = gf_changelog_setup_logging (this, logfile, lvl); + if (ret) + goto error_return; - ret = gf_changelog_open_dirs (gfc->hist_gfc); - if (ret) { - errn = errno; - gf_log (this->name, GF_LOG_ERROR, - "could not create entries in history scratch dir"); - goto cleanup; - } + need_order = (ordered) ? _gf_true : _gf_false; - (void) strncpy (gfc->hist_gfc->gfc_brickpath, brick_path, PATH_MAX); + brick = bricks; + while (count--) { + gf_log (this->name, GF_LOG_INFO, + "Registering brick: %s [notify filter: %d]", + brick->brick_path, brick->filter); - for (i=0; i < 256; i++) { - gfc->hist_gfc->rfc3986[i] = - (isalnum(i) || i == '~' || - i == '-' || i == '.' || i == '_') ? i : 0; - } - /* End: Changes for History API*/ + ret = gf_changelog_register_brick (this, brick, need_order, xl); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "Error registering with changelog xlator"); + break; + } - ret = gf_changelog_open_dirs (gfc); - if (ret) { - errn = errno; - gf_log (this->name, GF_LOG_ERROR, - "could not create entries in scratch dir"); - goto cleanup; + brick++; } - /* passing ident as NULL means to use default ident for syslog */ - if (gf_log_init (this->ctx, log_file, NULL)) - goto cleanup; - - gf_log_set_loglevel ((log_level == -1) ? GF_LOG_INFO : - log_level); + if (ret != 0) + goto cleanup_inited_bricks; - gfc->gfc_connretries = (max_reconnects <= 0) ? 1 : max_reconnects; - (void) strncpy (gfc->gfc_brickpath, brick_path, PATH_MAX); + RESTORE_THIS(); + return 0; - ret = gf_changelog_notification_init (this, gfc); - if (ret) { - errn = errno; - goto cleanup; - } + cleanup_inited_bricks: + gf_cleanup_connections (this); + error_return: + RESTORE_THIS(); + return -1; +} - ret = gf_thread_create (&gfc->gfc_changelog_processor, - NULL, gf_changelog_process, gfc); - if (ret) { - errn = errno; - gf_log (this->name, GF_LOG_ERROR, - "error creating changelog processor thread" - " new changes won't be recorded!!!"); - goto cleanup; - } +/** + * @API + * gf_changelog_register() + * + * This is _NOT_ a generic register API. It's a special API to handle + * updates at a journal granulality. This is used by consumers wanting + * to process persistent journal such as geo-replication via a set of + * APIs. All of this is required to maintain backward compatibility. + * Owner specific private data is stored in ->api (in gf_private_t), + * which is used by APIs to access it's private data. This limits + * the library access to a single brick, but that's how it used to + * be anyway. Furthermore, this API solely _owns_ "this", therefore + * callers already having a notion of "this" are expected to use the + * newer API. + * + * Newer applications wanting to use this library need not face this + * limitation and reply of the much more feature rich generic register + * API, which is purely callback based. + * + * NOTE: @max_reconnects is not used but required for backward compat. + * + * For generic API, refer gf_changelog_register_generic(). + */ +int +gf_changelog_register (char *brick_path, char *scratch_dir, + char *log_file, int log_level, int max_reconnects) +{ + struct gf_brick_spec brick = {0,}; - for (i=0; i < 256; i++) { - gfc->rfc3986[i] = - (isalnum(i) || i == '~' || - i == '-' || i == '.' || i == '_') ? i : 0; - } + THIS = master; - ret = 0; - this->private = gfc; + brick.brick_path = brick_path; + brick.filter = CHANGELOG_OP_TYPE_JOURNAL; - goto out; + brick.init = gf_changelog_journal_init; + brick.fini = gf_changelog_journal_fini; + brick.callback = gf_changelog_handle_journal; + brick.connected = gf_changelog_journal_connect; + brick.disconnected = gf_changelog_journal_disconnect; - cleanup: - if (gfc->hist_gfc) { - gf_changelog_cleanup (gfc->hist_gfc); - GF_FREE (gfc->hist_gfc); - } - gf_changelog_cleanup (gfc); - GF_FREE (gfc); - this->private = NULL; - errno = errn; + brick.ptr = scratch_dir; - out: - return ret; + return gf_changelog_register_generic (&brick, 1, 1, + log_file, log_level, NULL); } diff --git a/xlators/features/changelog/lib/src/gf-history-changelog.c b/xlators/features/changelog/lib/src/gf-history-changelog.c index 8a527dd6e4b..12f51da8fa2 100644 --- a/xlators/features/changelog/lib/src/gf-history-changelog.c +++ b/xlators/features/changelog/lib/src/gf-history-changelog.c @@ -14,6 +14,7 @@ #include "syscall.h" #include "gf-changelog-helpers.h" +#include "gf-changelog-journal.h" /* from the changelog translator */ #include "changelog-misc.h" @@ -36,12 +37,12 @@ int gf_history_changelog_done (char *file) { - int ret = -1; - char *buffer = NULL; - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - gf_changelog_t *hist_gfc = NULL; - char to_path[PATH_MAX] = {0,}; + int ret = -1; + char *buffer = NULL; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; + char to_path[PATH_MAX] = {0,}; errno = EINVAL; @@ -49,28 +50,28 @@ gf_history_changelog_done (char *file) if (!this) goto out; - gfc = (gf_changelog_t *) this->private; - if (!gfc) + jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); + if (!jnl) goto out; - hist_gfc = gfc->hist_gfc; - if (!hist_gfc) + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) goto out; if (!file || !strlen (file)) goto out; - /* make sure 'file' is inside ->gfc_working_dir */ + /* make sure 'file' is inside ->jnl_working_dir */ buffer = realpath (file, NULL); if (!buffer) goto out; - if (strncmp (hist_gfc->gfc_working_dir, - buffer, strlen (hist_gfc->gfc_working_dir))) + if (strncmp (hist_jnl->jnl_working_dir, + buffer, strlen (hist_jnl->jnl_working_dir))) goto out; (void) snprintf (to_path, PATH_MAX, "%s%s", - hist_gfc->gfc_processed_dir, basename (buffer)); + hist_jnl->jnl_processed_dir, basename (buffer)); gf_log (this->name, GF_LOG_DEBUG, "moving %s to processed directory", file); ret = rename (buffer, to_path); @@ -102,9 +103,9 @@ gf_history_changelog_done (char *file) int gf_history_changelog_start_fresh () { - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - gf_changelog_t *hist_gfc = NULL; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; this = THIS; if (!this) @@ -112,15 +113,15 @@ gf_history_changelog_start_fresh () errno = EINVAL; - gfc = (gf_changelog_t *) this->private; - if (!gfc) + jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); + if (!jnl) goto out; - hist_gfc = gfc->hist_gfc; - if (!hist_gfc) + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) goto out; - if (gf_ftruncate (hist_gfc->gfc_fd, 0)) + if (gf_ftruncate (hist_jnl->jnl_fd, 0)) goto out; return 0; @@ -147,12 +148,17 @@ gf_history_changelog_start_fresh () ssize_t gf_history_changelog_next_change (char *bufptr, size_t maxlen) { - ssize_t size = -1; - int tracker_fd = 0; - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - gf_changelog_t *hist_gfc = NULL; - char buffer[PATH_MAX] = {0,}; + ssize_t size = -1; + int tracker_fd = 0; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; + char buffer[PATH_MAX] = {0,}; + + if (maxlen > PATH_MAX) { + errno = ENAMETOOLONG; + goto out; + } errno = EINVAL; @@ -160,15 +166,15 @@ gf_history_changelog_next_change (char *bufptr, size_t maxlen) if (!this) goto out; - gfc = (gf_changelog_t *) this->private; - if (!gfc) + jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); + if (!jnl) goto out; - hist_gfc = gfc->hist_gfc; - if (!hist_gfc) + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) goto out; - tracker_fd = hist_gfc->gfc_fd; + tracker_fd = hist_jnl->jnl_fd; size = gf_readline (tracker_fd, buffer, maxlen); if (size < 0) { @@ -206,56 +212,60 @@ out: ssize_t gf_history_changelog_scan () { - int ret = 0; - int tracker_fd = 0; - size_t len = 0; - size_t off = 0; - xlator_t *this = NULL; - size_t nr_entries = 0; - gf_changelog_t *gfc = NULL; - gf_changelog_t *hist_gfc = NULL; - struct dirent *entryp = NULL; - struct dirent *result = NULL; - char buffer[PATH_MAX] = {0,}; - static int is_last_scan = 0; + int ret = 0; + int tracker_fd = 0; + size_t len = 0; + size_t off = 0; + xlator_t *this = NULL; + size_t nr_entries = 0; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; + struct dirent *entryp = NULL; + struct dirent *result = NULL; + char buffer[PATH_MAX] = {0,}; + static int is_last_scan; this = THIS; if (!this) goto out; - gfc = (gf_changelog_t *) this->private; - if (!gfc) + jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); + if (!jnl) goto out; + if (JNL_IS_API_DISCONNECTED (jnl)) { + errno = ENOTCONN; + goto out; + } - hist_gfc = gfc->hist_gfc; - if (!hist_gfc) + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) goto out; retry: if (is_last_scan == 1) return 0; - if (hist_gfc->hist_done == 0) + if (hist_jnl->hist_done == 0) is_last_scan = 1; errno = EINVAL; - if (hist_gfc->hist_done == -1) + if (hist_jnl->hist_done == -1) goto out; - tracker_fd = hist_gfc->gfc_fd; + tracker_fd = hist_jnl->jnl_fd; if (gf_ftruncate (tracker_fd, 0)) goto out; len = offsetof (struct dirent, d_name) - + pathconf (hist_gfc->gfc_processing_dir, _PC_NAME_MAX) + 1; + + pathconf (hist_jnl->jnl_processing_dir, _PC_NAME_MAX) + 1; entryp = GF_CALLOC (1, len, gf_changelog_mt_libgfchangelog_dirent_t); if (!entryp) goto out; - rewinddir (hist_gfc->gfc_dir); + rewinddir (hist_jnl->jnl_dir); while (1) { - ret = readdir_r (hist_gfc->gfc_dir, entryp, &result); + ret = readdir_r (hist_jnl->jnl_dir, entryp, &result); if (ret || !result) break; @@ -265,9 +275,9 @@ gf_history_changelog_scan () nr_entries++; - GF_CHANGELOG_FILL_BUFFER (hist_gfc->gfc_processing_dir, + GF_CHANGELOG_FILL_BUFFER (hist_jnl->jnl_processing_dir, buffer, off, - strlen (hist_gfc->gfc_processing_dir)); + strlen (hist_jnl->jnl_processing_dir)); GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer, off, strlen (entryp->d_name)); GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1); @@ -284,7 +294,8 @@ gf_history_changelog_scan () GF_FREE (entryp); gf_log (this->name, GF_LOG_DEBUG, - "hist_done %d, is_last_scan: %d", hist_gfc->hist_done, is_last_scan); + "hist_done %d, is_last_scan: %d", hist_jnl->hist_done, + is_last_scan); if (!result) { if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) { @@ -490,7 +501,7 @@ gf_changelog_consume_wrap (void* data) /* TODO: handle short reads and EOF. */ ret = gf_changelog_consume (ccd->this, - ccd->gfc, ccd->changelog, _gf_true); + ccd->jnl, ccd->changelog, _gf_true); if (ret) { gf_log (this->name, GF_LOG_ERROR, "could not parse changelog: %s", ccd->changelog); @@ -515,8 +526,8 @@ void * gf_history_consume (void * data) { xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - gf_changelog_t *hist_gfc = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; int ret = 0; int iter = 0; int fd = -1; @@ -549,14 +560,14 @@ gf_history_consume (void * data) goto out; } - gfc = (gf_changelog_t *) this->private; - if (!gfc) { + jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); + if (!jnl) { ret = -1; goto out; } - hist_gfc = gfc->hist_gfc; - if (!hist_gfc) { + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) { ret = -1; goto out; } @@ -568,7 +579,7 @@ gf_history_consume (void * data) curr = &ccd[iter]; curr->this = this; - curr->gfc = hist_gfc; + curr->jnl = hist_jnl; curr->fd = fd; curr->offset = from * (len + 1); @@ -613,7 +624,7 @@ gf_history_consume (void * data) } ret = gf_changelog_publish (curr->this, - curr->gfc, curr->changelog); + curr->jnl, curr->changelog); if (ret) { publish = _gf_false; gf_log (this->name, GF_LOG_ERROR, @@ -623,7 +634,7 @@ gf_history_consume (void * data) } /* informing "parsing done". */ - hist_gfc->hist_done = (publish == _gf_true) ? 0 : -1; + hist_jnl->hist_done = (publish == _gf_true) ? 0 : -1; out: if (fd != -1) @@ -740,8 +751,8 @@ gf_history_changelog (char* changelog_dir, unsigned long start, unsigned long from = 0; unsigned long total_changelog = 0; xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - gf_changelog_t *hist_gfc = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; gf_changelog_history_data_t *hist_data = NULL; DIR *dirp = NULL; struct dirent *dp = NULL; @@ -762,14 +773,14 @@ gf_history_changelog (char* changelog_dir, unsigned long start, goto out; } - gfc = (gf_changelog_t *) this->private; - if (!gfc) { + jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); + if (!jnl) { ret = -1; goto out; } - hist_gfc = (gf_changelog_t *) gfc->hist_gfc; - if (!hist_gfc) { + hist_jnl = (gf_changelog_journal_t *) jnl->hist_jnl; + if (!hist_jnl) { ret = -1; goto out; } @@ -917,7 +928,7 @@ out: return ret; } - hist_gfc->hist_done = 1; + hist_jnl->hist_done = 1; *actual_end = ts2; return ret; |