diff options
31 files changed, 3971 insertions, 1302 deletions
diff --git a/rpc/xdr/src/Makefile.am b/rpc/xdr/src/Makefile.am index a1c5525c17e..e27528d0e01 100644 --- a/rpc/xdr/src/Makefile.am +++ b/rpc/xdr/src/Makefile.am @@ -1,5 +1,6 @@  XDRSOURCES = glusterfs3-xdr.c cli1-xdr.c nlm4-xdr.c nsm-xdr.c \ -	rpc-common-xdr.c glusterd1-xdr.c acl3-xdr.c portmap-xdr.c mount3udp.c +	rpc-common-xdr.c glusterd1-xdr.c acl3-xdr.c portmap-xdr.c \ +	mount3udp.c changelog-xdr.c  XDRHEADERS = $(XDRSOURCES:.c=.h)  XDRGENFILES = $(XDRSOURCES:.c=.x) @@ -76,3 +77,9 @@ mount3udp.c: mount3udp.x mount3udp.h  mount3udp.h: mount3udp.x  	$(top_srcdir)/build-aux/xdrgen header $(xdrsrc)/`basename ${@:.h=.x}` + +changelog-xdr.c: changelog-xdr.x changelog-xdr.h +	$(top_srcdir)/build-aux/xdrgen source $(xdrsrc)/`basename ${@:.c=.x}` + +changelog-xdr.h: changelog-xdr.x +	$(top_srcdir)/build-aux/xdrgen header $(xdrsrc)/`basename ${@:.h=.x}` diff --git a/rpc/xdr/src/changelog-xdr.x b/rpc/xdr/src/changelog-xdr.x new file mode 100644 index 00000000000..ba1ebd27836 --- /dev/null +++ b/rpc/xdr/src/changelog-xdr.x @@ -0,0 +1,27 @@ +/* XDR: libgfchangelog -> changelog */ + +struct changelog_probe_req { +       unsigned int filter; +       char sock[UNIX_PATH_MAX]; +}; + +struct changelog_probe_rsp { +       int op_ret; +}; + +/* XDR: changelog -> libgfchangelog */ +struct changelog_event_req { +       /* sequence number for the buffer */ +       unsigned long seq; + +       /* time of dispatch */ +       unsigned long tv_sec; +       unsigned long tv_usec; +}; + +struct changelog_event_rsp { +       int op_ret; + +       /* ack'd buffers sequence number */ +       unsigned long seq; +}; diff --git a/xlators/features/changelog/lib/examples/c/get-changes-multi.c b/xlators/features/changelog/lib/examples/c/get-changes-multi.c new file mode 100644 index 00000000000..8f23c81c2a0 --- /dev/null +++ b/xlators/features/changelog/lib/examples/c/get-changes-multi.c @@ -0,0 +1,84 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +/** + * Compile it using: + *  gcc -o getchanges-multi `pkg-config --cflags libgfchangelog` \ + *  get-changes-multi.c `pkg-config --libs libgfchangelog` + */ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/un.h> +#include <limits.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <errno.h> + +#include "changelog.h" + +void *brick_init (void *xl, struct gf_brick_spec *brick) +{ +        return brick; +} + +void brick_fini (void *xl, char *brick, void *data) +{ +        return; +} + +void brick_callback (void *xl, char *brick, +                    void *data, changelog_event_t *ev) +{ +        printf ("->callback: (brick,type) [%s:%d]\n", brick, ev->ev_type); +} + +void fill_brick_spec (struct gf_brick_spec *brick, char *path) +{ +        brick->brick_path = strdup (path); +        brick->filter = CHANGELOG_OP_TYPE_RELEASE; + +        brick->init         = brick_init; +        brick->fini         = brick_fini; +        brick->callback     = brick_callback; +        brick->connected    = NULL; +        brick->disconnected = NULL; +} + +int +main (int argc, char **argv) +{ +        int ret = 0; +        void *bricks = NULL; +        struct gf_brick_spec *brick = NULL; + +        bricks = calloc (2, sizeof (struct gf_brick_spec)); +        if (!bricks) +                goto error_return; + +        brick = (struct gf_brick_spec *)bricks; +        fill_brick_spec (brick, "/export/z1/zwoop"); + +        brick++; +        fill_brick_spec (brick, "/export/z2/zwoop"); + +        ret = gf_changelog_register_generic ((struct gf_brick_spec *)bricks, 2, +                                             1, "/tmp/multi-changes.log", 9, +                                             NULL); +        if (ret) +                goto error_return; + +        /* let callbacks do the job */ +        select (0, NULL, NULL, NULL, NULL); + + error_return: +        return -1; +} diff --git a/xlators/features/changelog/lib/examples/c/get-changes.c b/xlators/features/changelog/lib/examples/c/get-changes.c index 6d0d0357db9..0b2808c7e35 100644 --- a/xlators/features/changelog/lib/examples/c/get-changes.c +++ b/xlators/features/changelog/lib/examples/c/get-changes.c @@ -40,7 +40,7 @@ main (int argc, char ** argv)          char fbuf[PATH_MAX] = {0,};          /* get changes for brick "/home/vshankar/export/yow/yow-1" */ -        ret = gf_changelog_register ("/home/vshankar/exports/yow/yow-1", +        ret = gf_changelog_register ("/export/z1/zwoop",                                       "/tmp/scratch", "/tmp/change.log", 9, 5);          if (ret) {                  handle_error ("register failed"); diff --git a/xlators/features/changelog/lib/examples/c/get-history.c b/xlators/features/changelog/lib/examples/c/get-history.c index 33eb8c32d4d..2e1ff3c767f 100644 --- a/xlators/features/changelog/lib/examples/c/get-history.c +++ b/xlators/features/changelog/lib/examples/c/get-history.c @@ -40,8 +40,8 @@ main (int argc, char ** argv)          char fbuf[PATH_MAX]  = {0,};          unsigned long end_ts = 0; -        ret = gf_changelog_register ("/export1/v1/b1", -                                     "/tmp/scratch_v1", "/tmp/scratch_v1/changes.log", +        ret = gf_changelog_register ("/export/z1/zwoop", +                                     "/tmp/scratch_v1", "/tmp/changes.log",                                       9, 5);          if (ret) {                  handle_error ("register failed"); @@ -51,7 +51,8 @@ main (int argc, char ** argv)          int a, b;          printf ("give the two numbers start and end\t");          scanf ("%d%d", &a, &b); -        ret = gf_history_changelog ("/export1/v1/b1/.glusterfs/changelogs",a, b, 3, &end_ts); +        ret = gf_history_changelog ("/export/z1/zwoop/.glusterfs/changelogs", +                                    a, b, 3, &end_ts);          if (ret == -1) {                  printf ("history failed");                  goto out; diff --git a/xlators/features/changelog/lib/src/Makefile.am b/xlators/features/changelog/lib/src/Makefile.am index 1ae919bfb38..306306bd585 100644 --- a/xlators/features/changelog/lib/src/Makefile.am +++ b/xlators/features/changelog/lib/src/Makefile.am @@ -4,9 +4,13 @@ libgfchangelog_la_CFLAGS = -Wall $(GF_CFLAGS) $(GF_DARWIN_LIBGLUSTERFS_CFLAGS) \  libgfchangelog_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 -fpic \  				-I../../../src/ -I$(top_srcdir)/libglusterfs/src \  				-I$(top_srcdir)/xlators/features/changelog/src \ +				-I$(top_srcdir)/rpc/xdr/src -I$(top_srcdir)/rpc/rpc-lib/src \ +				-I$(top_srcdir)/rpc/rpc-transport/socket/src \  				-DDATADIR=\"$(localstatedir)\" -libgfchangelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la +libgfchangelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \ +			   $(top_builddir)/rpc/xdr/src/libgfxdr.la \ +			   $(top_builddir)/rpc/rpc-lib/src/libgfrpc.la  libgfchangelog_la_LDFLAGS = $(GF_LDFLAGS) -version-info $(LIBGFCHANGELOG_LT_VERSION) @@ -15,18 +19,18 @@ lib_LTLIBRARIES = libgfchangelog.la  CONTRIB_BUILDDIR = $(top_builddir)/contrib -libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-process.c \ -		gf-changelog-helpers.c gf-history-changelog.c \ -		$(CONTRIBDIR)/uuid/clear.c \ -		$(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c \ -		$(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/parse.c \ -		$(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c \ -		$(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c \ -		$(CONTRIBDIR)/uuid/unpack.c - -noinst_HEADERS = gf-changelog-helpers.h $(CONTRIBDIR)/uuid/uuidd.h \ -		$(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h \ -		$(CONTRIB_BUILDDIR)/uuid/uuid_types.h +libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-journal-handler.c gf-changelog-helpers.c \ +		gf-changelog-api.c gf-history-changelog.c gf-changelog-rpc.c gf-changelog-reborp.c \ +		$(top_srcdir)/xlators/features/changelog/src/changelog-rpc-common.c \ +		$(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c \ +		$(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c \ +		$(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c \ +		$(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c \ +		$(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c + +noinst_HEADERS = gf-changelog-helpers.h gf-changelog-rpc.h gf-changelog-journal.h \ +		$(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h \ +		$(CONTRIBDIR)/uuid/uuidP.h $(CONTRIB_BUILDDIR)/uuid/uuid_types.h  libgfchangelog_HEADERS = changelog.h diff --git a/xlators/features/changelog/lib/src/changelog.h b/xlators/features/changelog/lib/src/changelog.h index 5cddfb5839c..d7048ff2508 100644 --- a/xlators/features/changelog/lib/src/changelog.h +++ b/xlators/features/changelog/lib/src/changelog.h @@ -11,6 +11,73 @@  #ifndef _GF_CHANGELOG_H  #define _GF_CHANGELOG_H +struct gf_brick_spec; + +/** + * Max bit shiter for event selection + */ +#define CHANGELOG_EV_SELECTION_RANGE  4 + +#define CHANGELOG_OP_TYPE_JOURNAL (1<<0) +#define CHANGELOG_OP_TYPE_OPEN    (1<<1) +#define CHANGELOG_OP_TYPE_CREATE  (1<<2) +#define CHANGELOG_OP_TYPE_RELEASE (1<<3) +#define CHANGELOG_OP_TYPE_MAX     (1<<CHANGELOG_EV_SELECTION_RANGE) + + +struct ev_open { +        unsigned char gfid[16]; +        int32_t flags; +}; + +struct ev_creat { +        unsigned char gfid[16]; +        int32_t flags; +}; + +struct ev_release { +        unsigned char gfid[16]; +}; + +struct ev_changelog { +        char path[PATH_MAX]; +}; + +typedef struct changelog_event { +        unsigned int ev_type; +        union { +                struct ev_open open; +                struct ev_creat create; +                struct ev_release release; +                struct ev_changelog journal; +        } u; +} changelog_event_t; + +#define CHANGELOG_EV_SIZE  (sizeof (changelog_event_t)) + +/** + * event callback, connected & disconnection defs + */ +typedef void (CALLBACK) (void *, char *, +                        void *, changelog_event_t *); +typedef void *(INIT) (void *, struct gf_brick_spec *); +typedef void (FINI) (void *, char *, void *); +typedef void (CONNECT) (void *, char *, void *); +typedef void (DISCONNECT) (void *, char *, void *); + +struct gf_brick_spec { +        char         *brick_path; +        unsigned int  filter; + +        INIT       *init; +        FINI       *fini; +        CALLBACK   *callback; +        CONNECT    *connected; +        DISCONNECT *disconnected; + +        void *ptr; +}; +  /* API set */  int @@ -28,4 +95,9 @@ gf_changelog_next_change (char *bufptr, size_t maxlen);  int  gf_changelog_done (char *file); +/* newer flexible API */ +int +gf_changelog_register_generic (struct gf_brick_spec *bricks, int count, +                               int ordered, char *logfile, int lvl, void *xl); +  #endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-api.c b/xlators/features/changelog/lib/src/gf-changelog-api.c new file mode 100644 index 00000000000..cea2ff01988 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-api.c @@ -0,0 +1,224 @@ +/* +   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "uuid.h" +#include "globals.h" +#include "glusterfs.h" + +#include "gf-changelog-helpers.h" +#include "gf-changelog-journal.h" +#include "changelog-mem-types.h" + +int +gf_changelog_done (char *file) +{ +        int                     ret    = -1; +        char                   *buffer = NULL; +        xlator_t               *this   = NULL; +        gf_changelog_journal_t *jnl    = NULL; +        char to_path[PATH_MAX]         = {0,}; + +        errno = EINVAL; + +        this = THIS; +        if (!this) +                goto out; + +        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); +        if (!jnl) +                goto out; + +        if (!file || !strlen (file)) +                goto out; + +        /* make sure 'file' is inside ->jnl_working_dir */ +        buffer = realpath (file, NULL); +        if (!buffer) +                goto out; + +        if (strncmp (jnl->jnl_working_dir, +                     buffer, strlen (jnl->jnl_working_dir))) +                goto out; + +        (void) snprintf (to_path, PATH_MAX, "%s%s", +                         jnl->jnl_processed_dir, basename (buffer)); +        gf_log (this->name, GF_LOG_DEBUG, +                "moving %s to processed directory", file); +        ret = rename (buffer, to_path); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "cannot move %s to %s (reason: %s)", +                        file, to_path, strerror (errno)); +                goto out; +        } + +        ret = 0; + + out: +        if (buffer) +                free (buffer); /* allocated by realpath() */ +        return ret; +} + +/** + * @API + *  for a set of changelogs, start from the beginning + */ +int +gf_changelog_start_fresh () +{ +        xlator_t *this = NULL; +        gf_changelog_journal_t *jnl = NULL; + +        this = THIS; +        if (!this) +                goto out; + +        errno = EINVAL; + +        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); +        if (!jnl) +                goto out; + +        if (gf_ftruncate (jnl->jnl_fd, 0)) +                goto out; + +        return 0; + + out: +        return -1; +} + +/** + * @API + * return the next changelog file entry. zero means all chanelogs + * consumed. + */ +ssize_t +gf_changelog_next_change (char *bufptr, size_t maxlen) +{ +        ssize_t         size       = -1; +        int             tracker_fd = 0; +        xlator_t       *this       = NULL; +        gf_changelog_journal_t *jnl = NULL; +        char buffer[PATH_MAX]      = {0,}; + +        errno = EINVAL; + +        this = THIS; +        if (!this) +                goto out; + +        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); +        if (!jnl) +                goto out; + +        tracker_fd = jnl->jnl_fd; + +        size = gf_readline (tracker_fd, buffer, maxlen); +        if (size < 0) { +                size = -1; +                goto out; +        } + +        if (size == 0) +                goto out; + +        memcpy (bufptr, buffer, size - 1); +        bufptr[size - 1] = '\0'; + +out: +        return size; +} + +/** + * @API + *  gf_changelog_scan() - scan and generate a list of change entries + * + * calling this api multiple times (without calling gf_changlog_done()) + * would result new changelogs(s) being refreshed in the tracker file. + * This call also acts as a cancellation point for the consumer. + */ +ssize_t +gf_changelog_scan () +{ +        int             ret        = 0; +        int             tracker_fd = 0; +        size_t          len        = 0; +        size_t          off        = 0; +        xlator_t       *this       = NULL; +        size_t          nr_entries = 0; +        gf_changelog_journal_t *jnl = NULL; +        struct dirent  *entryp     = NULL; +        struct dirent  *result     = NULL; +        char buffer[PATH_MAX]      = {0,}; + +        this = THIS; +        if (!this) +                goto out; + +        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); +        if (!jnl) +                goto out; +        if (JNL_IS_API_DISCONNECTED (jnl)) { +                errno = ENOTCONN; +                goto out; +        } + +        errno = EINVAL; + +        tracker_fd = jnl->jnl_fd; +        if (gf_ftruncate (tracker_fd, 0)) +                goto out; + +        len = offsetof(struct dirent, d_name) +                + pathconf(jnl->jnl_processing_dir, _PC_NAME_MAX) + 1; +        entryp = GF_CALLOC (1, len, +                            gf_changelog_mt_libgfchangelog_dirent_t); +        if (!entryp) +                goto out; + +        rewinddir (jnl->jnl_dir); +        while (1) { +                ret = readdir_r (jnl->jnl_dir, entryp, &result); +                if (ret || !result) +                        break; + +                if (!strcmp (basename (entryp->d_name), ".") +                     || !strcmp (basename (entryp->d_name), "..")) +                        continue; + +                nr_entries++; + +                GF_CHANGELOG_FILL_BUFFER (jnl->jnl_processing_dir, +                                          buffer, off, +                                          strlen (jnl->jnl_processing_dir)); +                GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer, +                                          off, strlen (entryp->d_name)); +                GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1); + +                if (gf_changelog_write (tracker_fd, buffer, off) != off) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "error writing changelog filename" +                                " to tracker file"); +                        break; +                } +                off = 0; +        } + +        GF_FREE (entryp); + +        if (!result) { +                if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) +                        return nr_entries; +        } + out: +        return -1; +} diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.c b/xlators/features/changelog/lib/src/gf-changelog-helpers.c index f071b057d59..6bf709dc664 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-helpers.c +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.c @@ -178,3 +178,35 @@ gf_ftruncate (int fd, off_t length)          return 0;  } + +int +gf_thread_cleanup (xlator_t *this, pthread_t thread) +{ +        int ret = 0; +        void *res = NULL; + +        ret = pthread_cancel (thread); +        if (ret != 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "Failed to send cancellation to thread"); +                goto error_return; +        } + +        ret = pthread_join (thread, &res); +        if (ret != 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "failed to join thread"); +                goto error_return; +        } + +        if (res != PTHREAD_CANCELED) { +                gf_log (this->name, GF_LOG_WARNING, +                        "Thread could not be cleaned up"); +                goto error_return; +        } + +        return 0; + + error_return: +        return -1; +} diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h index 9b875d45dcc..17b8862a89b 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h @@ -18,6 +18,11 @@  #include <xlator.h> +#include "changelog.h" + +#include "changelog-rpc-common.h" +#include "gf-changelog-journal.h" +  #define GF_CHANGELOG_TRACKER  "tracker"  #define GF_CHANGELOG_CURRENT_DIR    ".current" @@ -41,79 +46,143 @@ typedef struct read_line {          char rl_buf[MAXLINE];  } read_line_t; +struct gf_changelog; + +/** + * Event list for ordered event notification + * + * ->next_seq holds the next _expected_ sequence number. + */ +struct gf_event_list { +        pthread_mutex_t lock;               /* protects this structure */ +        pthread_cond_t  cond; + +        pthread_t invoker; + +        unsigned long next_seq;             /* next sequence number expected: +                                               zero during bootstrap */ + +        struct gf_changelog *entry;         /* backpointer to it's brick +                                               encapsulator (entry) */ +        struct list_head events;            /* list of events (ordered) */ +}; + +/** + * include a refcount if it's of use by additional layers + */ +struct gf_event { +        int count; + +        unsigned long seq; + +        struct list_head list; + +        struct iovec iov[0]; +}; +#define GF_EVENT_CALLOC_SIZE(cnt, len)                                  \ +        (sizeof (struct gf_event) + (cnt * sizeof (struct iovec)) + len) + +/** + * assign the base address of the IO vector to the correct memory + * area and set it's addressable length. + */ +#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos)                     \ +        do {                                                            \ +                vec->iov_base = ((char *)event) +                       \ +                        sizeof (struct gf_event) +                      \ +                        (event->count * sizeof (struct iovec)) + pos;   \ +                vec->iov_len = len;                                     \ +                pos += len;                                             \ +        } while (0) + +/** + * An instance of this structure is allocated for each brick for which + * notifications are streamed. + */  typedef struct gf_changelog {          xlator_t *this; -        /* 'processing' directory stream */ -        DIR *gfc_dir; - -        /* fd to the tracker file */ -        int gfc_fd; - -        /* connection retries */ -        int gfc_connretries; +        struct list_head list;              /* list of instances */ -        char gfc_sockpath[UNIX_PATH_MAX]; +        char brick[PATH_MAX];               /* brick path for this end-point */ -        char gfc_brickpath[PATH_MAX]; +        changelog_rpc_t grpc;               /* rpc{-clnt,svc} for this brick */ +#define RPC_PROBER(ent)  ent->grpc.rpc +#define RPC_REBORP(ent)  ent->grpc.svc +#define RPC_SOCK(ent)    ent->grpc.sock -        /* socket for receiving notifications */ -        int gfc_sockfd; +        unsigned int notify;                /* notification flag(s) */ -        char *gfc_working_dir; +        FINI       *fini;                   /* destructor callback */ +        CALLBACK   *callback;               /* event callback dispatcher */ +        CONNECT    *connected;              /* connect callback */ +        DISCONNECT *disconnected;           /* disconnection callback */ -        /* RFC 3986 string encoding */ -        char rfc3986[256]; +        void *ptr;                          /* owner specific private data */ +        xlator_t *invokerxl;                /* consumers _this_, if valid, +                                               assigned to THIS before cbk is +                                               invoked */ -        char gfc_current_dir[PATH_MAX]; -        char gfc_processed_dir[PATH_MAX]; -        char gfc_processing_dir[PATH_MAX]; +        gf_boolean_t ordered; -        pthread_t gfc_changelog_processor; - -        /* Holds gfc for History API */ -        struct gf_changelog *hist_gfc; - -        /* holds 0 done scanning, 1 keep scanning and -1 error */ -        int hist_done; +        struct gf_event_list event;  } gf_changelog_t; -typedef struct gf_changelog_history_data { -        int           len; - -        int           htime_fd; - -        /* parallelism count */ -        int           n_parallel; - -        /* history from, to indexes */ -        unsigned long from; -        unsigned long to; -} gf_changelog_history_data_t; - -typedef struct gf_changelog_consume_data { -        /** set of inputs */ - -        /* fd to read from */ -        int             fd; - -        /* from @offset */ -        off_t           offset; - -        xlator_t       *this; -        gf_changelog_t *gfc; - -        /** set of outputs */ +static inline int +gf_changelog_filter_check (gf_changelog_t *entry, changelog_event_t *event) +{ +        if (event->ev_type & entry->notify) +                return 1; +        return 0; +} + +#define GF_NEED_ORDERED_EVENTS(ent)  (ent->ordered == _gf_true) + +/** private structure */ +typedef struct gf_private { +        gf_lock_t lock;                  /* protects ->connections */ + +        void *api;                       /* pointer for API access */ + +        pthread_t poller;                /* event poller thread */ + +        struct list_head connections;    /* list of connections */ +} gf_private_t; + +#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *) this->private)->api) + +/** + * upcall: invoke callback with _correct_ THIS + */ +#define GF_CHANGELOG_INVOKE_CBK(this, cbk, brick, args ...)             \ +        do {                                                            \ +                xlator_t *old_this = NULL;                              \ +                xlator_t *invokerxl = NULL;                             \ +                                                                        \ +                invokerxl = entry->invokerxl;                           \ +                old_this = this;                                        \ +                                                                        \ +                if (invokerxl) {                                        \ +                        THIS = invokerxl;                               \ +                }                                                       \ +                                                                        \ +                cbk (invokerxl, brick, args);                           \ +                THIS = old_this;                                        \ +                                                                        \ +        } while (0) -        /* return value */ -        int retval; +#define SAVE_THIS(xl)                           \ +        do {                                    \ +                old_this = xl;                  \ +                THIS = master;                  \ +        } while (0) -        /* journal processed */ -        char changelog[PATH_MAX]; -} gf_changelog_consume_data_t; +#define RESTORE_THIS()                          \ +        do {                                    \ +                THIS = old_this;                \ +        } while (0) -int -gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc); +/** APIs and the rest */  void *  gf_changelog_process (void *data); @@ -138,9 +207,14 @@ gf_lseek (int fd, off_t offset, int whence);  int  gf_changelog_consume (xlator_t *this, -                      gf_changelog_t *gfc, +                      gf_changelog_journal_t *jnl,                        char *from_path, gf_boolean_t no_publish);  int -gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path); +gf_changelog_publish (xlator_t *this, +                      gf_changelog_journal_t *jnl, char *from_path); +int +gf_thread_cleanup (xlator_t *this, pthread_t thread); +void * +gf_changelog_callback_invoker (void *arg);  #endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-process.c b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c index 1a275e676fb..6ee6f9f074f 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-process.c +++ b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c @@ -1,5 +1,5 @@  /* -   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>     This file is part of GlusterFS.     This file is licensed to you under your choice of the GNU Lesser @@ -8,9 +8,6 @@     cases as published by the Free Software Foundation.  */ -#include <unistd.h> -#include <pthread.h> -  #include "uuid.h"  #include "globals.h"  #include "glusterfs.h" @@ -19,6 +16,9 @@  /* from the changelog translator */  #include "changelog-misc.h" +#include "changelog-mem-types.h" + +#include "gf-changelog-journal.h"  extern int byebye; @@ -98,7 +98,7 @@ conv_noop (char *ptr) { return ptr; }                  MOVER_MOVE (mover, nleft, sizeof (uuid_t));     \          }                                                       \ -#define LINE_BUFSIZE  3*PATH_MAX /* enough buffer for extra chars too */ +#define LINE_BUFSIZE  (3*PATH_MAX) /* enough buffer for extra chars too */  /**   * using mmap() makes parsing easy. fgets() cannot be used here as @@ -114,7 +114,8 @@ conv_noop (char *ptr) { return ptr; }  static int  gf_changelog_parse_binary (xlator_t *this, -                           gf_changelog_t *gfc, int from_fd, int to_fd, +                           gf_changelog_journal_t *jnl, +                           int from_fd, int to_fd,                             size_t start_offset, struct stat *stbuf)  { @@ -165,7 +166,8 @@ gf_changelog_parse_binary (xlator_t *this,                          PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err);                          bname_start = mover; -                        if ( (bname_end = strchr (mover, '\n')) == NULL ) { +                        bname_end = strchr (mover, '\n'); +                        if (bname_end == NULL) {                                  parse_err = 1;                                  break;                          } @@ -201,7 +203,7 @@ gf_changelog_parse_binary (xlator_t *this,                  MOVER_MOVE (mover, nleft, 1);          } -        if ( (nleft == 0) && (!parse_err)) +        if ((nleft == 0) && (!parse_err))                  ret = 0;          if (munmap (start, stbuf->st_size)) @@ -218,7 +220,8 @@ gf_changelog_parse_binary (xlator_t *this,   */  static int  gf_changelog_parse_ascii (xlator_t *this, -                          gf_changelog_t *gfc, int from_fd, int to_fd, +                          gf_changelog_journal_t *jnl, +                          int from_fd, int to_fd,                            size_t start_offset, struct stat *stbuf)  {          int           ng            = 0; @@ -281,7 +284,8 @@ gf_changelog_parse_ascii (xlator_t *this,                          VERIFY_SEPARATOR (mover, len, parse_err);                          fop = atoi (mover); -                        if ( (fopname = gf_fop_list[fop]) == NULL) { +                        fopname = gf_fop_list[fop]; +                        if (fopname == NULL) {                                  parse_err = 1;                                  break;                          } @@ -309,7 +313,8 @@ gf_changelog_parse_ascii (xlator_t *this,                          VERIFY_SEPARATOR (mover, len, parse_err);                          fop = atoi (mover); -                        if ( (fopname = gf_fop_list[fop]) == NULL) { +                        fopname = gf_fop_list[fop]; +                        if (fopname == NULL) {                                  parse_err = 1;                                  break;                          } @@ -320,7 +325,7 @@ gf_changelog_parse_ascii (xlator_t *this,                          GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len);                          ng = nr_extra_recs[fop]; -                        for (;ng > 0; ng--) { +                        for (; ng > 0; ng--) {                                  MOVER_MOVE (mover, nleft, 1);                                  len = strlen (mover);                                  VERIFY_SEPARATOR (mover, len, parse_err); @@ -346,7 +351,7 @@ gf_changelog_parse_ascii (xlator_t *this,                                  }                                  gf_rfc3986_encode ((unsigned char *) ptr, -                                                   eptr, gfc->rfc3986); +                                                   eptr, jnl->rfc3986);                                  FILL_AND_MOVE (eptr, ascii, off,                                                 mover, nleft, len);                                  free (eptr); @@ -374,7 +379,7 @@ gf_changelog_parse_ascii (xlator_t *this,          } -        if ( (nleft == 0) && (!parse_err)) +        if ((nleft == 0) && (!parse_err))                  ret = 0;          if (munmap (start, stbuf->st_size)) @@ -410,8 +415,8 @@ gf_changelog_copy (xlator_t *this, int from_fd, int to_fd)  }  static int -gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd, -                     int to_fd, struct stat *stbuf, int *zerob) +gf_changelog_decode (xlator_t *this, gf_changelog_journal_t *jnl, +                     int from_fd, int to_fd, struct stat *stbuf, int *zerob)  {          int    ret        = -1;          int    encoding   = -1; @@ -441,12 +446,12 @@ gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,                   * this ideally should have been a part of changelog-encoders.c                   * (ie. part of the changelog translator).                   */ -                ret = gf_changelog_parse_binary (this, gfc, from_fd, +                ret = gf_changelog_parse_binary (this, jnl, from_fd,                                                   to_fd, elen, stbuf);                  break;          case CHANGELOG_ENCODE_ASCII: -                ret = gf_changelog_parse_ascii (this, gfc, from_fd, +                ret = gf_changelog_parse_ascii (this, jnl, from_fd,                                                  to_fd, elen, stbuf);                  break;          default: @@ -458,7 +463,8 @@ gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,  }  int -gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path) +gf_changelog_publish (xlator_t *this, +                      gf_changelog_journal_t *jnl, char *from_path)  {          int         ret        = 0;          char dest[PATH_MAX]    = {0,}; @@ -466,21 +472,21 @@ gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path)          struct stat stbuf      = {0,};          (void) snprintf (to_path, PATH_MAX, "%s%s", -                         gfc->gfc_current_dir, basename (from_path)); +                         jnl->jnl_current_dir, basename (from_path));          /* handle zerob file that wont exist in current */          ret = stat (to_path, &stbuf); -        if (ret){ +        if (ret) {                  if (errno == ENOENT)                          ret = 0;                  goto out;          }          (void) snprintf (dest, PATH_MAX, "%s%s", -                         gfc->gfc_processing_dir, basename (from_path)); +                         jnl->jnl_processing_dir, basename (from_path));          ret = rename (to_path, dest); -        if (ret){ +        if (ret) {                  gf_log (this->name, GF_LOG_ERROR,                          "error moving %s to processing dir"                          " (reason: %s)", to_path, strerror (errno)); @@ -492,7 +498,7 @@ out:  int  gf_changelog_consume (xlator_t *this, -                      gf_changelog_t *gfc, +                      gf_changelog_journal_t *jnl,                        char *from_path, gf_boolean_t no_publish)  {          int         ret        = -1; @@ -520,9 +526,9 @@ gf_changelog_consume (xlator_t *this,          }          (void) snprintf (to_path, PATH_MAX, "%s%s", -                         gfc->gfc_current_dir, basename (from_path)); +                         jnl->jnl_current_dir, basename (from_path));          (void) snprintf (dest, PATH_MAX, "%s%s", -                         gfc->gfc_processing_dir, basename (from_path)); +                         jnl->jnl_processing_dir, basename (from_path));          fd2 = open (to_path, O_CREAT | O_TRUNC | O_RDWR,                      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); @@ -532,7 +538,7 @@ gf_changelog_consume (xlator_t *this,                          to_path, strerror (errno));                  goto close_fd;          } else { -                ret = gf_changelog_decode (this, gfc, fd1, +                ret = gf_changelog_decode (this, jnl, fd1,                                             fd2, &stbuf, &zerob);                  close (fd2); @@ -568,88 +574,412 @@ gf_changelog_consume (xlator_t *this,          return ret;  } -static char * -gf_changelog_ext_change (xlator_t *this, -                         gf_changelog_t *gfc, char *path, size_t readlen) +void * +gf_changelog_process (void *data)  { -        int     alo = 0; -        int     ret = 0; -        size_t  len = 0; -        char   *buf = NULL; - -        buf = path; -        while (len < readlen) { -                if (*buf == '\0') { -                        alo = 1; -                        gf_log (this->name, GF_LOG_DEBUG, -                                "processing changelog: %s", path); -                        ret = gf_changelog_consume (this, gfc, path, _gf_false); -                } +        int ret = 0; +        xlator_t *this = NULL; +        gf_changelog_journal_t *jnl = NULL; +        gf_changelog_entry_t *entry = NULL; +        gf_changelog_processor_t *jnl_proc = NULL; -                if (ret) -                        break; +        this = THIS; -                len++; buf++; -                if (alo) { -                        alo = 0; -                        path = buf; +        jnl = data; +        jnl_proc = jnl->jnl_proc; + +        while (1) { +                pthread_mutex_lock (&jnl_proc->lock); +                { +                        while (list_empty (&jnl_proc->entries)) { +                                jnl_proc->waiting = _gf_true; +                                pthread_cond_wait +                                        (&jnl_proc->cond, &jnl_proc->lock); +                        } + +                        entry = list_first_entry (&jnl_proc->entries, +                                                  gf_changelog_entry_t, list); +                        list_del (&entry->list); +                        jnl_proc->waiting = _gf_false;                  } +                pthread_mutex_unlock (&jnl_proc->lock); + +                if (entry) { +                        ret = gf_changelog_consume (this, jnl, +                                                    entry->path, _gf_false); +                        GF_FREE (entry); +                } +        } + +        return NULL; +} + +inline void +gf_changelog_queue_journal (gf_changelog_processor_t *jnl_proc, +                            changelog_event_t *event) +{ +        size_t len = 0; +        gf_changelog_entry_t *entry = NULL; + +        entry = GF_CALLOC (1, sizeof (gf_changelog_entry_t), +                           gf_changelog_mt_libgfchangelog_entry_t); +        if (!entry) +                return; +        INIT_LIST_HEAD (&entry->list); + +        len = strlen (event->u.journal.path); +        (void)memcpy (entry->path, event->u.journal.path, len+1); + +        pthread_mutex_lock (&jnl_proc->lock); +        { +                list_add_tail (&entry->list, &jnl_proc->entries); +                if (jnl_proc->waiting) +                        pthread_cond_signal (&jnl_proc->cond); +        } +        pthread_mutex_unlock (&jnl_proc->lock); + +        return; +} + +void +gf_changelog_handle_journal (void *xl, char *brick, +                             void *cbkdata, changelog_event_t *event) +{ +        int                       ret      = 0; +        gf_changelog_journal_t   *jnl      = NULL; +        gf_changelog_processor_t *jnl_proc = NULL; + +        jnl      = cbkdata; +        jnl_proc = jnl->jnl_proc; + +        gf_changelog_queue_journal (jnl_proc, event); +} + +void +gf_changelog_journal_disconnect (void *xl, char *brick, void *data) +{ +        gf_changelog_journal_t *jnl = NULL; + +        jnl = data; + +        pthread_spin_lock (&jnl->lock); +        { +                JNL_SET_API_STATE (jnl, JNL_API_DISCONNECTED); +        }; +        pthread_spin_unlock (&jnl->lock); +} + +void +gf_changelog_journal_connect (void *xl, char *brick, void *data) +{ +        gf_changelog_journal_t *jnl = NULL; + +        jnl = data; + +        pthread_spin_lock (&jnl->lock); +        { +                JNL_SET_API_STATE (jnl, JNL_API_CONNECTED); +        }; +        pthread_spin_unlock (&jnl->lock); + +        return; +} + +void +gf_changelog_cleanup_processor (gf_changelog_journal_t *jnl) +{ +        int ret = 0; +        xlator_t *this = NULL; +        gf_changelog_processor_t *jnl_proc = NULL; + +        this = THIS; +        if (!this || !jnl || !jnl->jnl_proc) +                goto error_return; + +        jnl_proc = jnl->jnl_proc; + +        ret = gf_thread_cleanup (this, jnl_proc->processor); +        if (ret != 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "failed to cleanup processor thread"); +                goto error_return; +        } + +        (void)pthread_mutex_destroy (&jnl_proc->lock); +        (void)pthread_cond_destroy (&jnl_proc->cond); + +        GF_FREE (jnl_proc); + + error_return: +        return; +} + +int +gf_changelog_init_processor (gf_changelog_journal_t *jnl) +{ +        int ret = -1; +        gf_changelog_processor_t *jnl_proc = NULL; + +        jnl_proc = GF_CALLOC (1, sizeof (gf_changelog_processor_t), +                              gf_changelog_mt_libgfchangelog_t); +        if (!jnl_proc) +                goto error_return; + +        ret = pthread_mutex_init (&jnl_proc->lock, NULL); +        if (ret != 0) +                goto free_jnl_proc; +        ret = pthread_cond_init (&jnl_proc->cond, NULL); +        if (ret != 0) +                goto cleanup_mutex; + +        INIT_LIST_HEAD (&jnl_proc->entries); +        ret = pthread_create (&jnl_proc->processor, +                              NULL, gf_changelog_process, jnl); +        if (ret != 0) +                goto cleanup_cond; +        jnl_proc->waiting = _gf_false; + +        jnl->jnl_proc = jnl_proc; +        return 0; + + cleanup_cond: +        (void) pthread_cond_destroy (&jnl_proc->cond); + cleanup_mutex: +        (void) pthread_mutex_destroy (&jnl_proc->lock); + free_jnl_proc: +        GF_FREE (jnl_proc); + error_return: +        return -1; +} + +static void +gf_changelog_cleanup_fds (gf_changelog_journal_t *jnl) +{ +        /* tracker fd */ +        if (jnl->jnl_fd != -1) +                close (jnl->jnl_fd); +        /* processing dir */ +        if (jnl->jnl_dir) +                closedir (jnl->jnl_dir); + +        if (jnl->jnl_working_dir) +                free (jnl->jnl_working_dir); /* allocated by realpath */ +} + +static int +gf_changelog_open_dirs (xlator_t *this, gf_changelog_journal_t *jnl) +{ +        int  ret                    = -1; +        DIR *dir                    = NULL; +        int  tracker_fd             = 0; +        char tracker_path[PATH_MAX] = {0,}; + +        /* .current */ +        (void) snprintf (jnl->jnl_current_dir, PATH_MAX, +                         "%s/"GF_CHANGELOG_CURRENT_DIR"/", +                         jnl->jnl_working_dir); +        ret = recursive_rmdir (jnl->jnl_current_dir); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Failed to rmdir: %s, err: %s", +                        jnl->jnl_current_dir, strerror (errno)); +                goto out; +        } +        ret = mkdir_p (jnl->jnl_current_dir, 0600, _gf_false); +        if (ret) +                goto out; + +        /* .processed */ +        (void) snprintf (jnl->jnl_processed_dir, PATH_MAX, +                         "%s/"GF_CHANGELOG_PROCESSED_DIR"/", +                         jnl->jnl_working_dir); +        ret = mkdir_p (jnl->jnl_processed_dir, 0600, _gf_false); +        if (ret) +                goto out; + +        /* .processing */ +        (void) snprintf (jnl->jnl_processing_dir, PATH_MAX, +                         "%s/"GF_CHANGELOG_PROCESSING_DIR"/", +                         jnl->jnl_working_dir); +        ret = recursive_rmdir (jnl->jnl_processing_dir); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Failed to rmdir: %s, err: %s", +                        jnl->jnl_processing_dir, strerror (errno)); +                goto out; +        } + +        ret = mkdir_p (jnl->jnl_processing_dir, 0600, _gf_false); +        if (ret) +                goto out; + +        dir = opendir (jnl->jnl_processing_dir); +        if (!dir) { +                gf_log ("", GF_LOG_ERROR, +                        "opendir() error [reason: %s]", strerror (errno)); +                goto out;          } -        return (ret) ? NULL : path; +        jnl->jnl_dir = dir; + +        (void) snprintf (tracker_path, PATH_MAX, +                         "%s/"GF_CHANGELOG_TRACKER, jnl->jnl_working_dir); + +        tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR, +                           S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); +        if (tracker_fd < 0) { +                closedir (jnl->jnl_dir); +                ret = -1; +                goto out; +        } + +        jnl->jnl_fd = tracker_fd; +        ret = 0; + out: +        return ret; +} + +int +gf_changelog_init_history (xlator_t *this, +                           gf_changelog_journal_t *jnl, +                           char *brick_path, char *scratch_dir) +{ +        int i   = 0; +        int ret = 0; +        char hist_scratch_dir[PATH_MAX] = {0,}; + +        jnl->hist_jnl = GF_CALLOC (1, sizeof (*jnl), +                         gf_changelog_mt_libgfchangelog_t); +        if (!jnl->hist_jnl) +                goto error_return; + +        jnl->hist_jnl->jnl_dir = NULL; +        jnl->hist_jnl->jnl_fd =  -1; + +        (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX); +        (void) snprintf (hist_scratch_dir, PATH_MAX, +                         "%s/"GF_CHANGELOG_HISTORY_DIR"/", +                         jnl->jnl_working_dir); + +        ret = mkdir_p (hist_scratch_dir, 0600, _gf_false); +        if (ret) +                goto dealloc_hist; + +        jnl->hist_jnl->jnl_working_dir = realpath (hist_scratch_dir, NULL); +        if (!jnl->hist_jnl->jnl_working_dir) +                goto dealloc_hist; + +        ret = gf_changelog_open_dirs (this, jnl->hist_jnl); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "could not create entries in history scratch dir"); +                goto dealloc_hist; +        } + +        (void) strncpy (jnl->hist_jnl->jnl_brickpath, brick_path, PATH_MAX); + +        for (i = 0; i < 256; i++) { +                jnl->hist_jnl->rfc3986[i] = +                        (isalnum(i) || i == '~' || +                        i == '-' || i == '.' || i == '_') ? i : 0; +        } + +        return 0; + + dealloc_hist: +        GF_FREE (jnl->hist_jnl); +        jnl->hist_jnl = NULL; + error_return: +        return -1; +} + +void +gf_changelog_journal_fini (void *xl, char *brick, void *data) +{ +        int ret = 0; +        xlator_t *this = NULL; +        gf_changelog_journal_t *jnl = NULL; + +        this = xl; +        jnl = data; + +        gf_changelog_cleanup_processor (jnl); + +        gf_changelog_cleanup_fds (jnl); +        if (jnl->hist_jnl) +                gf_changelog_cleanup_fds (jnl->hist_jnl); + +        GF_FREE (jnl);  }  void * -gf_changelog_process (void *data) +gf_changelog_journal_init (void *xl, struct gf_brick_spec *brick)  { -        ssize_t         len      = 0; -        ssize_t         offlen   = 0; -        xlator_t       *this     = NULL; -        char           *sbuf     = NULL; -        gf_changelog_t *gfc      = NULL; -        char from_path[PATH_MAX] = {0,}; - -        gfc = (gf_changelog_t *) data; -        this = gfc->this; - -        pthread_detach (pthread_self()); - -        for (;;) { -                len = gf_changelog_read_path (gfc->gfc_sockfd, -                                              from_path + offlen, -                                              PATH_MAX - offlen); -                if (len < 0) -                        continue; /* ignore it for now */ - -                if (len == 0) { /* close() from the changelog translator */ -                        gf_log (this->name, GF_LOG_INFO, "close from changelog" -                                " notification translator."); - -                        if (gfc->gfc_connretries != 1) { -                                if (!gf_changelog_notification_init(this, gfc)) -                                        continue; -                        } +        int                     i           = 0; +        int                     ret         = 0; +        xlator_t               *this        = NULL; +        struct stat             buf         = {0,}; +        char                   *scratch_dir = NULL; +        gf_changelog_journal_t *jnl         = NULL; + +        this = xl; +        scratch_dir = (char *) brick->ptr; + +        jnl = GF_CALLOC (1, sizeof (gf_changelog_journal_t), +                         gf_changelog_mt_libgfchangelog_t); +        if (!jnl) +                goto error_return; + +        if (stat (scratch_dir, &buf) && errno == ENOENT) { +                ret = mkdir_p (scratch_dir, 0600, _gf_true); +                if (ret) +                        goto dealloc_private; +        } -                        byebye = 1; -                        break; -                } +        jnl->jnl_working_dir = realpath (scratch_dir, NULL); +        if (!jnl->jnl_working_dir) +                goto dealloc_private; -                len += offlen; -                sbuf = gf_changelog_ext_change (this, gfc, from_path, len); -                if (!sbuf) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "could not extract changelog filename"); -                        continue; -                } +        ret = gf_changelog_open_dirs (this, jnl); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "could not create entries in scratch dir"); +                goto dealloc_private; +        } -                offlen = 0; -                if (sbuf != (from_path + len)) { -                        offlen = from_path + len - sbuf; -                        memmove (from_path, sbuf, offlen); -                } +        (void) strncpy (jnl->jnl_brickpath, brick->brick_path, PATH_MAX); + +        /* RFC 3986 {de,en}coding */ +        for (i = 0; i < 256; i++) { +                jnl->rfc3986[i] = +                        (isalnum(i) || i == '~' || +                        i == '-' || i == '.' || i == '_') ? i : 0;          } -        gf_log (this->name, GF_LOG_DEBUG, -                "byebye (%d) from processing thread...", byebye); +        ret = gf_changelog_init_history (this, jnl, +                                         brick->brick_path, scratch_dir); +        if (ret) +                goto cleanup_fds; + +        /* initialize journal processor */ +        ret = gf_changelog_init_processor (jnl); +        if (ret) +                goto cleanup_fds; + +        JNL_SET_API_STATE (jnl, JNL_API_CONN_INPROGESS); +        ret = pthread_spin_init (&jnl->lock, 0); +        if (ret != 0) +                goto cleanup_processor; +        return jnl; + + cleanup_processor: +        gf_changelog_cleanup_processor (jnl); + cleanup_fds: +        gf_changelog_cleanup_fds (jnl); +        if (jnl->hist_jnl) +                gf_changelog_cleanup_fds (jnl->hist_jnl); + dealloc_private: +        GF_FREE (jnl); + error_return:          return NULL;  } diff --git a/xlators/features/changelog/lib/src/gf-changelog-journal.h b/xlators/features/changelog/lib/src/gf-changelog-journal.h new file mode 100644 index 00000000000..9a0f0b28956 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-journal.h @@ -0,0 +1,114 @@ +/* +   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef __GF_CHANGELOG_JOURNAL_H +#define __GF_CHANGELOG_JOURNAL_H + +#include <unistd.h> +#include <pthread.h> + +#include "changelog.h" + +enum api_conn { +        JNL_API_CONNECTED, +        JNL_API_CONN_INPROGESS, +        JNL_API_DISCONNECTED, +}; + +typedef struct gf_changelog_entry { +        char path[PATH_MAX]; + +        struct list_head list; +} gf_changelog_entry_t; + +typedef struct gf_changelog_processor { +        pthread_mutex_t  lock;     /* protects ->entries */ +        pthread_cond_t   cond;     /* waiter during empty list */ +        gf_boolean_t     waiting; + +        pthread_t processor;       /* thread-id of journal processing thread */ + +        struct list_head entries; +} gf_changelog_processor_t; + +typedef struct gf_changelog_journal { +        DIR *jnl_dir;                       /* 'processing' directory stream */ + +        int jnl_fd;                         /* fd to the tracker file */ + +        char jnl_brickpath[PATH_MAX];       /* brick path for this end-point */ + +        gf_changelog_processor_t *jnl_proc; + +        char *jnl_working_dir;              /* scratch directory */ + +        char jnl_current_dir[PATH_MAX]; +        char jnl_processed_dir[PATH_MAX]; +        char jnl_processing_dir[PATH_MAX]; + +        char rfc3986[256];                  /* RFC 3986 string encoding */ + +        struct gf_changelog_journal *hist_jnl; +        int hist_done;                      /* holds 0 done scanning, +                                               1 keep scanning and -1 error */ + +        pthread_spinlock_t lock; +        int connected; +} gf_changelog_journal_t; + +#define JNL_SET_API_STATE(jnl, state)  (jnl->connected = state) +#define JNL_IS_API_DISCONNECTED(jnl)  (jnl->connected == JNL_API_DISCONNECTED) + +/* History API */ +typedef struct gf_changelog_history_data { +        int           len; + +        int           htime_fd; + +        /* parallelism count */ +        int           n_parallel; + +        /* history from, to indexes */ +        unsigned long from; +        unsigned long to; +} gf_changelog_history_data_t; + +typedef struct gf_changelog_consume_data { +        /** set of inputs */ + +        /* fd to read from */ +        int             fd; + +        /* from @offset */ +        off_t           offset; + +        xlator_t       *this; + +        gf_changelog_journal_t *jnl; + +        /** set of outputs */ + +        /* return value */ +        int retval; + +        /* journal processed */ +        char changelog[PATH_MAX]; +} gf_changelog_consume_data_t; + +/* event handler */ +CALLBACK gf_changelog_handle_journal; + +/* init, connect & disconnect handler */ +INIT gf_changelog_journal_init; +FINI gf_changelog_journal_fini; +CONNECT gf_changelog_journal_connect; +DISCONNECT gf_changelog_journal_disconnect; + +#endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c new file mode 100644 index 00000000000..d7e60fb9634 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c @@ -0,0 +1,381 @@ +/* +   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "changelog-misc.h" +#include "changelog-mem-types.h" + +#include "gf-changelog-helpers.h" +#include "changelog-rpc-common.h" + +/** + * Reverse socket: actual data transfer handler. Connection + * initiator is PROBER, data transfer is REBORP. + */ + +struct rpcsvc_program *gf_changelog_reborp_programs[]; + +/** + * On a reverse connection, unlink the socket file. + */ +int +gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata, +                                   rpcsvc_event_t event, void *data) +{ +        int ret = 0; +        xlator_t       *this     = NULL; +        gf_private_t   *priv     = NULL; +        gf_changelog_t *entry    = NULL; +        char sock[UNIX_PATH_MAX] = {0,}; + +        entry = mydata; +        this = entry->this; +        priv = this->private; + +        switch (event) { +        case RPCSVC_EVENT_ACCEPT: +                ret = unlink (RPC_SOCK(entry)); +                if (ret != 0) +                        gf_log (this->name, GF_LOG_WARNING, "failed to unlink" +                                " reverse socket file %s", RPC_SOCK (entry)); +                if (entry->connected) +                        GF_CHANGELOG_INVOKE_CBK (this, entry->connected, +                                                 entry->brick, entry->ptr); +                break; +        case RPCSVC_EVENT_DISCONNECT: +                LOCK (&priv->lock); +                { +                        list_del (&entry->list); +                } +                UNLOCK (&priv->lock); + +                if (entry->disconnected) +                        GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected, +                                                 entry->brick, entry->ptr); + +                GF_FREE (entry); +                break; +        default: +                break; +        } + +        return 0; +} + +rpcsvc_t * +gf_changelog_reborp_init_rpc_listner (xlator_t *this, +                                      char *path, char *sock, void *cbkdata) +{ +        CHANGELOG_MAKE_TMP_SOCKET_PATH (path, sock, UNIX_PATH_MAX); +        return changelog_rpc_server_init (this, sock, cbkdata, +                                          gf_changelog_reborp_rpcsvc_notify, +                                          gf_changelog_reborp_programs); +} + +/** + * This is dirty and painful as of now untill there is event filtering in the + * server. The entire event buffer is scanned and interested events are picked, + * whereas we _should_ be notified with the events we were interested in + * (selected at the time of probe). As of now this is complete BS and needs + * fixture ASAP. I just made it work, it needs to be better. + * + * @FIXME: cleanup this bugger once server filters events. + */ +inline void +gf_changelog_invoke_callback (gf_changelog_t *entry, +                              struct iovec **vec, int payloadcnt) +{ +        int i = 0; +        int evsize = 0; +        xlator_t *this = NULL; +        changelog_event_t *event = NULL; + +        this = entry->this; + +        for (; i < payloadcnt; i++) { +                event = (changelog_event_t *)vec[i]->iov_base; +                evsize = vec[i]->iov_len / CHANGELOG_EV_SIZE; + +                for (; evsize > 0; evsize--, event++) { +                        if (gf_changelog_filter_check (entry, event)) { +                                GF_CHANGELOG_INVOKE_CBK (this, +                                                         entry->callback, +                                                         entry->brick, +                                                         entry->ptr, event); +                        } +                } +        } +} + +/** + * Ordered event handler is self-adaptive.. if the event sequence number + * is what's expected (->next_seq) there is no ordering list that's + * maintained. On out-of-order event notifications, event buffers are + * dynamically allocated and ordered. + */ + +inline int +__is_expected_sequence (struct gf_event_list *ev, struct gf_event *event) +{ +        return (ev->next_seq == event->seq); +} + +inline int +__can_process_event (struct gf_event_list *ev, struct gf_event **event) +{ +        *event = list_first_entry (&ev->events, struct gf_event, list); + +        if (__is_expected_sequence (ev, *event)) { +                list_del (&(*event)->list); +                ev->next_seq++; +                return 1; +        } + +        return 0; +} + +inline void +__process_event_list (struct gf_event_list *ev, struct gf_event **event) +{ +        while (list_empty (&ev->events) +               || !__can_process_event (ev, event)) +                pthread_cond_wait (&ev->cond, &ev->lock); +} + +void * +gf_changelog_callback_invoker (void *arg) +{ +        int                   ret    = 0; +        xlator_t             *this   = NULL; +        gf_changelog_t       *entry  = NULL; +        struct iovec         *vec    = NULL; +        struct gf_event      *event  = NULL; +        struct gf_event_list *ev     = NULL; + +        ev    = arg; +        entry = ev->entry; +        this  = entry->this; + +        while (1) { +                pthread_mutex_lock (&ev->lock); +                { +                        __process_event_list (ev, &event); +                } +                pthread_mutex_unlock (&ev->lock); + +                vec = (struct iovec *) &event->iov; +                gf_changelog_invoke_callback (entry, &vec, event->count); + +                GF_FREE (event); +        } + +        return NULL; +} + +static int +orderfn (struct list_head *pos1, struct list_head *pos2) +{ +        struct gf_event *event1 = NULL; +        struct gf_event *event2 = NULL; + +        event1 = list_entry (pos1, struct gf_event, list); +        event2 = list_entry (pos2, struct gf_event, list); + +        if  (event1->seq > event2->seq) +                return 1; +        return -1; +} + +int +gf_changelog_ordered_event_handler (rpcsvc_request_t *req, +                                    xlator_t *this, gf_changelog_t *entry) +{ +        int                   i          = 0; +        size_t                payloadlen = 0; +        ssize_t               len        = 0; +        int                   payloadcnt = 0; +        changelog_event_req   rpc_req    = {0,}; +        changelog_event_rsp   rpc_rsp    = {0,}; +        struct iovec         *vec        = NULL; +        struct gf_event      *event      = NULL; +        struct gf_event_list *ev         = NULL; + +        ev = &entry->event; + +        len = xdr_to_generic (req->msg[0], +                              &rpc_req, (xdrproc_t)xdr_changelog_event_req); +        if (len < 0) { +                gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed"); +                req->rpc_err = GARBAGE_ARGS; +                goto handle_xdr_error; +        } + +        if (len < req->msg[0].iov_len) { +                payloadcnt = 1; +                payloadlen = (req->msg[0].iov_len - len); +        } +        for (i = 1; i < req->count; i++) { +                payloadcnt++; +                payloadlen += req->msg[i].iov_len; +        } + +        event = GF_CALLOC (1, GF_EVENT_CALLOC_SIZE (payloadcnt, payloadlen), +                           gf_changelog_mt_libgfchangelog_event_t); +        if (!event) +                goto handle_xdr_error; +        INIT_LIST_HEAD (&event->list); + +        payloadlen   = 0; +        event->seq   = rpc_req.seq; +        event->count = payloadcnt; + +        /* deep copy IO vectors */ +        vec = &event->iov[0]; +        GF_EVENT_ASSIGN_IOVEC (vec, event, +                               (req->msg[0].iov_len - len), payloadlen); +        (void) memcpy (vec->iov_base, +                       req->msg[0].iov_base + len, vec->iov_len); + +        for (i = 1; i < req->count; i++) { +                vec = &event->iov[i]; +                GF_EVENT_ASSIGN_IOVEC (vec, event, +                                       req->msg[i].iov_len, payloadlen); +                (void) memcpy (event->iov[i].iov_base, +                               req->msg[i].iov_base, req->msg[i].iov_len); +        } + +        gf_log (this->name, GF_LOG_DEBUG, +                "seq: %lu [%s] (time: %lu.%lu), (vec: %d, len: %ld)", +                rpc_req.seq, entry->brick, rpc_req.tv_sec, +                rpc_req.tv_usec, payloadcnt, payloadlen); + +        /* add it to the ordered event list and wake up listner(s) */ +        pthread_mutex_lock (&ev->lock); +        { +                list_add_order (&event->list, &ev->events, orderfn); +                if (!ev->next_seq) +                        ev->next_seq = event->seq; +                if (ev->next_seq == event->seq) +                        pthread_cond_signal (&ev->cond); +        } +        pthread_mutex_unlock (&ev->lock); + +        /* ack sequence number */ +        rpc_rsp.op_ret = 0; +        rpc_rsp.seq    = rpc_req.seq; + +        goto submit_rpc; + + handle_xdr_error: +        rpc_rsp.op_ret = -1; +        rpc_rsp.seq    = 0;     /* invalid */ + submit_rpc: +        return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL, +                                           (xdrproc_t)xdr_changelog_event_rsp); +} + +int +gf_changelog_unordered_event_handler (rpcsvc_request_t *req, +                                      xlator_t *this, gf_changelog_t *entry) +{ +        int                 i          = 0; +        int                 ret        = 0; +        ssize_t             len        = 0; +        int                 payloadcnt = 0; +        struct iovec vector[MAX_IOVEC] = {{0,}, }; +        changelog_event_req rpc_req    = {0,}; +        changelog_event_rsp rpc_rsp    = {0,}; + +        len = xdr_to_generic (req->msg[0], +                              &rpc_req, (xdrproc_t)xdr_changelog_event_req); +        if (len < 0) { +                gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed"); +                req->rpc_err = GARBAGE_ARGS; +                goto handle_xdr_error; +        } + +        /* prepare payload */ +        if (len < req->msg[0].iov_len) { +                payloadcnt = 1; +                vector[0].iov_base = (req->msg[0].iov_base + len); +                vector[0].iov_len  = (req->msg[0].iov_len - len); +        } + +        for (i = 1; i < req->count; i++) { +                vector[payloadcnt++] = req->msg[i]; +        } + +        gf_log (this->name, GF_LOG_DEBUG, +                "seq: %lu (time: %lu.%lu), (vec: %d)", +                rpc_req.seq, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt); + +        /* invoke callback */ +        struct iovec *vec = (struct iovec *) &vector; +        gf_changelog_invoke_callback (entry, &vec, payloadcnt); + +        /* ack sequence number */ +        rpc_rsp.op_ret = 0; +        rpc_rsp.seq = rpc_req.seq; + +        goto submit_rpc; + + handle_xdr_error: +        rpc_rsp.op_ret = -1; +        rpc_rsp.seq = 0; /* invalid */ + submit_rpc: +        return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL, +                                           (xdrproc_t)xdr_changelog_event_rsp); +} + +int +gf_changelog_reborp_handle_event (rpcsvc_request_t *req) +{ +        int                  ret     = 0; +        xlator_t            *this    = NULL; +        rpcsvc_t            *svc     = NULL; +        gf_changelog_t      *entry   = NULL; + +        svc = rpcsvc_request_service (req); +        entry = svc->mydata; + +        this = THIS = entry->this; + +        ret = GF_NEED_ORDERED_EVENTS (entry) +                ? gf_changelog_ordered_event_handler (req, this, entry) +                : gf_changelog_unordered_event_handler (req, this, entry); + +        return ret; +} + +rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = { +        [CHANGELOG_REV_PROC_EVENT] = { +                "CHANGELOG EVENT HANDLER", CHANGELOG_REV_PROC_EVENT, +                gf_changelog_reborp_handle_event, NULL, 0, DRC_NA +        }, +}; + +/** + * Do not use synctask as the RPC layer dereferences ->mydata as THIS. + * In gf_changelog_setup_rpc(), @cbkdata is of type @gf_changelog_t, + * and that's required to invoke the callback with the appropriate + * brick path and it's private data. + */ +struct rpcsvc_program gf_changelog_reborp_prog = { +        .progname  = "LIBGFCHANGELOG REBORP", +        .prognum   = CHANGELOG_REV_RPC_PROCNUM, +        .progver   = CHANGELOG_REV_RPC_PROCVER, +        .numactors = CHANGELOG_REV_PROC_MAX, +        .actors    = gf_changelog_reborp_actors, +        .synctask  = _gf_false, +}; + +struct rpcsvc_program *gf_changelog_reborp_programs[] = { +        &gf_changelog_reborp_prog, +        NULL, +}; diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.c b/xlators/features/changelog/lib/src/gf-changelog-rpc.c new file mode 100644 index 00000000000..c2a4c044d23 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.c @@ -0,0 +1,105 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "gf-changelog-rpc.h" +#include "changelog-misc.h" +#include "changelog-mem-types.h" + +struct rpc_clnt_program gf_changelog_clnt; + +/* TODO: piggyback reconnect to called (upcall) */ +int +gf_changelog_rpc_notify (struct rpc_clnt *rpc, +                         void *mydata, rpc_clnt_event_t event, void *data) +{ +        xlator_t *this = NULL; + +        this = mydata; + +        switch (event) { +        case RPC_CLNT_CONNECT: +                rpc_clnt_set_connected (&rpc->conn); +                break; +        case RPC_CLNT_DISCONNECT: +                rpc_clnt_unset_connected (&rpc->conn); +                break; +        case RPC_CLNT_MSG: +        case RPC_CLNT_DESTROY: +                break; +        } + +        return 0; +} + +struct rpc_clnt * +gf_changelog_rpc_init (xlator_t *this, gf_changelog_t *entry) +{ +        char sockfile[UNIX_PATH_MAX] = {0,}; + +        CHANGELOG_MAKE_SOCKET_PATH (entry->brick, +                                    sockfile, UNIX_PATH_MAX); +        return changelog_rpc_client_init (this, entry, +                                          sockfile, gf_changelog_rpc_notify); +} + +/** + * remote procedure calls declarations. + */ + +int +gf_probe_changelog_cbk (struct rpc_req *req, +                        struct iovec *iovec, int count, void *myframe) +{ +        return 0; +} + +int +gf_probe_changelog_filter (call_frame_t *frame, xlator_t *this, void *data) +{ +        int ret = 0; +        char *sock = NULL; +        gf_changelog_t *entry = NULL; +        changelog_probe_req req = {0,}; + +        entry = data; +        sock = RPC_SOCK (entry); + +        (void) memcpy (&req.sock, sock, strlen (sock)); +        req.filter = entry->notify; + +        /* invoke RPC */ +        return changelog_rpc_sumbit_req (RPC_PROBER (entry), (void *) &req, +                                         frame, &gf_changelog_clnt, +                                         CHANGELOG_RPC_PROBE_FILTER, NULL, 0, +                                         NULL, this, gf_probe_changelog_cbk, +                                         (xdrproc_t) xdr_changelog_probe_req); +} + +int +gf_changelog_invoke_rpc (xlator_t *this, gf_changelog_t *entry, int procidx) +{ +        return changelog_invoke_rpc (this, RPC_PROBER (entry), +                                     &gf_changelog_clnt, procidx, entry); +} + +struct rpc_clnt_procedure gf_changelog_procs[CHANGELOG_RPC_PROC_MAX] = { +        [CHANGELOG_RPC_PROC_NULL] = {"NULL", NULL}, +        [CHANGELOG_RPC_PROBE_FILTER] = { +                "PROBE FILTER", gf_probe_changelog_filter +        }, +}; + +struct rpc_clnt_program gf_changelog_clnt = { +        .progname  = "LIBGFCHANGELOG", +        .prognum   = CHANGELOG_RPC_PROGNUM, +        .progver   = CHANGELOG_RPC_PROGVER, +        .numproc   = CHANGELOG_RPC_PROC_MAX, +        .proctable = gf_changelog_procs, +}; diff --git a/xlators/features/changelog/src/changelog-notifier.h b/xlators/features/changelog/lib/src/gf-changelog-rpc.h index 55e728356e6..1c982eef809 100644 --- a/xlators/features/changelog/src/changelog-notifier.h +++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.h @@ -8,12 +8,19 @@     cases as published by the Free Software Foundation.  */ -#ifndef _CHANGELOG_NOTIFIER_H -#define _CHANGELOG_NOTIFIER_H +#ifndef __GF_CHANGELOG_RPC_H +#define __GF_CHANGELOG_RPC_H -#include "changelog-helpers.h" +#include "xlator.h" -void * -changelog_notifier (void *data); +#include "gf-changelog-helpers.h" +#include "changelog-rpc-common.h" + +struct rpc_clnt *gf_changelog_rpc_init (xlator_t *, gf_changelog_t *); + +int gf_changelog_invoke_rpc (xlator_t *, gf_changelog_t *, int); + +rpcsvc_t * +gf_changelog_reborp_init_rpc_listner (xlator_t *, char *, char *, void *);  #endif diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c index fb2d9037ffb..8f33eb01013 100644 --- a/xlators/features/changelog/lib/src/gf-changelog.c +++ b/xlators/features/changelog/lib/src/gf-changelog.c @@ -14,6 +14,8 @@  #include <sys/types.h>  #include <sys/socket.h>  #include <sys/un.h> +#include <sys/time.h> +#include <sys/resource.h>  #ifndef _GNU_SOURCE  #define _GNU_SOURCE @@ -24,589 +26,523 @@  #include "glusterfs.h"  #include "logging.h"  #include "defaults.h" +#include "syncop.h" +#include "gf-changelog-rpc.h"  #include "gf-changelog-helpers.h"  /* from the changelog translator */  #include "changelog-misc.h"  #include "changelog-mem-types.h" -int byebye = 0; +/** + * Global singleton xlator pointer for the library, initialized + * during library load. This should probably be hidden inside + * an initialized object which is an handle for the consumer. + * + * TODO: do away with the global.. + */ +xlator_t *master = NULL; -static void -gf_changelog_cleanup (gf_changelog_t *gfc) +static inline +gf_private_t *gf_changelog_alloc_priv ()  { -        /* socket */ -        if (gfc->gfc_sockfd != -1) -                close (gfc->gfc_sockfd); -        /* tracker fd */ -        if (gfc->gfc_fd != -1) -                close (gfc->gfc_fd); -        /* processing dir */ -        if (gfc->gfc_dir) -                closedir (gfc->gfc_dir); - -        if (gfc->gfc_working_dir) -                free (gfc->gfc_working_dir); /* allocated by realpath */ -} +        int ret = 0; +        gf_private_t *priv = NULL; -void -__attribute__ ((constructor)) gf_changelog_ctor (void) -{ -        glusterfs_ctx_t *ctx = NULL; +        priv = calloc (1, sizeof (gf_private_t)); +        if (!priv) +                goto error_return; +        INIT_LIST_HEAD (&priv->connections); -        ctx = glusterfs_ctx_new (); -        if (!ctx) -                return; +        ret = LOCK_INIT (&priv->lock); +        if (ret != 0) +                goto free_priv; +        priv->api = NULL; -        if (glusterfs_globals_init (ctx)) { -                free (ctx); -                ctx = NULL; -                return; -        } +        return priv; -        THIS->ctx = ctx; -        if (xlator_mem_acct_init (THIS, gf_changelog_mt_end)) -                return; + free_priv: +        free (priv); + error_return: +        return NULL;  } -void -__attribute__ ((destructor)) gf_changelog_dtor (void) -{ -        xlator_t        *this = NULL; -        glusterfs_ctx_t *ctx  = NULL; -        gf_changelog_t  *gfc  = NULL; +#define GF_CHANGELOG_EVENT_POOL_SIZE   16384 +#define GF_CHANGELOG_EVENT_THREAD_COUNT 4 -        this = THIS; -        if (!this) -                return; - -        ctx = this->ctx; -        gfc = this->private; - -        if (gfc) { -                if (gfc->hist_gfc) { -                        gf_changelog_cleanup(gfc->hist_gfc); -                        GF_FREE (gfc->hist_gfc); -                } -                gf_changelog_cleanup (gfc); -                GF_FREE (gfc); +static int +gf_changelog_ctx_defaults_init (glusterfs_ctx_t *ctx) +{ +        cmd_args_t    *cmd_args = NULL; +        struct rlimit  lim = {0, }; +        call_pool_t   *pool = NULL; +        int            ret         = -1; + +        ret = xlator_mem_acct_init (THIS, gf_changelog_mt_end); +        if (ret != 0) { +                return ret;          } -        if (ctx) { -                pthread_mutex_destroy (&ctx->lock); -                free (ctx); -                ctx = NULL; -        } -} +        ctx->process_uuid = generate_glusterfs_ctx_id (); +        if (!ctx->process_uuid) +                return -1; +        ctx->page_size  = 128 * GF_UNIT_KB; -static int -gf_changelog_open_dirs (gf_changelog_t *gfc) -{ -        int  ret                    = -1; -        DIR *dir                    = NULL; -        int  tracker_fd             = 0; -        char tracker_path[PATH_MAX] = {0,}; -        xlator_t *this              = NULL; +        ctx->iobuf_pool = iobuf_pool_new (); +        if (!ctx->iobuf_pool) +                return -1; -        this = THIS; -        GF_ASSERT (this); +        ctx->event_pool = event_pool_new (GF_CHANGELOG_EVENT_POOL_SIZE, +                                          GF_CHANGELOG_EVENT_THREAD_COUNT); +        if (!ctx->event_pool) +                return -1; -        (void) snprintf (gfc->gfc_current_dir, PATH_MAX, -                         "%s/"GF_CHANGELOG_CURRENT_DIR"/", -                         gfc->gfc_working_dir); +        pool = GF_CALLOC (1, sizeof (call_pool_t), +                          gf_changelog_mt_libgfchangelog_call_pool_t); +        if (!pool) +                return -1; -        ret = recursive_rmdir (gfc->gfc_current_dir); -        if (ret) { -                gf_log (this->name, GF_LOG_ERROR, -                        "Failed to rmdir: %s, err: %s", -                        gfc->gfc_current_dir, strerror (errno)); -                goto out; -        } +        /* frame_mem_pool size 112 * 64 */ +        pool->frame_mem_pool = mem_pool_new (call_frame_t, 32); +        if (!pool->frame_mem_pool) +                return -1; -        ret = mkdir_p (gfc->gfc_current_dir, 0600, _gf_false); -        if (ret) -                goto out; +        /* stack_mem_pool size 256 * 128 */ +        pool->stack_mem_pool = mem_pool_new (call_stack_t, 16); -        (void) snprintf (gfc->gfc_processed_dir, PATH_MAX, -                         "%s/"GF_CHANGELOG_PROCESSED_DIR"/", -                         gfc->gfc_working_dir); +        if (!pool->stack_mem_pool) +                return -1; -        ret = mkdir_p (gfc->gfc_processed_dir, 0600, _gf_false); -        if (ret) -                goto out; +        ctx->stub_mem_pool = mem_pool_new (call_stub_t, 16); +        if (!ctx->stub_mem_pool) +                return -1; -        (void) snprintf (gfc->gfc_processing_dir, PATH_MAX, -                         "%s/"GF_CHANGELOG_PROCESSING_DIR"/", -                         gfc->gfc_working_dir); +        ctx->dict_pool = mem_pool_new (dict_t, 32); +        if (!ctx->dict_pool) +                return -1; -        ret = recursive_rmdir (gfc->gfc_processing_dir); -        if (ret) { -                gf_log (this->name, GF_LOG_ERROR, -                        "Failed to rmdir: %s, err: %s", -                        gfc->gfc_processing_dir, strerror (errno)); -                goto out; -        } +        ctx->dict_pair_pool = mem_pool_new (data_pair_t, 512); +        if (!ctx->dict_pair_pool) +                return -1; -        ret = mkdir_p (gfc->gfc_processing_dir, 0600, _gf_false); -        if (ret) -                goto out; +        ctx->dict_data_pool = mem_pool_new (data_t, 512); +        if (!ctx->dict_data_pool) +                return -1; -        dir = opendir (gfc->gfc_processing_dir); -        if (!dir) { -                gf_log ("", GF_LOG_ERROR, -                        "opendir() error [reason: %s]", strerror (errno)); -                goto out; -        } +        INIT_LIST_HEAD (&pool->all_frames); +        LOCK_INIT (&pool->lock); +        ctx->pool = pool; -        gfc->gfc_dir = dir; +        pthread_mutex_init (&(ctx->lock), NULL); -        (void) snprintf (tracker_path, PATH_MAX, -                         "%s/"GF_CHANGELOG_TRACKER, gfc->gfc_working_dir); +        cmd_args = &ctx->cmd_args; -        tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR, -                           S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); -        if (tracker_fd < 0) { -                closedir (gfc->gfc_dir); -                ret = -1; -                goto out; -        } +        INIT_LIST_HEAD (&cmd_args->xlator_options); + +        lim.rlim_cur = RLIM_INFINITY; +        lim.rlim_max = RLIM_INFINITY; +        setrlimit (RLIMIT_CORE, &lim); -        gfc->gfc_fd = tracker_fd; -        ret = 0; - out: -        return ret; +        return 0;  } -int -gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc) +/* TODO: cleanup ctx defaults */ +static void +gf_changelog_cleanup_this (xlator_t *this)  { -        int                ret    = 0; -        int                len    = 0; -        int                tries  = 0; -        int                sockfd = 0; -        struct sockaddr_un remote; - -        this = gfc->this; +        glusterfs_ctx_t *ctx = NULL; -        if (gfc->gfc_sockfd != -1) { -                gf_log (this->name, GF_LOG_INFO, -                        "Reconnecting..."); -                close (gfc->gfc_sockfd); -        } +        if (!this) +                return; -        sockfd = socket (AF_UNIX, SOCK_STREAM, 0); -        if (sockfd < 0) { -                ret = -1; -                goto out; -        } +        ctx = this->ctx; +        syncenv_destroy (ctx->env); +        free (ctx); -        CHANGELOG_MAKE_SOCKET_PATH (gfc->gfc_brickpath, -                                    gfc->gfc_sockpath, UNIX_PATH_MAX); -        gf_log (this->name, GF_LOG_INFO, -                "connecting to changelog socket: %s (brick: %s)", -                gfc->gfc_sockpath, gfc->gfc_brickpath); +        if (this->private) +                free (this->private); -        remote.sun_family = AF_UNIX; -        strcpy (remote.sun_path, gfc->gfc_sockpath); +        this->private = NULL; +        this->ctx = NULL; +} -        len = strlen (remote.sun_path) + sizeof (remote.sun_family); +static int +gf_changelog_init_this () +{ +        glusterfs_ctx_t *ctx = NULL; -        while (tries < gfc->gfc_connretries) { -                gf_log (this->name, GF_LOG_WARNING, -                        "connection attempt %d/%d...", -                        tries + 1, gfc->gfc_connretries); +        ctx = glusterfs_ctx_new (); +        if (!ctx) +                goto error_return; -                /* initiate a connect */ -                if (connect (sockfd, (struct sockaddr *) &remote, len) == 0) { -                        gfc->gfc_sockfd = sockfd; -                        break; -                } +        if (glusterfs_globals_init (ctx)) +                goto free_ctx; -                tries++; -                sleep (2); -        } +        THIS->ctx = ctx; +        if (gf_changelog_ctx_defaults_init (ctx)) +                goto free_ctx; -        if (tries == gfc->gfc_connretries) { -                gf_log (this->name, GF_LOG_ERROR, -                        "could not connect to changelog socket!" -                        " bailing out..."); -                close (sockfd); -                ret = -1; -        } else -                gf_log (this->name, GF_LOG_INFO, -                        "connection successful"); +        ctx->env = syncenv_new (0, 0, 0); +        if (!ctx->env) +                goto free_ctx; +        return 0; - out: -        return ret; + free_ctx: +        free (ctx); +        THIS->ctx = NULL; + error_return: +        return -1;  } -int -gf_changelog_done (char *file) +static int +gf_changelog_init_master ()  { -        int             ret    = -1; -        char           *buffer = NULL; -        xlator_t       *this   = NULL; -        gf_changelog_t *gfc    = NULL; -        char to_path[PATH_MAX] = {0,}; - -        errno = EINVAL; - -        this = THIS; -        if (!this) -                goto out; - -        gfc = (gf_changelog_t *) this->private; -        if (!gfc) -                goto out; +        int              ret  = 0; +        gf_private_t    *priv = NULL; +        glusterfs_ctx_t *ctx  = NULL; -        if (!file || !strlen (file)) -                goto out; +        ret = gf_changelog_init_this (); +        if (ret != 0) +                goto error_return; +        master = THIS; + +        priv = gf_changelog_alloc_priv (); +        if (!priv) +                goto cleanup_master; +        master->private = priv; + +        /* poller thread */ +        ret = pthread_create (&priv->poller, +                              NULL, changelog_rpc_poller, master); +        if (ret != 0) { +                gf_log (master->name, GF_LOG_ERROR, +                        "failed to spawn poller thread"); +                goto cleanup_master; +        } -        /* make sure 'file' is inside ->gfc_working_dir */ -        buffer = realpath (file, NULL); -        if (!buffer) -                goto out; +        return 0; -        if (strncmp (gfc->gfc_working_dir, -                     buffer, strlen (gfc->gfc_working_dir))) -                goto out; + cleanup_master: +        master->private = NULL; +        gf_changelog_cleanup_this (master); + error_return: +        return -1; +} -        (void) snprintf (to_path, PATH_MAX, "%s%s", -                         gfc->gfc_processed_dir, basename (buffer)); -        gf_log (this->name, GF_LOG_DEBUG, -                "moving %s to processed directory", file); -        ret = rename (buffer, to_path); -        if (ret) { -                gf_log (this->name, GF_LOG_ERROR, -                        "cannot move %s to %s (reason: %s)", -                        file, to_path, strerror (errno)); -                goto out; -        } +/* ctor/dtor */ -        ret = 0; +void +__attribute__ ((constructor)) gf_changelog_ctor (void) +{ +        (void) gf_changelog_init_master (); +} - out: -        if (buffer) -                free (buffer); /* allocated by realpath() */ -        return ret; +void +__attribute__ ((destructor)) gf_changelog_dtor (void) +{ +        gf_changelog_cleanup_this (master);  } -/** - * @API - *  for a set of changelogs, start from the beginning - */ +/* TODO: cleanup clnt/svc on failure */  int -gf_changelog_start_fresh () +gf_changelog_setup_rpc (xlator_t *this, +                        gf_changelog_t *entry, int proc)  { -        xlator_t *this = NULL; -        gf_changelog_t *gfc = NULL; - -        this = THIS; -        if (!this) -                goto out; +        int              ret = 0; +        rpcsvc_t        *svc = NULL; +        struct rpc_clnt *rpc = NULL; -        errno = EINVAL; +        /** +         * Initialize a connect back socket. A probe() RPC call to the server +         * triggers a reverse connect. +         */ +        svc = gf_changelog_reborp_init_rpc_listner (this, entry->brick, +                                                    RPC_SOCK (entry), entry); +        if (!svc) +                goto error_return; +        RPC_REBORP (entry) = svc; + +        /* Initialize an RPC client */ +        rpc = gf_changelog_rpc_init (this, entry); +        if (!rpc) +                goto error_return; +        RPC_PROBER (entry) = rpc; -        gfc = (gf_changelog_t *) this->private; -        if (!gfc) -                goto out; +        /** +         * @FIXME +         * till we have connection state machine, let's delay the RPC call +         * for now.. +         */ +        sleep (2); -        if (gf_ftruncate (gfc->gfc_fd, 0)) -                goto out; +        /** +         * Probe changelog translator for reverse connection. After a successful +         * call, there's less use of the client and can be disconnected, but +         * let's leave the connection active for any future RPC calls. +         */ +        ret = gf_changelog_invoke_rpc (this, entry, proc); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Could not initiate probe RPC, bailing out!!!"); +                goto error_return; +        }          return 0; - out: + error_return:          return -1;  } -/** - * @API - * return the next changelog file entry. zero means all chanelogs - * consumed. - */ -ssize_t -gf_changelog_next_change (char *bufptr, size_t maxlen) +static void +gf_cleanup_event (gf_changelog_t *entry)  { -        ssize_t         size       = -1; -        int             tracker_fd = 0; -        xlator_t       *this       = NULL; -        gf_changelog_t *gfc        = NULL; -        char buffer[PATH_MAX]      = {0,}; - -        if (maxlen > PATH_MAX) { -                errno = ENAMETOOLONG; -                goto out; -        } +        xlator_t             *this = NULL; +        struct gf_event_list *ev   = NULL; -        errno = EINVAL; +        this = entry->this; +        ev = &entry->event; -        this = THIS; -        if (!this) -                goto out; +        (void) gf_thread_cleanup (this, ev->invoker); -        gfc = (gf_changelog_t *) this->private; -        if (!gfc) -                goto out; +        (void) pthread_mutex_destroy (&ev->lock); +        (void) pthread_cond_destroy (&ev->cond); -        tracker_fd = gfc->gfc_fd; +        ev->entry = NULL; +} -        size = gf_readline (tracker_fd, buffer, maxlen); -        if (size < 0) { -                size = -1; -                goto out; +static int +gf_init_event (gf_changelog_t *entry) +{ +        int ret = 0; +        struct gf_event_list *ev = NULL; + +        ev = &entry->event; +        ev->entry = entry; + +        ret = pthread_mutex_init (&ev->lock, NULL); +        if (ret != 0) +                goto error_return; +        ret = pthread_cond_init (&ev->cond, NULL); +        if (ret != 0) +                goto cleanup_mutex; +        INIT_LIST_HEAD (&ev->events); + +        ev->next_seq = 0;  /* bootstrap sequencing */ + +        if (entry->ordered) { +                ret = pthread_create (&ev->invoker, NULL, +                                      gf_changelog_callback_invoker, ev); +                if (ret != 0) +                        goto cleanup_cond;          } -        if (size == 0) -                goto out; - -        memcpy (bufptr, buffer, size - 1); -        bufptr[size - 1] = '\0'; +        return 0; -out: -        return size; + cleanup_cond: +        (void) pthread_cond_destroy (&ev->cond); + cleanup_mutex: +        (void) pthread_mutex_destroy (&ev->lock); + error_return: +        return -1;  }  /** - * @API - *  gf_changelog_scan() - scan and generate a list of change entries - * - * calling this api multiple times (without calling gf_changlog_done()) - * would result new changelogs(s) being refreshed in the tracker file. - * This call also acts as a cancellation point for the consumer. + * TODO: + *  - cleanup invoker thread (if ordered mode) + *  - cleanup event list + *  - destroy rpc{-clnt, svc}   */ -ssize_t -gf_changelog_scan () +int +gf_cleanup_brick_connection (xlator_t *this, gf_changelog_t *entry)  { -        int             ret        = 0; -        int             tracker_fd = 0; -        size_t          len        = 0; -        size_t          off        = 0; -        xlator_t       *this       = NULL; -        size_t          nr_entries = 0; -        gf_changelog_t *gfc        = NULL; -        struct dirent  *entryp     = NULL; -        struct dirent  *result     = NULL; -        char buffer[PATH_MAX]      = {0,}; - -        this = THIS; -        if (!this) -                goto out; +        return 0; +} -        gfc = (gf_changelog_t *) this->private; -        if (!gfc) -                goto out; +int +gf_cleanup_connections (xlator_t *this) +{ +        return 0; +} -        /** -         * do we need to protect 'byebye' with locks? worst, the -         * consumer would get notified during next scan(). -         */ -        if (byebye) { -                errno = ECONNREFUSED; -                goto out; -        } +static int +gf_setup_brick_connection (xlator_t *this, +                           struct gf_brick_spec *brick, +                           gf_boolean_t ordered, void *xl) +{ +        int ret = 0; +        gf_private_t *priv = NULL; +        gf_changelog_t *entry = NULL; -        errno = EINVAL; +        priv = this->private; -        tracker_fd = gfc->gfc_fd; +        if (!brick->callback || !brick->init || !brick->fini) +                goto error_return; -        if (gf_ftruncate (tracker_fd, 0)) -                goto out; +        entry = GF_CALLOC (1, sizeof (*entry), +                           gf_changelog_mt_libgfchangelog_t); +        if (!entry) +                goto error_return; +        INIT_LIST_HEAD (&entry->list); -        len = offsetof(struct dirent, d_name) -                + pathconf(gfc->gfc_processing_dir, _PC_NAME_MAX) + 1; -        entryp = GF_CALLOC (1, len, -                            gf_changelog_mt_libgfchangelog_dirent_t); -        if (!entryp) -                goto out; +        entry->notify = brick->filter; +        (void) strncpy (entry->brick, brick->brick_path, PATH_MAX); -        rewinddir (gfc->gfc_dir); -        while (1) { -                ret = readdir_r (gfc->gfc_dir, entryp, &result); -                if (ret || !result) -                        break; +        entry->this = this; +        entry->invokerxl = xl; -                if ( !strcmp (basename (entryp->d_name), ".") -                     || !strcmp (basename (entryp->d_name), "..") ) -                        continue; +        entry->ordered = ordered; +        if (ordered) { +                ret = gf_init_event (entry); +                if (ret) +                        goto free_entry; +        } -                nr_entries++; +        entry->fini         = brick->fini; +        entry->callback     = brick->callback; +        entry->connected    = brick->connected; +        entry->disconnected = brick->disconnected; -                GF_CHANGELOG_FILL_BUFFER (gfc->gfc_processing_dir, -                                          buffer, off, -                                          strlen (gfc->gfc_processing_dir)); -                GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer, -                                          off, strlen (entryp->d_name)); -                GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1); +        entry->ptr = brick->init (this, brick); +        if (!entry->ptr) +                goto cleanup_event; +        priv->api = entry->ptr;  /* pointer to API, if required */ -                if (gf_changelog_write (tracker_fd, buffer, off) != off) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "error writing changelog filename" -                                " to tracker file"); -                        break; -                } -                off = 0; +        LOCK (&priv->lock); +        { +                list_add_tail (&entry->list, &priv->connections);          } +        UNLOCK (&priv->lock); -        GF_FREE (entryp); +        ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER); +        if (ret) +                goto cleanup_event; +        return 0; -        if (!result) { -                if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) -                        return nr_entries; -        } - out: + cleanup_event: +        if (ordered) +                gf_cleanup_event (entry); + free_entry: +        list_del (&entry->list); /* FIXME: kludge for now */ +        GF_FREE (entry); + error_return:          return -1;  } -/** - * @API - *  gf_changelog_register() - register a client for updates. - */  int -gf_changelog_register (char *brick_path, char *scratch_dir, -                       char *log_file, int log_level, int max_reconnects) +gf_changelog_register_brick (xlator_t *this, +                             struct gf_brick_spec *brick, +                             gf_boolean_t ordered, void *xl)  { -        int             i                          = 0; -        int             ret                        = -1; -        int             errn                       = 0; -        xlator_t       *this                       = NULL; -        gf_changelog_t *gfc                        = NULL; -        char            hist_scratch_dir[PATH_MAX] = {0,}; -        struct stat     buf                        = {0,}; - -        this = THIS; -        if (!this->ctx) -                goto out; - -        errno = ENOMEM; - -        gfc = GF_CALLOC (1, sizeof (*gfc), -                         gf_changelog_mt_libgfchangelog_t); -        if (!gfc) -                goto out; - -        gfc->this = this; - -        gfc->gfc_dir = NULL; -        gfc->gfc_fd = gfc->gfc_sockfd = -1; - -        if (stat (scratch_dir, &buf) && errno == ENOENT) { -                ret = mkdir_p (scratch_dir, 0600, _gf_true); -                if (ret) { -                        errn = errno; -                        goto cleanup; -                } -        } - -        gfc->gfc_working_dir = realpath (scratch_dir, NULL); -        if (!gfc->gfc_working_dir) { -                errn = errno; -                goto cleanup; -        } +        return gf_setup_brick_connection (this, brick, ordered, xl); +} -        /* Begin: Changes for History API */ -        gfc->hist_gfc = NULL; +static int +gf_changelog_setup_logging (xlator_t *this, char *logfile, int loglevel) +{ +        /* passing ident as NULL means to use default ident for syslog */ +        if (gf_log_init (this->ctx, logfile, NULL)) +                return -1; -        gfc->hist_gfc = GF_CALLOC (1, sizeof (*gfc), -                         gf_changelog_mt_libgfchangelog_t); -        if (!gfc->hist_gfc) -                goto cleanup; +        gf_log_set_loglevel ((loglevel == -1) ? GF_LOG_INFO : +                             loglevel); +        return 0; +} -        gfc->hist_gfc->gfc_dir = NULL; -        gfc->hist_gfc->gfc_fd = gfc->hist_gfc->gfc_sockfd = -1; -        gfc->hist_gfc->this = NULL; +int +gf_changelog_register_generic (struct gf_brick_spec *bricks, int count, +                               int ordered, char *logfile, int lvl, void *xl) +{ +        int                   ret        = 0; +        xlator_t             *this       = NULL; +        xlator_t             *old_this   = NULL; +        struct gf_brick_spec *brick      = NULL; +        gf_boolean_t          need_order = _gf_false; -        (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX); -        (void) snprintf (hist_scratch_dir, PATH_MAX, -                         "%s/"GF_CHANGELOG_HISTORY_DIR"/", -                         gfc->gfc_working_dir); +        SAVE_THIS (xl); -        ret = mkdir_p (hist_scratch_dir, 0600, _gf_false); -        if (ret) { -                errn = errno; -                goto cleanup; -        } +        this = THIS; +        if (!this) +                goto error_return; -        gfc->hist_gfc->gfc_working_dir = realpath (hist_scratch_dir, NULL); -        if (!gfc->hist_gfc->gfc_working_dir) { -                errn = errno; -                goto cleanup; -        } +        ret = gf_changelog_setup_logging (this, logfile, lvl); +        if (ret) +                goto error_return; -        ret = gf_changelog_open_dirs (gfc->hist_gfc); -        if (ret) { -                errn = errno; -                gf_log (this->name, GF_LOG_ERROR, -                        "could not create entries in history scratch dir"); -                goto cleanup; -        } +        need_order = (ordered) ? _gf_true : _gf_false; -        (void) strncpy (gfc->hist_gfc->gfc_brickpath, brick_path, PATH_MAX); +        brick = bricks; +        while (count--) { +                gf_log (this->name, GF_LOG_INFO, +                        "Registering brick: %s [notify filter: %d]", +                        brick->brick_path, brick->filter); -        for (i=0; i < 256; i++) { -                gfc->hist_gfc->rfc3986[i] = -                        (isalnum(i) || i == '~' || -                        i == '-' || i == '.' || i == '_') ? i : 0; -        } -        /* End: Changes for History API*/ +                ret = gf_changelog_register_brick (this, brick, need_order, xl); +                if (ret != 0) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "Error registering with changelog xlator"); +                        break; +                } -        ret = gf_changelog_open_dirs (gfc); -        if (ret) { -                errn = errno; -                gf_log (this->name, GF_LOG_ERROR, -                        "could not create entries in scratch dir"); -                goto cleanup; +                brick++;          } -        /* passing ident as NULL means to use default ident for syslog */ -        if (gf_log_init (this->ctx, log_file, NULL)) -                goto cleanup; - -        gf_log_set_loglevel ((log_level == -1) ? GF_LOG_INFO : -                             log_level); +        if (ret != 0) +                goto cleanup_inited_bricks; -        gfc->gfc_connretries = (max_reconnects <= 0) ? 1 : max_reconnects; -        (void) strncpy (gfc->gfc_brickpath, brick_path, PATH_MAX); +        RESTORE_THIS(); +        return 0; -        ret = gf_changelog_notification_init (this, gfc); -        if (ret) { -                errn = errno; -                goto cleanup; -        } + cleanup_inited_bricks: +        gf_cleanup_connections (this); + error_return: +        RESTORE_THIS(); +        return -1; +} -        ret = gf_thread_create (&gfc->gfc_changelog_processor, -				NULL, gf_changelog_process, gfc); -        if (ret) { -                errn = errno; -                gf_log (this->name, GF_LOG_ERROR, -                        "error creating changelog processor thread" -                        " new changes won't be recorded!!!"); -                goto cleanup; -        } +/** + * @API + *  gf_changelog_register() + * + * This is _NOT_ a generic register API. It's a special API to handle + * updates at a journal granulality. This is used by consumers wanting + * to process persistent journal such as geo-replication via a set of + * APIs. All of this is required to maintain backward compatibility. + * Owner specific private data is stored in ->api (in gf_private_t), + * which is used by APIs to access it's private data. This limits + * the library access to a single brick, but that's how it used to + * be anyway. Furthermore, this API solely _owns_ "this", therefore + * callers already having a notion of "this" are expected to use the + * newer API. + * + * Newer applications wanting to use this library need not face this + * limitation and reply of the much more feature rich generic register + * API, which is purely callback based. + * + * NOTE: @max_reconnects is not used but required for backward compat. + * + * For generic API, refer gf_changelog_register_generic(). + */ +int +gf_changelog_register (char *brick_path, char *scratch_dir, +                       char *log_file, int log_level, int max_reconnects) +{ +        struct gf_brick_spec brick = {0,}; -        for (i=0; i < 256; i++) { -                gfc->rfc3986[i] = -                        (isalnum(i) || i == '~' || -                        i == '-' || i == '.' || i == '_') ? i : 0; -        } +        THIS = master; -        ret = 0; -        this->private = gfc; +        brick.brick_path = brick_path; +        brick.filter     = CHANGELOG_OP_TYPE_JOURNAL; -        goto out; +        brick.init         = gf_changelog_journal_init; +        brick.fini         = gf_changelog_journal_fini; +        brick.callback     = gf_changelog_handle_journal; +        brick.connected    = gf_changelog_journal_connect; +        brick.disconnected = gf_changelog_journal_disconnect; - cleanup: -        if (gfc->hist_gfc) { -                gf_changelog_cleanup (gfc->hist_gfc); -                GF_FREE (gfc->hist_gfc); -        } -        gf_changelog_cleanup (gfc); -        GF_FREE (gfc); -        this->private = NULL; -        errno = errn; +        brick.ptr = scratch_dir; - out: -        return ret; +        return gf_changelog_register_generic (&brick, 1, 1, +                                              log_file, log_level, NULL);  } diff --git a/xlators/features/changelog/lib/src/gf-history-changelog.c b/xlators/features/changelog/lib/src/gf-history-changelog.c index 8a527dd6e4b..12f51da8fa2 100644 --- a/xlators/features/changelog/lib/src/gf-history-changelog.c +++ b/xlators/features/changelog/lib/src/gf-history-changelog.c @@ -14,6 +14,7 @@  #include "syscall.h"  #include "gf-changelog-helpers.h" +#include "gf-changelog-journal.h"  /* from the changelog translator */  #include "changelog-misc.h" @@ -36,12 +37,12 @@  int  gf_history_changelog_done (char *file)  { -        int                     ret                     = -1; -        char                    *buffer                 = NULL; -        xlator_t                *this                   = NULL; -        gf_changelog_t          *gfc                    = NULL; -        gf_changelog_t          *hist_gfc               = NULL; -        char                    to_path[PATH_MAX]       = {0,}; +        int                     ret               = -1; +        char                   *buffer            = NULL; +        xlator_t               *this              = NULL; +        gf_changelog_journal_t *jnl               = NULL; +        gf_changelog_journal_t *hist_jnl          = NULL; +        char                    to_path[PATH_MAX] = {0,};          errno = EINVAL; @@ -49,28 +50,28 @@ gf_history_changelog_done (char *file)          if (!this)                  goto out; -        gfc = (gf_changelog_t *) this->private; -        if (!gfc) +        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); +        if (!jnl)                  goto out; -        hist_gfc = gfc->hist_gfc; -        if (!hist_gfc) +        hist_jnl = jnl->hist_jnl; +        if (!hist_jnl)                  goto out;          if (!file || !strlen (file))                  goto out; -        /* make sure 'file' is inside ->gfc_working_dir */ +        /* make sure 'file' is inside ->jnl_working_dir */          buffer = realpath (file, NULL);          if (!buffer)                  goto out; -        if (strncmp (hist_gfc->gfc_working_dir, -                     buffer, strlen (hist_gfc->gfc_working_dir))) +        if (strncmp (hist_jnl->jnl_working_dir, +                     buffer, strlen (hist_jnl->jnl_working_dir)))                  goto out;          (void) snprintf (to_path, PATH_MAX, "%s%s", -                         hist_gfc->gfc_processed_dir, basename (buffer)); +                         hist_jnl->jnl_processed_dir, basename (buffer));          gf_log (this->name, GF_LOG_DEBUG,                  "moving %s to processed directory", file);          ret = rename (buffer, to_path); @@ -102,9 +103,9 @@ gf_history_changelog_done (char *file)  int  gf_history_changelog_start_fresh ()  { -        xlator_t                *this                   = NULL; -        gf_changelog_t          *gfc                    = NULL; -        gf_changelog_t          *hist_gfc               = NULL; +        xlator_t               *this     = NULL; +        gf_changelog_journal_t *jnl      = NULL; +        gf_changelog_journal_t *hist_jnl = NULL;          this = THIS;          if (!this) @@ -112,15 +113,15 @@ gf_history_changelog_start_fresh ()          errno = EINVAL; -        gfc = (gf_changelog_t *) this->private; -        if (!gfc) +        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); +        if (!jnl)                  goto out; -        hist_gfc = gfc->hist_gfc; -        if (!hist_gfc) +        hist_jnl = jnl->hist_jnl; +        if (!hist_jnl)                  goto out; -        if (gf_ftruncate (hist_gfc->gfc_fd, 0)) +        if (gf_ftruncate (hist_jnl->jnl_fd, 0))                  goto out;          return 0; @@ -147,12 +148,17 @@ gf_history_changelog_start_fresh ()  ssize_t  gf_history_changelog_next_change (char *bufptr, size_t maxlen)  { -        ssize_t                 size                    = -1; -        int                     tracker_fd              = 0; -        xlator_t                *this                   = NULL; -        gf_changelog_t          *gfc                    = NULL; -        gf_changelog_t          *hist_gfc               = NULL; -        char                    buffer[PATH_MAX]        = {0,}; +        ssize_t                 size             = -1; +        int                     tracker_fd       = 0; +        xlator_t               *this             = NULL; +        gf_changelog_journal_t *jnl              = NULL; +        gf_changelog_journal_t *hist_jnl         = NULL; +        char                    buffer[PATH_MAX] = {0,}; + +        if (maxlen > PATH_MAX) { +                errno = ENAMETOOLONG; +                goto out; +        }          errno = EINVAL; @@ -160,15 +166,15 @@ gf_history_changelog_next_change (char *bufptr, size_t maxlen)          if (!this)                  goto out; -        gfc = (gf_changelog_t *) this->private; -        if (!gfc) +        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); +        if (!jnl)                  goto out; -        hist_gfc = gfc->hist_gfc; -        if (!hist_gfc) +        hist_jnl = jnl->hist_jnl; +        if (!hist_jnl)                  goto out; -        tracker_fd = hist_gfc->gfc_fd; +        tracker_fd = hist_jnl->jnl_fd;          size = gf_readline (tracker_fd, buffer, maxlen);          if (size < 0) { @@ -206,56 +212,60 @@ out:  ssize_t  gf_history_changelog_scan ()  { -        int             ret        = 0; -        int             tracker_fd = 0; -        size_t          len        = 0; -        size_t          off        = 0; -        xlator_t       *this       = NULL; -        size_t          nr_entries = 0; -        gf_changelog_t *gfc        = NULL; -        gf_changelog_t *hist_gfc   = NULL; -        struct dirent  *entryp     = NULL; -        struct dirent  *result     = NULL; -        char buffer[PATH_MAX]      = {0,}; -        static int    is_last_scan = 0; +        int                     ret          = 0; +        int                     tracker_fd   = 0; +        size_t                  len          = 0; +        size_t                  off          = 0; +        xlator_t               *this         = NULL; +        size_t                  nr_entries   = 0; +        gf_changelog_journal_t *jnl          = NULL; +        gf_changelog_journal_t *hist_jnl     = NULL; +        struct dirent          *entryp       = NULL; +        struct dirent          *result       = NULL; +        char buffer[PATH_MAX]                = {0,}; +        static int              is_last_scan;          this = THIS;          if (!this)                  goto out; -        gfc = (gf_changelog_t *) this->private; -        if (!gfc) +        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); +        if (!jnl)                  goto out; +        if (JNL_IS_API_DISCONNECTED (jnl)) { +                errno = ENOTCONN; +                goto out; +        } -        hist_gfc = gfc->hist_gfc; -        if (!hist_gfc) +        hist_jnl = jnl->hist_jnl; +        if (!hist_jnl)                  goto out;   retry:          if (is_last_scan == 1)                  return 0; -        if (hist_gfc->hist_done == 0) +        if (hist_jnl->hist_done == 0)                  is_last_scan = 1;          errno = EINVAL; -        if (hist_gfc->hist_done == -1) +        if (hist_jnl->hist_done == -1)                  goto out; -        tracker_fd = hist_gfc->gfc_fd; +        tracker_fd = hist_jnl->jnl_fd;          if (gf_ftruncate (tracker_fd, 0))                  goto out;          len = offsetof (struct dirent, d_name) -                + pathconf (hist_gfc->gfc_processing_dir, _PC_NAME_MAX) + 1; +                + pathconf (hist_jnl->jnl_processing_dir, _PC_NAME_MAX) + 1;          entryp = GF_CALLOC (1, len,                              gf_changelog_mt_libgfchangelog_dirent_t);          if (!entryp)                  goto out; -        rewinddir (hist_gfc->gfc_dir); +        rewinddir (hist_jnl->jnl_dir);          while (1) { -                ret = readdir_r (hist_gfc->gfc_dir, entryp, &result); +                ret = readdir_r (hist_jnl->jnl_dir, entryp, &result);                  if (ret || !result)                          break; @@ -265,9 +275,9 @@ gf_history_changelog_scan ()                  nr_entries++; -                GF_CHANGELOG_FILL_BUFFER (hist_gfc->gfc_processing_dir, +                GF_CHANGELOG_FILL_BUFFER (hist_jnl->jnl_processing_dir,                                            buffer, off, -                                          strlen (hist_gfc->gfc_processing_dir)); +                                          strlen (hist_jnl->jnl_processing_dir));                  GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,                                            off, strlen (entryp->d_name));                  GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1); @@ -284,7 +294,8 @@ gf_history_changelog_scan ()          GF_FREE (entryp);          gf_log (this->name, GF_LOG_DEBUG, -                "hist_done %d, is_last_scan: %d", hist_gfc->hist_done, is_last_scan); +                "hist_done %d, is_last_scan: %d", hist_jnl->hist_done, +                is_last_scan);          if (!result) {                  if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) { @@ -490,7 +501,7 @@ gf_changelog_consume_wrap (void* data)          /* TODO: handle short reads and EOF. */          ret = gf_changelog_consume (ccd->this, -                                    ccd->gfc, ccd->changelog, _gf_true); +                                    ccd->jnl, ccd->changelog, _gf_true);          if (ret) {                  gf_log (this->name, GF_LOG_ERROR,                          "could not parse changelog: %s", ccd->changelog); @@ -515,8 +526,8 @@ void *  gf_history_consume (void * data)  {          xlator_t                    *this              = NULL; -        gf_changelog_t              *gfc               = NULL; -        gf_changelog_t              *hist_gfc          = NULL; +        gf_changelog_journal_t              *jnl               = NULL; +        gf_changelog_journal_t              *hist_jnl          = NULL;          int                          ret               = 0;          int                          iter              = 0;          int                          fd                = -1; @@ -549,14 +560,14 @@ gf_history_consume (void * data)                  goto out;          } -        gfc = (gf_changelog_t *) this->private; -        if (!gfc) { +        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); +        if (!jnl) {                  ret = -1;                  goto out;          } -        hist_gfc = gfc->hist_gfc; -        if (!hist_gfc) { +        hist_jnl = jnl->hist_jnl; +        if (!hist_jnl) {                  ret = -1;                  goto out;          } @@ -568,7 +579,7 @@ gf_history_consume (void * data)                          curr = &ccd[iter];                          curr->this   = this; -                        curr->gfc    = hist_gfc; +                        curr->jnl    = hist_jnl;                          curr->fd     = fd;                          curr->offset = from * (len + 1); @@ -613,7 +624,7 @@ gf_history_consume (void * data)                          }                          ret = gf_changelog_publish (curr->this, -                                                    curr->gfc, curr->changelog); +                                                    curr->jnl, curr->changelog);                          if (ret) {                                  publish = _gf_false;                                  gf_log (this->name, GF_LOG_ERROR, @@ -623,7 +634,7 @@ gf_history_consume (void * data)          }         /* informing "parsing done". */ -        hist_gfc->hist_done = (publish == _gf_true) ? 0 : -1; +        hist_jnl->hist_done = (publish == _gf_true) ? 0 : -1;  out:          if (fd != -1) @@ -740,8 +751,8 @@ gf_history_changelog (char* changelog_dir, unsigned long start,          unsigned long                   from                    = 0;          unsigned long                   total_changelog         = 0;          xlator_t                        *this                   = NULL; -        gf_changelog_t                  *gfc                    = NULL; -        gf_changelog_t                  *hist_gfc               = NULL; +        gf_changelog_journal_t                  *jnl                    = NULL; +        gf_changelog_journal_t                  *hist_jnl               = NULL;          gf_changelog_history_data_t     *hist_data              = NULL;          DIR                             *dirp                   = NULL;          struct dirent                   *dp                     = NULL; @@ -762,14 +773,14 @@ gf_history_changelog (char* changelog_dir, unsigned long start,                  goto out;          } -        gfc = (gf_changelog_t *) this->private; -        if (!gfc) { +        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); +        if (!jnl) {                  ret = -1;                  goto out;          } -        hist_gfc = (gf_changelog_t *) gfc->hist_gfc; -        if (!hist_gfc) { +        hist_jnl = (gf_changelog_journal_t *) jnl->hist_jnl; +        if (!hist_jnl) {                  ret = -1;                  goto out;          } @@ -917,7 +928,7 @@ out:                  return ret;          } -        hist_gfc->hist_done = 1; +        hist_jnl->hist_done = 1;          *actual_end = ts2;          return ret; diff --git a/xlators/features/changelog/src/Makefile.am b/xlators/features/changelog/src/Makefile.am index 18c41e7d7d1..8712b9d059f 100644 --- a/xlators/features/changelog/src/Makefile.am +++ b/xlators/features/changelog/src/Makefile.am @@ -3,16 +3,24 @@ xlator_LTLIBRARIES = changelog.la  xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features  noinst_HEADERS = changelog-helpers.h changelog-mem-types.h changelog-rt.h \ -	changelog-misc.h changelog-encoders.h changelog-notifier.h +		 changelog-rpc-common.h changelog-misc.h changelog-encoders.h \ +		 changelog-rpc-common.h changelog-rpc.h changelog-ev-handle.h  changelog_la_LDFLAGS = -module -avoid-version  changelog_la_SOURCES = changelog.c changelog-rt.c changelog-helpers.c \ -	changelog-encoders.c changelog-notifier.c changelog-barrier.c -changelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la +		changelog-encoders.c changelog-rpc.c changelog-barrier.c \ +		changelog-rpc-common.c changelog-ev-handle.c +changelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \ +		      $(top_builddir)/rpc/xdr/src/libgfxdr.la \ +		      $(top_builddir)/rpc/rpc-lib/src/libgfrpc.la -AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src -fPIC -D_FILE_OFFSET_BITS=64 \ -	-D_GNU_SOURCE -D$(GF_HOST_OS) -DDATADIR=\"$(localstatedir)\" +AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \ +	      -I$(top_srcdir)/rpc/xdr/src -I$(top_srcdir)/rpc/rpc-lib/src \ +	      -I$(top_srcdir)/rpc/rpc-transport/socket/src \ +	      -I$(top_srcdir)/xlators/features/changelog/lib/src/ \ +	      -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -D$(GF_HOST_OS) \ +	      -DDATADIR=\"$(localstatedir)\"  AM_CFLAGS = -Wall $(GF_CFLAGS) diff --git a/xlators/features/changelog/src/changelog-encoders.c b/xlators/features/changelog/src/changelog-encoders.c index 08626ee2f22..ea9db4061ca 100644 --- a/xlators/features/changelog/src/changelog-encoders.c +++ b/xlators/features/changelog/src/changelog-encoders.c @@ -191,7 +191,7 @@ cb_encoder[] = {  };  void -changelog_encode_change( changelog_priv_t * priv) +changelog_encode_change(changelog_priv_t *priv)  {          priv->ce = &cb_encoder[priv->encode_mode];  } diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c new file mode 100644 index 00000000000..ca7443cfd22 --- /dev/null +++ b/xlators/features/changelog/src/changelog-ev-handle.c @@ -0,0 +1,382 @@ +/* +   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "changelog-ev-handle.h" +#include "changelog-rpc-common.h" +#include "changelog-helpers.h" + +struct rpc_clnt_program changelog_ev_program; + +#define NR_IOVEC  (MAX_IOVEC - 3) +struct ev_rpc_vec { +        int count; +        struct iovec vector[NR_IOVEC]; + +        /* sequence number */ +        unsigned long seq; +}; + +struct ev_rpc { +        rbuf_list_t     *rlist; +        struct rpc_clnt *rpc; +        struct ev_rpc_vec vec; +}; + +/** + * As of now this just does the minimal (retval logging). Going further + * un-acknowledges sequence numbers can be retransmitted and other + * intelligence can be built into the server. + */ +int +changelog_event_dispatch_cbk (struct rpc_req *req, +                              struct iovec *iov, int count, void *myframe) +{ +        return 0; +} + +/* dispatcher RPC */ +inline int +changelog_dispatch_vec (call_frame_t *frame, xlator_t *this, +                        struct rpc_clnt *rpc, struct ev_rpc_vec *vec) +{ +         struct timeval      tv  = {0,}; +         changelog_event_req req = {0,}; + +         (void) gettimeofday (&tv, NULL); + +         /** +          * Event dispatch RPC header contains a sequence number for each +          * dispatch. This allows the reciever to order the request before +          * processing. +          */ +         req.seq     = vec->seq; +         req.tv_sec  = tv.tv_sec; +         req.tv_usec = tv.tv_usec; + +         return changelog_rpc_sumbit_req (rpc, (void *)&req, +                                          frame, &changelog_ev_program, +                                          CHANGELOG_REV_PROC_EVENT, +                                          vec->vector, vec->count, NULL, +                                          this, changelog_event_dispatch_cbk, +                                          (xdrproc_t) xdr_changelog_event_req); + } + + int + changelog_event_dispatch_rpc (call_frame_t *frame, xlator_t *this, void *data) + { +         int                idx      = 0; +         int                count    = 0; +         int                ret      = 0; +         unsigned long      range    = 0; +         unsigned long      sequence = 0; +         rbuf_iovec_t      *rvec     = NULL; +         struct ev_rpc     *erpc     = NULL; +         struct rlist_iter  riter    = {{0,},}; + +         /* dispatch NR_IOVEC IO vectors at a time. */ + +         erpc = data; +         RLIST_GET_SEQ (erpc->rlist, sequence, range); + +         rlist_iter_init (&riter, erpc->rlist); + +         rvec_for_each_entry (rvec, &riter) { +                 idx = count % NR_IOVEC; +                 if (++count == NR_IOVEC) { +                         erpc->vec.vector[idx] = rvec->iov; +                         erpc->vec.seq = sequence++; +                         erpc->vec.count = NR_IOVEC; + +                         ret = changelog_dispatch_vec (frame, this, +                                                       erpc->rpc, &erpc->vec); +                         if (ret) +                                 break; +                         count = 0; +                         continue; +                 } + +                 erpc->vec.vector[idx] = rvec->iov; +         } + +         if (ret) +                 goto error_return; + +         idx = count % NR_IOVEC; +         if (idx) { +                 erpc->vec.seq = sequence; +                 erpc->vec.count = idx; + +                 ret = changelog_dispatch_vec (frame, this, +                                               erpc->rpc, &erpc->vec); +         } + + error_return: +         return ret; +} + +int +changelog_rpc_notify (struct rpc_clnt *rpc, +                      void *mydata, rpc_clnt_event_t event, void *data) +{ +        xlator_t                *this      = NULL; +        changelog_rpc_clnt_t    *crpc      = NULL; +        changelog_clnt_t        *c_clnt    = NULL; +        changelog_priv_t        *priv      = NULL; +        changelog_ev_selector_t *selection = NULL; + +        crpc = mydata; +        this = crpc->this; +        c_clnt = crpc->c_clnt; + +        priv = this->private; + +        switch (event) { +        case RPC_CLNT_CONNECT: +                rpc_clnt_set_connected (&rpc->conn); +                selection = &priv->ev_selection; + +                LOCK (&c_clnt->wait_lock); +                { +                        LOCK (&c_clnt->active_lock); +                        { +                                changelog_select_event (this, selection, +                                                        crpc->filter); +                                list_move_tail (&crpc->list, &c_clnt->active); +                        } +                        UNLOCK (&c_clnt->active_lock); +                } +                UNLOCK (&c_clnt->wait_lock); + +                break; +        case RPC_CLNT_DISCONNECT: +                rpc_clnt_disable (crpc->rpc); +                selection = &priv->ev_selection; + +                LOCK (&crpc->lock); +                { +                        changelog_deselect_event (this, selection, +                                                  crpc->filter); +                        changelog_set_disconnect_flag (crpc, _gf_true); +                } +                UNLOCK (&crpc->lock); + +                break; +        case RPC_CLNT_MSG: +        case RPC_CLNT_DESTROY: +                break; +        } + +        return 0; +} + +void * +changelog_ev_connector (void *data) +{ +        xlator_t             *this   = NULL; +        changelog_clnt_t     *c_clnt = NULL; +        changelog_rpc_clnt_t *crpc   = NULL; + +        c_clnt = data; +        this = c_clnt->this; + +        while (1) { +                pthread_mutex_lock (&c_clnt->pending_lock); +                { +                        while (list_empty (&c_clnt->pending)) +                                pthread_cond_wait (&c_clnt->pending_cond, +                                                   &c_clnt->pending_lock); +                        crpc = list_first_entry (&c_clnt->pending, +                                                 changelog_rpc_clnt_t, list); +                        crpc->rpc = +                                changelog_rpc_client_init (this, crpc, +                                                           crpc->sock, +                                                           changelog_rpc_notify); +                        if (!crpc->rpc) { +                                gf_log (this->name, GF_LOG_ERROR, "failed to " +                                        "connect back.. <%s>", crpc->sock); +                                crpc->cleanup (crpc); +                                goto mutex_unlock; +                        } + +                        LOCK (&c_clnt->wait_lock); +                        { +                                list_move_tail (&crpc->list, &c_clnt->waitq); +                        } +                        UNLOCK (&c_clnt->wait_lock); +                } +        mutex_unlock: +                pthread_mutex_unlock (&c_clnt->pending_lock); +        } + +        return NULL; +} + +void +changelog_ev_cleanup_connections (xlator_t *this, changelog_clnt_t *c_clnt) +{ +        int ret = 0; +        changelog_rpc_clnt_t *crpc = NULL; + +        /* cleanup active connections */ +        LOCK (&c_clnt->active_lock); +        { +                list_for_each_entry (crpc, &c_clnt->active, list) { +                        rpc_clnt_disable (crpc->rpc); +                } +        } +        UNLOCK (&c_clnt->active_lock); +} + +/** + * TODO: granularize lock + * + * If we have multiple threads dispatching events, doing it this way is + * a performance bottleneck. + */ + +static inline changelog_rpc_clnt_t * +get_client (changelog_clnt_t *c_clnt, struct list_head **next) +{ +        changelog_rpc_clnt_t *crpc = NULL; + +        LOCK (&c_clnt->active_lock); +        { +                if (*next == &c_clnt->active) +                        goto unblock; +                crpc = list_entry (*next, changelog_rpc_clnt_t, list); +                changelog_rpc_clnt_ref (crpc); +                *next = (*next)->next; +        } + unblock: +        UNLOCK (&c_clnt->active_lock); + +        return crpc; +} + +static inline void +put_client (changelog_clnt_t *c_clnt, changelog_rpc_clnt_t *crpc) +{ +        LOCK (&c_clnt->active_lock); +        { +                changelog_rpc_clnt_unref (crpc); +        } +        UNLOCK (&c_clnt->active_lock); +} + +void +_dispatcher (rbuf_list_t *rlist, void *arg) +{ +        int                   ret    = 0; +        xlator_t             *this   = NULL; +        changelog_clnt_t     *c_clnt = NULL; +        changelog_rpc_clnt_t *crpc   = NULL; +        changelog_rpc_clnt_t *tmp    = NULL; +        struct ev_rpc         erpc   = {0,}; +        struct list_head     *next   = NULL; + +        c_clnt = arg; +        this = c_clnt->this; + +        erpc.rlist = rlist; +        next = c_clnt->active.next; + +        while (1) { +                crpc = get_client (c_clnt, &next); +                if (!crpc) +                        break; +                erpc.rpc = crpc->rpc; +                ret = changelog_invoke_rpc (this, crpc->rpc, +                                            &changelog_ev_program, +                                            CHANGELOG_REV_PROC_EVENT, &erpc); +                put_client (c_clnt, crpc); +        } +} + +/** this is called under rotbuff's lock */ +void +sequencer (rbuf_list_t *rlist, void *mydata) +{ +        unsigned long     range  = 0; +        changelog_clnt_t *c_clnt = 0; + +        c_clnt = mydata; + +        range = (RLIST_ENTRY_COUNT (rlist)) / NR_IOVEC; +        if ((RLIST_ENTRY_COUNT (rlist)) % NR_IOVEC) +                range++; +        RLIST_STORE_SEQ (rlist, c_clnt->sequence, range); + +        c_clnt->sequence += range; +} + +void * +changelog_ev_dispatch (void *data) +{ +        int                   ret    = 0; +        void                 *opaque = NULL; +        xlator_t             *this   = NULL; +        changelog_clnt_t     *c_clnt = NULL; +        struct timeval        tv     = {0,}; + +        c_clnt = data; +        this = c_clnt->this; + +        while (1) { +                /* TODO: change this to be pthread cond based.. later */ +                tv.tv_sec = 1; +                tv.tv_usec = 0; +                select (0, NULL, NULL, NULL, &tv); + +                ret = rbuf_get_buffer (c_clnt->rbuf, +                                       &opaque, sequencer, c_clnt); +                if (ret != RBUF_CONSUMABLE) { +                        if (ret != RBUF_EMPTY) +                                gf_log (this->name, GF_LOG_WARNING, +                                        "Failed to get buffer for RPC dispatch " +                                        "[rbuf retval: %d]", ret); +                        continue; +                } + +                ret = rbuf_wait_for_completion (c_clnt->rbuf, +                                                opaque, _dispatcher, c_clnt); +                if (ret) +                        gf_log (this->name, GF_LOG_WARNING, +                                "failed to put buffer after consumption"); +        } + +        return NULL; +} + +void +changelog_ev_queue_connection (changelog_clnt_t *c_clnt, +                               changelog_rpc_clnt_t *crpc) +{ +        pthread_mutex_lock (&c_clnt->pending_lock); +        { +                list_add_tail (&crpc->list, &c_clnt->pending); +                pthread_cond_signal (&c_clnt->pending_cond); +        } +        pthread_mutex_unlock (&c_clnt->pending_lock); +} + +struct rpc_clnt_procedure changelog_ev_procs[CHANGELOG_REV_PROC_MAX] = { +        [CHANGELOG_REV_PROC_NULL]  = {"NULL", NULL}, +        [CHANGELOG_REV_PROC_EVENT] = { +                "EVENT DISPATCH", changelog_event_dispatch_rpc +        }, +}; + +struct rpc_clnt_program changelog_ev_program = { +        .progname  = "CHANGELOG EVENT DISPATCHER", +        .prognum   = CHANGELOG_REV_RPC_PROCNUM, +        .progver   = CHANGELOG_REV_RPC_PROCVER, +        .numproc   = CHANGELOG_REV_PROC_MAX, +        .proctable = changelog_ev_procs, +}; diff --git a/xlators/features/changelog/src/changelog-ev-handle.h b/xlators/features/changelog/src/changelog-ev-handle.h new file mode 100644 index 00000000000..eef0492a9ee --- /dev/null +++ b/xlators/features/changelog/src/changelog-ev-handle.h @@ -0,0 +1,140 @@ +/* +   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef __CHANGELOG_EV_HANDLE_H +#define __CHANGELOG_EV_HANDLE_H + +#include "list.h" +#include "xlator.h" +#include "rpc-clnt.h" + +#include "rot-buffs.h" + +struct changelog_clnt; + +typedef struct changelog_rpc_clnt { +        xlator_t *this; + +        gf_lock_t lock; + +        unsigned long ref; +        gf_boolean_t  disconnected; + +        unsigned int filter; +        char sock[UNIX_PATH_MAX]; + +        struct changelog_clnt *c_clnt;   /* back pointer to list holder */ + +        struct rpc_clnt *rpc;            /* RPC client endpoint */ + +        struct list_head list;           /* ->pending, ->waitq, ->active */ + +        void (*cleanup) +        (struct changelog_rpc_clnt *);   /* cleanup handler */ +} changelog_rpc_clnt_t; + +static inline void +changelog_rpc_clnt_ref (changelog_rpc_clnt_t *crpc) +{ +        LOCK (&crpc->lock); +        { +                ++crpc->ref; +        } +        UNLOCK (&crpc->lock); +} + +static inline void +changelog_set_disconnect_flag (changelog_rpc_clnt_t *crpc, gf_boolean_t flag) +{ +        crpc->disconnected = flag; +} + +static inline int +changelog_rpc_clnt_is_disconnected (changelog_rpc_clnt_t *crpc) +{ +        return (crpc->disconnected == _gf_true); +} + +static inline void +changelog_rpc_clnt_unref (changelog_rpc_clnt_t *crpc) +{ +        gf_boolean_t gone = _gf_false; + +        LOCK (&crpc->lock); +        { +                if (!(--crpc->ref) +                    && changelog_rpc_clnt_is_disconnected (crpc)) { +                        list_del (&crpc->list); +                        gone = _gf_true; +                } +        } +        UNLOCK (&crpc->lock); + +        if (gone) +                crpc->cleanup (crpc); +} + +/** + * This structure holds pending and active clients. On probe RPC all + * an instance of the above structure (@changelog_rpc_clnt) is placed + * in ->pending and gets moved to ->active on a successful connect. + * + * locking rules: + * + * Manipulating ->pending + * ->pending_lock + *    ->pending + * + * Manipulating ->active + * ->active_lock + *    ->active + * + * Moving object from ->pending to ->active + * ->pending_lock + *   ->active_lock + * + * Objects are _never_ moved from ->active to ->pending, i.e., during + * disconnection, the object is destroyed. Well, we could have tried + * to reconnect, but that's pure waste.. let the other end reconnect. + */ + +typedef struct changelog_clnt { +        xlator_t *this; + +        /* pending connections */ +        pthread_mutex_t pending_lock; +        pthread_cond_t pending_cond; +        struct list_head pending; + +        /* current active connections */ +        gf_lock_t active_lock; +        struct list_head active; + +        gf_lock_t wait_lock; +        struct list_head waitq; + +        /* consumer part of rot-buffs */ +        rbuf_t *rbuf; +        unsigned long sequence; +} changelog_clnt_t; + +void *changelog_ev_connector (void *); + +void *changelog_ev_dispatch (void *); + +/* APIs */ +void +changelog_ev_queue_connection (changelog_clnt_t *, changelog_rpc_clnt_t *); + +void +changelog_ev_cleanup_connections (xlator_t *, changelog_clnt_t *); + +#endif + diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c index 3af2938190d..5c755d76d69 100644 --- a/xlators/features/changelog/src/changelog-helpers.c +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -24,6 +24,7 @@  #include "changelog-mem-types.h"  #include "changelog-encoders.h" +#include "changelog-rpc-common.h"  #include <pthread.h>  static inline void @@ -57,7 +58,7 @@ changelog_cleanup_free_mutex (void *arg_mutex)              pthread_mutex_unlock(p_mutex);  } -void +int  changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)  {          int   ret    = 0; @@ -65,7 +66,7 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)          /* send a cancel request to the thread */          ret = pthread_cancel (thr_id); -        if (ret) { +        if (ret != 0) {                  gf_log (this->name, GF_LOG_ERROR,                          "could not cancel thread (reason: %s)",                          strerror (errno)); @@ -73,14 +74,14 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)          }          ret = pthread_join (thr_id, &retval); -        if (ret || (retval != PTHREAD_CANCELED)) { +        if ((ret != 0) || (retval != PTHREAD_CANCELED)) {                  gf_log (this->name, GF_LOG_ERROR,                          "cancel request not adhered as expected"                          " (reason: %s)", strerror (errno));          }   out: -        return; +        return ret;  }  inline void * @@ -98,6 +99,145 @@ changelog_get_usable_buffer (changelog_local_t *local)          return cld->cld_iobuf->ptr;  } +static inline int +changelog_selector_index (unsigned int selector) +{ +        return (ffs (selector) - 1); +} + +inline int +changelog_ev_selected (xlator_t *this, +                       changelog_ev_selector_t *selection, +                       unsigned int selector) +{ +        int idx = 0; + +        idx = changelog_selector_index (selector); +        gf_log (this->name, GF_LOG_DEBUG, +                "selector ref count for %d (idx: %d): %d", +                selector, idx, selection->ref[idx]); +        /* this can be lockless */ +        return (idx < CHANGELOG_EV_SELECTION_RANGE +                 && (selection->ref[idx] > 0)); +} + +inline void +changelog_select_event (xlator_t *this, +                        changelog_ev_selector_t *selection, +                        unsigned int selector) +{ +        int idx = 0; + +        LOCK (&selection->reflock); +        { +                while (selector) { +                        idx = changelog_selector_index (selector); +                        if (idx < CHANGELOG_EV_SELECTION_RANGE) { +                                selection->ref[idx]++; +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "selecting event %d", idx); +                        } +                        selector &= ~(1 << idx); +                } +        } +        UNLOCK (&selection->reflock); +} + +inline void +changelog_deselect_event (xlator_t *this, +                          changelog_ev_selector_t *selection, +                          unsigned int selector) +{ +        int idx = 0; + +        LOCK (&selection->reflock); +        { +                while (selector) { +                        idx = changelog_selector_index (selector); +                        if (idx < CHANGELOG_EV_SELECTION_RANGE) { +                                selection->ref[idx]--; +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "de-selecting event %d", idx); +                        } +                        selector &= ~(1 << idx); +                } +        } +        UNLOCK (&selection->reflock); +} + +inline int +changelog_init_event_selection (xlator_t *this, +                                changelog_ev_selector_t *selection) +{ +        int ret = 0; +        int j = CHANGELOG_EV_SELECTION_RANGE; + +        ret = LOCK_INIT (&selection->reflock); +        if (ret != 0) +                return -1; + +        LOCK (&selection->reflock); +        { +                while (j--) { +                        selection->ref[j] = 0; +                } +        } +        UNLOCK (&selection->reflock); + +        return 0; +} + +inline int +changelog_cleanup_event_selection (xlator_t *this, +                                   changelog_ev_selector_t *selection) +{ +        int ret = 0; +        int j = CHANGELOG_EV_SELECTION_RANGE; + +        LOCK (&selection->reflock); +        { +                while (j--) { +                        if (selection->ref[j] > 0) +                                gf_log (this->name, GF_LOG_WARNING, +                                        "changelog event selection cleaning up " +                                        " on active references"); +                } +        } +        UNLOCK (&selection->reflock); + +        return LOCK_DESTROY (&selection->reflock); +} + +static inline void +changelog_perform_dispatch (xlator_t *this, +                            changelog_priv_t *priv, void *mem, size_t size) +{ +        char *buf    = NULL; +        void *opaque = NULL; + +        buf = rbuf_reserve_write_area (priv->rbuf, size, &opaque); +        if (!buf) { +                gf_log_callingfn (this->name, +                                  GF_LOG_WARNING, "failed to dispatch event"); +                return; +        } + +        memcpy (buf, mem, size); +        rbuf_write_complete (opaque); +} + +inline void +changelog_dispatch_event (xlator_t *this, +                          changelog_priv_t *priv, changelog_event_t *ev) +{ +        changelog_ev_selector_t *selection = NULL; + +        selection = &priv->ev_selection; +        if (changelog_ev_selected (this, selection, ev->ev_type)) { +                changelog_perform_dispatch (this, priv, ev, CHANGELOG_EV_SIZE); +        } +} +  inline void  changelog_set_usable_record_and_length (changelog_local_t *local,                                          size_t len, int xr) @@ -206,9 +346,9 @@ changelog_rollover_changelog (xlator_t *this,  {          int   ret            = -1;          int   notify         = 0; -        char *bname          = NULL;          char ofile[PATH_MAX] = {0,};          char nfile[PATH_MAX] = {0,}; +        changelog_event_t ev = {0,};          if (priv->changelog_fd != -1) {                  ret = fsync (priv->changelog_fd); @@ -252,40 +392,32 @@ changelog_rollover_changelog (xlator_t *this,          }          if (notify) { -                bname = basename (nfile); -                gf_log (this->name, GF_LOG_DEBUG, "notifying: %s", bname); -                ret = changelog_write (priv->wfd, bname, strlen (bname) + 1); -                if (ret) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "Failed to send file name to notify thread" -                                " (reason: %s)", strerror (errno)); -                } else { -                        /* If this is explicit rollover initiated by snapshot, -                         * wakeup reconfigure thread waiting for changelog to -                         * rollover -                         */ -                        if (priv->explicit_rollover) { -                                priv->explicit_rollover = _gf_false; -                                ret = pthread_mutex_lock ( -                                                   &priv->bn.bnotify_mutex); -                                CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); -                                { -                                         priv->bn.bnotify = _gf_false; -                                         ret = pthread_cond_signal ( -                                                        &priv->bn.bnotify_cond); -                                         CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, -                                                                           out); -                                         gf_log (this->name, GF_LOG_INFO, -                                                 "Changelog published: %s and" -                                                 " signalled bnotify", bname); -                                } -                                ret = pthread_mutex_unlock ( -                                                       &priv->bn.bnotify_mutex); +                ev.ev_type = CHANGELOG_OP_TYPE_JOURNAL; +                memcpy (ev.u.journal.path, nfile, strlen (nfile) + 1); +                changelog_dispatch_event (this, priv, &ev); + +                /* If this is explicit rollover initiated by snapshot, +                 * wakeup reconfigure thread waiting for changelog to +                 * rollover +                 */ +                if (priv->explicit_rollover) { +                        priv->explicit_rollover = _gf_false; + +                        ret = pthread_mutex_lock (&priv->bn.bnotify_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +                        { +                                priv->bn.bnotify = _gf_false; +                                ret = pthread_cond_signal +                                        (&priv->bn.bnotify_cond);                                  CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); +                                gf_log (this->name, GF_LOG_INFO, +                                        "Changelog published: %s signalled" +                                        " bnotify", nfile);                          } +                        ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex); +                        CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);                  }          } -   out:          return ret;  } @@ -434,8 +566,8 @@ changelog_snap_logging_stop (xlator_t *this,  }  int -changelog_open (xlator_t *this, -                changelog_priv_t *priv) +changelog_open_journal (xlator_t *this, +                        changelog_priv_t *priv)  {          int fd                        = 0;          int ret                       = -1; @@ -490,7 +622,7 @@ changelog_start_next_change (xlator_t *this,          ret = changelog_rollover_changelog (this, priv, ts);          if (!ret && !finale) -                ret = changelog_open (this, priv); +                ret = changelog_open_journal (this, priv);          return ret;  } @@ -975,7 +1107,7 @@ __changelog_inode_ctx_set (xlator_t *this,   * one shot routine to get the address and the value of a inode version   * for a particular type.   */ -static changelog_inode_ctx_t * +changelog_inode_ctx_t *  __changelog_inode_ctx_get (xlator_t *this,                             inode_t *inode, unsigned long **iver,                             unsigned long *version, changelog_log_type type) diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h index 03a795369d1..33a99ee4eed 100644 --- a/xlators/features/changelog/src/changelog-helpers.h +++ b/xlators/features/changelog/src/changelog-helpers.h @@ -15,10 +15,16 @@  #include "timer.h"  #include "pthread.h"  #include "iobuf.h" +#include "rot-buffs.h"  #include "changelog-misc.h"  #include "call-stub.h" +#include "rpcsvc.h" +#include "changelog-ev-handle.h" + +#include "changelog.h" +  /**   * the changelog entry   */ @@ -120,29 +126,6 @@ typedef struct changelog_fsync {          xlator_t *this;  } changelog_fsync_t; -# define CHANGELOG_MAX_CLIENTS  5 -typedef struct changelog_notify { -        /* reader end of the pipe */ -        int rfd; - -        /* notifier thread */ -        pthread_t notify_th; - -        /* unique socket path */ -        char sockpath[UNIX_PATH_MAX]; - -        int socket_fd; - -        /** -         * simple array of accept()'ed fds. Not scalable at all -         * for large number of clients, but it's okay as we have -         * a ahrd limit in this version (@CHANGELOG_MAX_CLIENTS). -         */ -        int client_fd[CHANGELOG_MAX_CLIENTS]; - -        xlator_t *this; -} changelog_notify_t; -  /* Draining during changelog rollover (for geo-rep snapshot dependency):   * --------------------------------------------------------------------   * The introduction of draining of in-transit fops during changelog rollover @@ -162,14 +145,14 @@ typedef struct changelog_notify {  typedef enum chlog_fop_color {           FOP_COLOR_BLACK,           FOP_COLOR_WHITE -}chlog_fop_color_t; +} chlog_fop_color_t;  /* Barrier notify variable */  typedef struct barrier_notify {           pthread_mutex_t        bnotify_mutex;           pthread_cond_t         bnotify_cond;           gf_boolean_t           bnotify; -}barrier_notify_t; +} barrier_notify_t;  /* Two separate mutex and conditional variable set is used   * to drain white and black fops. */ @@ -185,15 +168,26 @@ typedef struct drain_mgmt {           unsigned long          white_fop_cnt;           gf_boolean_t           drain_wait_black;           gf_boolean_t           drain_wait_white; -}drain_mgmt_t; +} drain_mgmt_t;  /* External barrier as a result of snap on/off indicating flag*/  typedef struct barrier_flags {          gf_lock_t lock;          gf_boolean_t barrier_ext; -}barrier_flags_t; +} barrier_flags_t; +/* Event selection */ +typedef struct changelog_ev_selector { +        gf_lock_t reflock; +        /** +         * Array of references for each selection bit. +         */ +        unsigned int ref[CHANGELOG_EV_SELECTION_RANGE]; +} changelog_ev_selector_t; + + +/* changelog's private structure */  struct changelog_priv {          gf_boolean_t active; @@ -223,9 +217,6 @@ struct changelog_priv {          /*  lock to synchronize CSNAP updation */          gf_lock_t c_snap_lock; -        /* writen end of the pipe */ -        int wfd; -          /* rollover time */          int32_t rollover_time; @@ -247,9 +238,6 @@ struct changelog_priv {          /* context of fsync thread */          changelog_fsync_t cf; -        /* context of the notifier thread */ -        changelog_notify_t cn; -          /* operation mode */          changelog_mode_t op_mode; @@ -262,7 +250,9 @@ struct changelog_priv {          /* encoder */          struct changelog_encoder *ce; -        /* snapshot dependency changes */ +        /** +         * snapshot dependency changes +         */          /* Draining of fops*/          drain_mgmt_t dm; @@ -289,6 +279,30 @@ struct changelog_priv {          gf_timer_t       *timer;          struct timespec   timeout; +        /** +         * buffers, RPC, event selection, notifications and other +         * beasts. +         */ + +        /* epoll pthread */ +        pthread_t poller; + +        /* rotational buffer */ +        rbuf_t *rbuf; + +        /* changelog RPC server */ +        rpcsvc_t *rpc; + +        /* event selection */ +        changelog_ev_selector_t ev_selection; + +        /* client handling (reverse connection) */ +        pthread_t connector; + +        int nr_dispatchers; +        pthread_t *ev_dispatcher; + +        changelog_clnt_t connections;  };  struct changelog_local { @@ -367,7 +381,7 @@ typedef struct {   * helpers routines   */ -void +int  changelog_thread_cleanup (xlator_t *this, pthread_t thr_id);  void * @@ -386,7 +400,7 @@ changelog_start_next_change (xlator_t *this,                               changelog_priv_t *priv,                               unsigned long ts, gf_boolean_t finale);  int -changelog_open (xlator_t *this, changelog_priv_t *priv); +changelog_open_journal (xlator_t *this, changelog_priv_t *priv);  int  changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last);  int @@ -449,6 +463,7 @@ changelog_snap_handle_ascii_change (xlator_t *this,                  changelog_log_data_t *cld);  int  changelog_snap_write_change (changelog_priv_t *priv, char *buffer, size_t len); +  /* Changelog barrier routines */  void __chlog_barrier_enqueue (xlator_t *this, call_stub_t *stub);  void __chlog_barrier_disable (xlator_t *this, struct list_head *queue); @@ -460,6 +475,24 @@ int32_t  changelog_fill_entry_buf (call_frame_t *frame, xlator_t *this,                            loc_t *loc, changelog_local_t **local); +/* event selection routines */ +inline void changelog_select_event (xlator_t *, +                                    changelog_ev_selector_t *, unsigned int); +inline void changelog_deselect_event (xlator_t *, +                                      changelog_ev_selector_t *, unsigned int); +inline int changelog_init_event_selection (xlator_t *, +                                           changelog_ev_selector_t *); +inline int changelog_cleanup_event_selection (xlator_t *, +                                              changelog_ev_selector_t *); +inline int changelog_ev_selected (xlator_t *, +                                  changelog_ev_selector_t *, unsigned int); +inline void +changelog_dispatch_event (xlator_t *, changelog_priv_t *, changelog_event_t *); + +changelog_inode_ctx_t * +__changelog_inode_ctx_get (xlator_t *, inode_t *, unsigned long **, +                           unsigned long *, changelog_log_type); +  /* macros */  #define CHANGELOG_STACK_UNWIND(fop, frame, params ...) do {             \ @@ -471,10 +504,10 @@ changelog_fill_entry_buf (call_frame_t *frame, xlator_t *this,                          frame->local = NULL;                            \                  }                                                       \                  STACK_UNWIND_STRICT (fop, frame, params);               \ -                changelog_local_cleanup (__xl, __local);                \                  if (__local && __local->prev_entry)                     \                          changelog_local_cleanup (__xl,                  \                                                   __local->prev_entry);  \ +                changelog_local_cleanup (__xl, __local);                \          } while (0)  #define CHANGELOG_IOBUF_REF(iobuf) do {         \ diff --git a/xlators/features/changelog/src/changelog-mem-types.h b/xlators/features/changelog/src/changelog-mem-types.h index e1fa319a715..1618f722f6c 100644 --- a/xlators/features/changelog/src/changelog-mem-types.h +++ b/xlators/features/changelog/src/changelog-mem-types.h @@ -14,16 +14,21 @@  #include "mem-types.h"  enum gf_changelog_mem_types { -        gf_changelog_mt_priv_t                  = gf_common_mt_end + 1, -        gf_changelog_mt_str_t                   = gf_common_mt_end + 2, -        gf_changelog_mt_batch_t                 = gf_common_mt_end + 3, -        gf_changelog_mt_rt_t                    = gf_common_mt_end + 4, -        gf_changelog_mt_inode_ctx_t             = gf_common_mt_end + 5, -        gf_changelog_mt_libgfchangelog_t        = gf_common_mt_end + 6, -        gf_changelog_mt_libgfchangelog_rl_t     = gf_common_mt_end + 7, -        gf_changelog_mt_libgfchangelog_dirent_t = gf_common_mt_end + 8, -        gf_changelog_mt_changelog_buffer_t      = gf_common_mt_end + 9, -        gf_changelog_mt_history_data_t          = gf_common_mt_end + 10, +        gf_changelog_mt_priv_t                     = gf_common_mt_end + 1, +        gf_changelog_mt_str_t                      = gf_common_mt_end + 2, +        gf_changelog_mt_batch_t                    = gf_common_mt_end + 3, +        gf_changelog_mt_rt_t                       = gf_common_mt_end + 4, +        gf_changelog_mt_inode_ctx_t                = gf_common_mt_end + 5, +        gf_changelog_mt_rpc_clnt_t                 = gf_common_mt_end + 6, +        gf_changelog_mt_libgfchangelog_t           = gf_common_mt_end + 7, +        gf_changelog_mt_libgfchangelog_entry_t     = gf_common_mt_end + 8, +        gf_changelog_mt_libgfchangelog_rl_t        = gf_common_mt_end + 9, +        gf_changelog_mt_libgfchangelog_dirent_t    = gf_common_mt_end + 10, +        gf_changelog_mt_changelog_buffer_t         = gf_common_mt_end + 11, +        gf_changelog_mt_history_data_t             = gf_common_mt_end + 12, +        gf_changelog_mt_libgfchangelog_call_pool_t = gf_common_mt_end + 13, +        gf_changelog_mt_libgfchangelog_event_t     = gf_common_mt_end + 14, +        gf_changelog_mt_ev_dispatcher_t            = gf_common_mt_end + 15,          gf_changelog_mt_end  }; diff --git a/xlators/features/changelog/src/changelog-misc.h b/xlators/features/changelog/src/changelog-misc.h index 58b10961463..b45302ad099 100644 --- a/xlators/features/changelog/src/changelog-misc.h +++ b/xlators/features/changelog/src/changelog-misc.h @@ -25,6 +25,7 @@  #define CHANGELOG_VERSION_MINOR  1  #define CHANGELOG_UNIX_SOCK  DEFAULT_VAR_RUN_DIRECTORY"/changelog-%s.sock" +#define CHANGELOG_TMP_UNIX_SOCK  DEFAULT_VAR_RUN_DIRECTORY"/.%s%lu.sock"  /**   * header starts with the version and the format of the changelog. @@ -42,6 +43,19 @@                                   CHANGELOG_UNIX_SOCK, md5_sum);         \          } while (0) +#define CHANGELOG_MAKE_TMP_SOCKET_PATH(brick_path, sockpath, len) do {  \ +                unsigned long pid = 0;                                  \ +                char md5_sum[MD5_DIGEST_LENGTH*2+1] = {0,};             \ +                pid = (unsigned long) getpid ();                        \ +                md5_wrapper((unsigned char *) brick_path,               \ +                            strlen(brick_path),                         \ +                            md5_sum);                                   \ +                (void) snprintf (sockpath,                              \ +                                 len, CHANGELOG_TMP_UNIX_SOCK,          \ +                                 md5_sum, pid);                         \ +        } while (0) + +  /**   * ... used by libgfchangelog.   */ diff --git a/xlators/features/changelog/src/changelog-notifier.c b/xlators/features/changelog/src/changelog-notifier.c deleted file mode 100644 index 5f3d063a8ad..00000000000 --- a/xlators/features/changelog/src/changelog-notifier.c +++ /dev/null @@ -1,314 +0,0 @@ -/* -   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> -   This file is part of GlusterFS. - -   This file is licensed to you under your choice of the GNU Lesser -   General Public License, version 3 or any later version (LGPLv3 or -   later), or the GNU General Public License, version 2 (GPLv2), in all -   cases as published by the Free Software Foundation. -*/ - -#include "changelog-notifier.h" - -#include <pthread.h> - -inline static void -changelog_notify_clear_fd (changelog_notify_t *cn, int i) -{ -        cn->client_fd[i] = -1; -} - -inline static void -changelog_notify_save_fd (changelog_notify_t *cn, int i, int fd) -{ -        cn->client_fd[i] = fd; -} - -static int -changelog_notify_insert_fd (xlator_t *this, changelog_notify_t *cn, int fd) -{ -        int i   = 0; -        int ret = 0; - -        for (; i < CHANGELOG_MAX_CLIENTS; i++) { -                if (cn->client_fd[i] == -1) -                        break; -        } - -        if (i == CHANGELOG_MAX_CLIENTS) { -                /** -                 * this case should not be hit as listen() would limit -                 * the number of completely established connections. -                 */ -                gf_log (this->name, GF_LOG_WARNING, -                        "hit max client limit (%d)", CHANGELOG_MAX_CLIENTS); -                ret = -1; -        } -        else -                changelog_notify_save_fd (cn, i, fd); - -        return ret; -} - -static void -changelog_notify_fill_rset (changelog_notify_t *cn, fd_set *rset, int *maxfd) -{ -        int i = 0; - -        FD_ZERO (rset); - -        FD_SET (cn->socket_fd, rset); -        *maxfd = cn->socket_fd; - -        FD_SET (cn->rfd, rset); -        *maxfd = max (*maxfd, cn->rfd); - -        for (; i < CHANGELOG_MAX_CLIENTS; i++) { -                if (cn->client_fd[i] != -1) { -                        FD_SET (cn->client_fd[i], rset); -                        *maxfd = max (*maxfd, cn->client_fd[i]); -                } -        } - -        *maxfd = *maxfd + 1; -} - -static int -changelog_notify_client (changelog_notify_t *cn, char *path, ssize_t len) -{ -        int i = 0; -        int ret = 0; - -        for (; i < CHANGELOG_MAX_CLIENTS; i++) { -                if (cn->client_fd[i] == -1) -                        continue; - -                if (changelog_write (cn->client_fd[i], -                                     path, len)) { -                        ret = -1; - -                        close (cn->client_fd[i]); -                        changelog_notify_clear_fd (cn, i); -                } -        } - -        return ret; -} - -static void -changelog_notifier_init (changelog_notify_t *cn) -{ -        int i = 0; - -        cn->socket_fd = -1; - -        for (; i < CHANGELOG_MAX_CLIENTS; i++) { -                changelog_notify_clear_fd (cn, i); -        } -} - -static void -changelog_close_client_conn (changelog_notify_t *cn) -{ -        int i = 0; - -        for (; i < CHANGELOG_MAX_CLIENTS; i++) { -                if (cn->client_fd[i] == -1) -                        continue; - -                close (cn->client_fd[i]); -                changelog_notify_clear_fd (cn, i); -        } -} - -static void -changelog_notifier_cleanup (void *arg) -{ -        changelog_notify_t *cn = NULL; - -        cn = (changelog_notify_t *) arg; - -        changelog_close_client_conn (cn); - -        if (cn->socket_fd != -1) -                close (cn->socket_fd); - -        if (cn->rfd) -                close (cn->rfd); - -        if (unlink (cn->sockpath)) -                gf_log ("", GF_LOG_WARNING, -                        "could not unlink changelog socket file" -                        " %s (reason: %s", cn->sockpath, strerror (errno)); -} - -void * -changelog_notifier (void *data) -{ -        int                 i         = 0; -        int                 fd        = 0; -        int                 max_fd    = 0; -        int                 len       = 0; -        ssize_t             readlen   = 0; -        xlator_t           *this      = NULL; -        changelog_priv_t   *priv      = NULL; -        changelog_notify_t *cn        = NULL; -        struct sockaddr_un  local     = {0,}; -        char path[PATH_MAX]           = {0,}; -        char abspath[PATH_MAX]        = {0,}; - -        char buffer; -        fd_set rset; - -        priv = (changelog_priv_t *) data; - -        cn = &priv->cn; -        this = cn->this; - -        pthread_cleanup_push (changelog_notifier_cleanup, cn); - -        changelog_notifier_init (cn); - -        cn->socket_fd = socket (AF_UNIX, SOCK_STREAM, 0); -        if (cn->socket_fd < 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "changelog socket error (reason: %s)", -                        strerror (errno)); -                goto out; -        } - -        CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick, -                                    cn->sockpath, UNIX_PATH_MAX); -        if (unlink (cn->sockpath) < 0) { -                if (errno != ENOENT) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "Could not unlink changelog socket file (%s)" -                                " (reason: %s)", -                                CHANGELOG_UNIX_SOCK, strerror (errno)); -                        goto cleanup; -                } -        } - -        local.sun_family = AF_UNIX; -        strcpy (local.sun_path, cn->sockpath); - -        len = strlen (local.sun_path) + sizeof (local.sun_family); - -        /* bind to the unix domain socket */ -        if (bind (cn->socket_fd, (struct sockaddr *) &local, len) < 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "Could not bind to changelog socket (reason: %s)", -                        strerror (errno)); -                goto cleanup; -        } - -        /* listen for incoming connections */ -        if (listen (cn->socket_fd, CHANGELOG_MAX_CLIENTS) < 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "listen() error on changelog socket (reason: %s)", -                        strerror (errno)); -                goto cleanup; -        } - -        /** -         * simple select() on all to-be-read file descriptors. This method -         * though old school works pretty well when you have a handfull of -         * fd's to be watched (clients). -         * -         * Future TODO: move this to epoll based notification facility if -         *              number of clients increase. -         */ -        for (;;) { -                changelog_notify_fill_rset (cn, &rset, &max_fd); - -                if (select (max_fd, &rset, NULL, NULL, NULL) < 0) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "select() returned -1 (reason: %s)", -                                strerror (errno)); -                        sleep (2); -                        continue; -                } - -                if (FD_ISSET (cn->socket_fd, &rset)) { -                        fd = accept (cn->socket_fd, NULL, NULL); -                        if (fd < 0) { -                                gf_log (this->name, GF_LOG_ERROR, -                                        "accept error on changelog socket" -                                        " (reason: %s)", strerror (errno)); -                        } else if (changelog_notify_insert_fd (this, cn, fd)) { -                                gf_log (this->name, GF_LOG_ERROR, -                                        "hit max client limit"); -                        } -                } - -                if (FD_ISSET (cn->rfd, &rset)) { -                        /** -                         * read changelog filename and notify all connected -                         * clients. -                         */ -                        readlen = 0; -                        while (readlen < PATH_MAX) { -                                len = read (cn->rfd, &path[readlen++], 1); -                                if (len == -1) { -                                        break; -                                } - -                                if (len == 0) { -                                        gf_log (this->name, GF_LOG_ERROR, -                                                "rollover thread sent EOF" -                                                " on pipe - possibly a crash."); -                                        /* be blunt and close all connections */ -                                        pthread_exit(NULL); -                                } - -                                if (path[readlen - 1] == '\0') -                                        break; -                        } - -                        /* should we close all client connections here too? */ -                        if (len < 0 || readlen == PATH_MAX) { -                                gf_log (this->name, GF_LOG_ERROR, -                                        "Could not get pathname from rollover" -                                        " thread or pathname too long"); -                                goto process_rest; -                        } - -                        (void) snprintf (abspath, PATH_MAX, -                                         "%s/%s", priv->changelog_dir, path); -                        if (changelog_notify_client (cn, abspath, -                                                     strlen (abspath) + 1)) -                                gf_log (this->name, GF_LOG_ERROR, -                                        "could not notify some clients with new" -                                        " changelogs"); -                } - -        process_rest: -                for (i = 0; i < CHANGELOG_MAX_CLIENTS; i++) { -                        if ( (fd = cn->client_fd[i]) == -1 ) -                                continue; - -                        if (FD_ISSET (fd, &rset)) { -                                /** -                                 * the only data we accept from the client is a -                                 * disconnect. Anything else is treated as bogus -                                 * and is silently discarded (also warned!!!). -                                 */ -                                if ( (readlen = read (fd, &buffer, 1)) <= 0 ) { -                                        close (fd); -                                        changelog_notify_clear_fd (cn, i); -                                } else { -                                        /* silently discard data and log */ -                                        gf_log (this->name, GF_LOG_WARNING, -                                                "misbehaving changelog client"); -                                } -                        } -                } - -        } - - cleanup:; -        pthread_cleanup_pop (1); - - out: -        return NULL; -} diff --git a/xlators/features/changelog/src/changelog-rpc-common.c b/xlators/features/changelog/src/changelog-rpc-common.c new file mode 100644 index 00000000000..76db6696ae8 --- /dev/null +++ b/xlators/features/changelog/src/changelog-rpc-common.c @@ -0,0 +1,334 @@ +/* +   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "changelog-rpc-common.h" + +/** +***************************************************** +                  Client Interface +***************************************************** +*/ + +/** + * Initialize and return an RPC client object for a given unix + * domain socket. + */ + +void * +changelog_rpc_poller (void *arg) +{ +        xlator_t *this = arg; + +        (void) event_dispatch (this->ctx->event_pool); +        return NULL; +} + +struct rpc_clnt * +changelog_rpc_client_init (xlator_t *this, void *cbkdata, +                           char *sockfile, rpc_clnt_notify_t fn) +{ +        int              ret         = 0; +        struct rpc_clnt *rpc         = NULL; +        dict_t          *options     = NULL; + +        if (!cbkdata) +                cbkdata = this; + +        options = dict_new (); +        if (!options) +                goto error_return; + +        ret = rpc_transport_unix_options_build (&options, sockfile, 0); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "failed to build rpc options"); +                goto dealloc_dict; +        } + +        rpc = rpc_clnt_new (options, this->ctx, this->name, 16); +        if (!rpc) +                goto dealloc_dict; + +        ret = rpc_clnt_register_notify (rpc, fn, cbkdata); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, "failed to register notify"); +                goto dealloc_rpc_clnt; +        } + +        ret = rpc_clnt_start (rpc); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, "failed to start rpc"); +                goto dealloc_rpc_clnt; +        } + +        return rpc; + + dealloc_rpc_clnt: +        rpc_clnt_unref (rpc); + dealloc_dict: +        dict_unref (options); + error_return: +        return NULL; +} + +/** + * Generic RPC client routine to dispatch a request to an + * RPC server. + */ +int +changelog_rpc_sumbit_req (struct rpc_clnt *rpc, void *req, +                          call_frame_t *frame, rpc_clnt_prog_t *prog, +                          int procnum, struct iovec *payload, int payloadcnt, +                          struct iobref *iobref, xlator_t *this, +                          fop_cbk_fn_t cbkfn, xdrproc_t xdrproc) +{ +        int           ret        = 0; +        int           count      = 0; +        struct iovec  iov        = {0, }; +        struct iobuf *iobuf      = NULL; +        char          new_iobref = 0; +        ssize_t       xdr_size   = 0; + +        GF_ASSERT (this); + +        if (req) { +                xdr_size = xdr_sizeof (xdrproc, req); + +                iobuf = iobuf_get2 (this->ctx->iobuf_pool, xdr_size); +                if (!iobuf) { +                        goto out; +                }; + +                if (!iobref) { +                        iobref = iobref_new (); +                        if (!iobref) { +                                goto out; +                        } + +                        new_iobref = 1; +                } + +                iobref_add (iobref, iobuf); + +                iov.iov_base = iobuf->ptr; +                iov.iov_len  = iobuf_size (iobuf); + +                /* Create the xdr payload */ +                ret = xdr_serialize_generic (iov, req, xdrproc); +                if (ret == -1) { +                        goto out; +                } + +                iov.iov_len = ret; +                count = 1; +        } + +        ret = rpc_clnt_submit (rpc, prog, procnum, cbkfn, &iov, count, +                               payload, payloadcnt, iobref, frame, NULL, +                               0, NULL, 0, NULL); + + out: +        if (new_iobref) +                iobref_unref (iobref); +        if (iobuf) +                iobuf_unref (iobuf); +        return ret; +} + +/** + * Entry point to perform a remote procedure call + */ +int +changelog_invoke_rpc (xlator_t *this, struct rpc_clnt *rpc, +                      rpc_clnt_prog_t *prog, int procidx, void *arg) +{ +        int                   ret   = 0; +        call_frame_t         *frame = NULL; +        rpc_clnt_procedure_t *proc  = NULL; + +        if (!this || !prog) +                goto error_return; + +        frame = create_frame (this, this->ctx->pool); +        if (!frame) { +                gf_log (this->name, GF_LOG_ERROR, "failed to create frame"); +                goto error_return; +        } + +        proc = &prog->proctable[procidx]; +        if (proc->fn) +                ret = proc->fn (frame, this, arg); + +        STACK_DESTROY (frame->root); +        return ret; + + error_return: +        return -1; +} + +/** +***************************************************** +                  Server Interface +***************************************************** +*/ + +struct iobuf * +__changelog_rpc_serialize_reply (rpcsvc_request_t *req, void *arg, +                                 struct iovec *outmsg, xdrproc_t xdrproc) +{ +        struct iobuf *iob      = NULL; +        ssize_t       retlen   = 0; +        ssize_t       rsp_size = 0; + +        rsp_size = xdr_sizeof (xdrproc, arg); +        iob = iobuf_get2 (req->svc->ctx->iobuf_pool, rsp_size); +        if (!iob) +                goto error_return; + +        iobuf_to_iovec (iob, outmsg); + +        retlen = xdr_serialize_generic (*outmsg, arg, xdrproc); +        if (retlen == -1) +                goto unref_iob; + +        outmsg->iov_len = retlen; +        return iob; + + unref_iob: +        iobuf_unref (iob); + error_return: +        return NULL; +} + +int +changelog_rpc_sumbit_reply (rpcsvc_request_t *req, +                            void *arg, struct iovec *payload, int payloadcount, +                            struct iobref *iobref, xdrproc_t xdrproc) +{ +        int           ret        = -1; +        struct iobuf *iob        = NULL; +        struct iovec  iov        = {0,}; +        char          new_iobref = 0; + +        if (!req) +                goto return_ret; + +        if (!iobref) { +                iobref = iobref_new (); +                if (!iobref) +                        goto return_ret; +                new_iobref = 1; +        } + +        iob = __changelog_rpc_serialize_reply (req, arg, &iov, xdrproc); +        if (!iob) +                gf_log ("", GF_LOG_ERROR, "failed to serialize reply"); +        else +                iobref_add (iobref, iob); + +        ret = rpcsvc_submit_generic (req, &iov, +                                     1, payload, payloadcount, iobref); + +        if (new_iobref) +                iobref_unref (iobref); +        if (iob) +                iobuf_unref (iob); + return_ret: +        return ret; +} + +void +changelog_rpc_server_destroy (xlator_t *this, rpcsvc_t *rpc, char *sockfile, +                              rpcsvc_notify_t fn, struct rpcsvc_program **progs) +{ +        rpcsvc_listener_t      *listener = NULL; +        rpcsvc_listener_t      *next    = NULL; +        struct rpcsvc_program *prog    = NULL; + +        while (*progs) { +                prog = *progs; +                (void) rpcsvc_program_unregister (rpc, prog); +        } + +        list_for_each_entry_safe (listener, next, &rpc->listeners, list) { +                rpcsvc_listener_destroy (listener); +        } + +        (void) rpcsvc_unregister_notify (rpc, fn, this); +        unlink (sockfile); + +        GF_FREE (rpc); +} + +rpcsvc_t * +changelog_rpc_server_init (xlator_t *this, char *sockfile, void *cbkdata, +                           rpcsvc_notify_t fn, struct rpcsvc_program **progs) +{ +        int                    j       = 0; +        int                    ret     = 0; +        rpcsvc_t              *rpc     = NULL; +        dict_t                *options = NULL; +        struct rpcsvc_program *prog    = NULL; + +        if (!cbkdata) +                cbkdata = this; + +        options = dict_new (); +        if (!options) +                goto error_return; + +        ret = rpcsvc_transport_unix_options_build (&options, sockfile); +        if (ret) +                goto dealloc_dict; + +        rpc = rpcsvc_init (this, this->ctx, options, 8); +        if (rpc == NULL) { +                gf_log (this->name, GF_LOG_ERROR, "failed to init rpc"); +                goto dealloc_dict; +        } + +        ret = rpcsvc_register_notify (rpc, fn, cbkdata); +        if (ret) { +                gf_log (this->name, +                        GF_LOG_ERROR, "failed to register notify function"); +                goto dealloc_rpc; +        } + +        ret = rpcsvc_create_listeners (rpc, options, this->name); +        if (ret != 1) { +                gf_log (this->name, +                        GF_LOG_DEBUG, "failed to create listeners"); +                goto dealloc_rpc; +        } + +        while (*progs) { +                prog = *progs; +                ret = rpcsvc_program_register (rpc, prog); +                if (ret) { +                        gf_log (this->name, +                                GF_LOG_ERROR, "cannot register program " +                                "(name: %s, prognum: %d, pogver: %d)", +                                prog->progname, prog->prognum, prog->progver); +                        goto dealloc_rpc; +                } + +                progs++; +        } + +        dict_unref (options); +        return rpc; + + dealloc_rpc: +        GF_FREE (rpc); + dealloc_dict: +        dict_unref (options); + error_return: +        return NULL; +} diff --git a/xlators/features/changelog/src/changelog-rpc-common.h b/xlators/features/changelog/src/changelog-rpc-common.h new file mode 100644 index 00000000000..95c850c9400 --- /dev/null +++ b/xlators/features/changelog/src/changelog-rpc-common.h @@ -0,0 +1,84 @@ +/* +   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef __CHANGELOG_RPC_COMMON_H +#define __CHANGELOG_RPC_COMMON_H + +#include "rpcsvc.h" +#include "rpc-clnt.h" +#include "event.h" +#include "call-stub.h" + +#include "changelog-xdr.h" +#include "xdr-generic.h" + +#include "changelog.h" + +/** + * Let's keep this non-configurable for now. + */ +#define NR_ROTT_BUFFS  4 +#define NR_DISPATCHERS (NR_ROTT_BUFFS - 1) + +enum changelog_rpc_procnum { +        CHANGELOG_RPC_PROC_NULL    = 0, +        CHANGELOG_RPC_PROBE_FILTER = 1, +        CHANGELOG_RPC_PROC_MAX     = 2, +}; + +#define CHANGELOG_RPC_PROGNUM   1885957735 +#define CHANGELOG_RPC_PROGVER   1 + +/** + * reverse connection: data xfer path + */ +enum changelog_reverse_rpc_procnum { +        CHANGELOG_REV_PROC_NULL  = 0, +        CHANGELOG_REV_PROC_EVENT = 1, +        CHANGELOG_REV_PROC_MAX   = 2, +}; + +#define CHANGELOG_REV_RPC_PROCNUM   1886350951 +#define CHANGELOG_REV_RPC_PROCVER   1 + +typedef struct changelog_rpc { +        rpcsvc_t        *svc; +        struct rpc_clnt *rpc; +        char             sock[UNIX_PATH_MAX];  /* tied to server */ +} changelog_rpc_t; + +/* event poller */ +void *changelog_rpc_poller (void *); + +/* CLIENT API */ +struct rpc_clnt * +changelog_rpc_client_init (xlator_t *, void *, char *, rpc_clnt_notify_t); + +int +changelog_rpc_sumbit_req (struct rpc_clnt *, void *, call_frame_t *, +                          rpc_clnt_prog_t *, int , struct iovec *, int, +                          struct iobref *, xlator_t *, fop_cbk_fn_t, xdrproc_t); + +int +changelog_invoke_rpc (xlator_t *, struct rpc_clnt *, +                      rpc_clnt_prog_t *, int , void *); + +/* SERVER API */ +int +changelog_rpc_sumbit_reply (rpcsvc_request_t *, void *, +                            struct iovec *, int, struct iobref *, xdrproc_t); +rpcsvc_t * +changelog_rpc_server_init (xlator_t *, char *, void*, +                           rpcsvc_notify_t, struct rpcsvc_program **); +void +changelog_rpc_server_destroy (xlator_t *, rpcsvc_t *, char *, +                              rpcsvc_notify_t, struct rpcsvc_program **); + +#endif diff --git a/xlators/features/changelog/src/changelog-rpc.c b/xlators/features/changelog/src/changelog-rpc.c new file mode 100644 index 00000000000..04326456d31 --- /dev/null +++ b/xlators/features/changelog/src/changelog-rpc.c @@ -0,0 +1,300 @@ +/* +   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "changelog-rpc.h" +#include "changelog-mem-types.h" +#include "changelog-ev-handle.h" + +struct rpcsvc_program *changelog_programs[]; + +static void +changelog_cleanup_dispatchers (xlator_t *this, +                               changelog_priv_t *priv, int count) +{ +        for (; count >= 0; count--) { +                (void) changelog_thread_cleanup +                        (this, priv->ev_dispatcher[count]); +        } +} + +static int +changelog_cleanup_rpc_threads (xlator_t *this, changelog_priv_t *priv) +{ +        int ret = 0; +        changelog_clnt_t *conn = NULL; + +        conn = &priv->connections; +        if (!conn) +                return 0; + +        /** terminate RPC thread(s) */ +        ret = changelog_thread_cleanup (this, priv->connector); +        if (ret != 0) +                goto error_return; +        /** terminate dispatcher thread(s) */ +        changelog_cleanup_dispatchers (this, priv, priv->nr_dispatchers); + +        /* TODO: what about pending and waiting connections? */ +        changelog_ev_cleanup_connections (this, conn); + +        /* destroy locks */ +        ret = pthread_mutex_destroy (&conn->pending_lock); +        if (ret != 0) +                goto error_return; +        ret = pthread_cond_destroy (&conn->pending_cond); +        if (ret != 0) +                goto error_return; +        ret = LOCK_DESTROY (&conn->active_lock); +        if (ret != 0) +                goto error_return; +        ret = LOCK_DESTROY (&conn->wait_lock); +        if (ret != 0) +                goto error_return; +        return 0; + + error_return: +        return -1; +} + +static int +changelog_init_rpc_threads (xlator_t *this, changelog_priv_t *priv, +                            rbuf_t *rbuf, int nr_dispatchers) +{ +        int               j    = 0; +        int               ret  = 0; +        changelog_clnt_t *conn = NULL; + +        conn = &priv->connections; + +        conn->this = this; +        conn->rbuf = rbuf; +        conn->sequence = 1; /* start with sequence number one */ + +        INIT_LIST_HEAD (&conn->pending); +        INIT_LIST_HEAD (&conn->active); +        INIT_LIST_HEAD (&conn->waitq); + +        ret = pthread_mutex_init (&conn->pending_lock, NULL); +        if (ret) +                goto error_return; +        ret = pthread_cond_init (&conn->pending_cond, NULL); +        if (ret) +                goto cleanup_pending_lock; + +        ret = LOCK_INIT (&conn->active_lock); +        if (ret) +                goto cleanup_pending_cond; +        ret = LOCK_INIT (&conn->wait_lock); +        if (ret) +                goto cleanup_active_lock; + +        /* spawn reverse connection thread */ +        ret = pthread_create (&priv->connector, +                              NULL, changelog_ev_connector, conn); +        if (ret != 0) +                goto cleanup_wait_lock; + +        /* spawn dispatcher thread(s) */ +        priv->ev_dispatcher = GF_CALLOC (nr_dispatchers, sizeof(pthread_t), +                                         gf_changelog_mt_ev_dispatcher_t); +        if (!priv->ev_dispatcher) +                goto cleanup_connector; + +        /* spawn dispatcher threads */ +        for (; j < nr_dispatchers; j++) { +                ret = pthread_create (&priv->ev_dispatcher[j], +                                      NULL, changelog_ev_dispatch, conn); +                if (ret != 0) { +                        changelog_cleanup_dispatchers (this, priv, --j); +                        break; +                } +        } + +        if (ret != 0) +                goto cleanup_connector; + +        priv->nr_dispatchers = nr_dispatchers; +        return 0; + + cleanup_connector: +        (void) pthread_cancel (priv->connector); + cleanup_wait_lock: +        (void) LOCK_DESTROY (&conn->wait_lock); + cleanup_active_lock: +        (void) LOCK_DESTROY (&conn->active_lock); + cleanup_pending_cond: +        (void) pthread_cond_destroy (&conn->pending_cond); + cleanup_pending_lock: +        (void) pthread_mutex_destroy (&conn->pending_lock); + error_return: +        return -1; +} + +int +changelog_rpcsvc_notify (rpcsvc_t *rpc, +                         void *xl, rpcsvc_event_t event, void *data) +{ +        return 0; +} + +void +changelog_destroy_rpc_listner (xlator_t *this, changelog_priv_t *priv) +{ +        char sockfile[UNIX_PATH_MAX] = {0,}; + +        /* sockfile path could have been saved to avoid this */ +        CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick, +                                    sockfile, UNIX_PATH_MAX); +        changelog_rpc_server_destroy (this, +                                      priv->rpc, sockfile, +                                      changelog_rpcsvc_notify, +                                      changelog_programs); +        (void) changelog_cleanup_rpc_threads (this, priv); +} + +rpcsvc_t * +changelog_init_rpc_listner (xlator_t *this, changelog_priv_t *priv, +                            rbuf_t *rbuf, int nr_dispatchers) +{ +        int ret = 0; +        char sockfile[UNIX_PATH_MAX] = {0,}; + +        ret = changelog_init_rpc_threads (this, priv, rbuf, nr_dispatchers); +        if (ret) +                return NULL; + +        CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick, +                                    sockfile, UNIX_PATH_MAX); +        return changelog_rpc_server_init (this, sockfile, NULL, +                                          changelog_rpcsvc_notify, +                                          changelog_programs); +} + +void +changelog_rpc_clnt_cleanup (changelog_rpc_clnt_t *crpc) +{ +        if (!crpc) +                return; +        crpc->c_clnt = NULL; +        (void) LOCK_DESTROY (&crpc->lock); +        GF_FREE (crpc); +} + +inline changelog_rpc_clnt_t * +changelog_rpc_clnt_init (xlator_t *this, +                         changelog_probe_req *rpc_req, changelog_clnt_t *c_clnt) +{ +        int ret = 0; +        changelog_rpc_clnt_t *crpc = NULL; + +        crpc = GF_CALLOC (1, sizeof (*crpc), gf_changelog_mt_rpc_clnt_t); +        if (!crpc) +                goto error_return; +        INIT_LIST_HEAD (&crpc->list); + +        crpc->ref = 0; +        changelog_set_disconnect_flag (crpc, _gf_false); + +        crpc->filter = rpc_req->filter; +        (void) memcpy (crpc->sock, rpc_req->sock, strlen (rpc_req->sock)); + +        crpc->this = this; +        crpc->c_clnt = c_clnt; +        crpc->cleanup = changelog_rpc_clnt_cleanup; + +        ret = LOCK_INIT (&crpc->lock); +        if (ret != 0) +                goto dealloc_crpc; +        return crpc; + + dealloc_crpc: +        GF_FREE (crpc); + error_return: +        return NULL; +} + +/** + * Actor declarations + */ + +/** + * @probe_handler + * A probe RPC call spawns a connect back to the caller. Caller also + * passes an hint which acts as a filter for selecting updates. + */ + +int +changelog_handle_probe (rpcsvc_request_t *req) +{ +        int                   ret     = 0; +        xlator_t             *this    = NULL; +        rpcsvc_t             *svc     = NULL; +        changelog_priv_t     *priv    = NULL; +        changelog_clnt_t     *c_clnt  = NULL; +        changelog_rpc_clnt_t *crpc    = NULL; + +        changelog_probe_req   rpc_req = {0,}; +        changelog_probe_rsp   rpc_rsp = {0,}; + +        ret = xdr_to_generic (req->msg[0], +                              &rpc_req, (xdrproc_t)xdr_changelog_probe_req); +        if (ret < 0) { +                gf_log ("", GF_LOG_ERROR, "xdr decoding error"); +                req->rpc_err = GARBAGE_ARGS; +                goto handle_xdr_error; +        } + +        /* ->xl hidden in rpcsvc */ +        svc    = rpcsvc_request_service (req); +        this   = svc->mydata; +        priv   = this->private; +        c_clnt = &priv->connections; + +        crpc = changelog_rpc_clnt_init (this, &rpc_req, c_clnt); +        if (!crpc) +                goto handle_xdr_error; + +        changelog_ev_queue_connection (c_clnt, crpc); +        rpc_rsp.op_ret = 0; + +        goto submit_rpc; + + handle_xdr_error: +        rpc_rsp.op_ret = -1; + submit_rpc: +        (void) changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL, +                                           (xdrproc_t)xdr_changelog_probe_rsp); +        return 0; +} + +/** + * RPC declarations + */ + +rpcsvc_actor_t changelog_svc_actors[CHANGELOG_RPC_PROC_MAX] = { +        [CHANGELOG_RPC_PROBE_FILTER] = { +                "CHANGELOG PROBE FILTER", CHANGELOG_RPC_PROBE_FILTER, +                changelog_handle_probe, NULL, 0, DRC_NA +        }, +}; + +struct rpcsvc_program changelog_svc_prog = { +        .progname  = CHANGELOG_RPC_PROGNAME, +        .prognum   = CHANGELOG_RPC_PROGNUM, +        .progver   = CHANGELOG_RPC_PROGVER, +        .numactors = CHANGELOG_RPC_PROC_MAX, +        .actors    = changelog_svc_actors, +        .synctask  = _gf_true, +}; + +struct rpcsvc_program *changelog_programs[] = { +        &changelog_svc_prog, +        NULL, +}; diff --git a/xlators/features/changelog/src/changelog-rpc.h b/xlators/features/changelog/src/changelog-rpc.h new file mode 100644 index 00000000000..0df96684b6c --- /dev/null +++ b/xlators/features/changelog/src/changelog-rpc.h @@ -0,0 +1,29 @@ +/* +   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef __CHANGELOG_RPC_H +#define __CHANGELOG_RPC_H + +#include "xlator.h" +#include "changelog-helpers.h" + +/* one time */ +#include "socket.h" +#include "changelog-rpc-common.h" + +#define CHANGELOG_RPC_PROGNAME  "GlusterFS Changelog" + +rpcsvc_t * +changelog_init_rpc_listner (xlator_t *, changelog_priv_t *, rbuf_t *, int); + +void +changelog_destroy_rpc_listner (xlator_t *, changelog_priv_t *); + +#endif diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index 4263a462ad7..e7d8522ae8c 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -19,14 +19,13 @@  #include "iobuf.h"  #include "changelog-rt.h" -#include "changelog-helpers.h"  #include "changelog-encoders.h"  #include "changelog-mem-types.h"  #include <pthread.h> -#include "changelog-notifier.h" +#include "changelog-rpc.h"  static struct changelog_bootstrap  cb_bootstrap[] = { @@ -912,14 +911,30 @@ changelog_create_cbk (call_frame_t *frame,                        struct iatt *preparent,                        struct iatt *postparent, dict_t *xdata)  { +        int32_t ret = 0;          changelog_priv_t  *priv  = NULL;          changelog_local_t *local = NULL; +        changelog_event_t  ev    = {0,};          priv  = this->private;          local = frame->local;          CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); +        /* fill the event structure.. similar to open() */ +        ev.ev_type = CHANGELOG_OP_TYPE_CREATE; +        uuid_copy (ev.u.create.gfid, buf->ia_gfid); +        ev.u.create.flags = fd->flags; +        changelog_dispatch_event (this, priv, &ev); + +        if (changelog_ev_selected +                   (this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) { +                ret = fd_ctx_set (fd, this, (uint64_t)(long) 0x1); +                if (ret) +                        gf_log (this->name, GF_LOG_WARNING, +                                "could not set fd context (for release cbk)"); +        } +          changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);   unwind: @@ -1633,6 +1648,92 @@ changelog_writev (call_frame_t *frame,  /* }}} */ +/* open, release and other beasts */ + +/* {{{ */ + + + +int +changelog_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                    int op_ret, int op_errno, fd_t *fd, dict_t *xdata) +{ +        int                ret    = 0; +        void              *opaque = NULL; +        char              *buf    = NULL; +        ssize_t            buflen = 0; +        changelog_priv_t  *priv   = NULL; +        changelog_event_t  ev     = {0,}; +        gf_boolean_t logopen = _gf_false; + +        priv = this->private; +        if (frame->local) { +                frame->local = NULL; +                logopen = _gf_true; +        } + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !logopen), unwind); + +        /* fill the event structure */ +        ev.ev_type = CHANGELOG_OP_TYPE_OPEN; +        uuid_copy (ev.u.open.gfid, fd->inode->gfid); +        ev.u.open.flags = fd->flags; +        changelog_dispatch_event (this, priv, &ev); + +        if (changelog_ev_selected +                   (this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) { +                ret = fd_ctx_set (fd, this, (uint64_t)(long) 0x1); +                if (ret) +                        gf_log (this->name, GF_LOG_WARNING, +                                "could not set fd context (for release cbk)"); +        } + + unwind: +        CHANGELOG_STACK_UNWIND (open, frame, op_ret, op_errno, fd, xdata); +        return 0; +} + +int +changelog_open (call_frame_t *frame, xlator_t *this, +                loc_t *loc, int flags, fd_t *fd, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); + +        frame->local = (void *)0x1; /* do not dereference in ->cbk */ + + wind: +        STACK_WIND (frame, changelog_open_cbk, FIRST_CHILD (this), +                    FIRST_CHILD (this)->fops->open, loc, flags, fd, xdata); +        return 0; +} + +/* }}} */ + +/* {{{ */ + +int32_t +changelog_release (xlator_t *this, fd_t *fd) +{ +        changelog_event_t ev = {0,}; +        changelog_priv_t *priv = NULL; + +        priv = this->private; + +        ev.ev_type = CHANGELOG_OP_TYPE_RELEASE; +        uuid_copy (ev.u.release.gfid, fd->inode->gfid); +        changelog_dispatch_event (this, priv, &ev); + +        (void) fd_ctx_del (fd, this, NULL); + +        return 0; +} + + +/* }}} */ +  /**   * The   *   - @init () @@ -1679,7 +1780,7 @@ changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv)          int ret = 0;          if (priv->cr.rollover_th) { -                changelog_thread_cleanup (this, priv->cr.rollover_th); +                (void) changelog_thread_cleanup (this, priv->cr.rollover_th);                  priv->cr.rollover_th = 0;                  ret = close (priv->cr_wfd);                  if (ret) @@ -1689,7 +1790,7 @@ changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv)          }          if (priv->cf.fsync_th) { -                changelog_thread_cleanup (this, priv->cf.fsync_th); +                (void) changelog_thread_cleanup (this, priv->cf.fsync_th);                  priv->cf.fsync_th = 0;          }  } @@ -1754,67 +1855,6 @@ changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv)          return ret;  } -/* cleanup the notifier thread */ -static int -changelog_cleanup_notifier (xlator_t *this, changelog_priv_t *priv) -{ -        int ret = 0; - -        if (priv->cn.notify_th) { -                changelog_thread_cleanup (this, priv->cn.notify_th); -                priv->cn.notify_th = 0; - -                ret = close (priv->wfd); -                if (ret) -                        gf_log (this->name, GF_LOG_ERROR, -                                "error closing writer end of notifier pipe" -                                " (reason: %s)", strerror (errno)); -        } - -        return ret; -} - -/* spawn the notifier thread - nop if already running */ -static int -changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv) -{ -        int ret        = 0; -        int flags      = 0; -        int pipe_fd[2] = {0, 0}; - -        if (priv->cn.notify_th) -                goto out; /* notifier thread already running */ - -        ret = pipe (pipe_fd); -        if (ret == -1) { -                gf_log (this->name, GF_LOG_ERROR, -                        "Cannot create pipe (reason: %s)", strerror (errno)); -                goto out; -        } - -        /* writer is non-blocking */ -        flags = fcntl (pipe_fd[1], F_GETFL); -        flags |= O_NONBLOCK; - -        ret = fcntl (pipe_fd[1], F_SETFL, flags); -        if (ret) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to set O_NONBLOCK flag"); -                goto out; -        } - -        priv->wfd = pipe_fd[1]; - -        priv->cn.this = this; -        priv->cn.rfd  = pipe_fd[0]; - -        ret = gf_thread_create (&priv->cn.notify_th, -				NULL, changelog_notifier, priv); - - out: -        return ret; -} -  int  notify (xlator_t *this, int event, void *data, ...)  { @@ -2054,11 +2094,6 @@ changelog_init (xlator_t *this, changelog_priv_t *priv)          if (!priv->active)                  return ret; -        /* spawn the notifier thread */ -        ret = changelog_spawn_notifier (this, priv); -        if (ret) -                goto out; -          /**           * start with a fresh changelog file every time. this is done           * in case there was an encoding change. so... things are kept @@ -2086,9 +2121,11 @@ changelog_init (xlator_t *this, changelog_priv_t *priv)          return ret;  } -/* Init all pthread condition variables and locks in changelog*/ +/** + * Init barrier related condition variables and locks + */  static int -changelog_pthread_init (xlator_t *this, changelog_priv_t *priv) +changelog_barrier_pthread_init (xlator_t *this, changelog_priv_t *priv)  {          gf_boolean_t    bn_mutex_init         = _gf_false;          gf_boolean_t    bn_cond_init          = _gf_false; @@ -2165,9 +2202,9 @@ changelog_pthread_init (xlator_t *this, changelog_priv_t *priv)          return ret;  } -/* Destroy all pthread condition variables and locks in changelog */ +/* Destroy barrier related condition variables and locks */  static inline void -changelog_pthread_destroy (changelog_priv_t *priv) +changelog_barrier_pthread_destroy (changelog_priv_t *priv)  {          pthread_mutex_destroy (&priv->bn.bnotify_mutex);          pthread_cond_destroy (&priv->bn.bnotify_cond); @@ -2284,17 +2321,13 @@ reconfigure (xlator_t *this, dict_t *options)                                  }                                  htime_open(this, priv, tv.tv_sec);                          } -                        ret = changelog_spawn_notifier (this, priv); -                        if (!ret) -                                ret = changelog_spawn_helper_threads (this, -                                                                      priv); -                } else -                        ret = changelog_cleanup_notifier (this, priv); +                        ret = changelog_spawn_helper_threads (this, priv); +                }          }   out:          if (ret) { -                ret = changelog_cleanup_notifier (this, priv); +                /* TODO */          } else {                  gf_log (this->name, GF_LOG_DEBUG,                          "changelog reconfigured"); @@ -2305,67 +2338,40 @@ reconfigure (xlator_t *this, dict_t *options)          return ret;  } -int32_t -init (xlator_t *this) +static void +changelog_freeup_options (xlator_t *this, changelog_priv_t *priv)  { -        int                     ret                     = -1; -        char                    *tmp                    = NULL; -        changelog_priv_t        *priv                   = NULL; -        gf_boolean_t            cond_lock_init          = _gf_false; -        char                    htime_dir[PATH_MAX]     = {0,}; -        char                    csnap_dir[PATH_MAX]     = {0,}; -        uint32_t                timeout                 = 0; - -        GF_VALIDATE_OR_GOTO ("changelog", this, out); - -        if (!this->children || this->children->next) { -                gf_log (this->name, GF_LOG_ERROR, -                        "translator needs a single subvolume"); -                goto out; -        } - -        if (!this->parents) { -                gf_log (this->name, GF_LOG_ERROR, -                        "dangling volume. please check volfile"); -                goto out; -        } - -        priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t); -        if (!priv) -                goto out; +        int ret = 0; -        this->local_pool = mem_pool_new (changelog_local_t, 64); -        if (!this->local_pool) { +        ret = priv->cb->dtor (this, &priv->cd); +        if (ret)                  gf_log (this->name, GF_LOG_ERROR, -                        "failed to create local memory pool"); -                goto out; -        } - -        LOCK_INIT (&priv->lock); -        LOCK_INIT (&priv->c_snap_lock); +                        "could not cleanup bootstrapper"); +        GF_FREE (priv->changelog_brick); +        GF_FREE (priv->changelog_dir); +} -        GF_OPTION_INIT ("changelog-brick", tmp, str, out); -        if (!tmp) { -                gf_log (this->name, GF_LOG_ERROR, -                        "\"changelog-brick\" option is not set"); -                goto out; -        } +static int +changelog_init_options (xlator_t *this, changelog_priv_t *priv) +{ +        int       ret            = 0; +        char     *tmp            = NULL; +        uint32_t  timeout        = 0; +        char htime_dir[PATH_MAX] = {0,}; +        char csnap_dir[PATH_MAX] = {0,}; +        GF_OPTION_INIT ("changelog-brick", tmp, str, error_return);          priv->changelog_brick = gf_strdup (tmp);          if (!priv->changelog_brick) -                goto out; -        tmp = NULL; +                goto error_return; -        GF_OPTION_INIT ("changelog-dir", tmp, str, out); -        if (!tmp) { -                gf_log (this->name, GF_LOG_ERROR, -                        "\"changelog-dir\" option is not set"); -                goto out; -        } +        tmp = NULL; +        GF_OPTION_INIT ("changelog-dir", tmp, str, dealloc_1);          priv->changelog_dir = gf_strdup (tmp);          if (!priv->changelog_dir) -                goto out; +                goto dealloc_1; +          tmp = NULL;          /** @@ -2375,35 +2381,38 @@ init (xlator_t *this)          ret = mkdir_p (priv->changelog_dir, 0600, _gf_true);          if (ret) -                goto out; +                goto dealloc_2; -        CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir); +        CHANGELOG_FILL_HTIME_DIR (priv->changelog_dir, htime_dir);          ret = mkdir_p (htime_dir, 0600, _gf_true);          if (ret) -                goto out; +                goto dealloc_2; -        CHANGELOG_FILL_CSNAP_DIR(priv->changelog_dir, csnap_dir); +        CHANGELOG_FILL_CSNAP_DIR (priv->changelog_dir, csnap_dir);          ret = mkdir_p (csnap_dir, 0600, _gf_true);          if (ret) -                goto out; +                goto dealloc_2; -        GF_OPTION_INIT ("changelog", priv->active, bool, out); +        GF_OPTION_INIT ("changelog", priv->active, bool, dealloc_2); -        GF_OPTION_INIT ("op-mode", tmp, str, out); +        GF_OPTION_INIT ("op-mode", tmp, str, dealloc_2);          changelog_assign_opmode (priv, tmp);          tmp = NULL; -        GF_OPTION_INIT ("encoding", tmp, str, out); +        GF_OPTION_INIT ("encoding", tmp, str, dealloc_2);          changelog_assign_encoding (priv, tmp); +        changelog_encode_change (priv); -        GF_OPTION_INIT ("rollover-time", priv->rollover_time, int32, out); +        GF_OPTION_INIT ("rollover-time", +                        priv->rollover_time, int32, dealloc_2); -        GF_OPTION_INIT ("fsync-interval", priv->fsync_interval, int32, out); -        GF_OPTION_INIT ("changelog-barrier-timeout", timeout, time, out); -        priv->timeout.tv_sec = timeout; +        GF_OPTION_INIT ("fsync-interval", +                        priv->fsync_interval, int32, dealloc_2); -        changelog_encode_change(priv); +        GF_OPTION_INIT ("changelog-barrier-timeout", +                        timeout, time, dealloc_2); +        changelog_assign_barrier_timeout (priv, timeout);          GF_ASSERT (cb_bootstrap[priv->op_mode].mode == priv->op_mode);          priv->cb = &cb_bootstrap[priv->op_mode]; @@ -2411,10 +2420,111 @@ init (xlator_t *this)          /* ... now bootstrap the logger */          ret = priv->cb->ctor (this, &priv->cd);          if (ret) -                goto out; +                goto dealloc_2;          priv->changelog_fd = -1; +        return 0; + + dealloc_2: +        GF_FREE (priv->changelog_dir); + dealloc_1: +        GF_FREE (priv->changelog_brick); + error_return: +        return -1; +} + +static void +changelog_cleanup_rpc (xlator_t *this, changelog_priv_t *priv) +{ +        /* terminate rpc server */ +        changelog_destroy_rpc_listner (this, priv); + +        /* cleanup rot buffs */ +        rbuf_dtor (priv->rbuf); + +        /* cleanup poller thread */ +        (void) changelog_thread_cleanup (this, priv->poller); +} + +static int +changelog_init_rpc (xlator_t *this, changelog_priv_t *priv) +{ +        int        ret      = 0; +        rpcsvc_t  *rpc      = NULL; +        changelog_ev_selector_t *selection = NULL; + +        selection = &priv->ev_selection; + +        /* initialize event selection */ +        changelog_init_event_selection (this, selection); + +        ret = pthread_create (&priv->poller, NULL, changelog_rpc_poller, this); +        if (ret != 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "failed to spawn poller thread"); +                goto error_return; +        } + +        priv->rbuf = rbuf_init (NR_ROTT_BUFFS); +        if (!priv->rbuf) +                goto cleanup_thread; + +        rpc = changelog_init_rpc_listner (this, priv, +                                          priv->rbuf, NR_DISPATCHERS); +        if (!rpc) +                goto cleanup_rbuf; +        priv->rpc = rpc; + +        return 0; + + cleanup_rbuf: +        rbuf_dtor (priv->rbuf); + cleanup_thread: +        (void) changelog_thread_cleanup (this, priv->poller); + error_return: +        return -1; +} + +int32_t +init (xlator_t *this) +{ +        int               ret  = -1; +        char             *tmp  = NULL; +        changelog_priv_t *priv = NULL; + +        GF_VALIDATE_OR_GOTO ("changelog", this, error_return); + +        if (!this->children || this->children->next) { +                gf_log (this->name, GF_LOG_ERROR, +                        "translator needs a single subvolume"); +                goto error_return; +        } + +        if (!this->parents) { +                gf_log (this->name, GF_LOG_ERROR, +                        "dangling volume. please check volfile"); +                goto error_return; +        } + +        priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t); +        if (!priv) +                goto error_return; + +        this->local_pool = mem_pool_new (changelog_local_t, 64); +        if (!this->local_pool) { +                gf_log (this->name, GF_LOG_ERROR, +                        "failed to create local memory pool"); +                goto cleanup_priv; +        } + +        LOCK_INIT (&priv->lock); +        LOCK_INIT (&priv->c_snap_lock); + +        ret = changelog_init_options (this, priv); +        if (ret) +                goto cleanup_mempool; +          /* snap dependency changes */          priv->dm.black_fop_cnt = 0;          priv->dm.white_fop_cnt = 0; @@ -2422,67 +2532,68 @@ init (xlator_t *this)          priv->dm.drain_wait_white = _gf_false;          priv->current_color = FOP_COLOR_BLACK;          priv->explicit_rollover = _gf_false; +          /* Mutex is not needed as threads are not spawned yet */          priv->bn.bnotify = _gf_false; -        ret = changelog_pthread_init (this, priv); +        ret = changelog_barrier_pthread_init (this, priv);          if (ret) -                goto out; - +                goto cleanup_options;          LOCK_INIT (&priv->bflags.lock); -        cond_lock_init = _gf_true;          priv->bflags.barrier_ext = _gf_false;          /* Changelog barrier init */          INIT_LIST_HEAD (&priv->queue);          priv->barrier_enabled = _gf_false; -        ret = changelog_init (this, priv); +        /* RPC ball rolling.. */ +        ret = changelog_init_rpc (this, priv);          if (ret) -                goto out; +                goto cleanup_barrier; +        ret = changelog_init (this, priv); +        if (ret) +                goto cleanup_rpc;          gf_log (this->name, GF_LOG_DEBUG, "changelog translator loaded"); - out: -        if (ret) { -                if (this && this->local_pool) -                        mem_pool_destroy (this->local_pool); -                if (priv) { -                        if (priv->cb) { -                                ret = priv->cb->dtor (this, &priv->cd); -                                if (ret) -                                        gf_log (this->name, GF_LOG_ERROR, -                                        "error in cleanup during init()"); -                        } -                        GF_FREE (priv->changelog_brick); -                        GF_FREE (priv->changelog_dir); -                        if (cond_lock_init) -                                changelog_pthread_destroy (priv); -                        GF_FREE (priv); -                } -                this->private = NULL; -        } else -                this->private = priv; +        this->private = priv; +        return 0; -        return ret; + cleanup_rpc: +        changelog_cleanup_rpc (this, priv); + cleanup_barrier: +        changelog_barrier_pthread_destroy (priv); + cleanup_options: +        changelog_freeup_options (this, priv); + cleanup_mempool: +        mem_pool_destroy (this->local_pool); + cleanup_priv: +        GF_FREE (priv); + error_return: +        this->private = NULL; +        return -1;  }  void  fini (xlator_t *this)  { -        int               ret  = -1;          changelog_priv_t *priv = NULL;          priv = this->private;          if (priv) { -                ret = priv->cb->dtor (this, &priv->cd); -                if (ret) -                        gf_log (this->name, GF_LOG_ERROR, -                                "error in fini"); +                /* terminate RPC server/threads */ +                changelog_cleanup_rpc (this, priv); + +                /* cleanup barrier related objects */ +                changelog_barrier_pthread_destroy (priv); + +                /* cleanup allocated options */ +                changelog_freeup_options (this, priv); + +                /* deallocate mempool */                  mem_pool_destroy (this->local_pool); -                GF_FREE (priv->changelog_brick); -                GF_FREE (priv->changelog_dir); -                changelog_pthread_destroy (priv); + +                /* finally, dealloac private variable */                  GF_FREE (priv);          } @@ -2492,6 +2603,7 @@ fini (xlator_t *this)  }  struct xlator_fops fops = { +        .open         = changelog_open,          .mknod        = changelog_mknod,          .mkdir        = changelog_mkdir,          .create       = changelog_create, @@ -2513,6 +2625,7 @@ struct xlator_fops fops = {  struct xlator_cbks cbks = {          .forget = changelog_forget, +        .release = changelog_release,  };  struct volume_options options[] = {  | 
