summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/lib/src/gf-changelog-helpers.h
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog-helpers.h')
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.h279
1 files changed, 194 insertions, 85 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
index 9b875d45dcc..9c609d33172 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
@@ -14,133 +14,242 @@
#include <unistd.h>
#include <dirent.h>
#include <limits.h>
-#include <pthread.h>
+#include <glusterfs/locking.h>
-#include <xlator.h>
+#include <glusterfs/xlator.h>
-#define GF_CHANGELOG_TRACKER "tracker"
+#include "changelog.h"
-#define GF_CHANGELOG_CURRENT_DIR ".current"
-#define GF_CHANGELOG_PROCESSED_DIR ".processed"
+#include "changelog-rpc-common.h"
+#include "gf-changelog-journal.h"
+
+#define GF_CHANGELOG_TRACKER "tracker"
+
+#define GF_CHANGELOG_CURRENT_DIR ".current"
+#define GF_CHANGELOG_PROCESSED_DIR ".processed"
#define GF_CHANGELOG_PROCESSING_DIR ".processing"
-#define GF_CHANGELOG_HISTORY_DIR ".history"
+#define GF_CHANGELOG_HISTORY_DIR ".history"
#define TIMESTAMP_LENGTH 10
#ifndef MAXLINE
#define MAXLINE 4096
#endif
-#define GF_CHANGELOG_FILL_BUFFER(ptr, ascii, off, len) do { \
- memcpy (ascii + off, ptr, len); \
- off += len; \
- } while (0)
+#define GF_CHANGELOG_FILL_BUFFER(ptr, ascii, off, len) \
+ do { \
+ memcpy(ascii + off, ptr, len); \
+ off += len; \
+ } while (0)
typedef struct read_line {
- int rl_cnt;
- char *rl_bufptr;
- char rl_buf[MAXLINE];
+ int rl_cnt;
+ char *rl_bufptr;
+ char rl_buf[MAXLINE];
} read_line_t;
+struct gf_changelog;
+struct gf_event;
+
+/**
+ * Event list for ordered event notification
+ *
+ * ->next_seq holds the next _expected_ sequence number.
+ */
+struct gf_event_list {
+ pthread_mutex_t lock; /* protects this structure */
+ pthread_cond_t cond;
+
+ pthread_t invoker;
+
+ unsigned long next_seq; /* next sequence number expected:
+ zero during bootstrap */
+
+ struct gf_changelog *entry; /* backpointer to it's brick
+ encapsulator (entry) */
+ struct list_head events; /* list of events */
+};
+
+/**
+ * include a refcount if it's of use by additional layers
+ */
+struct gf_event {
+ int count;
+
+ unsigned long seq;
+
+ struct list_head list;
+
+ struct iovec iov[0];
+};
+#define GF_EVENT_CALLOC_SIZE(cnt, len) \
+ (sizeof(struct gf_event) + (cnt * sizeof(struct iovec)) + len)
+
+/**
+ * assign the base address of the IO vector to the correct memory
+o * area and set it's addressable length.
+ */
+#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos) \
+ do { \
+ vec->iov_base = ((char *)event) + sizeof(struct gf_event) + \
+ (event->count * sizeof(struct iovec)) + pos; \
+ vec->iov_len = len; \
+ pos += len; \
+ } while (0)
+
+typedef enum gf_changelog_conn_state {
+ GF_CHANGELOG_CONN_STATE_PENDING = 0,
+ GF_CHANGELOG_CONN_STATE_ACCEPTED,
+ GF_CHANGELOG_CONN_STATE_DISCONNECTED,
+} gf_changelog_conn_state_t;
+
+/**
+ * An instance of this structure is allocated for each brick for which
+ * notifications are streamed.
+ */
typedef struct gf_changelog {
- xlator_t *this;
-
- /* 'processing' directory stream */
- DIR *gfc_dir;
-
- /* fd to the tracker file */
- int gfc_fd;
+ gf_lock_t statelock;
+ gf_changelog_conn_state_t connstate;
- /* connection retries */
- int gfc_connretries;
+ xlator_t *this;
- char gfc_sockpath[UNIX_PATH_MAX];
+ struct list_head list; /* list of instances */
- char gfc_brickpath[PATH_MAX];
+ char brick[PATH_MAX]; /* brick path for this end-point */
- /* socket for receiving notifications */
- int gfc_sockfd;
+ changelog_rpc_t grpc; /* rpc{-clnt,svc} for this brick */
+#define RPC_PROBER(ent) ent->grpc.rpc
+#define RPC_REBORP(ent) ent->grpc.svc
+#define RPC_SOCK(ent) ent->grpc.sock
- char *gfc_working_dir;
+ unsigned int notify; /* notification flag(s) */
- /* RFC 3986 string encoding */
- char rfc3986[256];
+ FINI *fini; /* destructor callback */
+ CALLBACK *callback; /* event callback dispatcher */
+ CONNECT *connected; /* connect callback */
+ DISCONNECT *disconnected; /* disconnection callback */
- char gfc_current_dir[PATH_MAX];
- char gfc_processed_dir[PATH_MAX];
- char gfc_processing_dir[PATH_MAX];
+ void *ptr; /* owner specific private data */
+ xlator_t *invokerxl; /* consumers _this_, if valid,
+ assigned to THIS before cbk is
+ invoked */
- pthread_t gfc_changelog_processor;
+ gf_boolean_t ordered;
- /* Holds gfc for History API */
- struct gf_changelog *hist_gfc;
+ void (*queueevent)(struct gf_event_list *, struct gf_event *);
+ void (*pickevent)(struct gf_event_list *, struct gf_event **);
- /* holds 0 done scanning, 1 keep scanning and -1 error */
- int hist_done;
+ struct gf_event_list event;
} gf_changelog_t;
-typedef struct gf_changelog_history_data {
- int len;
-
- int htime_fd;
-
- /* parallelism count */
- int n_parallel;
-
- /* history from, to indexes */
- unsigned long from;
- unsigned long to;
-} gf_changelog_history_data_t;
-
-typedef struct gf_changelog_consume_data {
- /** set of inputs */
+static inline int
+gf_changelog_filter_check(gf_changelog_t *entry, changelog_event_t *event)
+{
+ if (event->ev_type & entry->notify)
+ return 1;
+ return 0;
+}
+
+#define GF_NEED_ORDERED_EVENTS(ent) (ent->ordered == _gf_true)
+
+/** private structure */
+typedef struct gf_private {
+ pthread_mutex_t lock; /* protects ->connections, cleanups */
+ pthread_cond_t cond;
+
+ void *api; /* pointer for API access */
+
+ pthread_t poller; /* event poller thread */
+ pthread_t connectionjanitor; /* connection cleaner */
+
+ struct list_head connections; /* list of connections */
+ struct list_head cleanups; /* list of connection to be
+ cleaned up */
+} gf_private_t;
+
+#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *)this->private)->api)
+
+/**
+ * upcall: invoke callback with _correct_ THIS
+ */
+#define GF_CHANGELOG_INVOKE_CBK(this, cbk, brick, args...) \
+ do { \
+ xlator_t *old_this = NULL; \
+ xlator_t *invokerxl = NULL; \
+ \
+ invokerxl = entry->invokerxl; \
+ old_this = this; \
+ \
+ if (invokerxl) { \
+ THIS = invokerxl; \
+ } \
+ \
+ cbk(invokerxl, brick, args); \
+ THIS = old_this; \
+ \
+ } while (0)
+
+#define SAVE_THIS(xl) \
+ do { \
+ old_this = xl; \
+ THIS = master; \
+ } while (0)
+
+#define RESTORE_THIS() \
+ do { \
+ if (old_this) \
+ THIS = old_this; \
+ } while (0)
+
+/** APIs and the rest */
- /* fd to read from */
- int fd;
+void *
+gf_changelog_process(void *data);
- /* from @offset */
- off_t offset;
+void
+gf_rfc3986_encode_space_newline(unsigned char *s, char *enc, char *estr);
- xlator_t *this;
- gf_changelog_t *gfc;
+size_t
+gf_changelog_write(int fd, char *buffer, size_t len);
- /** set of outputs */
+ssize_t
+gf_readline(int fd, void *vptr, size_t maxlen);
- /* return value */
- int retval;
+int
+gf_ftruncate(int fd, off_t length);
- /* journal processed */
- char changelog[PATH_MAX];
-} gf_changelog_consume_data_t;
+off_t
+gf_lseek(int fd, off_t offset, int whence);
int
-gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc);
-
+gf_changelog_consume(xlator_t *this, gf_changelog_journal_t *jnl,
+ char *from_path, gf_boolean_t no_publish);
+int
+gf_changelog_publish(xlator_t *this, gf_changelog_journal_t *jnl,
+ char *from_path);
+int
+gf_thread_cleanup(xlator_t *this, pthread_t thread);
void *
-gf_changelog_process (void *data);
+gf_changelog_callback_invoker(void *arg);
-ssize_t
-gf_changelog_read_path (int fd, char *buffer, size_t bufsize);
+int
+gf_cleanup_event(xlator_t *, struct gf_event_list *);
+/* (un)ordered event queueing */
void
-gf_rfc3986_encode (unsigned char *s, char *enc, char *estr);
+queue_ordered_event(struct gf_event_list *, struct gf_event *);
-size_t
-gf_changelog_write (int fd, char *buffer, size_t len);
-
-ssize_t
-gf_readline (int fd, void *vptr, size_t maxlen);
+void
+queue_unordered_event(struct gf_event_list *, struct gf_event *);
-int
-gf_ftruncate (int fd, off_t length);
+/* (un)ordered event picking */
+void
+pick_event_ordered(struct gf_event_list *, struct gf_event **);
-off_t
-gf_lseek (int fd, off_t offset, int whence);
+void
+pick_event_unordered(struct gf_event_list *, struct gf_event **);
-int
-gf_changelog_consume (xlator_t *this,
- gf_changelog_t *gfc,
- char *from_path, gf_boolean_t no_publish);
-int
-gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path);
+/* connection janitor thread */
+void *
+gf_changelog_connection_janitor(void *);
#endif