diff options
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog-helpers.h')
-rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog-helpers.h | 194 |
1 files changed, 134 insertions, 60 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h index 9b875d45dcc..17b8862a89b 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h @@ -18,6 +18,11 @@ #include <xlator.h> +#include "changelog.h" + +#include "changelog-rpc-common.h" +#include "gf-changelog-journal.h" + #define GF_CHANGELOG_TRACKER "tracker" #define GF_CHANGELOG_CURRENT_DIR ".current" @@ -41,79 +46,143 @@ typedef struct read_line { char rl_buf[MAXLINE]; } read_line_t; +struct gf_changelog; + +/** + * Event list for ordered event notification + * + * ->next_seq holds the next _expected_ sequence number. + */ +struct gf_event_list { + pthread_mutex_t lock; /* protects this structure */ + pthread_cond_t cond; + + pthread_t invoker; + + unsigned long next_seq; /* next sequence number expected: + zero during bootstrap */ + + struct gf_changelog *entry; /* backpointer to it's brick + encapsulator (entry) */ + struct list_head events; /* list of events (ordered) */ +}; + +/** + * include a refcount if it's of use by additional layers + */ +struct gf_event { + int count; + + unsigned long seq; + + struct list_head list; + + struct iovec iov[0]; +}; +#define GF_EVENT_CALLOC_SIZE(cnt, len) \ + (sizeof (struct gf_event) + (cnt * sizeof (struct iovec)) + len) + +/** + * assign the base address of the IO vector to the correct memory + * area and set it's addressable length. + */ +#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos) \ + do { \ + vec->iov_base = ((char *)event) + \ + sizeof (struct gf_event) + \ + (event->count * sizeof (struct iovec)) + pos; \ + vec->iov_len = len; \ + pos += len; \ + } while (0) + +/** + * An instance of this structure is allocated for each brick for which + * notifications are streamed. + */ typedef struct gf_changelog { xlator_t *this; - /* 'processing' directory stream */ - DIR *gfc_dir; - - /* fd to the tracker file */ - int gfc_fd; - - /* connection retries */ - int gfc_connretries; + struct list_head list; /* list of instances */ - char gfc_sockpath[UNIX_PATH_MAX]; + char brick[PATH_MAX]; /* brick path for this end-point */ - char gfc_brickpath[PATH_MAX]; + changelog_rpc_t grpc; /* rpc{-clnt,svc} for this brick */ +#define RPC_PROBER(ent) ent->grpc.rpc +#define RPC_REBORP(ent) ent->grpc.svc +#define RPC_SOCK(ent) ent->grpc.sock - /* socket for receiving notifications */ - int gfc_sockfd; + unsigned int notify; /* notification flag(s) */ - char *gfc_working_dir; + FINI *fini; /* destructor callback */ + CALLBACK *callback; /* event callback dispatcher */ + CONNECT *connected; /* connect callback */ + DISCONNECT *disconnected; /* disconnection callback */ - /* RFC 3986 string encoding */ - char rfc3986[256]; + void *ptr; /* owner specific private data */ + xlator_t *invokerxl; /* consumers _this_, if valid, + assigned to THIS before cbk is + invoked */ - char gfc_current_dir[PATH_MAX]; - char gfc_processed_dir[PATH_MAX]; - char gfc_processing_dir[PATH_MAX]; + gf_boolean_t ordered; - pthread_t gfc_changelog_processor; - - /* Holds gfc for History API */ - struct gf_changelog *hist_gfc; - - /* holds 0 done scanning, 1 keep scanning and -1 error */ - int hist_done; + struct gf_event_list event; } gf_changelog_t; -typedef struct gf_changelog_history_data { - int len; - - int htime_fd; - - /* parallelism count */ - int n_parallel; - - /* history from, to indexes */ - unsigned long from; - unsigned long to; -} gf_changelog_history_data_t; - -typedef struct gf_changelog_consume_data { - /** set of inputs */ - - /* fd to read from */ - int fd; - - /* from @offset */ - off_t offset; - - xlator_t *this; - gf_changelog_t *gfc; - - /** set of outputs */ +static inline int +gf_changelog_filter_check (gf_changelog_t *entry, changelog_event_t *event) +{ + if (event->ev_type & entry->notify) + return 1; + return 0; +} + +#define GF_NEED_ORDERED_EVENTS(ent) (ent->ordered == _gf_true) + +/** private structure */ +typedef struct gf_private { + gf_lock_t lock; /* protects ->connections */ + + void *api; /* pointer for API access */ + + pthread_t poller; /* event poller thread */ + + struct list_head connections; /* list of connections */ +} gf_private_t; + +#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *) this->private)->api) + +/** + * upcall: invoke callback with _correct_ THIS + */ +#define GF_CHANGELOG_INVOKE_CBK(this, cbk, brick, args ...) \ + do { \ + xlator_t *old_this = NULL; \ + xlator_t *invokerxl = NULL; \ + \ + invokerxl = entry->invokerxl; \ + old_this = this; \ + \ + if (invokerxl) { \ + THIS = invokerxl; \ + } \ + \ + cbk (invokerxl, brick, args); \ + THIS = old_this; \ + \ + } while (0) - /* return value */ - int retval; +#define SAVE_THIS(xl) \ + do { \ + old_this = xl; \ + THIS = master; \ + } while (0) - /* journal processed */ - char changelog[PATH_MAX]; -} gf_changelog_consume_data_t; +#define RESTORE_THIS() \ + do { \ + THIS = old_this; \ + } while (0) -int -gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc); +/** APIs and the rest */ void * gf_changelog_process (void *data); @@ -138,9 +207,14 @@ gf_lseek (int fd, off_t offset, int whence); int gf_changelog_consume (xlator_t *this, - gf_changelog_t *gfc, + gf_changelog_journal_t *jnl, char *from_path, gf_boolean_t no_publish); int -gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path); +gf_changelog_publish (xlator_t *this, + gf_changelog_journal_t *jnl, char *from_path); +int +gf_thread_cleanup (xlator_t *this, pthread_t thread); +void * +gf_changelog_callback_invoker (void *arg); #endif |