summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/lib/src
diff options
context:
space:
mode:
authorVenky Shankar <vshankar@redhat.com>2015-02-03 19:22:16 +0530
committerVijay Bellur <vbellur@redhat.com>2015-03-18 18:22:36 -0700
commit4737584fffcd25dbe35d17b076c95bf90a422cf2 (patch)
tree9f30e0e90c88c245787b78af3ca78d7ae05e30f2 /xlators/features/changelog/lib/src
parent728fcd41eb39f66744d84b979dd8195fd47313ed (diff)
features/changelog: RPC'fy {libgf}changelog
This patch introduces RPC based communication between the changelog translator and libgfchangelog. It replaces the old pathetic stream based interaction that existed earlier (due to time constraints :-/). Changelog, upon initialization starts a RPC server (rpcsvc) allowing clients to invoke a probe API as a bootup mechanism to request for event notifications. During probe, clients can choose an event filter specifying the type(s) of events they are interested in. As of now there is no way to change the event notification set once the probe RPC call is made, but that is easier to implement. The actual event notifications is done on a separate RPC session. The client (libgfchangelog) itself starts and RPC server which the changelog translator "connects back" during probe. Notifications are dispatched by a bunch of threads from the server (translator) and the client optionally orders them if ordered notifications are requried. FOPs fill in their respective event details in a buffer (rot-buffs to be particular) and a bunch of threads (consumers) swap the buffers out of roatation and dispatch them via RPC. To avoid writer starvation, then number of dispatcher threads is one less than the number of buffer list in rot-buffs.x libgfchangelog becomes purely callback based -- upon event notification from the server (and re-ordering them if required) invoke a callback routine specified by consumer(s). A major part of the patch is also aimed at providing backward compatibility for geo-replication, which was one of the main consumer of the stream based API. Also, this patch does not\ "turn on" event notifications for all fops, just a bunch which is currently in requirement. Another pain point is that the server does not filter events before dispatching it to the clients. That load is taken up by the client itself (although it's done at the library layer rather than making it hard on the callback implementor). This needs improvement and care needs to be taken to not load the server up with expensive filtering mechanisms. Change-Id: Ibf60a432b68f2dfa60c6f9add2bcfd37a9c41395 BUG: 1170075 Signed-off-by: Venky Shankar <vshankar@redhat.com> Reviewed-on: http://review.gluster.org/9708 Reviewed-by: Jeff Darcy <jdarcy@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features/changelog/lib/src')
-rw-r--r--xlators/features/changelog/lib/src/Makefile.am30
-rw-r--r--xlators/features/changelog/lib/src/changelog.h72
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-api.c224
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.c32
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.h194
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-journal-handler.c (renamed from xlators/features/changelog/lib/src/gf-changelog-process.c)520
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-journal.h114
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-reborp.c381
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-rpc.c105
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-rpc.h26
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog.c862
-rw-r--r--xlators/features/changelog/lib/src/gf-history-changelog.c161
12 files changed, 2015 insertions, 706 deletions
diff --git a/xlators/features/changelog/lib/src/Makefile.am b/xlators/features/changelog/lib/src/Makefile.am
index 1ae919bfb38..306306bd585 100644
--- a/xlators/features/changelog/lib/src/Makefile.am
+++ b/xlators/features/changelog/lib/src/Makefile.am
@@ -4,9 +4,13 @@ libgfchangelog_la_CFLAGS = -Wall $(GF_CFLAGS) $(GF_DARWIN_LIBGLUSTERFS_CFLAGS) \
libgfchangelog_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 -fpic \
-I../../../src/ -I$(top_srcdir)/libglusterfs/src \
-I$(top_srcdir)/xlators/features/changelog/src \
+ -I$(top_srcdir)/rpc/xdr/src -I$(top_srcdir)/rpc/rpc-lib/src \
+ -I$(top_srcdir)/rpc/rpc-transport/socket/src \
-DDATADIR=\"$(localstatedir)\"
-libgfchangelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+libgfchangelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ $(top_builddir)/rpc/xdr/src/libgfxdr.la \
+ $(top_builddir)/rpc/rpc-lib/src/libgfrpc.la
libgfchangelog_la_LDFLAGS = $(GF_LDFLAGS) -version-info $(LIBGFCHANGELOG_LT_VERSION)
@@ -15,18 +19,18 @@ lib_LTLIBRARIES = libgfchangelog.la
CONTRIB_BUILDDIR = $(top_builddir)/contrib
-libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-process.c \
- gf-changelog-helpers.c gf-history-changelog.c \
- $(CONTRIBDIR)/uuid/clear.c \
- $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c \
- $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/parse.c \
- $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c \
- $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c \
- $(CONTRIBDIR)/uuid/unpack.c
-
-noinst_HEADERS = gf-changelog-helpers.h $(CONTRIBDIR)/uuid/uuidd.h \
- $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h \
- $(CONTRIB_BUILDDIR)/uuid/uuid_types.h
+libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-journal-handler.c gf-changelog-helpers.c \
+ gf-changelog-api.c gf-history-changelog.c gf-changelog-rpc.c gf-changelog-reborp.c \
+ $(top_srcdir)/xlators/features/changelog/src/changelog-rpc-common.c \
+ $(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c \
+ $(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c \
+ $(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c \
+ $(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c \
+ $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c
+
+noinst_HEADERS = gf-changelog-helpers.h gf-changelog-rpc.h gf-changelog-journal.h \
+ $(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h \
+ $(CONTRIBDIR)/uuid/uuidP.h $(CONTRIB_BUILDDIR)/uuid/uuid_types.h
libgfchangelog_HEADERS = changelog.h
diff --git a/xlators/features/changelog/lib/src/changelog.h b/xlators/features/changelog/lib/src/changelog.h
index 5cddfb5839c..d7048ff2508 100644
--- a/xlators/features/changelog/lib/src/changelog.h
+++ b/xlators/features/changelog/lib/src/changelog.h
@@ -11,6 +11,73 @@
#ifndef _GF_CHANGELOG_H
#define _GF_CHANGELOG_H
+struct gf_brick_spec;
+
+/**
+ * Max bit shiter for event selection
+ */
+#define CHANGELOG_EV_SELECTION_RANGE 4
+
+#define CHANGELOG_OP_TYPE_JOURNAL (1<<0)
+#define CHANGELOG_OP_TYPE_OPEN (1<<1)
+#define CHANGELOG_OP_TYPE_CREATE (1<<2)
+#define CHANGELOG_OP_TYPE_RELEASE (1<<3)
+#define CHANGELOG_OP_TYPE_MAX (1<<CHANGELOG_EV_SELECTION_RANGE)
+
+
+struct ev_open {
+ unsigned char gfid[16];
+ int32_t flags;
+};
+
+struct ev_creat {
+ unsigned char gfid[16];
+ int32_t flags;
+};
+
+struct ev_release {
+ unsigned char gfid[16];
+};
+
+struct ev_changelog {
+ char path[PATH_MAX];
+};
+
+typedef struct changelog_event {
+ unsigned int ev_type;
+ union {
+ struct ev_open open;
+ struct ev_creat create;
+ struct ev_release release;
+ struct ev_changelog journal;
+ } u;
+} changelog_event_t;
+
+#define CHANGELOG_EV_SIZE (sizeof (changelog_event_t))
+
+/**
+ * event callback, connected & disconnection defs
+ */
+typedef void (CALLBACK) (void *, char *,
+ void *, changelog_event_t *);
+typedef void *(INIT) (void *, struct gf_brick_spec *);
+typedef void (FINI) (void *, char *, void *);
+typedef void (CONNECT) (void *, char *, void *);
+typedef void (DISCONNECT) (void *, char *, void *);
+
+struct gf_brick_spec {
+ char *brick_path;
+ unsigned int filter;
+
+ INIT *init;
+ FINI *fini;
+ CALLBACK *callback;
+ CONNECT *connected;
+ DISCONNECT *disconnected;
+
+ void *ptr;
+};
+
/* API set */
int
@@ -28,4 +95,9 @@ gf_changelog_next_change (char *bufptr, size_t maxlen);
int
gf_changelog_done (char *file);
+/* newer flexible API */
+int
+gf_changelog_register_generic (struct gf_brick_spec *bricks, int count,
+ int ordered, char *logfile, int lvl, void *xl);
+
#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-api.c b/xlators/features/changelog/lib/src/gf-changelog-api.c
new file mode 100644
index 00000000000..cea2ff01988
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-api.c
@@ -0,0 +1,224 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "uuid.h"
+#include "globals.h"
+#include "glusterfs.h"
+
+#include "gf-changelog-helpers.h"
+#include "gf-changelog-journal.h"
+#include "changelog-mem-types.h"
+
+int
+gf_changelog_done (char *file)
+{
+ int ret = -1;
+ char *buffer = NULL;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ char to_path[PATH_MAX] = {0,};
+
+ errno = EINVAL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
+ goto out;
+
+ if (!file || !strlen (file))
+ goto out;
+
+ /* make sure 'file' is inside ->jnl_working_dir */
+ buffer = realpath (file, NULL);
+ if (!buffer)
+ goto out;
+
+ if (strncmp (jnl->jnl_working_dir,
+ buffer, strlen (jnl->jnl_working_dir)))
+ goto out;
+
+ (void) snprintf (to_path, PATH_MAX, "%s%s",
+ jnl->jnl_processed_dir, basename (buffer));
+ gf_log (this->name, GF_LOG_DEBUG,
+ "moving %s to processed directory", file);
+ ret = rename (buffer, to_path);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "cannot move %s to %s (reason: %s)",
+ file, to_path, strerror (errno));
+ goto out;
+ }
+
+ ret = 0;
+
+ out:
+ if (buffer)
+ free (buffer); /* allocated by realpath() */
+ return ret;
+}
+
+/**
+ * @API
+ * for a set of changelogs, start from the beginning
+ */
+int
+gf_changelog_start_fresh ()
+{
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ errno = EINVAL;
+
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
+ goto out;
+
+ if (gf_ftruncate (jnl->jnl_fd, 0))
+ goto out;
+
+ return 0;
+
+ out:
+ return -1;
+}
+
+/**
+ * @API
+ * return the next changelog file entry. zero means all chanelogs
+ * consumed.
+ */
+ssize_t
+gf_changelog_next_change (char *bufptr, size_t maxlen)
+{
+ ssize_t size = -1;
+ int tracker_fd = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ char buffer[PATH_MAX] = {0,};
+
+ errno = EINVAL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
+ goto out;
+
+ tracker_fd = jnl->jnl_fd;
+
+ size = gf_readline (tracker_fd, buffer, maxlen);
+ if (size < 0) {
+ size = -1;
+ goto out;
+ }
+
+ if (size == 0)
+ goto out;
+
+ memcpy (bufptr, buffer, size - 1);
+ bufptr[size - 1] = '\0';
+
+out:
+ return size;
+}
+
+/**
+ * @API
+ * gf_changelog_scan() - scan and generate a list of change entries
+ *
+ * calling this api multiple times (without calling gf_changlog_done())
+ * would result new changelogs(s) being refreshed in the tracker file.
+ * This call also acts as a cancellation point for the consumer.
+ */
+ssize_t
+gf_changelog_scan ()
+{
+ int ret = 0;
+ int tracker_fd = 0;
+ size_t len = 0;
+ size_t off = 0;
+ xlator_t *this = NULL;
+ size_t nr_entries = 0;
+ gf_changelog_journal_t *jnl = NULL;
+ struct dirent *entryp = NULL;
+ struct dirent *result = NULL;
+ char buffer[PATH_MAX] = {0,};
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
+ goto out;
+ if (JNL_IS_API_DISCONNECTED (jnl)) {
+ errno = ENOTCONN;
+ goto out;
+ }
+
+ errno = EINVAL;
+
+ tracker_fd = jnl->jnl_fd;
+ if (gf_ftruncate (tracker_fd, 0))
+ goto out;
+
+ len = offsetof(struct dirent, d_name)
+ + pathconf(jnl->jnl_processing_dir, _PC_NAME_MAX) + 1;
+ entryp = GF_CALLOC (1, len,
+ gf_changelog_mt_libgfchangelog_dirent_t);
+ if (!entryp)
+ goto out;
+
+ rewinddir (jnl->jnl_dir);
+ while (1) {
+ ret = readdir_r (jnl->jnl_dir, entryp, &result);
+ if (ret || !result)
+ break;
+
+ if (!strcmp (basename (entryp->d_name), ".")
+ || !strcmp (basename (entryp->d_name), ".."))
+ continue;
+
+ nr_entries++;
+
+ GF_CHANGELOG_FILL_BUFFER (jnl->jnl_processing_dir,
+ buffer, off,
+ strlen (jnl->jnl_processing_dir));
+ GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,
+ off, strlen (entryp->d_name));
+ GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1);
+
+ if (gf_changelog_write (tracker_fd, buffer, off) != off) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "error writing changelog filename"
+ " to tracker file");
+ break;
+ }
+ off = 0;
+ }
+
+ GF_FREE (entryp);
+
+ if (!result) {
+ if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1)
+ return nr_entries;
+ }
+ out:
+ return -1;
+}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.c b/xlators/features/changelog/lib/src/gf-changelog-helpers.c
index f071b057d59..6bf709dc664 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-helpers.c
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.c
@@ -178,3 +178,35 @@ gf_ftruncate (int fd, off_t length)
return 0;
}
+
+int
+gf_thread_cleanup (xlator_t *this, pthread_t thread)
+{
+ int ret = 0;
+ void *res = NULL;
+
+ ret = pthread_cancel (thread);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Failed to send cancellation to thread");
+ goto error_return;
+ }
+
+ ret = pthread_join (thread, &res);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "failed to join thread");
+ goto error_return;
+ }
+
+ if (res != PTHREAD_CANCELED) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Thread could not be cleaned up");
+ goto error_return;
+ }
+
+ return 0;
+
+ error_return:
+ return -1;
+}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
index 9b875d45dcc..17b8862a89b 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
@@ -18,6 +18,11 @@
#include <xlator.h>
+#include "changelog.h"
+
+#include "changelog-rpc-common.h"
+#include "gf-changelog-journal.h"
+
#define GF_CHANGELOG_TRACKER "tracker"
#define GF_CHANGELOG_CURRENT_DIR ".current"
@@ -41,79 +46,143 @@ typedef struct read_line {
char rl_buf[MAXLINE];
} read_line_t;
+struct gf_changelog;
+
+/**
+ * Event list for ordered event notification
+ *
+ * ->next_seq holds the next _expected_ sequence number.
+ */
+struct gf_event_list {
+ pthread_mutex_t lock; /* protects this structure */
+ pthread_cond_t cond;
+
+ pthread_t invoker;
+
+ unsigned long next_seq; /* next sequence number expected:
+ zero during bootstrap */
+
+ struct gf_changelog *entry; /* backpointer to it's brick
+ encapsulator (entry) */
+ struct list_head events; /* list of events (ordered) */
+};
+
+/**
+ * include a refcount if it's of use by additional layers
+ */
+struct gf_event {
+ int count;
+
+ unsigned long seq;
+
+ struct list_head list;
+
+ struct iovec iov[0];
+};
+#define GF_EVENT_CALLOC_SIZE(cnt, len) \
+ (sizeof (struct gf_event) + (cnt * sizeof (struct iovec)) + len)
+
+/**
+ * assign the base address of the IO vector to the correct memory
+ * area and set it's addressable length.
+ */
+#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos) \
+ do { \
+ vec->iov_base = ((char *)event) + \
+ sizeof (struct gf_event) + \
+ (event->count * sizeof (struct iovec)) + pos; \
+ vec->iov_len = len; \
+ pos += len; \
+ } while (0)
+
+/**
+ * An instance of this structure is allocated for each brick for which
+ * notifications are streamed.
+ */
typedef struct gf_changelog {
xlator_t *this;
- /* 'processing' directory stream */
- DIR *gfc_dir;
-
- /* fd to the tracker file */
- int gfc_fd;
-
- /* connection retries */
- int gfc_connretries;
+ struct list_head list; /* list of instances */
- char gfc_sockpath[UNIX_PATH_MAX];
+ char brick[PATH_MAX]; /* brick path for this end-point */
- char gfc_brickpath[PATH_MAX];
+ changelog_rpc_t grpc; /* rpc{-clnt,svc} for this brick */
+#define RPC_PROBER(ent) ent->grpc.rpc
+#define RPC_REBORP(ent) ent->grpc.svc
+#define RPC_SOCK(ent) ent->grpc.sock
- /* socket for receiving notifications */
- int gfc_sockfd;
+ unsigned int notify; /* notification flag(s) */
- char *gfc_working_dir;
+ FINI *fini; /* destructor callback */
+ CALLBACK *callback; /* event callback dispatcher */
+ CONNECT *connected; /* connect callback */
+ DISCONNECT *disconnected; /* disconnection callback */
- /* RFC 3986 string encoding */
- char rfc3986[256];
+ void *ptr; /* owner specific private data */
+ xlator_t *invokerxl; /* consumers _this_, if valid,
+ assigned to THIS before cbk is
+ invoked */
- char gfc_current_dir[PATH_MAX];
- char gfc_processed_dir[PATH_MAX];
- char gfc_processing_dir[PATH_MAX];
+ gf_boolean_t ordered;
- pthread_t gfc_changelog_processor;
-
- /* Holds gfc for History API */
- struct gf_changelog *hist_gfc;
-
- /* holds 0 done scanning, 1 keep scanning and -1 error */
- int hist_done;
+ struct gf_event_list event;
} gf_changelog_t;
-typedef struct gf_changelog_history_data {
- int len;
-
- int htime_fd;
-
- /* parallelism count */
- int n_parallel;
-
- /* history from, to indexes */
- unsigned long from;
- unsigned long to;
-} gf_changelog_history_data_t;
-
-typedef struct gf_changelog_consume_data {
- /** set of inputs */
-
- /* fd to read from */
- int fd;
-
- /* from @offset */
- off_t offset;
-
- xlator_t *this;
- gf_changelog_t *gfc;
-
- /** set of outputs */
+static inline int
+gf_changelog_filter_check (gf_changelog_t *entry, changelog_event_t *event)
+{
+ if (event->ev_type & entry->notify)
+ return 1;
+ return 0;
+}
+
+#define GF_NEED_ORDERED_EVENTS(ent) (ent->ordered == _gf_true)
+
+/** private structure */
+typedef struct gf_private {
+ gf_lock_t lock; /* protects ->connections */
+
+ void *api; /* pointer for API access */
+
+ pthread_t poller; /* event poller thread */
+
+ struct list_head connections; /* list of connections */
+} gf_private_t;
+
+#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *) this->private)->api)
+
+/**
+ * upcall: invoke callback with _correct_ THIS
+ */
+#define GF_CHANGELOG_INVOKE_CBK(this, cbk, brick, args ...) \
+ do { \
+ xlator_t *old_this = NULL; \
+ xlator_t *invokerxl = NULL; \
+ \
+ invokerxl = entry->invokerxl; \
+ old_this = this; \
+ \
+ if (invokerxl) { \
+ THIS = invokerxl; \
+ } \
+ \
+ cbk (invokerxl, brick, args); \
+ THIS = old_this; \
+ \
+ } while (0)
- /* return value */
- int retval;
+#define SAVE_THIS(xl) \
+ do { \
+ old_this = xl; \
+ THIS = master; \
+ } while (0)
- /* journal processed */
- char changelog[PATH_MAX];
-} gf_changelog_consume_data_t;
+#define RESTORE_THIS() \
+ do { \
+ THIS = old_this; \
+ } while (0)
-int
-gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc);
+/** APIs and the rest */
void *
gf_changelog_process (void *data);
@@ -138,9 +207,14 @@ gf_lseek (int fd, off_t offset, int whence);
int
gf_changelog_consume (xlator_t *this,
- gf_changelog_t *gfc,
+ gf_changelog_journal_t *jnl,
char *from_path, gf_boolean_t no_publish);
int
-gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path);
+gf_changelog_publish (xlator_t *this,
+ gf_changelog_journal_t *jnl, char *from_path);
+int
+gf_thread_cleanup (xlator_t *this, pthread_t thread);
+void *
+gf_changelog_callback_invoker (void *arg);
#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-process.c b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c
index 1a275e676fb..6ee6f9f074f 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-process.c
+++ b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
@@ -8,9 +8,6 @@
cases as published by the Free Software Foundation.
*/
-#include <unistd.h>
-#include <pthread.h>
-
#include "uuid.h"
#include "globals.h"
#include "glusterfs.h"
@@ -19,6 +16,9 @@
/* from the changelog translator */
#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+#include "gf-changelog-journal.h"
extern int byebye;
@@ -98,7 +98,7 @@ conv_noop (char *ptr) { return ptr; }
MOVER_MOVE (mover, nleft, sizeof (uuid_t)); \
} \
-#define LINE_BUFSIZE 3*PATH_MAX /* enough buffer for extra chars too */
+#define LINE_BUFSIZE (3*PATH_MAX) /* enough buffer for extra chars too */
/**
* using mmap() makes parsing easy. fgets() cannot be used here as
@@ -114,7 +114,8 @@ conv_noop (char *ptr) { return ptr; }
static int
gf_changelog_parse_binary (xlator_t *this,
- gf_changelog_t *gfc, int from_fd, int to_fd,
+ gf_changelog_journal_t *jnl,
+ int from_fd, int to_fd,
size_t start_offset, struct stat *stbuf)
{
@@ -165,7 +166,8 @@ gf_changelog_parse_binary (xlator_t *this,
PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err);
bname_start = mover;
- if ( (bname_end = strchr (mover, '\n')) == NULL ) {
+ bname_end = strchr (mover, '\n');
+ if (bname_end == NULL) {
parse_err = 1;
break;
}
@@ -201,7 +203,7 @@ gf_changelog_parse_binary (xlator_t *this,
MOVER_MOVE (mover, nleft, 1);
}
- if ( (nleft == 0) && (!parse_err))
+ if ((nleft == 0) && (!parse_err))
ret = 0;
if (munmap (start, stbuf->st_size))
@@ -218,7 +220,8 @@ gf_changelog_parse_binary (xlator_t *this,
*/
static int
gf_changelog_parse_ascii (xlator_t *this,
- gf_changelog_t *gfc, int from_fd, int to_fd,
+ gf_changelog_journal_t *jnl,
+ int from_fd, int to_fd,
size_t start_offset, struct stat *stbuf)
{
int ng = 0;
@@ -281,7 +284,8 @@ gf_changelog_parse_ascii (xlator_t *this,
VERIFY_SEPARATOR (mover, len, parse_err);
fop = atoi (mover);
- if ( (fopname = gf_fop_list[fop]) == NULL) {
+ fopname = gf_fop_list[fop];
+ if (fopname == NULL) {
parse_err = 1;
break;
}
@@ -309,7 +313,8 @@ gf_changelog_parse_ascii (xlator_t *this,
VERIFY_SEPARATOR (mover, len, parse_err);
fop = atoi (mover);
- if ( (fopname = gf_fop_list[fop]) == NULL) {
+ fopname = gf_fop_list[fop];
+ if (fopname == NULL) {
parse_err = 1;
break;
}
@@ -320,7 +325,7 @@ gf_changelog_parse_ascii (xlator_t *this,
GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len);
ng = nr_extra_recs[fop];
- for (;ng > 0; ng--) {
+ for (; ng > 0; ng--) {
MOVER_MOVE (mover, nleft, 1);
len = strlen (mover);
VERIFY_SEPARATOR (mover, len, parse_err);
@@ -346,7 +351,7 @@ gf_changelog_parse_ascii (xlator_t *this,
}
gf_rfc3986_encode ((unsigned char *) ptr,
- eptr, gfc->rfc3986);
+ eptr, jnl->rfc3986);
FILL_AND_MOVE (eptr, ascii, off,
mover, nleft, len);
free (eptr);
@@ -374,7 +379,7 @@ gf_changelog_parse_ascii (xlator_t *this,
}
- if ( (nleft == 0) && (!parse_err))
+ if ((nleft == 0) && (!parse_err))
ret = 0;
if (munmap (start, stbuf->st_size))
@@ -410,8 +415,8 @@ gf_changelog_copy (xlator_t *this, int from_fd, int to_fd)
}
static int
-gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,
- int to_fd, struct stat *stbuf, int *zerob)
+gf_changelog_decode (xlator_t *this, gf_changelog_journal_t *jnl,
+ int from_fd, int to_fd, struct stat *stbuf, int *zerob)
{
int ret = -1;
int encoding = -1;
@@ -441,12 +446,12 @@ gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,
* this ideally should have been a part of changelog-encoders.c
* (ie. part of the changelog translator).
*/
- ret = gf_changelog_parse_binary (this, gfc, from_fd,
+ ret = gf_changelog_parse_binary (this, jnl, from_fd,
to_fd, elen, stbuf);
break;
case CHANGELOG_ENCODE_ASCII:
- ret = gf_changelog_parse_ascii (this, gfc, from_fd,
+ ret = gf_changelog_parse_ascii (this, jnl, from_fd,
to_fd, elen, stbuf);
break;
default:
@@ -458,7 +463,8 @@ gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,
}
int
-gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path)
+gf_changelog_publish (xlator_t *this,
+ gf_changelog_journal_t *jnl, char *from_path)
{
int ret = 0;
char dest[PATH_MAX] = {0,};
@@ -466,21 +472,21 @@ gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path)
struct stat stbuf = {0,};
(void) snprintf (to_path, PATH_MAX, "%s%s",
- gfc->gfc_current_dir, basename (from_path));
+ jnl->jnl_current_dir, basename (from_path));
/* handle zerob file that wont exist in current */
ret = stat (to_path, &stbuf);
- if (ret){
+ if (ret) {
if (errno == ENOENT)
ret = 0;
goto out;
}
(void) snprintf (dest, PATH_MAX, "%s%s",
- gfc->gfc_processing_dir, basename (from_path));
+ jnl->jnl_processing_dir, basename (from_path));
ret = rename (to_path, dest);
- if (ret){
+ if (ret) {
gf_log (this->name, GF_LOG_ERROR,
"error moving %s to processing dir"
" (reason: %s)", to_path, strerror (errno));
@@ -492,7 +498,7 @@ out:
int
gf_changelog_consume (xlator_t *this,
- gf_changelog_t *gfc,
+ gf_changelog_journal_t *jnl,
char *from_path, gf_boolean_t no_publish)
{
int ret = -1;
@@ -520,9 +526,9 @@ gf_changelog_consume (xlator_t *this,
}
(void) snprintf (to_path, PATH_MAX, "%s%s",
- gfc->gfc_current_dir, basename (from_path));
+ jnl->jnl_current_dir, basename (from_path));
(void) snprintf (dest, PATH_MAX, "%s%s",
- gfc->gfc_processing_dir, basename (from_path));
+ jnl->jnl_processing_dir, basename (from_path));
fd2 = open (to_path, O_CREAT | O_TRUNC | O_RDWR,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
@@ -532,7 +538,7 @@ gf_changelog_consume (xlator_t *this,
to_path, strerror (errno));
goto close_fd;
} else {
- ret = gf_changelog_decode (this, gfc, fd1,
+ ret = gf_changelog_decode (this, jnl, fd1,
fd2, &stbuf, &zerob);
close (fd2);
@@ -568,88 +574,412 @@ gf_changelog_consume (xlator_t *this,
return ret;
}
-static char *
-gf_changelog_ext_change (xlator_t *this,
- gf_changelog_t *gfc, char *path, size_t readlen)
+void *
+gf_changelog_process (void *data)
{
- int alo = 0;
- int ret = 0;
- size_t len = 0;
- char *buf = NULL;
-
- buf = path;
- while (len < readlen) {
- if (*buf == '\0') {
- alo = 1;
- gf_log (this->name, GF_LOG_DEBUG,
- "processing changelog: %s", path);
- ret = gf_changelog_consume (this, gfc, path, _gf_false);
- }
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_entry_t *entry = NULL;
+ gf_changelog_processor_t *jnl_proc = NULL;
- if (ret)
- break;
+ this = THIS;
- len++; buf++;
- if (alo) {
- alo = 0;
- path = buf;
+ jnl = data;
+ jnl_proc = jnl->jnl_proc;
+
+ while (1) {
+ pthread_mutex_lock (&jnl_proc->lock);
+ {
+ while (list_empty (&jnl_proc->entries)) {
+ jnl_proc->waiting = _gf_true;
+ pthread_cond_wait
+ (&jnl_proc->cond, &jnl_proc->lock);
+ }
+
+ entry = list_first_entry (&jnl_proc->entries,
+ gf_changelog_entry_t, list);
+ list_del (&entry->list);
+ jnl_proc->waiting = _gf_false;
}
+ pthread_mutex_unlock (&jnl_proc->lock);
+
+ if (entry) {
+ ret = gf_changelog_consume (this, jnl,
+ entry->path, _gf_false);
+ GF_FREE (entry);
+ }
+ }
+
+ return NULL;
+}
+
+inline void
+gf_changelog_queue_journal (gf_changelog_processor_t *jnl_proc,
+ changelog_event_t *event)
+{
+ size_t len = 0;
+ gf_changelog_entry_t *entry = NULL;
+
+ entry = GF_CALLOC (1, sizeof (gf_changelog_entry_t),
+ gf_changelog_mt_libgfchangelog_entry_t);
+ if (!entry)
+ return;
+ INIT_LIST_HEAD (&entry->list);
+
+ len = strlen (event->u.journal.path);
+ (void)memcpy (entry->path, event->u.journal.path, len+1);
+
+ pthread_mutex_lock (&jnl_proc->lock);
+ {
+ list_add_tail (&entry->list, &jnl_proc->entries);
+ if (jnl_proc->waiting)
+ pthread_cond_signal (&jnl_proc->cond);
+ }
+ pthread_mutex_unlock (&jnl_proc->lock);
+
+ return;
+}
+
+void
+gf_changelog_handle_journal (void *xl, char *brick,
+ void *cbkdata, changelog_event_t *event)
+{
+ int ret = 0;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_processor_t *jnl_proc = NULL;
+
+ jnl = cbkdata;
+ jnl_proc = jnl->jnl_proc;
+
+ gf_changelog_queue_journal (jnl_proc, event);
+}
+
+void
+gf_changelog_journal_disconnect (void *xl, char *brick, void *data)
+{
+ gf_changelog_journal_t *jnl = NULL;
+
+ jnl = data;
+
+ pthread_spin_lock (&jnl->lock);
+ {
+ JNL_SET_API_STATE (jnl, JNL_API_DISCONNECTED);
+ };
+ pthread_spin_unlock (&jnl->lock);
+}
+
+void
+gf_changelog_journal_connect (void *xl, char *brick, void *data)
+{
+ gf_changelog_journal_t *jnl = NULL;
+
+ jnl = data;
+
+ pthread_spin_lock (&jnl->lock);
+ {
+ JNL_SET_API_STATE (jnl, JNL_API_CONNECTED);
+ };
+ pthread_spin_unlock (&jnl->lock);
+
+ return;
+}
+
+void
+gf_changelog_cleanup_processor (gf_changelog_journal_t *jnl)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_processor_t *jnl_proc = NULL;
+
+ this = THIS;
+ if (!this || !jnl || !jnl->jnl_proc)
+ goto error_return;
+
+ jnl_proc = jnl->jnl_proc;
+
+ ret = gf_thread_cleanup (this, jnl_proc->processor);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to cleanup processor thread");
+ goto error_return;
+ }
+
+ (void)pthread_mutex_destroy (&jnl_proc->lock);
+ (void)pthread_cond_destroy (&jnl_proc->cond);
+
+ GF_FREE (jnl_proc);
+
+ error_return:
+ return;
+}
+
+int
+gf_changelog_init_processor (gf_changelog_journal_t *jnl)
+{
+ int ret = -1;
+ gf_changelog_processor_t *jnl_proc = NULL;
+
+ jnl_proc = GF_CALLOC (1, sizeof (gf_changelog_processor_t),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!jnl_proc)
+ goto error_return;
+
+ ret = pthread_mutex_init (&jnl_proc->lock, NULL);
+ if (ret != 0)
+ goto free_jnl_proc;
+ ret = pthread_cond_init (&jnl_proc->cond, NULL);
+ if (ret != 0)
+ goto cleanup_mutex;
+
+ INIT_LIST_HEAD (&jnl_proc->entries);
+ ret = pthread_create (&jnl_proc->processor,
+ NULL, gf_changelog_process, jnl);
+ if (ret != 0)
+ goto cleanup_cond;
+ jnl_proc->waiting = _gf_false;
+
+ jnl->jnl_proc = jnl_proc;
+ return 0;
+
+ cleanup_cond:
+ (void) pthread_cond_destroy (&jnl_proc->cond);
+ cleanup_mutex:
+ (void) pthread_mutex_destroy (&jnl_proc->lock);
+ free_jnl_proc:
+ GF_FREE (jnl_proc);
+ error_return:
+ return -1;
+}
+
+static void
+gf_changelog_cleanup_fds (gf_changelog_journal_t *jnl)
+{
+ /* tracker fd */
+ if (jnl->jnl_fd != -1)
+ close (jnl->jnl_fd);
+ /* processing dir */
+ if (jnl->jnl_dir)
+ closedir (jnl->jnl_dir);
+
+ if (jnl->jnl_working_dir)
+ free (jnl->jnl_working_dir); /* allocated by realpath */
+}
+
+static int
+gf_changelog_open_dirs (xlator_t *this, gf_changelog_journal_t *jnl)
+{
+ int ret = -1;
+ DIR *dir = NULL;
+ int tracker_fd = 0;
+ char tracker_path[PATH_MAX] = {0,};
+
+ /* .current */
+ (void) snprintf (jnl->jnl_current_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_CURRENT_DIR"/",
+ jnl->jnl_working_dir);
+ ret = recursive_rmdir (jnl->jnl_current_dir);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to rmdir: %s, err: %s",
+ jnl->jnl_current_dir, strerror (errno));
+ goto out;
+ }
+ ret = mkdir_p (jnl->jnl_current_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ /* .processed */
+ (void) snprintf (jnl->jnl_processed_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_PROCESSED_DIR"/",
+ jnl->jnl_working_dir);
+ ret = mkdir_p (jnl->jnl_processed_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ /* .processing */
+ (void) snprintf (jnl->jnl_processing_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_PROCESSING_DIR"/",
+ jnl->jnl_working_dir);
+ ret = recursive_rmdir (jnl->jnl_processing_dir);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to rmdir: %s, err: %s",
+ jnl->jnl_processing_dir, strerror (errno));
+ goto out;
+ }
+
+ ret = mkdir_p (jnl->jnl_processing_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ dir = opendir (jnl->jnl_processing_dir);
+ if (!dir) {
+ gf_log ("", GF_LOG_ERROR,
+ "opendir() error [reason: %s]", strerror (errno));
+ goto out;
}
- return (ret) ? NULL : path;
+ jnl->jnl_dir = dir;
+
+ (void) snprintf (tracker_path, PATH_MAX,
+ "%s/"GF_CHANGELOG_TRACKER, jnl->jnl_working_dir);
+
+ tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+ if (tracker_fd < 0) {
+ closedir (jnl->jnl_dir);
+ ret = -1;
+ goto out;
+ }
+
+ jnl->jnl_fd = tracker_fd;
+ ret = 0;
+ out:
+ return ret;
+}
+
+int
+gf_changelog_init_history (xlator_t *this,
+ gf_changelog_journal_t *jnl,
+ char *brick_path, char *scratch_dir)
+{
+ int i = 0;
+ int ret = 0;
+ char hist_scratch_dir[PATH_MAX] = {0,};
+
+ jnl->hist_jnl = GF_CALLOC (1, sizeof (*jnl),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!jnl->hist_jnl)
+ goto error_return;
+
+ jnl->hist_jnl->jnl_dir = NULL;
+ jnl->hist_jnl->jnl_fd = -1;
+
+ (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX);
+ (void) snprintf (hist_scratch_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_HISTORY_DIR"/",
+ jnl->jnl_working_dir);
+
+ ret = mkdir_p (hist_scratch_dir, 0600, _gf_false);
+ if (ret)
+ goto dealloc_hist;
+
+ jnl->hist_jnl->jnl_working_dir = realpath (hist_scratch_dir, NULL);
+ if (!jnl->hist_jnl->jnl_working_dir)
+ goto dealloc_hist;
+
+ ret = gf_changelog_open_dirs (this, jnl->hist_jnl);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not create entries in history scratch dir");
+ goto dealloc_hist;
+ }
+
+ (void) strncpy (jnl->hist_jnl->jnl_brickpath, brick_path, PATH_MAX);
+
+ for (i = 0; i < 256; i++) {
+ jnl->hist_jnl->rfc3986[i] =
+ (isalnum(i) || i == '~' ||
+ i == '-' || i == '.' || i == '_') ? i : 0;
+ }
+
+ return 0;
+
+ dealloc_hist:
+ GF_FREE (jnl->hist_jnl);
+ jnl->hist_jnl = NULL;
+ error_return:
+ return -1;
+}
+
+void
+gf_changelog_journal_fini (void *xl, char *brick, void *data)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+
+ this = xl;
+ jnl = data;
+
+ gf_changelog_cleanup_processor (jnl);
+
+ gf_changelog_cleanup_fds (jnl);
+ if (jnl->hist_jnl)
+ gf_changelog_cleanup_fds (jnl->hist_jnl);
+
+ GF_FREE (jnl);
}
void *
-gf_changelog_process (void *data)
+gf_changelog_journal_init (void *xl, struct gf_brick_spec *brick)
{
- ssize_t len = 0;
- ssize_t offlen = 0;
- xlator_t *this = NULL;
- char *sbuf = NULL;
- gf_changelog_t *gfc = NULL;
- char from_path[PATH_MAX] = {0,};
-
- gfc = (gf_changelog_t *) data;
- this = gfc->this;
-
- pthread_detach (pthread_self());
-
- for (;;) {
- len = gf_changelog_read_path (gfc->gfc_sockfd,
- from_path + offlen,
- PATH_MAX - offlen);
- if (len < 0)
- continue; /* ignore it for now */
-
- if (len == 0) { /* close() from the changelog translator */
- gf_log (this->name, GF_LOG_INFO, "close from changelog"
- " notification translator.");
-
- if (gfc->gfc_connretries != 1) {
- if (!gf_changelog_notification_init(this, gfc))
- continue;
- }
+ int i = 0;
+ int ret = 0;
+ xlator_t *this = NULL;
+ struct stat buf = {0,};
+ char *scratch_dir = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+
+ this = xl;
+ scratch_dir = (char *) brick->ptr;
+
+ jnl = GF_CALLOC (1, sizeof (gf_changelog_journal_t),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!jnl)
+ goto error_return;
+
+ if (stat (scratch_dir, &buf) && errno == ENOENT) {
+ ret = mkdir_p (scratch_dir, 0600, _gf_true);
+ if (ret)
+ goto dealloc_private;
+ }
- byebye = 1;
- break;
- }
+ jnl->jnl_working_dir = realpath (scratch_dir, NULL);
+ if (!jnl->jnl_working_dir)
+ goto dealloc_private;
- len += offlen;
- sbuf = gf_changelog_ext_change (this, gfc, from_path, len);
- if (!sbuf) {
- gf_log (this->name, GF_LOG_ERROR,
- "could not extract changelog filename");
- continue;
- }
+ ret = gf_changelog_open_dirs (this, jnl);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not create entries in scratch dir");
+ goto dealloc_private;
+ }
- offlen = 0;
- if (sbuf != (from_path + len)) {
- offlen = from_path + len - sbuf;
- memmove (from_path, sbuf, offlen);
- }
+ (void) strncpy (jnl->jnl_brickpath, brick->brick_path, PATH_MAX);
+
+ /* RFC 3986 {de,en}coding */
+ for (i = 0; i < 256; i++) {
+ jnl->rfc3986[i] =
+ (isalnum(i) || i == '~' ||
+ i == '-' || i == '.' || i == '_') ? i : 0;
}
- gf_log (this->name, GF_LOG_DEBUG,
- "byebye (%d) from processing thread...", byebye);
+ ret = gf_changelog_init_history (this, jnl,
+ brick->brick_path, scratch_dir);
+ if (ret)
+ goto cleanup_fds;
+
+ /* initialize journal processor */
+ ret = gf_changelog_init_processor (jnl);
+ if (ret)
+ goto cleanup_fds;
+
+ JNL_SET_API_STATE (jnl, JNL_API_CONN_INPROGESS);
+ ret = pthread_spin_init (&jnl->lock, 0);
+ if (ret != 0)
+ goto cleanup_processor;
+ return jnl;
+
+ cleanup_processor:
+ gf_changelog_cleanup_processor (jnl);
+ cleanup_fds:
+ gf_changelog_cleanup_fds (jnl);
+ if (jnl->hist_jnl)
+ gf_changelog_cleanup_fds (jnl->hist_jnl);
+ dealloc_private:
+ GF_FREE (jnl);
+ error_return:
return NULL;
}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-journal.h b/xlators/features/changelog/lib/src/gf-changelog-journal.h
new file mode 100644
index 00000000000..9a0f0b28956
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-journal.h
@@ -0,0 +1,114 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __GF_CHANGELOG_JOURNAL_H
+#define __GF_CHANGELOG_JOURNAL_H
+
+#include <unistd.h>
+#include <pthread.h>
+
+#include "changelog.h"
+
+enum api_conn {
+ JNL_API_CONNECTED,
+ JNL_API_CONN_INPROGESS,
+ JNL_API_DISCONNECTED,
+};
+
+typedef struct gf_changelog_entry {
+ char path[PATH_MAX];
+
+ struct list_head list;
+} gf_changelog_entry_t;
+
+typedef struct gf_changelog_processor {
+ pthread_mutex_t lock; /* protects ->entries */
+ pthread_cond_t cond; /* waiter during empty list */
+ gf_boolean_t waiting;
+
+ pthread_t processor; /* thread-id of journal processing thread */
+
+ struct list_head entries;
+} gf_changelog_processor_t;
+
+typedef struct gf_changelog_journal {
+ DIR *jnl_dir; /* 'processing' directory stream */
+
+ int jnl_fd; /* fd to the tracker file */
+
+ char jnl_brickpath[PATH_MAX]; /* brick path for this end-point */
+
+ gf_changelog_processor_t *jnl_proc;
+
+ char *jnl_working_dir; /* scratch directory */
+
+ char jnl_current_dir[PATH_MAX];
+ char jnl_processed_dir[PATH_MAX];
+ char jnl_processing_dir[PATH_MAX];
+
+ char rfc3986[256]; /* RFC 3986 string encoding */
+
+ struct gf_changelog_journal *hist_jnl;
+ int hist_done; /* holds 0 done scanning,
+ 1 keep scanning and -1 error */
+
+ pthread_spinlock_t lock;
+ int connected;
+} gf_changelog_journal_t;
+
+#define JNL_SET_API_STATE(jnl, state) (jnl->connected = state)
+#define JNL_IS_API_DISCONNECTED(jnl) (jnl->connected == JNL_API_DISCONNECTED)
+
+/* History API */
+typedef struct gf_changelog_history_data {
+ int len;
+
+ int htime_fd;
+
+ /* parallelism count */
+ int n_parallel;
+
+ /* history from, to indexes */
+ unsigned long from;
+ unsigned long to;
+} gf_changelog_history_data_t;
+
+typedef struct gf_changelog_consume_data {
+ /** set of inputs */
+
+ /* fd to read from */
+ int fd;
+
+ /* from @offset */
+ off_t offset;
+
+ xlator_t *this;
+
+ gf_changelog_journal_t *jnl;
+
+ /** set of outputs */
+
+ /* return value */
+ int retval;
+
+ /* journal processed */
+ char changelog[PATH_MAX];
+} gf_changelog_consume_data_t;
+
+/* event handler */
+CALLBACK gf_changelog_handle_journal;
+
+/* init, connect & disconnect handler */
+INIT gf_changelog_journal_init;
+FINI gf_changelog_journal_fini;
+CONNECT gf_changelog_journal_connect;
+DISCONNECT gf_changelog_journal_disconnect;
+
+#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
new file mode 100644
index 00000000000..d7e60fb9634
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
@@ -0,0 +1,381 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+#include "gf-changelog-helpers.h"
+#include "changelog-rpc-common.h"
+
+/**
+ * Reverse socket: actual data transfer handler. Connection
+ * initiator is PROBER, data transfer is REBORP.
+ */
+
+struct rpcsvc_program *gf_changelog_reborp_programs[];
+
+/**
+ * On a reverse connection, unlink the socket file.
+ */
+int
+gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata,
+ rpcsvc_event_t event, void *data)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
+ char sock[UNIX_PATH_MAX] = {0,};
+
+ entry = mydata;
+ this = entry->this;
+ priv = this->private;
+
+ switch (event) {
+ case RPCSVC_EVENT_ACCEPT:
+ ret = unlink (RPC_SOCK(entry));
+ if (ret != 0)
+ gf_log (this->name, GF_LOG_WARNING, "failed to unlink"
+ " reverse socket file %s", RPC_SOCK (entry));
+ if (entry->connected)
+ GF_CHANGELOG_INVOKE_CBK (this, entry->connected,
+ entry->brick, entry->ptr);
+ break;
+ case RPCSVC_EVENT_DISCONNECT:
+ LOCK (&priv->lock);
+ {
+ list_del (&entry->list);
+ }
+ UNLOCK (&priv->lock);
+
+ if (entry->disconnected)
+ GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected,
+ entry->brick, entry->ptr);
+
+ GF_FREE (entry);
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+rpcsvc_t *
+gf_changelog_reborp_init_rpc_listner (xlator_t *this,
+ char *path, char *sock, void *cbkdata)
+{
+ CHANGELOG_MAKE_TMP_SOCKET_PATH (path, sock, UNIX_PATH_MAX);
+ return changelog_rpc_server_init (this, sock, cbkdata,
+ gf_changelog_reborp_rpcsvc_notify,
+ gf_changelog_reborp_programs);
+}
+
+/**
+ * This is dirty and painful as of now untill there is event filtering in the
+ * server. The entire event buffer is scanned and interested events are picked,
+ * whereas we _should_ be notified with the events we were interested in
+ * (selected at the time of probe). As of now this is complete BS and needs
+ * fixture ASAP. I just made it work, it needs to be better.
+ *
+ * @FIXME: cleanup this bugger once server filters events.
+ */
+inline void
+gf_changelog_invoke_callback (gf_changelog_t *entry,
+ struct iovec **vec, int payloadcnt)
+{
+ int i = 0;
+ int evsize = 0;
+ xlator_t *this = NULL;
+ changelog_event_t *event = NULL;
+
+ this = entry->this;
+
+ for (; i < payloadcnt; i++) {
+ event = (changelog_event_t *)vec[i]->iov_base;
+ evsize = vec[i]->iov_len / CHANGELOG_EV_SIZE;
+
+ for (; evsize > 0; evsize--, event++) {
+ if (gf_changelog_filter_check (entry, event)) {
+ GF_CHANGELOG_INVOKE_CBK (this,
+ entry->callback,
+ entry->brick,
+ entry->ptr, event);
+ }
+ }
+ }
+}
+
+/**
+ * Ordered event handler is self-adaptive.. if the event sequence number
+ * is what's expected (->next_seq) there is no ordering list that's
+ * maintained. On out-of-order event notifications, event buffers are
+ * dynamically allocated and ordered.
+ */
+
+inline int
+__is_expected_sequence (struct gf_event_list *ev, struct gf_event *event)
+{
+ return (ev->next_seq == event->seq);
+}
+
+inline int
+__can_process_event (struct gf_event_list *ev, struct gf_event **event)
+{
+ *event = list_first_entry (&ev->events, struct gf_event, list);
+
+ if (__is_expected_sequence (ev, *event)) {
+ list_del (&(*event)->list);
+ ev->next_seq++;
+ return 1;
+ }
+
+ return 0;
+}
+
+inline void
+__process_event_list (struct gf_event_list *ev, struct gf_event **event)
+{
+ while (list_empty (&ev->events)
+ || !__can_process_event (ev, event))
+ pthread_cond_wait (&ev->cond, &ev->lock);
+}
+
+void *
+gf_changelog_callback_invoker (void *arg)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_t *entry = NULL;
+ struct iovec *vec = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+
+ ev = arg;
+ entry = ev->entry;
+ this = entry->this;
+
+ while (1) {
+ pthread_mutex_lock (&ev->lock);
+ {
+ __process_event_list (ev, &event);
+ }
+ pthread_mutex_unlock (&ev->lock);
+
+ vec = (struct iovec *) &event->iov;
+ gf_changelog_invoke_callback (entry, &vec, event->count);
+
+ GF_FREE (event);
+ }
+
+ return NULL;
+}
+
+static int
+orderfn (struct list_head *pos1, struct list_head *pos2)
+{
+ struct gf_event *event1 = NULL;
+ struct gf_event *event2 = NULL;
+
+ event1 = list_entry (pos1, struct gf_event, list);
+ event2 = list_entry (pos2, struct gf_event, list);
+
+ if (event1->seq > event2->seq)
+ return 1;
+ return -1;
+}
+
+int
+gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
+ xlator_t *this, gf_changelog_t *entry)
+{
+ int i = 0;
+ size_t payloadlen = 0;
+ ssize_t len = 0;
+ int payloadcnt = 0;
+ changelog_event_req rpc_req = {0,};
+ changelog_event_rsp rpc_rsp = {0,};
+ struct iovec *vec = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+
+ ev = &entry->event;
+
+ len = xdr_to_generic (req->msg[0],
+ &rpc_req, (xdrproc_t)xdr_changelog_event_req);
+ if (len < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed");
+ req->rpc_err = GARBAGE_ARGS;
+ goto handle_xdr_error;
+ }
+
+ if (len < req->msg[0].iov_len) {
+ payloadcnt = 1;
+ payloadlen = (req->msg[0].iov_len - len);
+ }
+ for (i = 1; i < req->count; i++) {
+ payloadcnt++;
+ payloadlen += req->msg[i].iov_len;
+ }
+
+ event = GF_CALLOC (1, GF_EVENT_CALLOC_SIZE (payloadcnt, payloadlen),
+ gf_changelog_mt_libgfchangelog_event_t);
+ if (!event)
+ goto handle_xdr_error;
+ INIT_LIST_HEAD (&event->list);
+
+ payloadlen = 0;
+ event->seq = rpc_req.seq;
+ event->count = payloadcnt;
+
+ /* deep copy IO vectors */
+ vec = &event->iov[0];
+ GF_EVENT_ASSIGN_IOVEC (vec, event,
+ (req->msg[0].iov_len - len), payloadlen);
+ (void) memcpy (vec->iov_base,
+ req->msg[0].iov_base + len, vec->iov_len);
+
+ for (i = 1; i < req->count; i++) {
+ vec = &event->iov[i];
+ GF_EVENT_ASSIGN_IOVEC (vec, event,
+ req->msg[i].iov_len, payloadlen);
+ (void) memcpy (event->iov[i].iov_base,
+ req->msg[i].iov_base, req->msg[i].iov_len);
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "seq: %lu [%s] (time: %lu.%lu), (vec: %d, len: %ld)",
+ rpc_req.seq, entry->brick, rpc_req.tv_sec,
+ rpc_req.tv_usec, payloadcnt, payloadlen);
+
+ /* add it to the ordered event list and wake up listner(s) */
+ pthread_mutex_lock (&ev->lock);
+ {
+ list_add_order (&event->list, &ev->events, orderfn);
+ if (!ev->next_seq)
+ ev->next_seq = event->seq;
+ if (ev->next_seq == event->seq)
+ pthread_cond_signal (&ev->cond);
+ }
+ pthread_mutex_unlock (&ev->lock);
+
+ /* ack sequence number */
+ rpc_rsp.op_ret = 0;
+ rpc_rsp.seq = rpc_req.seq;
+
+ goto submit_rpc;
+
+ handle_xdr_error:
+ rpc_rsp.op_ret = -1;
+ rpc_rsp.seq = 0; /* invalid */
+ submit_rpc:
+ return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_changelog_event_rsp);
+}
+
+int
+gf_changelog_unordered_event_handler (rpcsvc_request_t *req,
+ xlator_t *this, gf_changelog_t *entry)
+{
+ int i = 0;
+ int ret = 0;
+ ssize_t len = 0;
+ int payloadcnt = 0;
+ struct iovec vector[MAX_IOVEC] = {{0,}, };
+ changelog_event_req rpc_req = {0,};
+ changelog_event_rsp rpc_rsp = {0,};
+
+ len = xdr_to_generic (req->msg[0],
+ &rpc_req, (xdrproc_t)xdr_changelog_event_req);
+ if (len < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed");
+ req->rpc_err = GARBAGE_ARGS;
+ goto handle_xdr_error;
+ }
+
+ /* prepare payload */
+ if (len < req->msg[0].iov_len) {
+ payloadcnt = 1;
+ vector[0].iov_base = (req->msg[0].iov_base + len);
+ vector[0].iov_len = (req->msg[0].iov_len - len);
+ }
+
+ for (i = 1; i < req->count; i++) {
+ vector[payloadcnt++] = req->msg[i];
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "seq: %lu (time: %lu.%lu), (vec: %d)",
+ rpc_req.seq, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt);
+
+ /* invoke callback */
+ struct iovec *vec = (struct iovec *) &vector;
+ gf_changelog_invoke_callback (entry, &vec, payloadcnt);
+
+ /* ack sequence number */
+ rpc_rsp.op_ret = 0;
+ rpc_rsp.seq = rpc_req.seq;
+
+ goto submit_rpc;
+
+ handle_xdr_error:
+ rpc_rsp.op_ret = -1;
+ rpc_rsp.seq = 0; /* invalid */
+ submit_rpc:
+ return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_changelog_event_rsp);
+}
+
+int
+gf_changelog_reborp_handle_event (rpcsvc_request_t *req)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ rpcsvc_t *svc = NULL;
+ gf_changelog_t *entry = NULL;
+
+ svc = rpcsvc_request_service (req);
+ entry = svc->mydata;
+
+ this = THIS = entry->this;
+
+ ret = GF_NEED_ORDERED_EVENTS (entry)
+ ? gf_changelog_ordered_event_handler (req, this, entry)
+ : gf_changelog_unordered_event_handler (req, this, entry);
+
+ return ret;
+}
+
+rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = {
+ [CHANGELOG_REV_PROC_EVENT] = {
+ "CHANGELOG EVENT HANDLER", CHANGELOG_REV_PROC_EVENT,
+ gf_changelog_reborp_handle_event, NULL, 0, DRC_NA
+ },
+};
+
+/**
+ * Do not use synctask as the RPC layer dereferences ->mydata as THIS.
+ * In gf_changelog_setup_rpc(), @cbkdata is of type @gf_changelog_t,
+ * and that's required to invoke the callback with the appropriate
+ * brick path and it's private data.
+ */
+struct rpcsvc_program gf_changelog_reborp_prog = {
+ .progname = "LIBGFCHANGELOG REBORP",
+ .prognum = CHANGELOG_REV_RPC_PROCNUM,
+ .progver = CHANGELOG_REV_RPC_PROCVER,
+ .numactors = CHANGELOG_REV_PROC_MAX,
+ .actors = gf_changelog_reborp_actors,
+ .synctask = _gf_false,
+};
+
+struct rpcsvc_program *gf_changelog_reborp_programs[] = {
+ &gf_changelog_reborp_prog,
+ NULL,
+};
diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.c b/xlators/features/changelog/lib/src/gf-changelog-rpc.c
new file mode 100644
index 00000000000..c2a4c044d23
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.c
@@ -0,0 +1,105 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "gf-changelog-rpc.h"
+#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+struct rpc_clnt_program gf_changelog_clnt;
+
+/* TODO: piggyback reconnect to called (upcall) */
+int
+gf_changelog_rpc_notify (struct rpc_clnt *rpc,
+ void *mydata, rpc_clnt_event_t event, void *data)
+{
+ xlator_t *this = NULL;
+
+ this = mydata;
+
+ switch (event) {
+ case RPC_CLNT_CONNECT:
+ rpc_clnt_set_connected (&rpc->conn);
+ break;
+ case RPC_CLNT_DISCONNECT:
+ rpc_clnt_unset_connected (&rpc->conn);
+ break;
+ case RPC_CLNT_MSG:
+ case RPC_CLNT_DESTROY:
+ break;
+ }
+
+ return 0;
+}
+
+struct rpc_clnt *
+gf_changelog_rpc_init (xlator_t *this, gf_changelog_t *entry)
+{
+ char sockfile[UNIX_PATH_MAX] = {0,};
+
+ CHANGELOG_MAKE_SOCKET_PATH (entry->brick,
+ sockfile, UNIX_PATH_MAX);
+ return changelog_rpc_client_init (this, entry,
+ sockfile, gf_changelog_rpc_notify);
+}
+
+/**
+ * remote procedure calls declarations.
+ */
+
+int
+gf_probe_changelog_cbk (struct rpc_req *req,
+ struct iovec *iovec, int count, void *myframe)
+{
+ return 0;
+}
+
+int
+gf_probe_changelog_filter (call_frame_t *frame, xlator_t *this, void *data)
+{
+ int ret = 0;
+ char *sock = NULL;
+ gf_changelog_t *entry = NULL;
+ changelog_probe_req req = {0,};
+
+ entry = data;
+ sock = RPC_SOCK (entry);
+
+ (void) memcpy (&req.sock, sock, strlen (sock));
+ req.filter = entry->notify;
+
+ /* invoke RPC */
+ return changelog_rpc_sumbit_req (RPC_PROBER (entry), (void *) &req,
+ frame, &gf_changelog_clnt,
+ CHANGELOG_RPC_PROBE_FILTER, NULL, 0,
+ NULL, this, gf_probe_changelog_cbk,
+ (xdrproc_t) xdr_changelog_probe_req);
+}
+
+int
+gf_changelog_invoke_rpc (xlator_t *this, gf_changelog_t *entry, int procidx)
+{
+ return changelog_invoke_rpc (this, RPC_PROBER (entry),
+ &gf_changelog_clnt, procidx, entry);
+}
+
+struct rpc_clnt_procedure gf_changelog_procs[CHANGELOG_RPC_PROC_MAX] = {
+ [CHANGELOG_RPC_PROC_NULL] = {"NULL", NULL},
+ [CHANGELOG_RPC_PROBE_FILTER] = {
+ "PROBE FILTER", gf_probe_changelog_filter
+ },
+};
+
+struct rpc_clnt_program gf_changelog_clnt = {
+ .progname = "LIBGFCHANGELOG",
+ .prognum = CHANGELOG_RPC_PROGNUM,
+ .progver = CHANGELOG_RPC_PROGVER,
+ .numproc = CHANGELOG_RPC_PROC_MAX,
+ .proctable = gf_changelog_procs,
+};
diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.h b/xlators/features/changelog/lib/src/gf-changelog-rpc.h
new file mode 100644
index 00000000000..1c982eef809
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.h
@@ -0,0 +1,26 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __GF_CHANGELOG_RPC_H
+#define __GF_CHANGELOG_RPC_H
+
+#include "xlator.h"
+
+#include "gf-changelog-helpers.h"
+#include "changelog-rpc-common.h"
+
+struct rpc_clnt *gf_changelog_rpc_init (xlator_t *, gf_changelog_t *);
+
+int gf_changelog_invoke_rpc (xlator_t *, gf_changelog_t *, int);
+
+rpcsvc_t *
+gf_changelog_reborp_init_rpc_listner (xlator_t *, char *, char *, void *);
+
+#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c
index fb2d9037ffb..8f33eb01013 100644
--- a/xlators/features/changelog/lib/src/gf-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-changelog.c
@@ -14,6 +14,8 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <sys/time.h>
+#include <sys/resource.h>
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
@@ -24,589 +26,523 @@
#include "glusterfs.h"
#include "logging.h"
#include "defaults.h"
+#include "syncop.h"
+#include "gf-changelog-rpc.h"
#include "gf-changelog-helpers.h"
/* from the changelog translator */
#include "changelog-misc.h"
#include "changelog-mem-types.h"
-int byebye = 0;
+/**
+ * Global singleton xlator pointer for the library, initialized
+ * during library load. This should probably be hidden inside
+ * an initialized object which is an handle for the consumer.
+ *
+ * TODO: do away with the global..
+ */
+xlator_t *master = NULL;
-static void
-gf_changelog_cleanup (gf_changelog_t *gfc)
+static inline
+gf_private_t *gf_changelog_alloc_priv ()
{
- /* socket */
- if (gfc->gfc_sockfd != -1)
- close (gfc->gfc_sockfd);
- /* tracker fd */
- if (gfc->gfc_fd != -1)
- close (gfc->gfc_fd);
- /* processing dir */
- if (gfc->gfc_dir)
- closedir (gfc->gfc_dir);
-
- if (gfc->gfc_working_dir)
- free (gfc->gfc_working_dir); /* allocated by realpath */
-}
+ int ret = 0;
+ gf_private_t *priv = NULL;
-void
-__attribute__ ((constructor)) gf_changelog_ctor (void)
-{
- glusterfs_ctx_t *ctx = NULL;
+ priv = calloc (1, sizeof (gf_private_t));
+ if (!priv)
+ goto error_return;
+ INIT_LIST_HEAD (&priv->connections);
- ctx = glusterfs_ctx_new ();
- if (!ctx)
- return;
+ ret = LOCK_INIT (&priv->lock);
+ if (ret != 0)
+ goto free_priv;
+ priv->api = NULL;
- if (glusterfs_globals_init (ctx)) {
- free (ctx);
- ctx = NULL;
- return;
- }
+ return priv;
- THIS->ctx = ctx;
- if (xlator_mem_acct_init (THIS, gf_changelog_mt_end))
- return;
+ free_priv:
+ free (priv);
+ error_return:
+ return NULL;
}
-void
-__attribute__ ((destructor)) gf_changelog_dtor (void)
-{
- xlator_t *this = NULL;
- glusterfs_ctx_t *ctx = NULL;
- gf_changelog_t *gfc = NULL;
+#define GF_CHANGELOG_EVENT_POOL_SIZE 16384
+#define GF_CHANGELOG_EVENT_THREAD_COUNT 4
- this = THIS;
- if (!this)
- return;
-
- ctx = this->ctx;
- gfc = this->private;
-
- if (gfc) {
- if (gfc->hist_gfc) {
- gf_changelog_cleanup(gfc->hist_gfc);
- GF_FREE (gfc->hist_gfc);
- }
- gf_changelog_cleanup (gfc);
- GF_FREE (gfc);
+static int
+gf_changelog_ctx_defaults_init (glusterfs_ctx_t *ctx)
+{
+ cmd_args_t *cmd_args = NULL;
+ struct rlimit lim = {0, };
+ call_pool_t *pool = NULL;
+ int ret = -1;
+
+ ret = xlator_mem_acct_init (THIS, gf_changelog_mt_end);
+ if (ret != 0) {
+ return ret;
}
- if (ctx) {
- pthread_mutex_destroy (&ctx->lock);
- free (ctx);
- ctx = NULL;
- }
-}
+ ctx->process_uuid = generate_glusterfs_ctx_id ();
+ if (!ctx->process_uuid)
+ return -1;
+ ctx->page_size = 128 * GF_UNIT_KB;
-static int
-gf_changelog_open_dirs (gf_changelog_t *gfc)
-{
- int ret = -1;
- DIR *dir = NULL;
- int tracker_fd = 0;
- char tracker_path[PATH_MAX] = {0,};
- xlator_t *this = NULL;
+ ctx->iobuf_pool = iobuf_pool_new ();
+ if (!ctx->iobuf_pool)
+ return -1;
- this = THIS;
- GF_ASSERT (this);
+ ctx->event_pool = event_pool_new (GF_CHANGELOG_EVENT_POOL_SIZE,
+ GF_CHANGELOG_EVENT_THREAD_COUNT);
+ if (!ctx->event_pool)
+ return -1;
- (void) snprintf (gfc->gfc_current_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_CURRENT_DIR"/",
- gfc->gfc_working_dir);
+ pool = GF_CALLOC (1, sizeof (call_pool_t),
+ gf_changelog_mt_libgfchangelog_call_pool_t);
+ if (!pool)
+ return -1;
- ret = recursive_rmdir (gfc->gfc_current_dir);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "Failed to rmdir: %s, err: %s",
- gfc->gfc_current_dir, strerror (errno));
- goto out;
- }
+ /* frame_mem_pool size 112 * 64 */
+ pool->frame_mem_pool = mem_pool_new (call_frame_t, 32);
+ if (!pool->frame_mem_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_current_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ /* stack_mem_pool size 256 * 128 */
+ pool->stack_mem_pool = mem_pool_new (call_stack_t, 16);
- (void) snprintf (gfc->gfc_processed_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_PROCESSED_DIR"/",
- gfc->gfc_working_dir);
+ if (!pool->stack_mem_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_processed_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ ctx->stub_mem_pool = mem_pool_new (call_stub_t, 16);
+ if (!ctx->stub_mem_pool)
+ return -1;
- (void) snprintf (gfc->gfc_processing_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_PROCESSING_DIR"/",
- gfc->gfc_working_dir);
+ ctx->dict_pool = mem_pool_new (dict_t, 32);
+ if (!ctx->dict_pool)
+ return -1;
- ret = recursive_rmdir (gfc->gfc_processing_dir);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "Failed to rmdir: %s, err: %s",
- gfc->gfc_processing_dir, strerror (errno));
- goto out;
- }
+ ctx->dict_pair_pool = mem_pool_new (data_pair_t, 512);
+ if (!ctx->dict_pair_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_processing_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ ctx->dict_data_pool = mem_pool_new (data_t, 512);
+ if (!ctx->dict_data_pool)
+ return -1;
- dir = opendir (gfc->gfc_processing_dir);
- if (!dir) {
- gf_log ("", GF_LOG_ERROR,
- "opendir() error [reason: %s]", strerror (errno));
- goto out;
- }
+ INIT_LIST_HEAD (&pool->all_frames);
+ LOCK_INIT (&pool->lock);
+ ctx->pool = pool;
- gfc->gfc_dir = dir;
+ pthread_mutex_init (&(ctx->lock), NULL);
- (void) snprintf (tracker_path, PATH_MAX,
- "%s/"GF_CHANGELOG_TRACKER, gfc->gfc_working_dir);
+ cmd_args = &ctx->cmd_args;
- tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
- if (tracker_fd < 0) {
- closedir (gfc->gfc_dir);
- ret = -1;
- goto out;
- }
+ INIT_LIST_HEAD (&cmd_args->xlator_options);
+
+ lim.rlim_cur = RLIM_INFINITY;
+ lim.rlim_max = RLIM_INFINITY;
+ setrlimit (RLIMIT_CORE, &lim);
- gfc->gfc_fd = tracker_fd;
- ret = 0;
- out:
- return ret;
+ return 0;
}
-int
-gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc)
+/* TODO: cleanup ctx defaults */
+static void
+gf_changelog_cleanup_this (xlator_t *this)
{
- int ret = 0;
- int len = 0;
- int tries = 0;
- int sockfd = 0;
- struct sockaddr_un remote;
-
- this = gfc->this;
+ glusterfs_ctx_t *ctx = NULL;
- if (gfc->gfc_sockfd != -1) {
- gf_log (this->name, GF_LOG_INFO,
- "Reconnecting...");
- close (gfc->gfc_sockfd);
- }
+ if (!this)
+ return;
- sockfd = socket (AF_UNIX, SOCK_STREAM, 0);
- if (sockfd < 0) {
- ret = -1;
- goto out;
- }
+ ctx = this->ctx;
+ syncenv_destroy (ctx->env);
+ free (ctx);
- CHANGELOG_MAKE_SOCKET_PATH (gfc->gfc_brickpath,
- gfc->gfc_sockpath, UNIX_PATH_MAX);
- gf_log (this->name, GF_LOG_INFO,
- "connecting to changelog socket: %s (brick: %s)",
- gfc->gfc_sockpath, gfc->gfc_brickpath);
+ if (this->private)
+ free (this->private);
- remote.sun_family = AF_UNIX;
- strcpy (remote.sun_path, gfc->gfc_sockpath);
+ this->private = NULL;
+ this->ctx = NULL;
+}
- len = strlen (remote.sun_path) + sizeof (remote.sun_family);
+static int
+gf_changelog_init_this ()
+{
+ glusterfs_ctx_t *ctx = NULL;
- while (tries < gfc->gfc_connretries) {
- gf_log (this->name, GF_LOG_WARNING,
- "connection attempt %d/%d...",
- tries + 1, gfc->gfc_connretries);
+ ctx = glusterfs_ctx_new ();
+ if (!ctx)
+ goto error_return;
- /* initiate a connect */
- if (connect (sockfd, (struct sockaddr *) &remote, len) == 0) {
- gfc->gfc_sockfd = sockfd;
- break;
- }
+ if (glusterfs_globals_init (ctx))
+ goto free_ctx;
- tries++;
- sleep (2);
- }
+ THIS->ctx = ctx;
+ if (gf_changelog_ctx_defaults_init (ctx))
+ goto free_ctx;
- if (tries == gfc->gfc_connretries) {
- gf_log (this->name, GF_LOG_ERROR,
- "could not connect to changelog socket!"
- " bailing out...");
- close (sockfd);
- ret = -1;
- } else
- gf_log (this->name, GF_LOG_INFO,
- "connection successful");
+ ctx->env = syncenv_new (0, 0, 0);
+ if (!ctx->env)
+ goto free_ctx;
+ return 0;
- out:
- return ret;
+ free_ctx:
+ free (ctx);
+ THIS->ctx = NULL;
+ error_return:
+ return -1;
}
-int
-gf_changelog_done (char *file)
+static int
+gf_changelog_init_master ()
{
- int ret = -1;
- char *buffer = NULL;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char to_path[PATH_MAX] = {0,};
-
- errno = EINVAL;
-
- this = THIS;
- if (!this)
- goto out;
-
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ int ret = 0;
+ gf_private_t *priv = NULL;
+ glusterfs_ctx_t *ctx = NULL;
- if (!file || !strlen (file))
- goto out;
+ ret = gf_changelog_init_this ();
+ if (ret != 0)
+ goto error_return;
+ master = THIS;
+
+ priv = gf_changelog_alloc_priv ();
+ if (!priv)
+ goto cleanup_master;
+ master->private = priv;
+
+ /* poller thread */
+ ret = pthread_create (&priv->poller,
+ NULL, changelog_rpc_poller, master);
+ if (ret != 0) {
+ gf_log (master->name, GF_LOG_ERROR,
+ "failed to spawn poller thread");
+ goto cleanup_master;
+ }
- /* make sure 'file' is inside ->gfc_working_dir */
- buffer = realpath (file, NULL);
- if (!buffer)
- goto out;
+ return 0;
- if (strncmp (gfc->gfc_working_dir,
- buffer, strlen (gfc->gfc_working_dir)))
- goto out;
+ cleanup_master:
+ master->private = NULL;
+ gf_changelog_cleanup_this (master);
+ error_return:
+ return -1;
+}
- (void) snprintf (to_path, PATH_MAX, "%s%s",
- gfc->gfc_processed_dir, basename (buffer));
- gf_log (this->name, GF_LOG_DEBUG,
- "moving %s to processed directory", file);
- ret = rename (buffer, to_path);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot move %s to %s (reason: %s)",
- file, to_path, strerror (errno));
- goto out;
- }
+/* ctor/dtor */
- ret = 0;
+void
+__attribute__ ((constructor)) gf_changelog_ctor (void)
+{
+ (void) gf_changelog_init_master ();
+}
- out:
- if (buffer)
- free (buffer); /* allocated by realpath() */
- return ret;
+void
+__attribute__ ((destructor)) gf_changelog_dtor (void)
+{
+ gf_changelog_cleanup_this (master);
}
-/**
- * @API
- * for a set of changelogs, start from the beginning
- */
+/* TODO: cleanup clnt/svc on failure */
int
-gf_changelog_start_fresh ()
+gf_changelog_setup_rpc (xlator_t *this,
+ gf_changelog_t *entry, int proc)
{
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
-
- this = THIS;
- if (!this)
- goto out;
+ int ret = 0;
+ rpcsvc_t *svc = NULL;
+ struct rpc_clnt *rpc = NULL;
- errno = EINVAL;
+ /**
+ * Initialize a connect back socket. A probe() RPC call to the server
+ * triggers a reverse connect.
+ */
+ svc = gf_changelog_reborp_init_rpc_listner (this, entry->brick,
+ RPC_SOCK (entry), entry);
+ if (!svc)
+ goto error_return;
+ RPC_REBORP (entry) = svc;
+
+ /* Initialize an RPC client */
+ rpc = gf_changelog_rpc_init (this, entry);
+ if (!rpc)
+ goto error_return;
+ RPC_PROBER (entry) = rpc;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ /**
+ * @FIXME
+ * till we have connection state machine, let's delay the RPC call
+ * for now..
+ */
+ sleep (2);
- if (gf_ftruncate (gfc->gfc_fd, 0))
- goto out;
+ /**
+ * Probe changelog translator for reverse connection. After a successful
+ * call, there's less use of the client and can be disconnected, but
+ * let's leave the connection active for any future RPC calls.
+ */
+ ret = gf_changelog_invoke_rpc (this, entry, proc);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not initiate probe RPC, bailing out!!!");
+ goto error_return;
+ }
return 0;
- out:
+ error_return:
return -1;
}
-/**
- * @API
- * return the next changelog file entry. zero means all chanelogs
- * consumed.
- */
-ssize_t
-gf_changelog_next_change (char *bufptr, size_t maxlen)
+static void
+gf_cleanup_event (gf_changelog_t *entry)
{
- ssize_t size = -1;
- int tracker_fd = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char buffer[PATH_MAX] = {0,};
-
- if (maxlen > PATH_MAX) {
- errno = ENAMETOOLONG;
- goto out;
- }
+ xlator_t *this = NULL;
+ struct gf_event_list *ev = NULL;
- errno = EINVAL;
+ this = entry->this;
+ ev = &entry->event;
- this = THIS;
- if (!this)
- goto out;
+ (void) gf_thread_cleanup (this, ev->invoker);
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ (void) pthread_mutex_destroy (&ev->lock);
+ (void) pthread_cond_destroy (&ev->cond);
- tracker_fd = gfc->gfc_fd;
+ ev->entry = NULL;
+}
- size = gf_readline (tracker_fd, buffer, maxlen);
- if (size < 0) {
- size = -1;
- goto out;
+static int
+gf_init_event (gf_changelog_t *entry)
+{
+ int ret = 0;
+ struct gf_event_list *ev = NULL;
+
+ ev = &entry->event;
+ ev->entry = entry;
+
+ ret = pthread_mutex_init (&ev->lock, NULL);
+ if (ret != 0)
+ goto error_return;
+ ret = pthread_cond_init (&ev->cond, NULL);
+ if (ret != 0)
+ goto cleanup_mutex;
+ INIT_LIST_HEAD (&ev->events);
+
+ ev->next_seq = 0; /* bootstrap sequencing */
+
+ if (entry->ordered) {
+ ret = pthread_create (&ev->invoker, NULL,
+ gf_changelog_callback_invoker, ev);
+ if (ret != 0)
+ goto cleanup_cond;
}
- if (size == 0)
- goto out;
-
- memcpy (bufptr, buffer, size - 1);
- bufptr[size - 1] = '\0';
+ return 0;
-out:
- return size;
+ cleanup_cond:
+ (void) pthread_cond_destroy (&ev->cond);
+ cleanup_mutex:
+ (void) pthread_mutex_destroy (&ev->lock);
+ error_return:
+ return -1;
}
/**
- * @API
- * gf_changelog_scan() - scan and generate a list of change entries
- *
- * calling this api multiple times (without calling gf_changlog_done())
- * would result new changelogs(s) being refreshed in the tracker file.
- * This call also acts as a cancellation point for the consumer.
+ * TODO:
+ * - cleanup invoker thread (if ordered mode)
+ * - cleanup event list
+ * - destroy rpc{-clnt, svc}
*/
-ssize_t
-gf_changelog_scan ()
+int
+gf_cleanup_brick_connection (xlator_t *this, gf_changelog_t *entry)
{
- int ret = 0;
- int tracker_fd = 0;
- size_t len = 0;
- size_t off = 0;
- xlator_t *this = NULL;
- size_t nr_entries = 0;
- gf_changelog_t *gfc = NULL;
- struct dirent *entryp = NULL;
- struct dirent *result = NULL;
- char buffer[PATH_MAX] = {0,};
-
- this = THIS;
- if (!this)
- goto out;
+ return 0;
+}
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+int
+gf_cleanup_connections (xlator_t *this)
+{
+ return 0;
+}
- /**
- * do we need to protect 'byebye' with locks? worst, the
- * consumer would get notified during next scan().
- */
- if (byebye) {
- errno = ECONNREFUSED;
- goto out;
- }
+static int
+gf_setup_brick_connection (xlator_t *this,
+ struct gf_brick_spec *brick,
+ gf_boolean_t ordered, void *xl)
+{
+ int ret = 0;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
- errno = EINVAL;
+ priv = this->private;
- tracker_fd = gfc->gfc_fd;
+ if (!brick->callback || !brick->init || !brick->fini)
+ goto error_return;
- if (gf_ftruncate (tracker_fd, 0))
- goto out;
+ entry = GF_CALLOC (1, sizeof (*entry),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!entry)
+ goto error_return;
+ INIT_LIST_HEAD (&entry->list);
- len = offsetof(struct dirent, d_name)
- + pathconf(gfc->gfc_processing_dir, _PC_NAME_MAX) + 1;
- entryp = GF_CALLOC (1, len,
- gf_changelog_mt_libgfchangelog_dirent_t);
- if (!entryp)
- goto out;
+ entry->notify = brick->filter;
+ (void) strncpy (entry->brick, brick->brick_path, PATH_MAX);
- rewinddir (gfc->gfc_dir);
- while (1) {
- ret = readdir_r (gfc->gfc_dir, entryp, &result);
- if (ret || !result)
- break;
+ entry->this = this;
+ entry->invokerxl = xl;
- if ( !strcmp (basename (entryp->d_name), ".")
- || !strcmp (basename (entryp->d_name), "..") )
- continue;
+ entry->ordered = ordered;
+ if (ordered) {
+ ret = gf_init_event (entry);
+ if (ret)
+ goto free_entry;
+ }
- nr_entries++;
+ entry->fini = brick->fini;
+ entry->callback = brick->callback;
+ entry->connected = brick->connected;
+ entry->disconnected = brick->disconnected;
- GF_CHANGELOG_FILL_BUFFER (gfc->gfc_processing_dir,
- buffer, off,
- strlen (gfc->gfc_processing_dir));
- GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,
- off, strlen (entryp->d_name));
- GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1);
+ entry->ptr = brick->init (this, brick);
+ if (!entry->ptr)
+ goto cleanup_event;
+ priv->api = entry->ptr; /* pointer to API, if required */
- if (gf_changelog_write (tracker_fd, buffer, off) != off) {
- gf_log (this->name, GF_LOG_ERROR,
- "error writing changelog filename"
- " to tracker file");
- break;
- }
- off = 0;
+ LOCK (&priv->lock);
+ {
+ list_add_tail (&entry->list, &priv->connections);
}
+ UNLOCK (&priv->lock);
- GF_FREE (entryp);
+ ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER);
+ if (ret)
+ goto cleanup_event;
+ return 0;
- if (!result) {
- if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1)
- return nr_entries;
- }
- out:
+ cleanup_event:
+ if (ordered)
+ gf_cleanup_event (entry);
+ free_entry:
+ list_del (&entry->list); /* FIXME: kludge for now */
+ GF_FREE (entry);
+ error_return:
return -1;
}
-/**
- * @API
- * gf_changelog_register() - register a client for updates.
- */
int
-gf_changelog_register (char *brick_path, char *scratch_dir,
- char *log_file, int log_level, int max_reconnects)
+gf_changelog_register_brick (xlator_t *this,
+ struct gf_brick_spec *brick,
+ gf_boolean_t ordered, void *xl)
{
- int i = 0;
- int ret = -1;
- int errn = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char hist_scratch_dir[PATH_MAX] = {0,};
- struct stat buf = {0,};
-
- this = THIS;
- if (!this->ctx)
- goto out;
-
- errno = ENOMEM;
-
- gfc = GF_CALLOC (1, sizeof (*gfc),
- gf_changelog_mt_libgfchangelog_t);
- if (!gfc)
- goto out;
-
- gfc->this = this;
-
- gfc->gfc_dir = NULL;
- gfc->gfc_fd = gfc->gfc_sockfd = -1;
-
- if (stat (scratch_dir, &buf) && errno == ENOENT) {
- ret = mkdir_p (scratch_dir, 0600, _gf_true);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
- }
-
- gfc->gfc_working_dir = realpath (scratch_dir, NULL);
- if (!gfc->gfc_working_dir) {
- errn = errno;
- goto cleanup;
- }
+ return gf_setup_brick_connection (this, brick, ordered, xl);
+}
- /* Begin: Changes for History API */
- gfc->hist_gfc = NULL;
+static int
+gf_changelog_setup_logging (xlator_t *this, char *logfile, int loglevel)
+{
+ /* passing ident as NULL means to use default ident for syslog */
+ if (gf_log_init (this->ctx, logfile, NULL))
+ return -1;
- gfc->hist_gfc = GF_CALLOC (1, sizeof (*gfc),
- gf_changelog_mt_libgfchangelog_t);
- if (!gfc->hist_gfc)
- goto cleanup;
+ gf_log_set_loglevel ((loglevel == -1) ? GF_LOG_INFO :
+ loglevel);
+ return 0;
+}
- gfc->hist_gfc->gfc_dir = NULL;
- gfc->hist_gfc->gfc_fd = gfc->hist_gfc->gfc_sockfd = -1;
- gfc->hist_gfc->this = NULL;
+int
+gf_changelog_register_generic (struct gf_brick_spec *bricks, int count,
+ int ordered, char *logfile, int lvl, void *xl)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ xlator_t *old_this = NULL;
+ struct gf_brick_spec *brick = NULL;
+ gf_boolean_t need_order = _gf_false;
- (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX);
- (void) snprintf (hist_scratch_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_HISTORY_DIR"/",
- gfc->gfc_working_dir);
+ SAVE_THIS (xl);
- ret = mkdir_p (hist_scratch_dir, 0600, _gf_false);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
+ this = THIS;
+ if (!this)
+ goto error_return;
- gfc->hist_gfc->gfc_working_dir = realpath (hist_scratch_dir, NULL);
- if (!gfc->hist_gfc->gfc_working_dir) {
- errn = errno;
- goto cleanup;
- }
+ ret = gf_changelog_setup_logging (this, logfile, lvl);
+ if (ret)
+ goto error_return;
- ret = gf_changelog_open_dirs (gfc->hist_gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "could not create entries in history scratch dir");
- goto cleanup;
- }
+ need_order = (ordered) ? _gf_true : _gf_false;
- (void) strncpy (gfc->hist_gfc->gfc_brickpath, brick_path, PATH_MAX);
+ brick = bricks;
+ while (count--) {
+ gf_log (this->name, GF_LOG_INFO,
+ "Registering brick: %s [notify filter: %d]",
+ brick->brick_path, brick->filter);
- for (i=0; i < 256; i++) {
- gfc->hist_gfc->rfc3986[i] =
- (isalnum(i) || i == '~' ||
- i == '-' || i == '.' || i == '_') ? i : 0;
- }
- /* End: Changes for History API*/
+ ret = gf_changelog_register_brick (this, brick, need_order, xl);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Error registering with changelog xlator");
+ break;
+ }
- ret = gf_changelog_open_dirs (gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "could not create entries in scratch dir");
- goto cleanup;
+ brick++;
}
- /* passing ident as NULL means to use default ident for syslog */
- if (gf_log_init (this->ctx, log_file, NULL))
- goto cleanup;
-
- gf_log_set_loglevel ((log_level == -1) ? GF_LOG_INFO :
- log_level);
+ if (ret != 0)
+ goto cleanup_inited_bricks;
- gfc->gfc_connretries = (max_reconnects <= 0) ? 1 : max_reconnects;
- (void) strncpy (gfc->gfc_brickpath, brick_path, PATH_MAX);
+ RESTORE_THIS();
+ return 0;
- ret = gf_changelog_notification_init (this, gfc);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
+ cleanup_inited_bricks:
+ gf_cleanup_connections (this);
+ error_return:
+ RESTORE_THIS();
+ return -1;
+}
- ret = gf_thread_create (&gfc->gfc_changelog_processor,
- NULL, gf_changelog_process, gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "error creating changelog processor thread"
- " new changes won't be recorded!!!");
- goto cleanup;
- }
+/**
+ * @API
+ * gf_changelog_register()
+ *
+ * This is _NOT_ a generic register API. It's a special API to handle
+ * updates at a journal granulality. This is used by consumers wanting
+ * to process persistent journal such as geo-replication via a set of
+ * APIs. All of this is required to maintain backward compatibility.
+ * Owner specific private data is stored in ->api (in gf_private_t),
+ * which is used by APIs to access it's private data. This limits
+ * the library access to a single brick, but that's how it used to
+ * be anyway. Furthermore, this API solely _owns_ "this", therefore
+ * callers already having a notion of "this" are expected to use the
+ * newer API.
+ *
+ * Newer applications wanting to use this library need not face this
+ * limitation and reply of the much more feature rich generic register
+ * API, which is purely callback based.
+ *
+ * NOTE: @max_reconnects is not used but required for backward compat.
+ *
+ * For generic API, refer gf_changelog_register_generic().
+ */
+int
+gf_changelog_register (char *brick_path, char *scratch_dir,
+ char *log_file, int log_level, int max_reconnects)
+{
+ struct gf_brick_spec brick = {0,};
- for (i=0; i < 256; i++) {
- gfc->rfc3986[i] =
- (isalnum(i) || i == '~' ||
- i == '-' || i == '.' || i == '_') ? i : 0;
- }
+ THIS = master;
- ret = 0;
- this->private = gfc;
+ brick.brick_path = brick_path;
+ brick.filter = CHANGELOG_OP_TYPE_JOURNAL;
- goto out;
+ brick.init = gf_changelog_journal_init;
+ brick.fini = gf_changelog_journal_fini;
+ brick.callback = gf_changelog_handle_journal;
+ brick.connected = gf_changelog_journal_connect;
+ brick.disconnected = gf_changelog_journal_disconnect;
- cleanup:
- if (gfc->hist_gfc) {
- gf_changelog_cleanup (gfc->hist_gfc);
- GF_FREE (gfc->hist_gfc);
- }
- gf_changelog_cleanup (gfc);
- GF_FREE (gfc);
- this->private = NULL;
- errno = errn;
+ brick.ptr = scratch_dir;
- out:
- return ret;
+ return gf_changelog_register_generic (&brick, 1, 1,
+ log_file, log_level, NULL);
}
diff --git a/xlators/features/changelog/lib/src/gf-history-changelog.c b/xlators/features/changelog/lib/src/gf-history-changelog.c
index 8a527dd6e4b..12f51da8fa2 100644
--- a/xlators/features/changelog/lib/src/gf-history-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-history-changelog.c
@@ -14,6 +14,7 @@
#include "syscall.h"
#include "gf-changelog-helpers.h"
+#include "gf-changelog-journal.h"
/* from the changelog translator */
#include "changelog-misc.h"
@@ -36,12 +37,12 @@
int
gf_history_changelog_done (char *file)
{
- int ret = -1;
- char *buffer = NULL;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
- char to_path[PATH_MAX] = {0,};
+ int ret = -1;
+ char *buffer = NULL;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
+ char to_path[PATH_MAX] = {0,};
errno = EINVAL;
@@ -49,28 +50,28 @@ gf_history_changelog_done (char *file)
if (!this)
goto out;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
goto out;
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
goto out;
if (!file || !strlen (file))
goto out;
- /* make sure 'file' is inside ->gfc_working_dir */
+ /* make sure 'file' is inside ->jnl_working_dir */
buffer = realpath (file, NULL);
if (!buffer)
goto out;
- if (strncmp (hist_gfc->gfc_working_dir,
- buffer, strlen (hist_gfc->gfc_working_dir)))
+ if (strncmp (hist_jnl->jnl_working_dir,
+ buffer, strlen (hist_jnl->jnl_working_dir)))
goto out;
(void) snprintf (to_path, PATH_MAX, "%s%s",
- hist_gfc->gfc_processed_dir, basename (buffer));
+ hist_jnl->jnl_processed_dir, basename (buffer));
gf_log (this->name, GF_LOG_DEBUG,
"moving %s to processed directory", file);
ret = rename (buffer, to_path);
@@ -102,9 +103,9 @@ gf_history_changelog_done (char *file)
int
gf_history_changelog_start_fresh ()
{
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
this = THIS;
if (!this)
@@ -112,15 +113,15 @@ gf_history_changelog_start_fresh ()
errno = EINVAL;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
goto out;
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
goto out;
- if (gf_ftruncate (hist_gfc->gfc_fd, 0))
+ if (gf_ftruncate (hist_jnl->jnl_fd, 0))
goto out;
return 0;
@@ -147,12 +148,17 @@ gf_history_changelog_start_fresh ()
ssize_t
gf_history_changelog_next_change (char *bufptr, size_t maxlen)
{
- ssize_t size = -1;
- int tracker_fd = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
- char buffer[PATH_MAX] = {0,};
+ ssize_t size = -1;
+ int tracker_fd = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
+ char buffer[PATH_MAX] = {0,};
+
+ if (maxlen > PATH_MAX) {
+ errno = ENAMETOOLONG;
+ goto out;
+ }
errno = EINVAL;
@@ -160,15 +166,15 @@ gf_history_changelog_next_change (char *bufptr, size_t maxlen)
if (!this)
goto out;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
goto out;
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
goto out;
- tracker_fd = hist_gfc->gfc_fd;
+ tracker_fd = hist_jnl->jnl_fd;
size = gf_readline (tracker_fd, buffer, maxlen);
if (size < 0) {
@@ -206,56 +212,60 @@ out:
ssize_t
gf_history_changelog_scan ()
{
- int ret = 0;
- int tracker_fd = 0;
- size_t len = 0;
- size_t off = 0;
- xlator_t *this = NULL;
- size_t nr_entries = 0;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
- struct dirent *entryp = NULL;
- struct dirent *result = NULL;
- char buffer[PATH_MAX] = {0,};
- static int is_last_scan = 0;
+ int ret = 0;
+ int tracker_fd = 0;
+ size_t len = 0;
+ size_t off = 0;
+ xlator_t *this = NULL;
+ size_t nr_entries = 0;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
+ struct dirent *entryp = NULL;
+ struct dirent *result = NULL;
+ char buffer[PATH_MAX] = {0,};
+ static int is_last_scan;
this = THIS;
if (!this)
goto out;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
goto out;
+ if (JNL_IS_API_DISCONNECTED (jnl)) {
+ errno = ENOTCONN;
+ goto out;
+ }
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
goto out;
retry:
if (is_last_scan == 1)
return 0;
- if (hist_gfc->hist_done == 0)
+ if (hist_jnl->hist_done == 0)
is_last_scan = 1;
errno = EINVAL;
- if (hist_gfc->hist_done == -1)
+ if (hist_jnl->hist_done == -1)
goto out;
- tracker_fd = hist_gfc->gfc_fd;
+ tracker_fd = hist_jnl->jnl_fd;
if (gf_ftruncate (tracker_fd, 0))
goto out;
len = offsetof (struct dirent, d_name)
- + pathconf (hist_gfc->gfc_processing_dir, _PC_NAME_MAX) + 1;
+ + pathconf (hist_jnl->jnl_processing_dir, _PC_NAME_MAX) + 1;
entryp = GF_CALLOC (1, len,
gf_changelog_mt_libgfchangelog_dirent_t);
if (!entryp)
goto out;
- rewinddir (hist_gfc->gfc_dir);
+ rewinddir (hist_jnl->jnl_dir);
while (1) {
- ret = readdir_r (hist_gfc->gfc_dir, entryp, &result);
+ ret = readdir_r (hist_jnl->jnl_dir, entryp, &result);
if (ret || !result)
break;
@@ -265,9 +275,9 @@ gf_history_changelog_scan ()
nr_entries++;
- GF_CHANGELOG_FILL_BUFFER (hist_gfc->gfc_processing_dir,
+ GF_CHANGELOG_FILL_BUFFER (hist_jnl->jnl_processing_dir,
buffer, off,
- strlen (hist_gfc->gfc_processing_dir));
+ strlen (hist_jnl->jnl_processing_dir));
GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,
off, strlen (entryp->d_name));
GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1);
@@ -284,7 +294,8 @@ gf_history_changelog_scan ()
GF_FREE (entryp);
gf_log (this->name, GF_LOG_DEBUG,
- "hist_done %d, is_last_scan: %d", hist_gfc->hist_done, is_last_scan);
+ "hist_done %d, is_last_scan: %d", hist_jnl->hist_done,
+ is_last_scan);
if (!result) {
if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) {
@@ -490,7 +501,7 @@ gf_changelog_consume_wrap (void* data)
/* TODO: handle short reads and EOF. */
ret = gf_changelog_consume (ccd->this,
- ccd->gfc, ccd->changelog, _gf_true);
+ ccd->jnl, ccd->changelog, _gf_true);
if (ret) {
gf_log (this->name, GF_LOG_ERROR,
"could not parse changelog: %s", ccd->changelog);
@@ -515,8 +526,8 @@ void *
gf_history_consume (void * data)
{
xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
int ret = 0;
int iter = 0;
int fd = -1;
@@ -549,14 +560,14 @@ gf_history_consume (void * data)
goto out;
}
- gfc = (gf_changelog_t *) this->private;
- if (!gfc) {
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl) {
ret = -1;
goto out;
}
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc) {
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl) {
ret = -1;
goto out;
}
@@ -568,7 +579,7 @@ gf_history_consume (void * data)
curr = &ccd[iter];
curr->this = this;
- curr->gfc = hist_gfc;
+ curr->jnl = hist_jnl;
curr->fd = fd;
curr->offset = from * (len + 1);
@@ -613,7 +624,7 @@ gf_history_consume (void * data)
}
ret = gf_changelog_publish (curr->this,
- curr->gfc, curr->changelog);
+ curr->jnl, curr->changelog);
if (ret) {
publish = _gf_false;
gf_log (this->name, GF_LOG_ERROR,
@@ -623,7 +634,7 @@ gf_history_consume (void * data)
}
/* informing "parsing done". */
- hist_gfc->hist_done = (publish == _gf_true) ? 0 : -1;
+ hist_jnl->hist_done = (publish == _gf_true) ? 0 : -1;
out:
if (fd != -1)
@@ -740,8 +751,8 @@ gf_history_changelog (char* changelog_dir, unsigned long start,
unsigned long from = 0;
unsigned long total_changelog = 0;
xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
gf_changelog_history_data_t *hist_data = NULL;
DIR *dirp = NULL;
struct dirent *dp = NULL;
@@ -762,14 +773,14 @@ gf_history_changelog (char* changelog_dir, unsigned long start,
goto out;
}
- gfc = (gf_changelog_t *) this->private;
- if (!gfc) {
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl) {
ret = -1;
goto out;
}
- hist_gfc = (gf_changelog_t *) gfc->hist_gfc;
- if (!hist_gfc) {
+ hist_jnl = (gf_changelog_journal_t *) jnl->hist_jnl;
+ if (!hist_jnl) {
ret = -1;
goto out;
}
@@ -917,7 +928,7 @@ out:
return ret;
}
- hist_gfc->hist_done = 1;
+ hist_jnl->hist_done = 1;
*actual_end = ts2;
return ret;