diff options
Diffstat (limited to 'xlators/features/changelog/lib/src')
4 files changed, 192 insertions, 204 deletions
diff --git a/xlators/features/changelog/lib/src/changelog-lib-messages.h b/xlators/features/changelog/lib/src/changelog-lib-messages.h index 2061217b801..32b3497d89d 100644 --- a/xlators/features/changelog/lib/src/changelog-lib-messages.h +++ b/xlators/features/changelog/lib/src/changelog-lib-messages.h @@ -23,39 +23,26 @@ * glfs-message-id.h. */ -GLFS_MSGID(CHANGELOG_LIB, - CHANGELOG_LIB_MSG_OPEN_FAILED, - CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, - CHANGELOG_LIB_MSG_SCRATCH_DIR_ENTRIES_CREATION_ERROR, - CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, - CHANGELOG_LIB_MSG_OPENDIR_ERROR, - CHANGELOG_LIB_MSG_RENAME_FAILED, - CHANGELOG_LIB_MSG_READ_ERROR, - CHANGELOG_LIB_MSG_HTIME_ERROR, - CHANGELOG_LIB_MSG_GET_TIME_ERROR, - CHANGELOG_LIB_MSG_WRITE_FAILED, - CHANGELOG_LIB_MSG_PTHREAD_ERROR, - CHANGELOG_LIB_MSG_MMAP_FAILED, - CHANGELOG_LIB_MSG_MUNMAP_FAILED, - CHANGELOG_LIB_MSG_ASCII_ERROR, - CHANGELOG_LIB_MSG_STAT_FAILED, - CHANGELOG_LIB_MSG_GET_XATTR_FAILED, - CHANGELOG_LIB_MSG_PUBLISH_ERROR, - CHANGELOG_LIB_MSG_PARSE_ERROR, - CHANGELOG_LIB_MSG_TOTAL_LOG_INFO, - CHANGELOG_LIB_MSG_CLEANUP_ERROR, - CHANGELOG_LIB_MSG_UNLINK_FAILED, - CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED, - CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED, - CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO, - CHANGELOG_LIB_MSG_CLEANING_BRICK_ENTRY_INFO, - CHANGELOG_LIB_MSG_FREEING_ENTRY_INFO, - CHANGELOG_LIB_MSG_XDR_DECODING_FAILED, - CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, - CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, - CHANGELOG_LIB_MSG_COPY_FROM_BUFFER_FAILED, - CHANGELOG_LIB_MSG_PTHREAD_JOIN_FAILED, - CHANGELOG_LIB_MSG_HIST_FAILED -); +GLFS_MSGID( + CHANGELOG_LIB, CHANGELOG_LIB_MSG_OPEN_FAILED, + CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, + CHANGELOG_LIB_MSG_SCRATCH_DIR_ENTRIES_CREATION_ERROR, + CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, CHANGELOG_LIB_MSG_OPENDIR_ERROR, + CHANGELOG_LIB_MSG_RENAME_FAILED, CHANGELOG_LIB_MSG_READ_ERROR, + CHANGELOG_LIB_MSG_HTIME_ERROR, CHANGELOG_LIB_MSG_GET_TIME_ERROR, + CHANGELOG_LIB_MSG_WRITE_FAILED, CHANGELOG_LIB_MSG_PTHREAD_ERROR, + CHANGELOG_LIB_MSG_MMAP_FAILED, CHANGELOG_LIB_MSG_MUNMAP_FAILED, + CHANGELOG_LIB_MSG_ASCII_ERROR, CHANGELOG_LIB_MSG_STAT_FAILED, + CHANGELOG_LIB_MSG_GET_XATTR_FAILED, CHANGELOG_LIB_MSG_PUBLISH_ERROR, + CHANGELOG_LIB_MSG_PARSE_ERROR, CHANGELOG_LIB_MSG_TOTAL_LOG_INFO, + CHANGELOG_LIB_MSG_CLEANUP_ERROR, CHANGELOG_LIB_MSG_UNLINK_FAILED, + CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED, + CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED, CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO, + CHANGELOG_LIB_MSG_CLEANING_BRICK_ENTRY_INFO, + CHANGELOG_LIB_MSG_FREEING_ENTRY_INFO, CHANGELOG_LIB_MSG_XDR_DECODING_FAILED, + CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, + CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, + CHANGELOG_LIB_MSG_COPY_FROM_BUFFER_FAILED, + CHANGELOG_LIB_MSG_PTHREAD_JOIN_FAILED, CHANGELOG_LIB_MSG_HIST_FAILED); #endif /* !_CHANGELOG_MESSAGES_H_ */ diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h index b05628ee70d..cfb26a0081e 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h @@ -23,27 +23,28 @@ #include "changelog-rpc-common.h" #include "gf-changelog-journal.h" -#define GF_CHANGELOG_TRACKER "tracker" +#define GF_CHANGELOG_TRACKER "tracker" -#define GF_CHANGELOG_CURRENT_DIR ".current" -#define GF_CHANGELOG_PROCESSED_DIR ".processed" +#define GF_CHANGELOG_CURRENT_DIR ".current" +#define GF_CHANGELOG_PROCESSED_DIR ".processed" #define GF_CHANGELOG_PROCESSING_DIR ".processing" -#define GF_CHANGELOG_HISTORY_DIR ".history" +#define GF_CHANGELOG_HISTORY_DIR ".history" #define TIMESTAMP_LENGTH 10 #ifndef MAXLINE #define MAXLINE 4096 #endif -#define GF_CHANGELOG_FILL_BUFFER(ptr, ascii, off, len) do { \ - memcpy (ascii + off, ptr, len); \ - off += len; \ - } while (0) +#define GF_CHANGELOG_FILL_BUFFER(ptr, ascii, off, len) \ + do { \ + memcpy(ascii + off, ptr, len); \ + off += len; \ + } while (0) typedef struct read_line { - int rl_cnt; - char *rl_bufptr; - char rl_buf[MAXLINE]; + int rl_cnt; + char *rl_bufptr; + char rl_buf[MAXLINE]; } read_line_t; struct gf_changelog; @@ -55,51 +56,50 @@ struct gf_event; * ->next_seq holds the next _expected_ sequence number. */ struct gf_event_list { - pthread_mutex_t lock; /* protects this structure */ - pthread_cond_t cond; + pthread_mutex_t lock; /* protects this structure */ + pthread_cond_t cond; - pthread_t invoker; + pthread_t invoker; - unsigned long next_seq; /* next sequence number expected: - zero during bootstrap */ + unsigned long next_seq; /* next sequence number expected: + zero during bootstrap */ - struct gf_changelog *entry; /* backpointer to it's brick - encapsulator (entry) */ - struct list_head events; /* list of events */ + struct gf_changelog *entry; /* backpointer to it's brick + encapsulator (entry) */ + struct list_head events; /* list of events */ }; /** * include a refcount if it's of use by additional layers */ struct gf_event { - int count; + int count; - unsigned long seq; + unsigned long seq; - struct list_head list; + struct list_head list; - struct iovec iov[0]; + struct iovec iov[0]; }; -#define GF_EVENT_CALLOC_SIZE(cnt, len) \ - (sizeof (struct gf_event) + (cnt * sizeof (struct iovec)) + len) +#define GF_EVENT_CALLOC_SIZE(cnt, len) \ + (sizeof(struct gf_event) + (cnt * sizeof(struct iovec)) + len) /** * assign the base address of the IO vector to the correct memory o * area and set it's addressable length. */ -#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos) \ - do { \ - vec->iov_base = ((char *)event) + \ - sizeof (struct gf_event) + \ - (event->count * sizeof (struct iovec)) + pos; \ - vec->iov_len = len; \ - pos += len; \ - } while (0) +#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos) \ + do { \ + vec->iov_base = ((char *)event) + sizeof(struct gf_event) + \ + (event->count * sizeof(struct iovec)) + pos; \ + vec->iov_len = len; \ + pos += len; \ + } while (0) typedef enum gf_changelog_conn_state { - GF_CHANGELOG_CONN_STATE_PENDING = 0, - GF_CHANGELOG_CONN_STATE_ACCEPTED, - GF_CHANGELOG_CONN_STATE_DISCONNECTED, + GF_CHANGELOG_CONN_STATE_PENDING = 0, + GF_CHANGELOG_CONN_STATE_ACCEPTED, + GF_CHANGELOG_CONN_STATE_DISCONNECTED, } gf_changelog_conn_state_t; /** @@ -107,153 +107,152 @@ typedef enum gf_changelog_conn_state { * notifications are streamed. */ typedef struct gf_changelog { - gf_lock_t statelock; - gf_changelog_conn_state_t connstate; + gf_lock_t statelock; + gf_changelog_conn_state_t connstate; - xlator_t *this; + xlator_t *this; - struct list_head list; /* list of instances */ + struct list_head list; /* list of instances */ - char brick[PATH_MAX]; /* brick path for this end-point */ + char brick[PATH_MAX]; /* brick path for this end-point */ - changelog_rpc_t grpc; /* rpc{-clnt,svc} for this brick */ -#define RPC_PROBER(ent) ent->grpc.rpc -#define RPC_REBORP(ent) ent->grpc.svc -#define RPC_SOCK(ent) ent->grpc.sock + changelog_rpc_t grpc; /* rpc{-clnt,svc} for this brick */ +#define RPC_PROBER(ent) ent->grpc.rpc +#define RPC_REBORP(ent) ent->grpc.svc +#define RPC_SOCK(ent) ent->grpc.sock - unsigned int notify; /* notification flag(s) */ + unsigned int notify; /* notification flag(s) */ - FINI *fini; /* destructor callback */ - CALLBACK *callback; /* event callback dispatcher */ - CONNECT *connected; /* connect callback */ - DISCONNECT *disconnected; /* disconnection callback */ + FINI *fini; /* destructor callback */ + CALLBACK *callback; /* event callback dispatcher */ + CONNECT *connected; /* connect callback */ + DISCONNECT *disconnected; /* disconnection callback */ - void *ptr; /* owner specific private data */ - xlator_t *invokerxl; /* consumers _this_, if valid, - assigned to THIS before cbk is - invoked */ + void *ptr; /* owner specific private data */ + xlator_t *invokerxl; /* consumers _this_, if valid, + assigned to THIS before cbk is + invoked */ - gf_boolean_t ordered; + gf_boolean_t ordered; - void (*queueevent) (struct gf_event_list *, struct gf_event *); - void (*pickevent) (struct gf_event_list *, struct gf_event **); + void (*queueevent)(struct gf_event_list *, struct gf_event *); + void (*pickevent)(struct gf_event_list *, struct gf_event **); - struct gf_event_list event; + struct gf_event_list event; } gf_changelog_t; static inline int -gf_changelog_filter_check (gf_changelog_t *entry, changelog_event_t *event) +gf_changelog_filter_check(gf_changelog_t *entry, changelog_event_t *event) { - if (event->ev_type & entry->notify) - return 1; - return 0; + if (event->ev_type & entry->notify) + return 1; + return 0; } -#define GF_NEED_ORDERED_EVENTS(ent) (ent->ordered == _gf_true) +#define GF_NEED_ORDERED_EVENTS(ent) (ent->ordered == _gf_true) /** private structure */ typedef struct gf_private { - pthread_mutex_t lock; /* protects ->connections, cleanups */ - pthread_cond_t cond; + pthread_mutex_t lock; /* protects ->connections, cleanups */ + pthread_cond_t cond; - void *api; /* pointer for API access */ + void *api; /* pointer for API access */ - pthread_t poller; /* event poller thread */ - pthread_t connectionjanitor; /* connection cleaner */ + pthread_t poller; /* event poller thread */ + pthread_t connectionjanitor; /* connection cleaner */ - struct list_head connections; /* list of connections */ - struct list_head cleanups; /* list of connection to be - cleaned up */ + struct list_head connections; /* list of connections */ + struct list_head cleanups; /* list of connection to be + cleaned up */ } gf_private_t; -#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *) this->private)->api) +#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *)this->private)->api) /** * upcall: invoke callback with _correct_ THIS */ -#define GF_CHANGELOG_INVOKE_CBK(this, cbk, brick, args ...) \ - do { \ - xlator_t *old_this = NULL; \ - xlator_t *invokerxl = NULL; \ - \ - invokerxl = entry->invokerxl; \ - old_this = this; \ - \ - if (invokerxl) { \ - THIS = invokerxl; \ - } \ - \ - cbk (invokerxl, brick, args); \ - THIS = old_this; \ - \ - } while (0) - -#define SAVE_THIS(xl) \ - do { \ - old_this = xl; \ - THIS = master; \ - } while (0) - -#define RESTORE_THIS() \ - do { \ - if (old_this) \ - THIS = old_this; \ - } while (0) +#define GF_CHANGELOG_INVOKE_CBK(this, cbk, brick, args...) \ + do { \ + xlator_t *old_this = NULL; \ + xlator_t *invokerxl = NULL; \ + \ + invokerxl = entry->invokerxl; \ + old_this = this; \ + \ + if (invokerxl) { \ + THIS = invokerxl; \ + } \ + \ + cbk(invokerxl, brick, args); \ + THIS = old_this; \ + \ + } while (0) + +#define SAVE_THIS(xl) \ + do { \ + old_this = xl; \ + THIS = master; \ + } while (0) + +#define RESTORE_THIS() \ + do { \ + if (old_this) \ + THIS = old_this; \ + } while (0) /** APIs and the rest */ void * -gf_changelog_process (void *data); +gf_changelog_process(void *data); ssize_t -gf_changelog_read_path (int fd, char *buffer, size_t bufsize); +gf_changelog_read_path(int fd, char *buffer, size_t bufsize); void -gf_rfc3986_encode_space_newline (unsigned char *s, char *enc, char *estr); +gf_rfc3986_encode_space_newline(unsigned char *s, char *enc, char *estr); size_t -gf_changelog_write (int fd, char *buffer, size_t len); +gf_changelog_write(int fd, char *buffer, size_t len); ssize_t -gf_readline (int fd, void *vptr, size_t maxlen); +gf_readline(int fd, void *vptr, size_t maxlen); int -gf_ftruncate (int fd, off_t length); +gf_ftruncate(int fd, off_t length); off_t -gf_lseek (int fd, off_t offset, int whence); +gf_lseek(int fd, off_t offset, int whence); int -gf_changelog_consume (xlator_t *this, - gf_changelog_journal_t *jnl, - char *from_path, gf_boolean_t no_publish); +gf_changelog_consume(xlator_t *this, gf_changelog_journal_t *jnl, + char *from_path, gf_boolean_t no_publish); int -gf_changelog_publish (xlator_t *this, - gf_changelog_journal_t *jnl, char *from_path); +gf_changelog_publish(xlator_t *this, gf_changelog_journal_t *jnl, + char *from_path); int -gf_thread_cleanup (xlator_t *this, pthread_t thread); +gf_thread_cleanup(xlator_t *this, pthread_t thread); void * -gf_changelog_callback_invoker (void *arg); +gf_changelog_callback_invoker(void *arg); int -gf_cleanup_event (xlator_t *, struct gf_event_list *); +gf_cleanup_event(xlator_t *, struct gf_event_list *); /* (un)ordered event queueing */ void -queue_ordered_event (struct gf_event_list *, struct gf_event *); +queue_ordered_event(struct gf_event_list *, struct gf_event *); void -queue_unordered_event (struct gf_event_list *, struct gf_event *); +queue_unordered_event(struct gf_event_list *, struct gf_event *); /* (un)ordered event picking */ void -pick_event_ordered (struct gf_event_list *, struct gf_event **); +pick_event_ordered(struct gf_event_list *, struct gf_event **); void -pick_event_unordered (struct gf_event_list *, struct gf_event **); +pick_event_unordered(struct gf_event_list *, struct gf_event **); /* connection janitor thread */ void * -gf_changelog_connection_janitor (void *); +gf_changelog_connection_janitor(void *); #endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-journal.h b/xlators/features/changelog/lib/src/gf-changelog-journal.h index 46d50f159d9..ba5b9bf827e 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-journal.h +++ b/xlators/features/changelog/lib/src/gf-changelog-journal.h @@ -17,91 +17,91 @@ #include "changelog.h" enum api_conn { - JNL_API_CONNECTED, - JNL_API_CONN_INPROGESS, - JNL_API_DISCONNECTED, + JNL_API_CONNECTED, + JNL_API_CONN_INPROGESS, + JNL_API_DISCONNECTED, }; typedef struct gf_changelog_entry { - char path[PATH_MAX]; + char path[PATH_MAX]; - struct list_head list; + struct list_head list; } gf_changelog_entry_t; typedef struct gf_changelog_processor { - pthread_mutex_t lock; /* protects ->entries */ - pthread_cond_t cond; /* waiter during empty list */ - gf_boolean_t waiting; + pthread_mutex_t lock; /* protects ->entries */ + pthread_cond_t cond; /* waiter during empty list */ + gf_boolean_t waiting; - pthread_t processor; /* thread-id of journal processing thread */ + pthread_t processor; /* thread-id of journal processing thread */ - struct list_head entries; + struct list_head entries; } gf_changelog_processor_t; typedef struct gf_changelog_journal { - DIR *jnl_dir; /* 'processing' directory stream */ + DIR *jnl_dir; /* 'processing' directory stream */ - int jnl_fd; /* fd to the tracker file */ + int jnl_fd; /* fd to the tracker file */ - char jnl_brickpath[PATH_MAX]; /* brick path for this end-point */ + char jnl_brickpath[PATH_MAX]; /* brick path for this end-point */ - gf_changelog_processor_t *jnl_proc; + gf_changelog_processor_t *jnl_proc; - char *jnl_working_dir; /* scratch directory */ + char *jnl_working_dir; /* scratch directory */ - char jnl_current_dir[PATH_MAX]; - char jnl_processed_dir[PATH_MAX]; - char jnl_processing_dir[PATH_MAX]; + char jnl_current_dir[PATH_MAX]; + char jnl_processed_dir[PATH_MAX]; + char jnl_processing_dir[PATH_MAX]; - char rfc3986_space_newline[256]; /* RFC 3986 string encoding */ + char rfc3986_space_newline[256]; /* RFC 3986 string encoding */ - struct gf_changelog_journal *hist_jnl; - int hist_done; /* holds 0 done scanning, - 1 keep scanning and -1 error */ + struct gf_changelog_journal *hist_jnl; + int hist_done; /* holds 0 done scanning, + 1 keep scanning and -1 error */ - pthread_spinlock_t lock; - int connected; - xlator_t *this; + pthread_spinlock_t lock; + int connected; + xlator_t *this; } gf_changelog_journal_t; -#define JNL_SET_API_STATE(jnl, state) (jnl->connected = state) -#define JNL_IS_API_DISCONNECTED(jnl) (jnl->connected == JNL_API_DISCONNECTED) +#define JNL_SET_API_STATE(jnl, state) (jnl->connected = state) +#define JNL_IS_API_DISCONNECTED(jnl) (jnl->connected == JNL_API_DISCONNECTED) /* History API */ typedef struct gf_changelog_history_data { - int len; + int len; - int htime_fd; + int htime_fd; - /* parallelism count */ - int n_parallel; + /* parallelism count */ + int n_parallel; - /* history from, to indexes */ - unsigned long from; - unsigned long to; - xlator_t *this; + /* history from, to indexes */ + unsigned long from; + unsigned long to; + xlator_t *this; } gf_changelog_history_data_t; typedef struct gf_changelog_consume_data { - /** set of inputs */ + /** set of inputs */ - /* fd to read from */ - int fd; + /* fd to read from */ + int fd; - /* from @offset */ - off_t offset; + /* from @offset */ + off_t offset; - xlator_t *this; + xlator_t *this; - gf_changelog_journal_t *jnl; + gf_changelog_journal_t *jnl; - /** set of outputs */ + /** set of outputs */ - /* return value */ - int retval; + /* return value */ + int retval; - /* journal processed */ - char changelog[PATH_MAX]; + /* journal processed */ + char changelog[PATH_MAX]; } gf_changelog_consume_data_t; /* event handler */ diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.h b/xlators/features/changelog/lib/src/gf-changelog-rpc.h index 1c982eef809..975307b99d3 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-rpc.h +++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.h @@ -16,11 +16,13 @@ #include "gf-changelog-helpers.h" #include "changelog-rpc-common.h" -struct rpc_clnt *gf_changelog_rpc_init (xlator_t *, gf_changelog_t *); +struct rpc_clnt * +gf_changelog_rpc_init(xlator_t *, gf_changelog_t *); -int gf_changelog_invoke_rpc (xlator_t *, gf_changelog_t *, int); +int +gf_changelog_invoke_rpc(xlator_t *, gf_changelog_t *, int); rpcsvc_t * -gf_changelog_reborp_init_rpc_listner (xlator_t *, char *, char *, void *); +gf_changelog_reborp_init_rpc_listner(xlator_t *, char *, char *, void *); #endif |