diff options
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog-helpers.h')
| -rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog-helpers.h | 194 | 
1 files changed, 134 insertions, 60 deletions
| diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h index 9b875d45dcc..17b8862a89b 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h @@ -18,6 +18,11 @@  #include <xlator.h> +#include "changelog.h" + +#include "changelog-rpc-common.h" +#include "gf-changelog-journal.h" +  #define GF_CHANGELOG_TRACKER  "tracker"  #define GF_CHANGELOG_CURRENT_DIR    ".current" @@ -41,79 +46,143 @@ typedef struct read_line {          char rl_buf[MAXLINE];  } read_line_t; +struct gf_changelog; + +/** + * Event list for ordered event notification + * + * ->next_seq holds the next _expected_ sequence number. + */ +struct gf_event_list { +        pthread_mutex_t lock;               /* protects this structure */ +        pthread_cond_t  cond; + +        pthread_t invoker; + +        unsigned long next_seq;             /* next sequence number expected: +                                               zero during bootstrap */ + +        struct gf_changelog *entry;         /* backpointer to it's brick +                                               encapsulator (entry) */ +        struct list_head events;            /* list of events (ordered) */ +}; + +/** + * include a refcount if it's of use by additional layers + */ +struct gf_event { +        int count; + +        unsigned long seq; + +        struct list_head list; + +        struct iovec iov[0]; +}; +#define GF_EVENT_CALLOC_SIZE(cnt, len)                                  \ +        (sizeof (struct gf_event) + (cnt * sizeof (struct iovec)) + len) + +/** + * assign the base address of the IO vector to the correct memory + * area and set it's addressable length. + */ +#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos)                     \ +        do {                                                            \ +                vec->iov_base = ((char *)event) +                       \ +                        sizeof (struct gf_event) +                      \ +                        (event->count * sizeof (struct iovec)) + pos;   \ +                vec->iov_len = len;                                     \ +                pos += len;                                             \ +        } while (0) + +/** + * An instance of this structure is allocated for each brick for which + * notifications are streamed. + */  typedef struct gf_changelog {          xlator_t *this; -        /* 'processing' directory stream */ -        DIR *gfc_dir; - -        /* fd to the tracker file */ -        int gfc_fd; - -        /* connection retries */ -        int gfc_connretries; +        struct list_head list;              /* list of instances */ -        char gfc_sockpath[UNIX_PATH_MAX]; +        char brick[PATH_MAX];               /* brick path for this end-point */ -        char gfc_brickpath[PATH_MAX]; +        changelog_rpc_t grpc;               /* rpc{-clnt,svc} for this brick */ +#define RPC_PROBER(ent)  ent->grpc.rpc +#define RPC_REBORP(ent)  ent->grpc.svc +#define RPC_SOCK(ent)    ent->grpc.sock -        /* socket for receiving notifications */ -        int gfc_sockfd; +        unsigned int notify;                /* notification flag(s) */ -        char *gfc_working_dir; +        FINI       *fini;                   /* destructor callback */ +        CALLBACK   *callback;               /* event callback dispatcher */ +        CONNECT    *connected;              /* connect callback */ +        DISCONNECT *disconnected;           /* disconnection callback */ -        /* RFC 3986 string encoding */ -        char rfc3986[256]; +        void *ptr;                          /* owner specific private data */ +        xlator_t *invokerxl;                /* consumers _this_, if valid, +                                               assigned to THIS before cbk is +                                               invoked */ -        char gfc_current_dir[PATH_MAX]; -        char gfc_processed_dir[PATH_MAX]; -        char gfc_processing_dir[PATH_MAX]; +        gf_boolean_t ordered; -        pthread_t gfc_changelog_processor; - -        /* Holds gfc for History API */ -        struct gf_changelog *hist_gfc; - -        /* holds 0 done scanning, 1 keep scanning and -1 error */ -        int hist_done; +        struct gf_event_list event;  } gf_changelog_t; -typedef struct gf_changelog_history_data { -        int           len; - -        int           htime_fd; - -        /* parallelism count */ -        int           n_parallel; - -        /* history from, to indexes */ -        unsigned long from; -        unsigned long to; -} gf_changelog_history_data_t; - -typedef struct gf_changelog_consume_data { -        /** set of inputs */ - -        /* fd to read from */ -        int             fd; - -        /* from @offset */ -        off_t           offset; - -        xlator_t       *this; -        gf_changelog_t *gfc; - -        /** set of outputs */ +static inline int +gf_changelog_filter_check (gf_changelog_t *entry, changelog_event_t *event) +{ +        if (event->ev_type & entry->notify) +                return 1; +        return 0; +} + +#define GF_NEED_ORDERED_EVENTS(ent)  (ent->ordered == _gf_true) + +/** private structure */ +typedef struct gf_private { +        gf_lock_t lock;                  /* protects ->connections */ + +        void *api;                       /* pointer for API access */ + +        pthread_t poller;                /* event poller thread */ + +        struct list_head connections;    /* list of connections */ +} gf_private_t; + +#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *) this->private)->api) + +/** + * upcall: invoke callback with _correct_ THIS + */ +#define GF_CHANGELOG_INVOKE_CBK(this, cbk, brick, args ...)             \ +        do {                                                            \ +                xlator_t *old_this = NULL;                              \ +                xlator_t *invokerxl = NULL;                             \ +                                                                        \ +                invokerxl = entry->invokerxl;                           \ +                old_this = this;                                        \ +                                                                        \ +                if (invokerxl) {                                        \ +                        THIS = invokerxl;                               \ +                }                                                       \ +                                                                        \ +                cbk (invokerxl, brick, args);                           \ +                THIS = old_this;                                        \ +                                                                        \ +        } while (0) -        /* return value */ -        int retval; +#define SAVE_THIS(xl)                           \ +        do {                                    \ +                old_this = xl;                  \ +                THIS = master;                  \ +        } while (0) -        /* journal processed */ -        char changelog[PATH_MAX]; -} gf_changelog_consume_data_t; +#define RESTORE_THIS()                          \ +        do {                                    \ +                THIS = old_this;                \ +        } while (0) -int -gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc); +/** APIs and the rest */  void *  gf_changelog_process (void *data); @@ -138,9 +207,14 @@ gf_lseek (int fd, off_t offset, int whence);  int  gf_changelog_consume (xlator_t *this, -                      gf_changelog_t *gfc, +                      gf_changelog_journal_t *jnl,                        char *from_path, gf_boolean_t no_publish);  int -gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path); +gf_changelog_publish (xlator_t *this, +                      gf_changelog_journal_t *jnl, char *from_path); +int +gf_thread_cleanup (xlator_t *this, pthread_t thread); +void * +gf_changelog_callback_invoker (void *arg);  #endif | 
