diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog-helpers.h')
-rw-r--r-- | xlators/features/changelog/src/changelog-helpers.h | 107 |
1 files changed, 70 insertions, 37 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h index 03a795369d1..33a99ee4eed 100644 --- a/xlators/features/changelog/src/changelog-helpers.h +++ b/xlators/features/changelog/src/changelog-helpers.h @@ -15,10 +15,16 @@ #include "timer.h" #include "pthread.h" #include "iobuf.h" +#include "rot-buffs.h" #include "changelog-misc.h" #include "call-stub.h" +#include "rpcsvc.h" +#include "changelog-ev-handle.h" + +#include "changelog.h" + /** * the changelog entry */ @@ -120,29 +126,6 @@ typedef struct changelog_fsync { xlator_t *this; } changelog_fsync_t; -# define CHANGELOG_MAX_CLIENTS 5 -typedef struct changelog_notify { - /* reader end of the pipe */ - int rfd; - - /* notifier thread */ - pthread_t notify_th; - - /* unique socket path */ - char sockpath[UNIX_PATH_MAX]; - - int socket_fd; - - /** - * simple array of accept()'ed fds. Not scalable at all - * for large number of clients, but it's okay as we have - * a ahrd limit in this version (@CHANGELOG_MAX_CLIENTS). - */ - int client_fd[CHANGELOG_MAX_CLIENTS]; - - xlator_t *this; -} changelog_notify_t; - /* Draining during changelog rollover (for geo-rep snapshot dependency): * -------------------------------------------------------------------- * The introduction of draining of in-transit fops during changelog rollover @@ -162,14 +145,14 @@ typedef struct changelog_notify { typedef enum chlog_fop_color { FOP_COLOR_BLACK, FOP_COLOR_WHITE -}chlog_fop_color_t; +} chlog_fop_color_t; /* Barrier notify variable */ typedef struct barrier_notify { pthread_mutex_t bnotify_mutex; pthread_cond_t bnotify_cond; gf_boolean_t bnotify; -}barrier_notify_t; +} barrier_notify_t; /* Two separate mutex and conditional variable set is used * to drain white and black fops. */ @@ -185,15 +168,26 @@ typedef struct drain_mgmt { unsigned long white_fop_cnt; gf_boolean_t drain_wait_black; gf_boolean_t drain_wait_white; -}drain_mgmt_t; +} drain_mgmt_t; /* External barrier as a result of snap on/off indicating flag*/ typedef struct barrier_flags { gf_lock_t lock; gf_boolean_t barrier_ext; -}barrier_flags_t; +} barrier_flags_t; +/* Event selection */ +typedef struct changelog_ev_selector { + gf_lock_t reflock; + /** + * Array of references for each selection bit. + */ + unsigned int ref[CHANGELOG_EV_SELECTION_RANGE]; +} changelog_ev_selector_t; + + +/* changelog's private structure */ struct changelog_priv { gf_boolean_t active; @@ -223,9 +217,6 @@ struct changelog_priv { /* lock to synchronize CSNAP updation */ gf_lock_t c_snap_lock; - /* writen end of the pipe */ - int wfd; - /* rollover time */ int32_t rollover_time; @@ -247,9 +238,6 @@ struct changelog_priv { /* context of fsync thread */ changelog_fsync_t cf; - /* context of the notifier thread */ - changelog_notify_t cn; - /* operation mode */ changelog_mode_t op_mode; @@ -262,7 +250,9 @@ struct changelog_priv { /* encoder */ struct changelog_encoder *ce; - /* snapshot dependency changes */ + /** + * snapshot dependency changes + */ /* Draining of fops*/ drain_mgmt_t dm; @@ -289,6 +279,30 @@ struct changelog_priv { gf_timer_t *timer; struct timespec timeout; + /** + * buffers, RPC, event selection, notifications and other + * beasts. + */ + + /* epoll pthread */ + pthread_t poller; + + /* rotational buffer */ + rbuf_t *rbuf; + + /* changelog RPC server */ + rpcsvc_t *rpc; + + /* event selection */ + changelog_ev_selector_t ev_selection; + + /* client handling (reverse connection) */ + pthread_t connector; + + int nr_dispatchers; + pthread_t *ev_dispatcher; + + changelog_clnt_t connections; }; struct changelog_local { @@ -367,7 +381,7 @@ typedef struct { * helpers routines */ -void +int changelog_thread_cleanup (xlator_t *this, pthread_t thr_id); void * @@ -386,7 +400,7 @@ changelog_start_next_change (xlator_t *this, changelog_priv_t *priv, unsigned long ts, gf_boolean_t finale); int -changelog_open (xlator_t *this, changelog_priv_t *priv); +changelog_open_journal (xlator_t *this, changelog_priv_t *priv); int changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last); int @@ -449,6 +463,7 @@ changelog_snap_handle_ascii_change (xlator_t *this, changelog_log_data_t *cld); int changelog_snap_write_change (changelog_priv_t *priv, char *buffer, size_t len); + /* Changelog barrier routines */ void __chlog_barrier_enqueue (xlator_t *this, call_stub_t *stub); void __chlog_barrier_disable (xlator_t *this, struct list_head *queue); @@ -460,6 +475,24 @@ int32_t changelog_fill_entry_buf (call_frame_t *frame, xlator_t *this, loc_t *loc, changelog_local_t **local); +/* event selection routines */ +inline void changelog_select_event (xlator_t *, + changelog_ev_selector_t *, unsigned int); +inline void changelog_deselect_event (xlator_t *, + changelog_ev_selector_t *, unsigned int); +inline int changelog_init_event_selection (xlator_t *, + changelog_ev_selector_t *); +inline int changelog_cleanup_event_selection (xlator_t *, + changelog_ev_selector_t *); +inline int changelog_ev_selected (xlator_t *, + changelog_ev_selector_t *, unsigned int); +inline void +changelog_dispatch_event (xlator_t *, changelog_priv_t *, changelog_event_t *); + +changelog_inode_ctx_t * +__changelog_inode_ctx_get (xlator_t *, inode_t *, unsigned long **, + unsigned long *, changelog_log_type); + /* macros */ #define CHANGELOG_STACK_UNWIND(fop, frame, params ...) do { \ @@ -471,10 +504,10 @@ changelog_fill_entry_buf (call_frame_t *frame, xlator_t *this, frame->local = NULL; \ } \ STACK_UNWIND_STRICT (fop, frame, params); \ - changelog_local_cleanup (__xl, __local); \ if (__local && __local->prev_entry) \ changelog_local_cleanup (__xl, \ __local->prev_entry); \ + changelog_local_cleanup (__xl, __local); \ } while (0) #define CHANGELOG_IOBUF_REF(iobuf) do { \ |