diff options
35 files changed, 5053 insertions, 16 deletions
diff --git a/Makefile.am b/Makefile.am index 970ffdf7..85054dca 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,7 +1,7 @@  EXTRA_DIST = autogen.sh \  	COPYING-GPLV2 COPYING-LGPLV3 \  	INSTALL README AUTHORS THANKS NEWS \ -	glusterfs.spec glusterfs-api.pc.in \ +	glusterfs.spec glusterfs-api.pc.in libgfchangelog.pc.in \  	error-codes.json gf-error-codes.h.template \  	gen-headers.py @@ -9,7 +9,7 @@ SUBDIRS = argp-standalone libglusterfs rpc api xlators glusterfsd \  	$(FUSERMOUNT_SUBDIR) doc extras cli  pkgconfigdir = @pkgconfigdir@ -pkgconfig_DATA = glusterfs-api.pc +pkgconfig_DATA = glusterfs-api.pc libgfchangelog.pc  CLEANFILES = diff --git a/configure.ac b/configure.ac index 94e08781..c9a816d7 100644 --- a/configure.ac +++ b/configure.ac @@ -95,6 +95,10 @@ AC_CONFIG_FILES([Makefile                  xlators/protocol/server/Makefile                  xlators/protocol/server/src/Makefile                  xlators/features/Makefile +		xlators/features/changelog/Makefile +		xlators/features/changelog/src/Makefile +		xlators/features/changelog/lib/Makefile +		xlators/features/changelog/lib/src/Makefile                  xlators/features/glupy/Makefile                  xlators/features/glupy/src/Makefile                  xlators/features/locks/Makefile @@ -151,6 +155,7 @@ AC_CONFIG_FILES([Makefile                  contrib/fuse-util/Makefile                  contrib/uuid/uuid_types.h                  glusterfs-api.pc +                libgfchangelog.pc                  api/Makefile                  api/src/Makefile                  glusterfs.spec]) diff --git a/glusterfs.spec.in b/glusterfs.spec.in index 96918fac..27fb5197 100644 --- a/glusterfs.spec.in +++ b/glusterfs.spec.in @@ -622,6 +622,7 @@ fi  %files api-devel  %{_libdir}/pkgconfig/glusterfs-api.pc +%{_libdir}/pkgconfig/libgfchangelog.pc  %{_libdir}/libgfapi.so  %{_includedir}/glusterfs/api/* diff --git a/libgfchangelog.pc.in b/libgfchangelog.pc.in new file mode 100644 index 00000000..869e587a --- /dev/null +++ b/libgfchangelog.pc.in @@ -0,0 +1,11 @@ +prefix=@prefix@ +exec_prefix=@exec_prefix@ +libdir=@libdir@ +includedir=@includedir@ + + +Name: libgfchangelog +Description: GlusterFS Changelog Consumer Library +Version: @VERSION@ +Libs: -Wl,-R${libdir} -L${libdir} -lgfchangelog -lglusterfs +Cflags: -I${includedir}/glusterfs/gfchangelog -D_FILE_OFFSET_BITS=64 -D__USE_FILE_OFFSET64 diff --git a/libglusterfs/src/common-utils.c b/libglusterfs/src/common-utils.c index 034e3da2..6eb886b3 100644 --- a/libglusterfs/src/common-utils.c +++ b/libglusterfs/src/common-utils.c @@ -59,6 +59,16 @@ struct dnscache6 {          struct addrinfo *next;  }; +void +md5_wrapper(const unsigned char *data, size_t len, char *md5) +{ +        unsigned short i = 0; +        unsigned short lim = MD5_DIGEST_LENGTH*2+1; +        unsigned char scratch[MD5_DIGEST_LENGTH] = {0,}; +        MD5(data, len, scratch); +        for (; i < MD5_DIGEST_LENGTH; i++) +                snprintf(md5 + i * 2, lim-i*2, "%02x", scratch[i]); +}  /* works similar to mkdir(1) -p.   */ diff --git a/libglusterfs/src/common-utils.h b/libglusterfs/src/common-utils.h index b0d9c18b..bf41444d 100644 --- a/libglusterfs/src/common-utils.h +++ b/libglusterfs/src/common-utils.h @@ -23,6 +23,7 @@  #include <string.h>  #include <assert.h>  #include <pthread.h> +#include <openssl/md5.h>  #ifndef GF_BSD_HOST_OS  #include <alloca.h>  #endif @@ -584,5 +585,6 @@ gf_boolean_t gf_ports_reserved (char *blocked_port, gf_boolean_t *ports);  int gf_get_hostname_from_ip (char *client_ip, char **hostname);  gf_boolean_t gf_is_local_addr (char *hostname);  gf_boolean_t gf_is_same_address (char *host1, char *host2); +void md5_wrapper(const unsigned char *data, size_t len, char *md5);  #endif /* _COMMON_UTILS_H */ diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index 6c09e89b..dbb2b58d 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -156,6 +156,8 @@  #define GF_REMOVE_BRICK_TID_KEY  "remove-brick-id"  #define GF_REPLACE_BRICK_TID_KEY "replace-brick-id" +#define UUID_CANONICAL_FORM_LEN  36 +  /* NOTE: add members ONLY at the end (just before _MAXVALUE) */  typedef enum {          GF_FOP_NULL = 0, diff --git a/xlators/features/Makefile.am b/xlators/features/Makefile.am index 6a73301d..5edffaba 100644 --- a/xlators/features/Makefile.am +++ b/xlators/features/Makefile.am @@ -1,4 +1,4 @@  SUBDIRS = locks quota read-only mac-compat quiesce marker index \ -	  protect $(GLUPY_SUBDIR) # trash path-converter # filter +	  protect changelog $(GLUPY_SUBDIR) # trash path-converter # filter  CLEANFILES = diff --git a/xlators/features/changelog/Makefile.am b/xlators/features/changelog/Makefile.am new file mode 100644 index 00000000..153bb685 --- /dev/null +++ b/xlators/features/changelog/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src lib + +CLEANFILES = diff --git a/xlators/features/changelog/lib/Makefile.am b/xlators/features/changelog/lib/Makefile.am new file mode 100644 index 00000000..a985f42a --- /dev/null +++ b/xlators/features/changelog/lib/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src + +CLEANFILES = diff --git a/xlators/features/changelog/lib/examples/c/get-changes.c b/xlators/features/changelog/lib/examples/c/get-changes.c new file mode 100644 index 00000000..14562585 --- /dev/null +++ b/xlators/features/changelog/lib/examples/c/get-changes.c @@ -0,0 +1,87 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +/** + * get set of new changes every 10 seconds (just print the file names) + * + * Compile it using: + *  gcc -o getchanges `pkg-config --cflags libgfchangelog` get-changes.c \ + *  `pkg-config --libs libgfchangelog` + */ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/un.h> +#include <limits.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <errno.h> + +#include "changelog.h" + +#define handle_error(fn)                                \ +        printf ("%s (reason: %s)\n", fn, strerror (errno)) + +int +main (int argc, char ** argv) +{ +        int     i           = 0; +        int     ret         = 0; +        ssize_t nr_changes  = 0; +        ssize_t changes     = 0; +        char fbuf[PATH_MAX] = {0,}; + +        /* get changes for brick "/home/vshankar/export/yow/yow-1" */ +        ret = gf_changelog_register ("/home/vshankar/export/yow/yow-1", +                                     "/tmp/scratch", "/tmp/change.log", 9, 5); +        if (ret) { +                handle_error ("register failed"); +                goto out; +        } + +        while (1) { +                i = 0; +                nr_changes = gf_changelog_scan (); +                if (nr_changes < 0) { +                        handle_error ("scan(): "); +                        break; +                } + +                if (nr_changes == 0) +                        goto next; + +                printf ("Got %ld changelog files\n", nr_changes); + +                while ( (changes = +                         gf_changelog_next_change (fbuf, PATH_MAX)) > 0) { +                        printf ("changelog file [%d]: %s\n", ++i, fbuf); + +                        /* process changelog */ +                        /* ... */ +                        /* ... */ +                        /* ... */ +                        /* done processing */ + +                        ret = gf_changelog_done (fbuf); +                        if (ret) +                                handle_error ("gf_changelog_done"); +                } + +                if (changes == -1) +                        handle_error ("gf_changelog_next_change"); + +        next: +                sleep (10); +        } + + out: +        return ret; +} diff --git a/xlators/features/changelog/lib/examples/python/changes.py b/xlators/features/changelog/lib/examples/python/changes.py new file mode 100644 index 00000000..d21db8ea --- /dev/null +++ b/xlators/features/changelog/lib/examples/python/changes.py @@ -0,0 +1,32 @@ +#!/usr/bin/python + +import os +import sys +import time +import libgfchangelog + +cl = libgfchangelog.Changes() + +def get_changes(brick, scratch_dir, log_file, log_level, interval): +    change_list = [] +    try: +        cl.cl_register(brick, scratch_dir, log_file, log_level) +        while True: +            cl.cl_scan() +            change_list = cl.cl_getchanges() +            if change_list: +                print change_list +            for change in change_list: +                print('done with %s' % (change)) +                cl.cl_done(change) +            time.sleep(interval) +    except OSError: +        ex = sys.exc_info()[1] +        print ex + +if __name__ == '__main__': +    if len(sys.argv) != 5: +        print("usage: %s <brick> <scratch-dir> <log-file> <fetch-interval>" +              % (sys.argv[0])) +        sys.exit(1) +    get_changes(sys.argv[1], sys.argv[2], sys.argv[3], 9, int(sys.argv[4])) diff --git a/xlators/features/changelog/lib/examples/python/libgfchangelog.py b/xlators/features/changelog/lib/examples/python/libgfchangelog.py new file mode 100644 index 00000000..68ec3baf --- /dev/null +++ b/xlators/features/changelog/lib/examples/python/libgfchangelog.py @@ -0,0 +1,64 @@ +import os +from ctypes import * +from ctypes.util import find_library + +class Changes(object): +    libgfc = CDLL(find_library("gfchangelog"), use_errno=True) + +    @classmethod +    def geterrno(cls): +        return get_errno() + +    @classmethod +    def raise_oserr(cls): +        errn = cls.geterrno() +        raise OSError(errn, os.strerror(errn)) + +    @classmethod +    def _get_api(cls, call): +        return getattr(cls.libgfc, call) + +    @classmethod +    def cl_register(cls, brick, path, log_file, log_level, retries = 0): +        ret = cls._get_api('gf_changelog_register')(brick, path, +                                                    log_file, log_level, retries) +        if ret == -1: +            cls.raise_oserr() + +    @classmethod +    def cl_scan(cls): +        ret = cls._get_api('gf_changelog_scan')() +        if ret == -1: +            cls.raise_oserr() + +    @classmethod +    def cl_startfresh(cls): +        ret = cls._get_api('gf_changelog_start_fresh')() +        if ret == -1: +            cls.raise_oserr() + +    @classmethod +    def cl_getchanges(cls): +        """ remove hardcoding for path name length """ +        def clsort(f): +            return f.split('.')[-1] +        changes = [] +        buf = create_string_buffer('\0', 4096) +        call = cls._get_api('gf_changelog_next_change') + +        while True: +            ret = call(buf, 4096) +            if ret in (0, -1): +                break; +            changes.append(buf.raw[:ret-1]) +        if ret == -1: +            cls.raise_oserr() +        # cleanup tracker +        cls.cl_startfresh() +        return sorted(changes, key=clsort) + +    @classmethod +    def cl_done(cls, clfile): +        ret = cls._get_api('gf_changelog_done')(clfile) +        if ret == -1: +            cls.raise_oserr() diff --git a/xlators/features/changelog/lib/src/Makefile.am b/xlators/features/changelog/lib/src/Makefile.am new file mode 100644 index 00000000..fbaaea62 --- /dev/null +++ b/xlators/features/changelog/lib/src/Makefile.am @@ -0,0 +1,37 @@ +libgfchangelog_la_CFLAGS = -Wall $(GF_CFLAGS) $(GF_DARWIN_LIBGLUSTERFS_CFLAGS) \ +			-DDATADIR=\"$(localstatedir)\" + +libgfchangelog_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 -fpic \ +				-I../../../src/ -I$(top_srcdir)/libglusterfs/src \ +				-I$(top_srcdir)/xlators/features/changelog/src \ +				-DDATADIR=\"$(localstatedir)\" + +libgfchangelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \ +			$(GF_GLUSTERFS_LIBS) + +libgfchangelog_la_LDFLAGS = $(GF_LDFLAGS) + +libgfchangelogdir = $(includedir)/glusterfs/gfchangelog +lib_LTLIBRARIES = libgfchangelog.la + +CONTRIB_BUILDDIR = $(top_builddir)/contrib + +libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-process.c \ +		gf-changelog-helpers.c $(CONTRIBDIR)/uuid/clear.c \ +		$(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c \ +		$(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/parse.c \ +		$(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c \ +		$(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c \ +		$(CONTRIBDIR)/uuid/unpack.c + +noinst_HEADERS = gf-changelog-helpers.h $(CONTRIBDIR)/uuid/uuidd.h \ +		$(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h \ +		$(CONTRIB_BUILDDIR)/uuid/uuid_types.h + +libgfchangelog_HEADERS = changelog.h + +CLEANFILES = +CONFIG_CLEAN_FILES = $(CONTRIB_BUILDDIR)/uuid/uuid_types.h + +$(top_builddir)/libglusterfs/src/libglusterfs.la: +	$(MAKE) -C $(top_builddir)/libglusterfs/src/ all diff --git a/xlators/features/changelog/lib/src/changelog.h b/xlators/features/changelog/lib/src/changelog.h new file mode 100644 index 00000000..5cddfb58 --- /dev/null +++ b/xlators/features/changelog/lib/src/changelog.h @@ -0,0 +1,31 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _GF_CHANGELOG_H +#define _GF_CHANGELOG_H + +/* API set */ + +int +gf_changelog_register (char *brick_path, char *scratch_dir, +                       char *log_file, int log_levl, int max_reconnects); +ssize_t +gf_changelog_scan (); + +int +gf_changelog_start_fresh (); + +ssize_t +gf_changelog_next_change (char *bufptr, size_t maxlen); + +int +gf_changelog_done (char *file); + +#endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.c b/xlators/features/changelog/lib/src/gf-changelog-helpers.c new file mode 100644 index 00000000..1eef8bf0 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.c @@ -0,0 +1,180 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "changelog-mem-types.h" +#include "gf-changelog-helpers.h" + +ssize_t gf_changelog_read_path (int fd, char *buffer, size_t bufsize) +{ +        return read (fd, buffer, bufsize); +} + +size_t +gf_changelog_write (int fd, char *buffer, size_t len) +{ +        ssize_t size = 0; +        size_t writen = 0; + +        while (writen < len) { +                size = write (fd, +                              buffer + writen, len - writen); +                if (size <= 0) +                        break; + +                writen += size; +        } + +        return writen; +} + +void +gf_rfc3986_encode (unsigned char *s, char *enc, char *estr) +{ +        for (; *s; s++) { +                if (estr[*s]) +                        sprintf(enc, "%c", estr[*s]); +                else +                        sprintf(enc, "%%%02X", *s); +                while (*++enc); +        } +} + +/** + * thread safe version of readline with buffering + * (taken from Unix Network Programming Volume I, W.R. Stevens) + * + * This is favoured over fgets() as we'd need to ftruncate() + * (see gf_changelog_scan() API) to record new changelog files. + * stream open functions does have a truncate like api (although + * that can be done via @fflush(fp), @ftruncate(fd) and @fseek(fp), + * but this involves mixing POSIX file descriptors and stream FILE *). + * + * NOTE: This implmentation still does work with more than one fd's + *       used to perform gf_readline(). For this very reason it's not + *       made a part of libglusterfs. + */ + +static pthread_key_t rl_key; +static pthread_once_t rl_once = PTHREAD_ONCE_INIT; + +static void +readline_destructor (void *ptr) +{ +        GF_FREE (ptr); +} + +static void +readline_once (void) +{ +        pthread_key_create (&rl_key, readline_destructor); +} + +static ssize_t +my_read (read_line_t *tsd, int fd, char *ptr) +{ +        if (tsd->rl_cnt <= 0) { +                if ( (tsd->rl_cnt = read (fd, tsd->rl_buf, MAXLINE)) < 0 ) +                        return -1; +                else if (tsd->rl_cnt == 0) +                        return 0; +                tsd->rl_bufptr = tsd->rl_buf; +        } + +        tsd->rl_cnt--; +        *ptr = *tsd->rl_bufptr++; +        return 1; +} + +static int +gf_readline_init_once (read_line_t **tsd) +{ +        if (pthread_once (&rl_once, readline_once) != 0) +                return -1; + +        *tsd = pthread_getspecific (rl_key); +        if (*tsd) +                goto out; + +        *tsd = GF_CALLOC (1, sizeof (**tsd), +                          gf_changelog_mt_libgfchangelog_rl_t); +        if (!*tsd) +                return -1; + +        if (pthread_setspecific (rl_key, *tsd) != 0) +                return -1; + + out: +        return 0; +} + +ssize_t +gf_readline (int fd, void *vptr, size_t maxlen) +{ +        size_t       n   = 0; +        size_t       rc  = 0; +        char         c   = ' '; +        char        *ptr = NULL; +        read_line_t *tsd = NULL; + +        if (gf_readline_init_once (&tsd)) +                return -1; + +        ptr = vptr; +        for (n = 1; n < maxlen; n++) { +                if ( (rc = my_read (tsd, fd, &c)) == 1 ) { +                        *ptr++ = c; +                        if (c == '\n') +                                break; +                } else if (rc == 0) { +                        *ptr = '\0'; +                        return (n - 1); +                } else +                        return -1; +        } + +        *ptr = '\0'; +        return n; + +} + +off_t +gf_lseek (int fd, off_t offset, int whence) +{ +        off_t        off = 0; +        read_line_t *tsd = NULL; + +        if (gf_readline_init_once (&tsd)) +                return -1; + +        if ( (off = lseek (fd, offset, whence)) == -1) +                return -1; + +        tsd->rl_cnt = 0; +        tsd->rl_bufptr = tsd->rl_buf; + +        return off; +} + +int +gf_ftruncate (int fd, off_t length) +{ +        read_line_t *tsd = NULL; + +        if (gf_readline_init_once (&tsd)) +                return -1; + +        if (ftruncate (fd, 0)) +                return -1; + +        tsd->rl_cnt = 0; +        tsd->rl_bufptr = tsd->rl_buf; + +        return 0; +} diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h new file mode 100644 index 00000000..3aa6ed7b --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h @@ -0,0 +1,97 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _GF_CHANGELOG_HELPERS_H +#define _GF_CHANGELOG_HELPERS_H + +#include <unistd.h> +#include <dirent.h> +#include <limits.h> +#include <pthread.h> + +#include <xlator.h> + +#define GF_CHANGELOG_TRACKER  "tracker" + +#define GF_CHANGELOG_CURRENT_DIR    ".current" +#define GF_CHANGELOG_PROCESSED_DIR  ".processed" +#define GF_CHANGELOG_PROCESSING_DIR ".processing" + +#ifndef MAXLINE +#define MAXLINE 4096 +#endif + +#define GF_CHANGELOG_FILL_BUFFER(ptr, ascii, off, len) do {     \ +                memcpy (ascii + off, ptr, len);                 \ +                off += len;                                     \ +        } while (0) + +typedef struct read_line { +        int rl_cnt; +        char *rl_bufptr; +        char rl_buf[MAXLINE]; +} read_line_t; + +typedef struct gf_changelog { +        xlator_t *this; + +        /* 'processing' directory stream */ +        DIR *gfc_dir; + +        /* fd to the tracker file */ +        int gfc_fd; + +        /* connection retries */ +        int gfc_connretries; + +        char gfc_sockpath[PATH_MAX]; + +        char gfc_brickpath[PATH_MAX]; + +        /* socket for recieving notifications */ +        int gfc_sockfd; + +        char *gfc_working_dir; + +        /* RFC 3986 string encoding */ +        char rfc3986[256]; + +        char gfc_current_dir[PATH_MAX]; +        char gfc_processed_dir[PATH_MAX]; +        char gfc_processing_dir[PATH_MAX]; + +        pthread_t gfc_changelog_processor; +} gf_changelog_t; + +int +gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc); + +void * +gf_changelog_process (void *data); + +ssize_t +gf_changelog_read_path (int fd, char *buffer, size_t bufsize); + +void +gf_rfc3986_encode (unsigned char *s, char *enc, char *estr); + +size_t +gf_changelog_write (int fd, char *buffer, size_t len); + +ssize_t +gf_readline (int fd, void *vptr, size_t maxlen); + +int +gf_ftruncate (int fd, off_t length); + +off_t +gf_lseek (int fd, off_t offset, int whence); + +#endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-process.c b/xlators/features/changelog/lib/src/gf-changelog-process.c new file mode 100644 index 00000000..df720493 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-process.c @@ -0,0 +1,571 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include <unistd.h> +#include <pthread.h> + +#include "uuid.h" +#include "globals.h" +#include "glusterfs.h" + +#include "gf-changelog-helpers.h" + +/* from the changelog translator */ +#include "changelog-misc.h" + +extern int byebye; + +/** + * number of gfid records after fop number + */ +int nr_gfids[] = { +        [GF_FOP_MKNOD]   = 1, +        [GF_FOP_MKDIR]   = 1, +        [GF_FOP_UNLINK]  = 1, +        [GF_FOP_RMDIR]   = 1, +        [GF_FOP_SYMLINK] = 1, +        [GF_FOP_RENAME]  = 2, +        [GF_FOP_LINK]    = 1, +        [GF_FOP_CREATE]  = 1, +}; + +static char * +binary_to_ascii (uuid_t uuid) +{ +        return uuid_utoa (uuid); +} + +static char * +conv_noop (char *ptr) { return ptr; } + +#define VERIFY_SEPARATOR(ptr, plen, perr)       \ +        {                                       \ +                if (*(ptr + plen) != '\0') {    \ +                        perr = 1;               \ +                        break;                  \ +                }                               \ +        } + +#define MOVER_MOVE(mover, nleft, bytes)         \ +        {                                       \ +                mover += bytes;                 \ +                nleft -= bytes;                 \ +        }                                       \ + +#define PARSE_GFID(mov, ptr, le, fn, perr)                      \ +        {                                                       \ +                VERIFY_SEPARATOR (mov, le, perr);               \ +                ptr = fn (mov);                                 \ +                if (!ptr) {                                     \ +                        perr = 1;                               \ +                        break;                                  \ +                }                                               \ +        } + +#define FILL_AND_MOVE(pt, buf, of, mo, nl, le)                          \ +        {                                                               \ +                GF_CHANGELOG_FILL_BUFFER (pt, buf, of, strlen (pt));    \ +                MOVER_MOVE (mo, nl, le);                                \ +        } + + +#define PARSE_GFID_MOVE(ptr, uuid, mover, nleft, perr)          \ +        {                                                       \ +                memcpy (uuid, mover, sizeof (uuid_t));          \ +                ptr = binary_to_ascii (uuid);                   \ +                if (!ptr) {                                     \ +                        perr = 1;                               \ +                        break;                                  \ +                }                                               \ +                MOVER_MOVE (mover, nleft, sizeof (uuid_t));     \ +        }                                                       \ + +#define LINE_BUFSIZE  3*PATH_MAX /* enough buffer for extra chars too */ + +/** + * using mmap() makes parsing easy. fgets() cannot be used here as + * the binary gfid could contain a line-feed (0x0A), in that case fgets() + * would read an incomplete line and parsing would fail. using POSIX fds + * would result is additional code to maintain state in case of partial + * reads of data (where multiple entries do not fit extirely in the buffer). + * + * mmap() gives the flexibility of pointing to an offset in the file + * without us worrying about reading it in memory (VM does that for us for + * free). + */ + +static int +gf_changelog_parse_binary (xlator_t *this, +                           gf_changelog_t *gfc, int from_fd, int to_fd, +                           size_t start_offset, struct stat *stbuf) + +{ +        int     ret              = -1; +        off_t   off              = 0; +        off_t   nleft            = 0; +        uuid_t  uuid             = {0,}; +        char   *ptr              = NULL; +        char   *bname_start      = NULL; +        char   *bname_end        = NULL; +        char   *mover            = NULL; +        char   *start            = NULL; +        char    current_mover    = ' '; +        size_t  blen             = 0; +        int     parse_err        = 0; +        char ascii[LINE_BUFSIZE] = {0,}; + +        nleft = stbuf->st_size; + +        start = (char *) mmap (NULL, nleft, +                               PROT_READ, MAP_PRIVATE, from_fd, 0); +        if (!start) { +                gf_log (this->name, GF_LOG_ERROR, +                        "mmap() error (reason: %s)", strerror (errno)); +                goto out; +        } + +        mover = start; + +        MOVER_MOVE (mover, nleft, start_offset); + +        while (nleft > 0) { + +                off = blen = 0; +                ptr = bname_start = bname_end = NULL; + +                current_mover = *mover; + +                switch (current_mover) { +                case 'D': +                case 'M': +                        MOVER_MOVE (mover, nleft, 1); +                        PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err); + +                        break; + +                case 'E': +                        MOVER_MOVE (mover, nleft, 1); +                        PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err); + +                        bname_start = mover; +                        if ( (bname_end = strchr (mover, '\n')) == NULL ) { +                                parse_err = 1; +                                break; +                        } + +                        blen = bname_end - bname_start; +                        MOVER_MOVE (mover, nleft, blen); + +                        break; + +                default: +                        parse_err = 1; +                } + +                if (parse_err) +                        break; + +                GF_CHANGELOG_FILL_BUFFER (¤t_mover, ascii, off, 1); +                GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); +                GF_CHANGELOG_FILL_BUFFER (ptr, ascii, off, strlen (ptr)); +                if (blen) +                        GF_CHANGELOG_FILL_BUFFER (bname_start, +                                                  ascii, off, blen); +                GF_CHANGELOG_FILL_BUFFER ("\n", ascii, off, 1); + +                if (gf_changelog_write (to_fd, ascii, off) != off) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "processing binary changelog failed due to " +                                " error in writing ascii change (reason: %s)", +                                strerror (errno)); +                        break; +                } + +                MOVER_MOVE (mover, nleft, 1); +        } + +        if ( (nleft == 0) && (!parse_err)) +                ret = 0; + +        if (munmap (start, stbuf->st_size)) +                gf_log (this->name, GF_LOG_ERROR, +                        "munmap() error (reason: %s)", strerror (errno)); + out: +        return ret; +} + +/** + * ascii decoder: + *  - separate out one entry from another + *  - use fop name rather than fop number + */ +static int +gf_changelog_parse_ascii (xlator_t *this, +                          gf_changelog_t *gfc, int from_fd, int to_fd, +                          size_t start_offset, struct stat *stbuf) +{ +        int         ng            = 0; +        int         ret           = -1; +        int         fop           = 0; +        int         len           = 0; +        off_t       off           = 0; +        off_t       nleft         = 0; +        char       *ptr           = NULL; +        char       *eptr          = NULL; +        char       *start         = NULL; +        char       *mover         = NULL; +        int         parse_err     = 0; +        char        current_mover = ' '; +        char ascii[LINE_BUFSIZE]  = {0,}; +        const char *fopname       = NULL; + +        nleft = stbuf->st_size; + +        start = (char *) mmap (NULL, nleft, +                               PROT_READ, MAP_PRIVATE, from_fd, 0); +        if (!start) { +                gf_log (this->name, GF_LOG_ERROR, +                        "mmap() error (reason: %s)", strerror (errno)); +                goto out; +        } + +        mover = start; + +        MOVER_MOVE (mover, nleft, start_offset); + +        while (nleft > 0) { +                off = 0; +                current_mover = *mover; + +                GF_CHANGELOG_FILL_BUFFER (¤t_mover, ascii, off, 1); +                GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); + +                switch (current_mover) { +                case 'D': +                case 'M': +                        MOVER_MOVE (mover, nleft, 1); + +                        /* target gfid */ +                        PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN, +                                    conv_noop, parse_err); +                        FILL_AND_MOVE(ptr, ascii, off, +                                      mover, nleft, UUID_CANONICAL_FORM_LEN); +                        break; + +                case 'E': +                        MOVER_MOVE (mover, nleft, 1); + +                        /* target gfid */ +                        PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN, +                                    conv_noop, parse_err); +                        FILL_AND_MOVE (ptr, ascii, off, +                                       mover, nleft, UUID_CANONICAL_FORM_LEN); +                        FILL_AND_MOVE (" ", ascii, off, +                                       mover, nleft, 1); + +                        /* fop */ +                        len = strlen (mover); +                        VERIFY_SEPARATOR (mover, len, parse_err); + +                        fop = atoi (mover); +                        if ( (fopname = gf_fop_list[fop]) == NULL) { +                                parse_err = 1; +                                break; +                        } + +                        MOVER_MOVE (mover, nleft, len); + +                        len = strlen (fopname); +                        GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len); + +                        /* pargfid + bname */ +                        ng = nr_gfids[fop]; +                        while (ng-- > 0) { +                                MOVER_MOVE (mover, nleft, 1); +                                len = strlen (mover); +                                GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); + +                                PARSE_GFID (mover, ptr, len, +                                            conv_noop, parse_err); +                                eptr = calloc (3, strlen (ptr)); +                                if (!eptr) { +                                        parse_err = 1; +                                        break; +                                } + +                                gf_rfc3986_encode ((unsigned char *) ptr, +                                                   eptr, gfc->rfc3986); +                                FILL_AND_MOVE (eptr, ascii, off, +                                               mover, nleft, len); +                                free (eptr); +                        } + +                        break; +                default: +                        parse_err = 1; +                } + +                if (parse_err) +                        break; + +                GF_CHANGELOG_FILL_BUFFER ("\n", ascii, off, 1); + +                if (gf_changelog_write (to_fd, ascii, off) != off) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "processing ascii changelog failed due to " +                                " wrror in writing change (reason: %s)", +                                strerror (errno)); +                        break; +                } + +                MOVER_MOVE (mover, nleft, 1); + +        } + +        if ( (nleft == 0) && (!parse_err)) +                ret = 0; + +        if (munmap (start, stbuf->st_size)) +                gf_log (this->name, GF_LOG_ERROR, +                        "munmap() error (reason: %s)", strerror (errno)); + + out: +        return ret; +} + +#define COPY_BUFSIZE  8192 +static int +gf_changelog_copy (xlator_t *this, int from_fd, int to_fd) +{ +        ssize_t size                  = 0; +        char   buffer[COPY_BUFSIZE+1] = {0,}; + +        while (1) { +                size = read (from_fd, buffer, COPY_BUFSIZE); +                if (size <= 0) +                        break; + +                if (gf_changelog_write (to_fd, +                                        buffer, size) != size) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "error processing ascii changlog"); +                        size = -1; +                        break; +                } +        } + +        return (size < 0 ? -1 : 0); +} + +static int +gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd, +                     int to_fd, struct stat *stbuf, int *zerob) +{ +        int    ret        = -1; +        int    encoding   = -1; +        size_t elen       = 0; +        char buffer[1024] = {0,}; + +        CHANGELOG_GET_ENCODING (from_fd, buffer, 1024, encoding, elen); +        if (encoding == -1) /* unknown encoding */ +                goto out; + +        if (!CHANGELOG_VALID_ENCODING (encoding)) +                goto out; + +        if (elen == stbuf->st_size) { +                *zerob = 1; +                goto out; +        } + +        /** +         * start processing after the header +         */ +        lseek (from_fd, elen, SEEK_SET); + +        switch (encoding) { +        case CHANGELOG_ENCODE_BINARY: +                /** +                 * this ideally should have been a part of changelog-encoders.c +                 * (ie. part of the changelog translator). +                 */ +                ret = gf_changelog_parse_binary (this, gfc, from_fd, +                                                 to_fd, elen, stbuf); +                break; + +        case CHANGELOG_ENCODE_ASCII: +                ret = gf_changelog_parse_ascii (this, gfc, from_fd, +                                                to_fd, elen, stbuf); +                break; +        default: +                ret = gf_changelog_copy (this, from_fd, to_fd); +        } + + out: +        return ret; +} + +static int +gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path) +{ +        int         ret        = -1; +        int         fd1        = 0; +        int         fd2        = 0; +        int         zerob      = 0; +        struct stat stbuf      = {0,}; +        char dest[PATH_MAX]    = {0,}; +        char to_path[PATH_MAX] = {0,}; + +        ret = stat (from_path, &stbuf); +        if (ret || !S_ISREG(stbuf.st_mode)) { +                gf_log (this->name, GF_LOG_ERROR, +                        "stat failed on changelog file: %s", from_path); +                goto out; +        } + +        fd1 = open (from_path, O_RDONLY); +        if (fd1 < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "cannot open changelog file: %s (reason: %s)", +                        from_path, strerror (errno)); +                goto out; +        } + +        (void) snprintf (to_path, PATH_MAX, "%s%s", +                         gfc->gfc_current_dir, basename (from_path)); +        (void) snprintf (dest, PATH_MAX, "%s%s", +                         gfc->gfc_processing_dir, basename (from_path)); + +        fd2 = open (to_path, O_CREAT | O_TRUNC | O_RDWR, +                    S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); +        if (fd2 < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "cannot create ascii changelog file %s (reason %s)", +                        to_path, strerror (errno)); +                goto close_fd; +        } else { +                ret = gf_changelog_decode (this, gfc, fd1, +                                           fd2, &stbuf, &zerob); + +                close (fd2); + +                if (!ret) { +                        /* move it to processing on a successfull +                           decode */ +                        ret = rename (to_path, dest); +                        if (ret) +                                gf_log (this->name, GF_LOG_ERROR, +                                        "error moving %s to processing dir" +                                        " (reason: %s)", to_path, +                                        strerror (errno)); +                } + +                /* remove it from .current if it's an empty file */ +                if (zerob) { +                        ret = unlink (to_path); +                        if (ret) +                                gf_log (this->name, GF_LOG_ERROR, +                                        "could not unlink %s (reason: %s", +                                        to_path, strerror (errno)); +                } +        } + + close_fd: +        close (fd1); + + out: +        return ret; +} + +static char * +gf_changelog_ext_change (xlator_t *this, +                         gf_changelog_t *gfc, char *path, size_t readlen) +{ +        int     alo = 0; +        int     ret = 0; +        size_t  len = 0; +        char   *buf = NULL; + +        buf = path; +        while (len < readlen) { +                if (*buf == '\0') { +                        alo = 1; +                        gf_log (this->name, GF_LOG_DEBUG, +                                "processing changelog: %s", path); +                        ret = gf_changelog_consume (this, gfc, path); +                } + +                if (ret) +                        break; + +                len++; buf++; +                if (alo) { +                        alo = 0; +                        path = buf; +                } +        } + +        return (ret) ? NULL : path; +} + +void * +gf_changelog_process (void *data) +{ +        ssize_t         len      = 0; +        ssize_t         offlen   = 0; +        xlator_t       *this     = NULL; +        char           *sbuf     = NULL; +        gf_changelog_t *gfc      = NULL; +        char from_path[PATH_MAX] = {0,}; + +        gfc = (gf_changelog_t *) data; +        this = gfc->this; + +        pthread_detach (pthread_self()); + +        for (;;) { +                len = gf_changelog_read_path (gfc->gfc_sockfd, +                                              from_path + offlen, +                                              PATH_MAX - offlen); +                if (len < 0) +                        continue; /* ignore it for now */ + +                if (len == 0) { /* close() from the changelog translator */ +                        gf_log (this->name, GF_LOG_INFO, "close from changelog" +                                " notification translator."); + +                        if (gfc->gfc_connretries != 1) { +                                if (!gf_changelog_notification_init(this, gfc)) +                                        continue; +                        } + +                        byebye = 1; +                        break; +                } + +                len += offlen; +                sbuf = gf_changelog_ext_change (this, gfc, from_path, len); +                if (!sbuf) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "could not extract changelog filename"); +                        continue; +                } + +                offlen = 0; +                if (sbuf != (from_path + len)) { +                        offlen = from_path + len - sbuf; +                        memmove (from_path, sbuf, offlen); +                } +        } + +        gf_log (this->name, GF_LOG_DEBUG, +                "byebye (%d) from processing thread...", byebye); +        return NULL; +} diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c new file mode 100644 index 00000000..4e60bb27 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog.c @@ -0,0 +1,514 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include <errno.h> +#include <dirent.h> +#include <stddef.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include <string.h> + +#include "globals.h" +#include "glusterfs.h" +#include "logging.h" + +#include "gf-changelog-helpers.h" + +/* from the changelog translator */ +#include "changelog-misc.h" +#include "changelog-mem-types.h" + +int byebye = 0; + +static void +gf_changelog_cleanup (gf_changelog_t *gfc) +{ +        /* socket */ +        if (gfc->gfc_sockfd != -1) +                close (gfc->gfc_sockfd); +        /* tracker fd */ +        if (gfc->gfc_fd != -1) +                close (gfc->gfc_fd); +        /* processing dir */ +        if (gfc->gfc_dir) +                closedir (gfc->gfc_dir); + +        if (gfc->gfc_working_dir) +                free (gfc->gfc_working_dir); /* allocated by realpath */ +} + +void +__attribute__ ((constructor)) gf_changelog_ctor (void) +{ +        glusterfs_ctx_t *ctx = NULL; + +        ctx = glusterfs_ctx_new (); +        if (!ctx) +                return; + +        if (glusterfs_globals_init (ctx)) { +                free (ctx); +                ctx = NULL; +                return; +        } + +        THIS->ctx = ctx; +} + +void +__attribute__ ((destructor)) gf_changelog_dtor (void) +{ +        xlator_t        *this = NULL; +        glusterfs_ctx_t *ctx  = NULL; +        gf_changelog_t  *gfc  = NULL; + +        this = THIS; +        if (!this) +                return; + +        ctx = this->ctx; +        gfc = this->private; + +        if (gfc) { +                gf_changelog_cleanup (gfc); +                GF_FREE (gfc); +        } + +        if (ctx) { +                pthread_mutex_destroy (&ctx->lock); +                free (ctx); +                ctx = NULL; +        } +} + + +static int +gf_changelog_open_dirs (gf_changelog_t *gfc) +{ +        int  ret                    = -1; +        DIR *dir                    = NULL; +        int  tracker_fd             = 0; +        char tracker_path[PATH_MAX] = {0,}; + +        (void) snprintf (gfc->gfc_current_dir, PATH_MAX, +                         "%s/"GF_CHANGELOG_CURRENT_DIR"/", +                         gfc->gfc_working_dir); +        ret = mkdir_p (gfc->gfc_current_dir, 0600, _gf_false); +        if (ret) +                goto out; + +        (void) snprintf (gfc->gfc_processed_dir, PATH_MAX, +                         "%s/"GF_CHANGELOG_PROCESSED_DIR"/", +                         gfc->gfc_working_dir); +        ret = mkdir_p (gfc->gfc_processed_dir, 0600, _gf_false); +        if (ret) +                goto out; + +        (void) snprintf (gfc->gfc_processing_dir, PATH_MAX, +                         "%s/"GF_CHANGELOG_PROCESSING_DIR"/", +                         gfc->gfc_working_dir); +        ret = mkdir_p (gfc->gfc_processing_dir, 0600, _gf_false); +        if (ret) +                goto out; + +        dir = opendir (gfc->gfc_processing_dir); +        if (!dir) { +                gf_log ("", GF_LOG_ERROR, +                        "opendir() error [reason: %s]", strerror (errno)); +                goto out; +        } + +        gfc->gfc_dir = dir; + +        (void) snprintf (tracker_path, PATH_MAX, +                         "%s/"GF_CHANGELOG_TRACKER, gfc->gfc_working_dir); + +        tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR, +                           S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); +        if (tracker_fd < 0) { +                closedir (gfc->gfc_dir); +                ret = -1; +                goto out; +        } + +        gfc->gfc_fd = tracker_fd; +        ret = 0; + out: +        return ret; +} + +int +gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc) +{ +        int                ret    = 0; +        int                len    = 0; +        int                tries  = 0; +        int                sockfd = 0; +        struct sockaddr_un remote; + +        this = gfc->this; + +        if (gfc->gfc_sockfd != -1) { +                gf_log (this->name, GF_LOG_INFO, +                        "Reconnecting..."); +                close (gfc->gfc_sockfd); +        } + +        sockfd = socket (AF_UNIX, SOCK_STREAM, 0); +        if (sockfd < 0) { +                ret = -1; +                goto out; +        } + +        CHANGELOG_MAKE_SOCKET_PATH (gfc->gfc_brickpath, +                                    gfc->gfc_sockpath, PATH_MAX); +        gf_log (this->name, GF_LOG_INFO, +                "connecting to changelog socket: %s (brick: %s)", +                gfc->gfc_sockpath, gfc->gfc_brickpath); + +        remote.sun_family = AF_UNIX; +        strcpy (remote.sun_path, gfc->gfc_sockpath); + +        len = strlen (remote.sun_path) + sizeof (remote.sun_family); + +        while (tries < gfc->gfc_connretries) { +                gf_log (this->name, GF_LOG_WARNING, +                        "connection attempt %d/%d...", +                        tries + 1, gfc->gfc_connretries); + +                /* initiate a connect */ +                if (connect (sockfd, (struct sockaddr *) &remote, len) == 0) { +                        gfc->gfc_sockfd = sockfd; +                        break; +                } + +                tries++; +                sleep (2); +        } + +        if (tries == gfc->gfc_connretries) { +                gf_log (this->name, GF_LOG_ERROR, +                        "could not connect to changelog socket!" +                        " bailing out..."); +                ret = -1; +        } else +                gf_log (this->name, GF_LOG_INFO, +                        "connection successful"); + + out: +        return ret; +} + +int +gf_changelog_done (char *file) +{ +        int             ret    = -1; +        char           *buffer = NULL; +        xlator_t       *this   = NULL; +        gf_changelog_t *gfc    = NULL; +        char to_path[PATH_MAX] = {0,}; + +        errno = EINVAL; + +        this = THIS; +        if (!this) +                goto out; + +        gfc = (gf_changelog_t *) this->private; +        if (!gfc) +                goto out; + +        if (!file || !strlen (file)) +                goto out; + +        /* make sure 'file' is inside ->gfc_working_dir */ +        buffer = realpath (file, NULL); +        if (!buffer) +                goto out; + +        if (strncmp (gfc->gfc_working_dir, +                     buffer, strlen (gfc->gfc_working_dir))) +                goto out; + +        (void) snprintf (to_path, PATH_MAX, "%s%s", +                         gfc->gfc_processed_dir, basename (buffer)); +        gf_log (this->name, GF_LOG_DEBUG, +                "moving %s to processed directory", file); +        ret = rename (buffer, to_path); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "cannot move %s to %s (reason: %s)", +                        file, to_path, strerror (errno)); +                goto out; +        } + +        ret = 0; + + out: +        if (buffer) +                free (buffer); /* allocated by realpath() */ +        return ret; +} + +/** + * @API + *  for a set of changelogs, start from the begining + */ +int +gf_changelog_start_fresh () +{ +        xlator_t *this = NULL; +        gf_changelog_t *gfc = NULL; + +        this = THIS; +        if (!this) +                goto out; + +        errno = EINVAL; + +        gfc = (gf_changelog_t *) this->private; +        if (!gfc) +                goto out; + +        if (gf_ftruncate (gfc->gfc_fd, 0)) +                goto out; + +        return 0; + + out: +        return -1; +} + +/** + * @API + * return the next changelog file entry. zero means all chanelogs + * consumed. + */ +ssize_t +gf_changelog_next_change (char *bufptr, size_t maxlen) +{ +        ssize_t         size       = 0; +        int             tracker_fd = 0; +        xlator_t       *this       = NULL; +        gf_changelog_t *gfc        = NULL; +        char buffer[PATH_MAX]      = {0,}; + +        errno = EINVAL; + +        this = THIS; +        if (!this) +                goto out; + +        gfc = (gf_changelog_t *) this->private; +        if (!gfc) +                goto out; + +        tracker_fd = gfc->gfc_fd; + +        size = gf_readline (tracker_fd, buffer, maxlen); +        if (size < 0) +                goto out; +        if (size == 0) +                return 0; + +        memcpy (bufptr, buffer, size - 1); +        *(buffer + size) = '\0'; + +        return size; + + out: +        return -1; +} + +/** + * @API + *  gf_changelog_scan() - scan and generate a list of change entries + * + * calling this api multiple times (without calling gf_changlog_done()) + * would result new changelogs(s) being refreshed in the tracker file. + * This call also acts as a cancellation point for the consumer. + */ +ssize_t +gf_changelog_scan () +{ +        int             ret        = 0; +        int             tracker_fd = 0; +        size_t          len        = 0; +        size_t          off        = 0; +        xlator_t       *this       = NULL; +        size_t          nr_entries = 0; +        gf_changelog_t *gfc        = NULL; +        struct dirent  *entryp     = NULL; +        struct dirent  *result     = NULL; +        char buffer[PATH_MAX]      = {0,}; + +        this = THIS; +        if (!this) +                goto out; + +        gfc = (gf_changelog_t *) this->private; +        if (!gfc) +                goto out; + +        /** +         * do we need to protect 'byebye' with locks? worst, the +         * consumer would get notified during next scan(). +         */ +        if (byebye) { +                errno = ECONNREFUSED; +                goto out; +        } + +        errno = EINVAL; + +        tracker_fd = gfc->gfc_fd; + +        if (gf_ftruncate (tracker_fd, 0)) +                goto out; + +        len = offsetof(struct dirent, d_name) +                + pathconf(gfc->gfc_processing_dir, _PC_NAME_MAX) + 1; +        entryp = GF_CALLOC (1, len, +                            gf_changelog_mt_libgfchangelog_dirent_t); +        if (!entryp) +                goto out; + +        rewinddir (gfc->gfc_dir); +        while (1) { +                ret = readdir_r (gfc->gfc_dir, entryp, &result); +                if (ret || !result) +                        break; + +                if ( !strcmp (basename (entryp->d_name), ".") +                     || !strcmp (basename (entryp->d_name), "..") ) +                        continue; + +                nr_entries++; + +                GF_CHANGELOG_FILL_BUFFER (gfc->gfc_processing_dir, +                                          buffer, off, +                                          strlen (gfc->gfc_processing_dir)); +                GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer, +                                          off, strlen (entryp->d_name)); +                GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1); + +                if (gf_changelog_write (tracker_fd, buffer, off) != off) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "error writing changelog filename" +                                " to tracker file"); +                        break; +                } +                off = 0; +        } + +        GF_FREE (entryp); + +        if (!result) { +                if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) +                        return nr_entries; +        } + out: +        return -1; +} + +/** + * @API + *  gf_changelog_register() - register a client for updates. + */ +int +gf_changelog_register (char *brick_path, char *scratch_dir, +                       char *log_file, int log_level, int max_reconnects) +{ +        int             i    = 0; +        int             ret  = -1; +        int             errn = 0; +        xlator_t       *this = NULL; +        gf_changelog_t *gfc  = NULL; + +        this = THIS; +        if (!this->ctx) +                goto out; + +        errno = ENOMEM; + +        gfc = GF_CALLOC (1, sizeof (*gfc), +                         gf_changelog_mt_libgfchangelog_t); +        if (!gfc) +                goto out; + +        gfc->this = this; + +        gfc->gfc_dir = NULL; +        gfc->gfc_fd = gfc->gfc_sockfd = -1; + +        gfc->gfc_working_dir = realpath (scratch_dir, NULL); +        if (!gfc->gfc_working_dir) { +                errn = errno; +                goto cleanup; +        } + +        ret = gf_changelog_open_dirs (gfc); +        if (ret) { +                errn = errno; +                gf_log (this->name, GF_LOG_ERROR, +                        "could not create entries in scratch dir"); +                goto cleanup; +        } + +        if (gf_log_init (this->ctx, log_file)) +                goto cleanup; + +        gf_log_set_loglevel ((log_level == -1) ? GF_LOG_INFO : +                             log_level); + +        gfc->gfc_connretries = (max_reconnects <= 0) ? 1 : max_reconnects; +        (void) strncpy (gfc->gfc_brickpath, brick_path, PATH_MAX); + +        ret = gf_changelog_notification_init (this, gfc); +        if (ret) { +                errn = errno; +                goto cleanup; +        } + +        ret = pthread_create (&gfc->gfc_changelog_processor, +                              NULL, gf_changelog_process, gfc); +        if (ret) { +                errn = errno; +                gf_log (this->name, GF_LOG_ERROR, +                        "error creating changelog processor thread" +                        " new changes won't be recorded!!!"); +                goto cleanup; +        } + +        for (; i < 256; i++) { +                gfc->rfc3986[i] = +                        (isalnum(i) || i == '~' || +                        i == '-' || i == '.' || i == '_') ? i : 0; +        } + +        ret = 0; +        this->private = gfc; + +        goto out; + + cleanup: +        gf_changelog_cleanup (gfc); +        GF_FREE (gfc); +        this->private = NULL; +        errno = errn; + + out: +        return ret; +} diff --git a/xlators/features/changelog/src/Makefile.am b/xlators/features/changelog/src/Makefile.am new file mode 100644 index 00000000..e85031ad --- /dev/null +++ b/xlators/features/changelog/src/Makefile.am @@ -0,0 +1,19 @@ +xlator_LTLIBRARIES = changelog.la + +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features + +noinst_HEADERS = changelog-helpers.h changelog-mem-types.h changelog-rt.h \ +	changelog-misc.h changelog-encoders.h changelog-notifier.h + +changelog_la_LDFLAGS = -module -avoidversion + +changelog_la_SOURCES = changelog.c changelog-rt.c changelog-helpers.c \ +	changelog-encoders.c changelog-notifier.c +changelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src -fPIC -D_FILE_OFFSET_BITS=64 \ +	-D_GNU_SOURCE -D$(GF_HOST_OS) -shared -nostartfiles -DDATADIR=\"$(localstatedir)\" + +AM_CFLAGS = -Wall $(GF_CFLAGS) + +CLEANFILES = diff --git a/xlators/features/changelog/src/changelog-encoders.c b/xlators/features/changelog/src/changelog-encoders.c new file mode 100644 index 00000000..c71ea927 --- /dev/null +++ b/xlators/features/changelog/src/changelog-encoders.c @@ -0,0 +1,156 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "changelog-encoders.h" + +size_t +entry_fn (void *data, char *buffer, gf_boolean_t encode) +{ +        char    *tmpbuf = NULL; +        size_t  bufsz  = 0; +        struct changelog_entry_fields *ce = NULL; + +        ce = (struct changelog_entry_fields *) data; + +        if (encode) { +                tmpbuf = uuid_utoa (ce->cef_uuid); +                CHANGELOG_FILL_BUFFER (buffer, bufsz, tmpbuf, strlen (tmpbuf)); +        } else { +                CHANGELOG_FILL_BUFFER (buffer, bufsz, +                                       ce->cef_uuid, sizeof (uuid_t)); +        } + +        CHANGELOG_FILL_BUFFER (buffer, bufsz, "/", 1); +        CHANGELOG_FILL_BUFFER (buffer, bufsz, +                               ce->cef_bname, strlen (ce->cef_bname)); +        return bufsz; +} + +size_t +fop_fn (void *data, char *buffer, gf_boolean_t encode) +{ +        char buf[10]          = {0,}; +        size_t         bufsz = 0; +        glusterfs_fop_t fop   = 0; + +        fop = *(glusterfs_fop_t *) data; + +        if (encode) { +                (void) snprintf (buf, sizeof (buf), "%d", fop); +                CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); +        } else +                CHANGELOG_FILL_BUFFER (buffer, bufsz, &fop, sizeof (fop)); + +        return bufsz; +} + +void +entry_free_fn (void *data) +{ +        changelog_opt_t *co = data; + +        if (!co) +                return; + +        GF_FREE (co->co_entry.cef_bname); +} + +/** + * try to write all data in one shot + */ + +static inline void +changelog_encode_write_xtra (changelog_log_data_t *cld, +                             char *buffer, size_t *off, gf_boolean_t encode) +{ +        int              i      = 0; +        size_t           offset = 0; +        void            *data   = NULL; +        changelog_opt_t *co     = NULL; + +        offset = *off; + +        co = (changelog_opt_t *) cld->cld_ptr; + +        for (; i < cld->cld_xtra_records; i++, co++) { +                CHANGELOG_FILL_BUFFER (buffer, offset, "\0", 1); + +                switch (co->co_type) { +                case CHANGELOG_OPT_REC_FOP: +                        data = &co->co_fop; +                        break; +                case CHANGELOG_OPT_REC_ENTRY: +                        data = &co->co_entry; +                        break; +                } + +                if (co->co_convert) +                        offset += co->co_convert (data, +                                                  buffer + offset, encode); +                else /* no coversion: write it out as it is */ +                        CHANGELOG_FILL_BUFFER (buffer, offset, +                                               data, co->co_len); +        } + +        *off = offset; +} + +int +changelog_encode_ascii (xlator_t *this, changelog_log_data_t *cld) +{ +        size_t            off      = 0; +        size_t            gfid_len = 0; +        char             *gfid_str = NULL; +        char             *buffer   = NULL; +        changelog_priv_t *priv     = NULL; + +        priv = this->private; + +        gfid_str = uuid_utoa (cld->cld_gfid); +        gfid_len = strlen (gfid_str); + +        /* extra bytes for decorations */ +        buffer = alloca (gfid_len + cld->cld_ptr_len + 10); +        CHANGELOG_STORE_ASCII (priv, buffer, +                               off, gfid_str, gfid_len, cld); + +        if (cld->cld_xtra_records) +                changelog_encode_write_xtra (cld, buffer, &off, _gf_true); + +        CHANGELOG_FILL_BUFFER (buffer, off, "\0", 1); + +        return changelog_write_change (priv, buffer, off); +} + +int +changelog_encode_binary (xlator_t *this, changelog_log_data_t *cld) +{ +        size_t            off    = 0; +        char             *buffer = NULL; +        changelog_priv_t *priv   = NULL; + +        priv = this->private; + +        /* extra bytes for decorations */ +        buffer = alloca (sizeof (uuid_t) + cld->cld_ptr_len + 10); +        CHANGELOG_STORE_BINARY (priv, buffer, off, cld->cld_gfid, cld); + +        if (cld->cld_xtra_records) +                changelog_encode_write_xtra (cld, buffer, &off, _gf_false); + +        CHANGELOG_FILL_BUFFER (buffer, off, "\0", 1); + +        return changelog_write_change (priv, buffer, off); +} diff --git a/xlators/features/changelog/src/changelog-encoders.h b/xlators/features/changelog/src/changelog-encoders.h new file mode 100644 index 00000000..43deb130 --- /dev/null +++ b/xlators/features/changelog/src/changelog-encoders.h @@ -0,0 +1,44 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CHANGELOG_ENCODERS_H +#define _CHANGELOG_ENCODERS_H + +#include "xlator.h" +#include "defaults.h" + +#include "changelog-helpers.h" + +#define CHANGELOG_STORE_ASCII(priv, buf, off, gfid, gfid_len, cld) do { \ +                CHANGELOG_FILL_BUFFER (buffer, off,                     \ +                                       priv->maps[cld->cld_type], 1);   \ +                CHANGELOG_FILL_BUFFER (buffer,                          \ +                                       off, gfid, gfid_len);            \ +        } while (0) + +#define CHANGELOG_STORE_BINARY(priv, buf, off, gfid, cld) do {          \ +                CHANGELOG_FILL_BUFFER (buffer, off,                     \ +                                       priv->maps[cld->cld_type], 1);   \ +                CHANGELOG_FILL_BUFFER (buffer,                          \ +                                       off, gfid, sizeof (uuid_t));     \ +        } while (0) + +size_t +entry_fn (void *data, char *buffer, gf_boolean_t encode); +size_t +fop_fn (void *data, char *buffer, gf_boolean_t encode); +void +entry_free_fn (void *data); +int +changelog_encode_binary (xlator_t *, changelog_log_data_t *); +int +changelog_encode_ascii (xlator_t *, changelog_log_data_t *); + +#endif /* _CHANGELOG_ENCODERS_H */ diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c new file mode 100644 index 00000000..c1bb6e5f --- /dev/null +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -0,0 +1,691 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "xlator.h" +#include "defaults.h" +#include "logging.h" +#include "iobuf.h" + +#include "changelog-helpers.h" +#include "changelog-mem-types.h" + +#include <pthread.h> + +void +changelog_thread_cleanup (xlator_t *this, pthread_t thr_id) +{ +        int   ret    = 0; +        void *retval = NULL; + +        /* send a cancel request to the thread */ +        ret = pthread_cancel (thr_id); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "could not cancel thread (reason: %s)", +                        strerror (errno)); +                goto out; +        } + +        ret = pthread_join (thr_id, &retval); +        if (ret || (retval != PTHREAD_CANCELED)) { +                gf_log (this->name, GF_LOG_ERROR, +                        "cancel request not adhered as expected" +                        " (reason: %s)", strerror (errno)); +        } + + out: +        return; +} + +inline void * +changelog_get_usable_buffer (changelog_local_t *local) +{ +        changelog_log_data_t *cld = NULL; + +        cld = &local->cld; +        if (!cld->cld_iobuf) +                return NULL; + +        return cld->cld_iobuf->ptr; +} + +inline void +changelog_set_usable_record_and_length (changelog_local_t *local, +                                        size_t len, int xr) +{ +        changelog_log_data_t *cld = NULL; + +        cld = &local->cld; + +        cld->cld_ptr_len = len; +        cld->cld_xtra_records = xr; +} + +void +changelog_local_cleanup (xlator_t *xl, changelog_local_t *local) +{ +        int                   i   = 0; +        changelog_opt_t      *co  = NULL; +        changelog_log_data_t *cld = NULL; + +        if (!local) +                return; + +        cld = &local->cld; + +        /* cleanup dynamic allocation for extra records */ +        if (cld->cld_xtra_records) { +                co = (changelog_opt_t *) cld->cld_ptr; +                for (; i < cld->cld_xtra_records; i++, co++) +                        if (co->co_free) +                                co->co_free (co); +        } + +        CHANGELOG_IOBUF_UNREF (cld->cld_iobuf); + +        if (local->inode) +                inode_unref (local->inode); + +        mem_put (local); +} + +inline int +changelog_write (int fd, char *buffer, size_t len) +{ +        ssize_t size = 0; +        size_t writen = 0; + +        while (writen < len) { +                size = write (fd, +                              buffer + writen, len - writen); +                if (size <= 0) +                        break; + +                writen += size; +        } + +        return (writen != len); +} + +static int +changelog_rollover_changelog (xlator_t *this, +                              changelog_priv_t *priv, unsigned long ts) +{ +        int   ret            = -1; +        int   notify         = 0; +        char *bname          = NULL; +        char ofile[PATH_MAX] = {0,}; +        char nfile[PATH_MAX] = {0,}; + +        if (priv->changelog_fd != -1) { +                close (priv->changelog_fd); +                priv->changelog_fd = -1; +        } + +        (void) snprintf (ofile, PATH_MAX, +                         "%s/"CHANGELOG_FILE_NAME, priv->changelog_dir); +        (void) snprintf (nfile, PATH_MAX, +                         "%s/"CHANGELOG_FILE_NAME".%lu", +                         priv->changelog_dir, ts); + +        ret = rename (ofile, nfile); +        if (!ret) +                notify = 1; + +        if (ret && (errno == ENOENT)) { +                ret = 0; +        } + +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "error renaming %s -> %s (reason %s)", +                        ofile, nfile, strerror (errno)); +        } + +        if (notify) { +                bname = basename (nfile); +                gf_log (this->name, GF_LOG_DEBUG, "notifying: %s", bname); +                ret = changelog_write (priv->wfd, bname, strlen (bname) + 1); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "Failed to send file name to notify thread" +                                " (reason: %s)", strerror (errno)); +                } +        } + +        return ret; +} + +int +changelog_open (xlator_t *this, +                changelog_priv_t *priv) +{ +        int fd                        = 0; +        int ret                       = -1; +        int flags                     = 0; +        char buffer[1024]             = {0,}; +        char changelog_path[PATH_MAX] = {0,}; + +        (void) snprintf (changelog_path, PATH_MAX, +                         "%s/"CHANGELOG_FILE_NAME, +                         priv->changelog_dir); + +        flags |= (O_CREAT | O_RDWR); +        if (priv->fsync_interval == 0) +                flags |= O_SYNC; + +        fd = open (changelog_path, flags, +                   S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); +        if (fd < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "unable to open/create changelog file %s" +                        " (reason: %s). change-logging will be" +                        " inactive", changelog_path, strerror (errno)); +                goto out; +        } + +        priv->changelog_fd = fd; + +        (void) snprintf (buffer, 1024, CHANGELOG_HEADER, +                         CHANGELOG_VERSION_MAJOR, +                         CHANGELOG_VERSION_MINOR, +                         priv->encode_mode); +        ret = changelog_write_change (priv, buffer, strlen (buffer)); +        if (ret) { +                close (priv->changelog_fd); +                priv->changelog_fd = -1; +                goto out; +        } + +        ret = 0; + + out: +        return ret; +} + +int +changelog_start_next_change (xlator_t *this, +                             changelog_priv_t *priv, +                             unsigned long ts, gf_boolean_t finale) +{ +        int ret = -1; + +        ret = changelog_rollover_changelog (this, priv, ts); + +        if (!ret && !finale) +                ret = changelog_open (this, priv); + +        return ret; +} + +/** + * return the length of entry + */ +inline size_t +changelog_entry_length () +{ +        return sizeof (changelog_log_data_t); +} + +int +changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last) +{ +        struct timeval tv = {0,}; + +        cld->cld_type = CHANGELOG_TYPE_ROLLOVER; + +        if (gettimeofday (&tv, NULL)) +                return -1; + +        cld->cld_roll_time = (unsigned long) tv.tv_sec; +        cld->cld_finale = is_last; +        return 0; +} + +int +changelog_write_change (changelog_priv_t *priv, char *buffer, size_t len) +{ +        return changelog_write (priv->changelog_fd, buffer, len); +} + +inline int +changelog_handle_change (xlator_t *this, +                         changelog_priv_t *priv, changelog_log_data_t *cld) +{ +        int ret = 0; + +        if (CHANGELOG_TYPE_IS_ROLLOVER (cld->cld_type)) { +                ret = changelog_start_next_change (this, priv, +                                                   cld->cld_roll_time, +                                                   cld->cld_finale); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, +                                "Problem rolling over changelog(s)"); +                goto out; +        } + +        /** +         * case when there is reconfigure done (disabling changelog) and there +         * are still fops that have updates in prgress. +         */ +        if (priv->changelog_fd == -1) +                return 0; + +        if (CHANGELOG_TYPE_IS_FSYNC (cld->cld_type)) { +                ret = fsync (priv->changelog_fd); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "fsync failed (reason: %s)", +                                strerror (errno)); +                } +                goto out; +        } + +        ret = priv->ce->encode (this, cld); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "error writing changelog to disk"); +        } + + out: +        return ret; +} + +changelog_local_t * +changelog_local_init (xlator_t *this, inode_t *inode, +                      uuid_t gfid, int xtra_records, +                      gf_boolean_t update_flag) +{ +        changelog_local_t *local = NULL; +        struct iobuf      *iobuf = NULL; + +        /** +         * We relax the presence of inode if @update_flag is true. +         * The caller (implmentation of the fop) needs to be careful to +         * not blindly use local->inode. +         */ +        if (!update_flag && !inode) { +                gf_log_callingfn (this->name, GF_LOG_WARNING, +                                  "inode needed for version checking !!!"); +                goto out; +        } + +        if (xtra_records) { +                iobuf = iobuf_get2 (this->ctx->iobuf_pool, +                                    xtra_records * CHANGELOG_OPT_RECORD_LEN); +                if (!iobuf) +                        goto out; +        } + +        local = mem_get0 (this->local_pool); +        if (!local) { +                CHANGELOG_IOBUF_UNREF (iobuf); +                goto out; +        } + +        local->update_no_check = update_flag; + +        uuid_copy (local->cld.cld_gfid, gfid); + +        local->cld.cld_iobuf = iobuf; +        local->cld.cld_xtra_records = 0; /* set by the caller */ + +        if (inode) +                local->inode = inode_ref (inode); + + out: +        return local; +} + +int +changelog_forget (xlator_t *this, inode_t *inode) +{ +        uint64_t ctx_addr = 0; +        changelog_inode_ctx_t *ctx = NULL; + +        inode_ctx_del (inode, this, &ctx_addr); +        if (!ctx_addr) +                return 0; + +        ctx = (changelog_inode_ctx_t *) (long) ctx_addr; +        GF_FREE (ctx); + +        return 0; +} + +int +changelog_inject_single_event (xlator_t *this, +                               changelog_priv_t *priv, +                               changelog_log_data_t *cld) +{ +        return priv->cd.dispatchfn (this, priv, priv->cd.cd_data, cld, NULL); +} + +/** + * TODO: these threads have many thing in common (wake up after + * a certain time etc..). move them into separate routine. + */ +void * +changelog_rollover (void *data) +{ +        int                     ret   = 0; +        xlator_t               *this  = NULL; +        struct timeval          tv    = {0,}; +        changelog_log_data_t    cld   = {0,}; +        changelog_time_slice_t *slice = NULL; +        changelog_priv_t       *priv  = data; + +        this = priv->cr.this; +        slice = &priv->slice; + +        while (1) { +                tv.tv_sec  = priv->rollover_time; +                tv.tv_usec = 0; + +                ret = select (0, NULL, NULL, NULL, &tv); +                if (ret) +                        continue; + +                ret = changelog_fill_rollover_data (&cld, _gf_false); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "failed to fill rollover data"); +                        continue; +                } + +                LOCK (&priv->lock); +                { +                        ret = changelog_inject_single_event (this, priv, &cld); +                        if (!ret) +                                SLICE_VERSION_UPDATE (slice); +                } +                UNLOCK (&priv->lock); +        } + +        return NULL; +} + +void * +changelog_fsync_thread (void *data) +{ +        int                   ret  = 0; +        xlator_t             *this = NULL; +        struct timeval        tv   = {0,}; +        changelog_log_data_t  cld  = {0,}; +        changelog_priv_t     *priv = data; + +        this = priv->cf.this; +        cld.cld_type = CHANGELOG_TYPE_FSYNC; + +        while (1) { +                tv.tv_sec  = priv->fsync_interval; +                tv.tv_usec = 0; + +                ret = select (0, NULL, NULL, NULL, &tv); +                if (ret) +                        continue; + +                ret = changelog_inject_single_event (this, priv, &cld); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, +                                "failed to inject fsync event"); +        } + +        return NULL; +} + +/* macros for inode/changelog version checks */ + +#define INODE_VERSION_UPDATE(priv, inode, iver, slice, type) do {       \ +                LOCK (&inode->lock);                                    \ +                {                                                       \ +                        LOCK (&priv->lock);                             \ +                        {                                               \ +                                *iver = slice->changelog_version[type]; \ +                        }                                               \ +                        UNLOCK (&priv->lock);                           \ +                }                                                       \ +                UNLOCK (&inode->lock);                                  \ +        } while (0) + +#define INODE_VERSION_EQUALS_SLICE(priv, ver, slice, type, upd) do {    \ +                LOCK (&priv->lock);                                     \ +                {                                                       \ +                        upd = (ver == slice->changelog_version[type])   \ +                                ? _gf_false : _gf_true;                 \ +                }                                                       \ +                UNLOCK (&priv->lock);                                   \ +        } while (0) + +static int +__changelog_inode_ctx_set (xlator_t *this, +                           inode_t *inode, changelog_inode_ctx_t *ctx) +{ +        uint64_t ctx_addr = (uint64_t) ctx; +        return __inode_ctx_set (inode, this, &ctx_addr); +} + +/** + * one shot routine to get the address and the value of a inode version + * for a particular type. + */ +static changelog_inode_ctx_t * +__changelog_inode_ctx_get (xlator_t *this, +                           inode_t *inode, unsigned long **iver, +                           unsigned long *version, changelog_log_type type) +{ +        int                    ret      = 0; +        uint64_t               ctx_addr = 0; +        changelog_inode_ctx_t *ctx      = NULL; + +        ret = __inode_ctx_get (inode, this, &ctx_addr); +        if (ret < 0) +                ctx_addr = 0; +        if (ctx_addr != 0) { +                ctx = (changelog_inode_ctx_t *) (long)ctx_addr; +                goto out; +        } + +        ctx = GF_CALLOC (1, sizeof (*ctx), gf_changelog_mt_inode_ctx_t); +        if (!ctx) +                goto out; + +        ret = __changelog_inode_ctx_set (this, inode, ctx); +        if (ret) { +                GF_FREE (ctx); +                ctx = NULL; +        } + + out: +        if (ctx && iver && version) { +                *iver = CHANGELOG_INODE_VERSION_TYPE (ctx, type); +                *version = **iver; +        } + +        return ctx; +} + +static changelog_inode_ctx_t * +changelog_inode_ctx_get (xlator_t *this, +                         inode_t *inode, unsigned long **iver, +                         unsigned long *version, changelog_log_type type) +{ +        changelog_inode_ctx_t *ctx = NULL; + +        LOCK (&inode->lock); +        { +                ctx = __changelog_inode_ctx_get (this, +                                                 inode, iver, version, type); +        } +        UNLOCK (&inode->lock); + +        return ctx; +} + +/** + * This is the main update routine. Locking has been made granular so as to + * maximize parallelism of fops - I'll try to explain it below using execution + * timelines. + * + * Basically, the contention is between multiple execution threads of this + * routine and the roll-over thread. So, instead of having a big lock, we hold + * granular locks: inode->lock and priv->lock. Now I'll explain what happens + * when there is an update and a roll-over at just about the same time. + * NOTE: + *  - the dispatcher itself synchronizes updates via it's own lock + *  - the slice version in incremented by the roll-over thread + * + * Case 1: When the rollover thread wins before the inode version can be + * compared with the slice version. + * + *          [updater]                 |             [rollover] + *                                    | + *                                    |           <SLICE: 1, 1, 1> + * <changelog_update>                 | + *   <changelog_inode_ctx_get>        | + *      <CTX: 1, 1, 1>                | + *                                    |         <dispatch-rollover-event> + *                                    |         LOCK (&priv->lock) + *                                    |            <SLICE_VERSION_UPDATE> + *                                    |              <SLICE: 2, 2, 2> + *                                    |         UNLOCK (&priv->lock) + *                                    | + * LOCK (&priv->lock)                 | + *   <INODE_VERSION_EQUALS_SLICE>     | + *    I: 1 <-> S: 2                   | + *    update: true                    | + * UNLOCK (&priv->lock)               | + *                                    | + * <if update == true>                | + *  <dispath-update-event>            | + *  <INODE_VERSION_UPDATE>            | + *   LOCK (&inode->lock)              | + *    LOCK (&priv->lock)              | + *     <CTX: 2, 1, 1>                 | + *    UNLOCK (&priv->lock)            | + *   UNLOCK (&inode->lock)            | + * + * Therefore, the change gets recorded in the next change (no lost change). If + * the slice version was ahead of the inode version (say I:1, S: 2), then + * anyway the comparison would result in a update (I: 1, S: 3). + * + * If the rollover time is too less, then there is another contention when the + * updater tries to bring up inode version to the slice version (this is also + * the case when the roll-over thread wakes up during INODE_VERSION_UPDATE. + * + *   <CTX: 1, 1, 1>                   |       <SLICE: 2, 2, 2> + *                                    | + *                                    | + * <dispath-update-event>             | + * <INODE_VERSION_UPDATE>             | + *  LOCK (&inode->lock)               | + *   LOCK (&priv->lock)               | + *    <CTX: 2, 1, 1>                  | + *   UNLOCK (&priv->lock)             | + *  UNLOCK (&inode->lock)             | + *                                    |         <dispatch-rollover-event> + *                                    |         LOCK (&priv->lock) + *                                    |            <SLICE_VERSION_UPDATE> + *                                    |              <SLICE: 3, 3, 3> + *                                    |         UNLOCK (&priv->lock) + * + * + * Case 2: When the fop thread wins + * + *          [updater]                 |             [rollover] + *                                    | + *                                    |           <SLICE: 1, 1, 1> + * <changelog_update>                 | + *   <changelog_inode_ctx_get>        | + *      <CTX: 0, 0, 0>                | + *                                    | + * LOCK (&priv->lock)                 | + *   <INODE_VERSION_EQUALS_SLICE>     | + *    I: 0 <-> S: 1                   | + *    update: true                    | + * UNLOCK (&priv->lock)               | + *                                    |         <dispatch-rollover-event> + *                                    |         LOCK (&priv->lock) + *                                    |            <SLICE_VERSION_UPDATE> + *                                    |              <SLICE: 2, 2, 2> + *                                    |         UNLOCK (&priv->lock) + * <if update == true>                | + *  <dispath-update-event>            | + *  <INODE_VERSION_UPDATE>            | + *   LOCK (&inode->lock)              | + *    LOCK (&priv->lock)              | + *     <CTX: 2, 0, 0>                 | + *    UNLOCK (&priv->lock)            | + *   UNLOCK (&inode->lock)            | + * + * Here again, if the inode version was equal to the slice version (I: 1, S: 1) + * then there is no need to record an update (as the equality of the two version + * signifies an update was recorded in the current time slice). + */ +inline void +changelog_update (xlator_t *this, changelog_priv_t *priv, +                  changelog_local_t *local, changelog_log_type type) +{ +        int                     ret        = 0; +        unsigned long          *iver       = NULL; +        unsigned long           version    = 0; +        inode_t                *inode      = NULL; +        changelog_time_slice_t *slice      = NULL; +        changelog_inode_ctx_t  *ctx        = NULL; +        changelog_log_data_t   *cld_0      = NULL; +        changelog_log_data_t   *cld_1      = NULL; +        changelog_local_t      *next_local = NULL; +        gf_boolean_t            need_upd   = _gf_true; + +        slice = &priv->slice; + +        /** +         * for fops that do not require inode version checking +         */ +        if (local->update_no_check) +                goto update; + +        inode = local->inode; + +        ctx = changelog_inode_ctx_get (this, +                                       inode, &iver, &version, type); +        if (!ctx) +                goto update; + +        INODE_VERSION_EQUALS_SLICE (priv, version, slice, type, need_upd); + + update: +        if (need_upd) { +                cld_0 = &local->cld; +                cld_0->cld_type = type; + +                if ( (next_local = local->prev_entry) != NULL ) { +                        cld_1 = &next_local->cld; +                        cld_1->cld_type = type; +                } + +                ret = priv->cd.dispatchfn (this, priv, +                                           priv->cd.cd_data, cld_0, cld_1); + +                /** +                 * update after the dispatcher has successfully done +                 * it's job. +                 */ +                if (!local->update_no_check && iver && !ret) +                        INODE_VERSION_UPDATE (priv, inode, iver, slice, type); +        } + +        return; +} diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h new file mode 100644 index 00000000..bbea245b --- /dev/null +++ b/xlators/features/changelog/src/changelog-helpers.h @@ -0,0 +1,386 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CHANGELOG_HELPERS_H +#define _CHANGELOG_HELPERS_H + +#include "locking.h" +#include "timer.h" +#include "pthread.h" +#include "iobuf.h" + +#include "changelog-misc.h" + +/** + * the changelog entry + */ +typedef struct changelog_log_data { +        /* rollover related */ +        unsigned long cld_roll_time; + +        /* reopen changelog? */ +        gf_boolean_t cld_finale; + +        changelog_log_type cld_type; + +        /** +         * sincd gfid is _always_ a necessity, it's not a part +         * of the iobuf. by doing this we do not add any overhead +         * for data and metadata related fops. +         */ +        uuid_t        cld_gfid; + +        /** +         * iobufs are used for optionals records: pargfid, path, +         * write offsets etc.. It's the fop implementers job +         * to allocate (iobuf_get() in the fop) and get unref'ed +         * in the callback (CHANGELOG_STACK_UNWIND). +         */ +        struct iobuf *cld_iobuf; + +#define cld_ptr cld_iobuf->ptr + +        /** +         * after allocation you can point this to the length of +         * usable data, but make sure it does not exceed the +         * the size of the requested iobuf. +         */ +        size_t        cld_iobuf_len; + +#define cld_ptr_len cld_iobuf_len + +        /** +         * number of optional records +         */ +        int cld_xtra_records; +} changelog_log_data_t; + +/** + * holder for dispatch function and private data + */ + +typedef struct changelog_priv changelog_priv_t; + +typedef struct changelog_dispatcher { +        void *cd_data; +        int (*dispatchfn) (xlator_t *, changelog_priv_t *, void *, +                           changelog_log_data_t *, changelog_log_data_t *); +} changelog_dispatcher_t; + +struct changelog_bootstrap { +        changelog_mode_t mode; +        int (*ctor) (xlator_t *, changelog_dispatcher_t *); +        int (*dtor) (xlator_t *, changelog_dispatcher_t *); +}; + +struct changelog_encoder { +        changelog_encoder_t encoder; +        int (*encode) (xlator_t *, changelog_log_data_t *); +}; + + +/* xlator private */ + +typedef struct changelog_time_slice { +        /** +         * just in case we need nanosecond granularity some day. +         * field is unused as of now (maybe we'd need it later). +         */ +        struct timeval tv_start; + +        /** +         * version of changelog file, incremented each time changes +         * rollover. +         */ +        unsigned long changelog_version[CHANGELOG_MAX_TYPE]; +} changelog_time_slice_t; + +typedef struct changelog_rollover { +        /* rollover thread */ +        pthread_t rollover_th; + +        xlator_t *this; +} changelog_rollover_t; + +typedef struct changelog_fsync { +        /* fsync() thread */ +        pthread_t fsync_th; + +        xlator_t *this; +} changelog_fsync_t; + +# define CHANGELOG_MAX_CLIENTS  5 +typedef struct changelog_notify { +        /* reader end of the pipe */ +        int rfd; + +        /* notifier thread */ +        pthread_t notify_th; + +        /* unique socket path */ +        char sockpath[PATH_MAX]; + +        int socket_fd; + +        /** +         * simple array of accept()'ed fds. Not scalable at all +         * for large number of clients, but it's okay as we have +         * a ahrd limit in this version (@CHANGELOG_MAX_CLIENTS). +         */ +        int client_fd[CHANGELOG_MAX_CLIENTS]; + +        xlator_t *this; +} changelog_notify_t; + +struct changelog_priv { +        gf_boolean_t active; + +        /* to generate unique socket file per brick */ +        char *changelog_brick; + +        /* logging directory */ +        char *changelog_dir; + +        /* one file for all changelog types */ +        int changelog_fd; + +        gf_lock_t lock; + +        /* writen end of the pipe */ +        int wfd; + +        /* rollover time */ +        int32_t rollover_time; + +        /* fsync() interval */ +        int32_t fsync_interval; + +        /* changelog type maps */ +        const char *maps[CHANGELOG_MAX_TYPE]; + +        /* time slicer */ +        changelog_time_slice_t slice; + +        /* context of the updater */ +        changelog_dispatcher_t cd; + +        /* context of the rollover thread */ +        changelog_rollover_t cr; + +        /* context of fsync thread */ +        changelog_fsync_t cf; + +        /* context of the notifier thread */ +        changelog_notify_t cn; + +        /* operation mode */ +        changelog_mode_t op_mode; + +        /* bootstrap routine for 'current' logger */ +        struct changelog_bootstrap *cb; + +        /* encoder mode */ +        changelog_encoder_t encode_mode; + +        /* encoder */ +        struct changelog_encoder *ce; +}; + +struct changelog_local { +        inode_t              *inode; +        gf_boolean_t          update_no_check; + +        changelog_log_data_t  cld; + +        /** +         * ->prev_entry is used in cases when there needs to be +         * additional changelog entry for the parent (eg. rename) +         * It's analogous to ->next in single linked list world, +         * but we call it as ->prev_entry... ha ha ha +         */ +        struct changelog_local *prev_entry; +}; + +typedef struct changelog_local changelog_local_t; + +/* inode version is stored in inode ctx */ +typedef struct changelog_inode_ctx { +        unsigned long iversion[CHANGELOG_MAX_TYPE]; +} changelog_inode_ctx_t; + +#define CHANGELOG_INODE_VERSION_TYPE(ctx, type)  &(ctx->iversion[type]) + +/** + * Optional Records: + *  fops that need to save additional information request a array of + *  @changelog_opt_t struct. The array is allocated via @iobufs. + */ +typedef enum { +        CHANGELOG_OPT_REC_FOP, +        CHANGELOG_OPT_REC_ENTRY, +} changelog_optional_rec_type_t; + +struct changelog_entry_fields { +        uuid_t  cef_uuid; +        char   *cef_bname; +}; + +typedef struct { +        /** +         * @co_covert can be used to do post-processing of the record before +         * it's persisted to the CHANGELOG. If this is NULL, then the record +         * is persisted as per it's in memory format. +         */ +        size_t (*co_convert) (void *data, char *buffer, gf_boolean_t encode); + +        /* release routines */ +        void (*co_free) (void *data); + +        /* type of the field */ +        changelog_optional_rec_type_t co_type; + +        /** +         * sizeof of the 'valid' field in the union. This field is not used if +         * @co_convert is specified. +         */ +        size_t co_len; + +        union { +                glusterfs_fop_t co_fop; +                struct changelog_entry_fields co_entry; +        }; +} changelog_opt_t; + +#define CHANGELOG_OPT_RECORD_LEN  sizeof (changelog_opt_t) + +/** + * helpers routines + */ + +void +changelog_thread_cleanup (xlator_t *this, pthread_t thr_id); +inline void * +changelog_get_usable_buffer (changelog_local_t *local); +inline void +changelog_set_usable_record_and_length (changelog_local_t *local, +                                        size_t len, int xr); +void +changelog_local_cleanup (xlator_t *xl, changelog_local_t *local); +changelog_local_t * +changelog_local_init (xlator_t *this, inode_t *inode, uuid_t gfid, +                      int xtra_records, gf_boolean_t update_flag); +int +changelog_start_next_change (xlator_t *this, +                             changelog_priv_t *priv, +                             unsigned long ts, gf_boolean_t finale); +int +changelog_open (xlator_t *this, changelog_priv_t *priv); +int +changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last); +int +changelog_inject_single_event (xlator_t *this, +                               changelog_priv_t *priv, +                               changelog_log_data_t *cld); +inline size_t +changelog_entry_length (); +inline int +changelog_write (int fd, char *buffer, size_t len); +int +changelog_write_change (changelog_priv_t *priv, char *buffer, size_t len); +inline int +changelog_handle_change (xlator_t *this, +                         changelog_priv_t *priv, changelog_log_data_t *cld); +inline void +changelog_update (xlator_t *this, changelog_priv_t *priv, +                  changelog_local_t *local, changelog_log_type type); +void * +changelog_rollover (void *data); +void * +changelog_fsync_thread (void *data); +int +changelog_forget (xlator_t *this, inode_t *inode); + +/* macros */ + +#define CHANGELOG_STACK_UNWIND(fop, frame, params ...) do {             \ +                changelog_local_t *__local = NULL;                      \ +                xlator_t          *__xl    = NULL;                      \ +                if (frame) {                                            \ +                        __local      = frame->local;                    \ +                        __xl         = frame->this;                     \ +                        frame->local = NULL;                            \ +                }                                                       \ +                STACK_UNWIND_STRICT (fop, frame, params);               \ +                changelog_local_cleanup (__xl, __local);                \ +                if (__local && __local->prev_entry)                     \ +                        changelog_local_cleanup (__xl,                  \ +                                                 __local->prev_entry);  \ +        } while (0) + +#define CHANGELOG_IOBUF_REF(iobuf) do {         \ +                if (iobuf)                      \ +                        iobuf_ref (iobuf);      \ +        } while (0) + +#define CHANGELOG_IOBUF_UNREF(iobuf) do {       \ +                if (iobuf)                      \ +                        iobuf_unref (iobuf);    \ +        } while (0) + +#define CHANGELOG_FILL_BUFFER(buffer, off, val, len) do {       \ +                memcpy (buffer + off, val, len);                \ +                off += len;                                     \ +        } while (0) + +#define SLICE_VERSION_UPDATE(slice) do {                \ +                int i = 0;                              \ +                for (; i < CHANGELOG_MAX_TYPE; i++) {   \ +                        slice->changelog_version[i]++;  \ +                }                                       \ +        } while (0) + +#define CHANGLOG_FILL_FOP_NUMBER(co, fop, converter, xlen) do { \ +                co->co_convert = converter;                     \ +                co->co_free = NULL;                             \ +                co->co_type = CHANGELOG_OPT_REC_FOP;            \ +                co->co_fop = fop;                               \ +                xlen += sizeof (fop);                           \ +        } while (0) + +#define CHANGELOG_FILL_ENTRY(co, pargfid, bname,                        \ +                             converter, freefn, xlen, label)            \ +        do {                                                            \ +                co->co_convert = converter;                             \ +                co->co_free = freefn;                                   \ +                co->co_type = CHANGELOG_OPT_REC_ENTRY;                  \ +                uuid_copy (co->co_entry.cef_uuid, pargfid);             \ +                co->co_entry.cef_bname = gf_strdup(bname);              \ +                if (!co->co_entry.cef_bname)                            \ +                        goto label;                                     \ +                xlen += (UUID_CANONICAL_FORM_LEN + strlen (bname));     \ +        } while (0) + +#define CHANGELOG_INIT(this, local, inode, gfid, xrec)                  \ +        local = changelog_local_init (this, inode, gfid, xrec, _gf_false) + +#define CHANGELOG_INIT_NOCHECK(this, local, inode, gfid, xrec)          \ +        local = changelog_local_init (this, inode, gfid, xrec, _gf_true) + +#define CHANGELOG_NOT_ACTIVE_THEN_GOTO(priv, label) do {        \ +                if (!priv->active)                              \ +                        goto label;                             \ +        } while (0) + +#define CHANGELOG_COND_GOTO(priv, cond, label) do {                    \ +                if (!priv->active || cond)                             \ +                        goto label;                                    \ +        } while (0) + +#endif /* _CHANGELOG_HELPERS_H */ diff --git a/xlators/features/changelog/src/changelog-mem-types.h b/xlators/features/changelog/src/changelog-mem-types.h new file mode 100644 index 00000000..d72464ea --- /dev/null +++ b/xlators/features/changelog/src/changelog-mem-types.h @@ -0,0 +1,29 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CHANGELOG_MEM_TYPES_H +#define _CHANGELOG_MEM_TYPES_H + +#include "mem-types.h" + +enum gf_changelog_mem_types { +        gf_changelog_mt_priv_t                  = gf_common_mt_end + 1, +        gf_changelog_mt_str_t                   = gf_common_mt_end + 2, +        gf_changelog_mt_batch_t                 = gf_common_mt_end + 3, +        gf_changelog_mt_rt_t                    = gf_common_mt_end + 4, +        gf_changelog_mt_inode_ctx_t             = gf_common_mt_end + 5, +        gf_changelog_mt_libgfchangelog_t        = gf_common_mt_end + 6, +        gf_changelog_mt_libgfchangelog_rl_t     = gf_common_mt_end + 7, +        gf_changelog_mt_libgfchangelog_dirent_t = gf_common_mt_end + 8, +        gf_changelog_mt_changelog_buffer_t      = gf_common_mt_end + 9, +        gf_changelog_mt_end +}; + +#endif diff --git a/xlators/features/changelog/src/changelog-misc.h b/xlators/features/changelog/src/changelog-misc.h new file mode 100644 index 00000000..0712a377 --- /dev/null +++ b/xlators/features/changelog/src/changelog-misc.h @@ -0,0 +1,101 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CHANGELOG_MISC_H +#define _CHANGELOG_MISC_H + +#include "glusterfs.h" +#include "common-utils.h" + +#define CHANGELOG_MAX_TYPE  3 +#define CHANGELOG_FILE_NAME "CHANGELOG" + +#define CHANGELOG_VERSION_MAJOR  1 +#define CHANGELOG_VERSION_MINOR  0 + +#define CHANGELOG_UNIX_SOCK  DEFAULT_VAR_RUN_DIRECTORY"/changelog-%s.sock" + +/** + * header starts with the version and the format of the changelog. + * 'version' not much of a use now. + */ +#define CHANGELOG_HEADER                                                \ +        "GlusterFS Changelog | version: v%d.%d | encoding : %d\n" + +#define CHANGELOG_MAKE_SOCKET_PATH(brick_path, sockpath, len) do {      \ +                char md5_sum[MD5_DIGEST_LENGTH*2+1] = {0,};             \ +                md5_wrapper((unsigned char *) brick_path,               \ +                            strlen(brick_path),                         \ +                            md5_sum);                                   \ +                (void) snprintf (sockpath, len,                         \ +                                 CHANGELOG_UNIX_SOCK, md5_sum);         \ +        } while (0) + +/** + * ... used by libgfchangelog. + */ +#define CHANGELOG_GET_ENCODING(fd, buffer, len, enc, enc_len) do {      \ +                FILE *fp;                                               \ +                int fd_dup, maj, min;                                   \ +                                                                        \ +                enc = -1;                                               \ +                fd_dup = dup (fd);                                      \ +                                                                        \ +                if (fd_dup != -1) {                                     \ +                        fp = fdopen (fd_dup, "r");                      \ +                        if (fp) {                                       \ +                                if (fgets (buffer, len, fp)) {          \ +                                        elen = strlen (buffer);         \ +                                        sscanf (buffer,                 \ +                                                CHANGELOG_HEADER,       \ +                                                &maj, &min, &enc);      \ +                                }                                       \ +                                fclose (fp);                            \ +                        } else {                                        \ +                                close (fd_dup);                         \ +                        }                                               \ +                }                                                       \ +        } while (0) + +/** + * everything after 'CHANGELOG_TYPE_ENTRY' are internal types + * (ie. none of the fops trigger this type of event), hence + * CHANGELOG_MAX_TYPE = 3 + */ +typedef enum { +        CHANGELOG_TYPE_DATA = 0, +        CHANGELOG_TYPE_METADATA, +        CHANGELOG_TYPE_ENTRY, +        CHANGELOG_TYPE_ROLLOVER, +        CHANGELOG_TYPE_FSYNC, +} changelog_log_type; + +/* operation modes - RT for now */ +typedef enum { +        CHANGELOG_MODE_RT = 0, +} changelog_mode_t; + +/* encoder types */ + +typedef enum { +        CHANGELOG_ENCODE_MIN = 0, +        CHANGELOG_ENCODE_BINARY, +        CHANGELOG_ENCODE_ASCII, +        CHANGELOG_ENCODE_MAX, +} changelog_encoder_t; + +#define CHANGELOG_VALID_ENCODING(enc)                                   \ +        (enc > CHANGELOG_ENCODE_MIN && enc < CHANGELOG_ENCODE_MAX) + +#define CHANGELOG_TYPE_IS_ENTRY(type)  (type == CHANGELOG_TYPE_ENTRY) +#define CHANGELOG_TYPE_IS_ROLLOVER(type)  (type == CHANGELOG_TYPE_ROLLOVER) +#define CHANGELOG_TYPE_IS_FSYNC(type)  (type == CHANGELOG_TYPE_FSYNC) + +#endif /* _CHANGELOG_MISC_H */ diff --git a/xlators/features/changelog/src/changelog-notifier.c b/xlators/features/changelog/src/changelog-notifier.c new file mode 100644 index 00000000..1f8b3125 --- /dev/null +++ b/xlators/features/changelog/src/changelog-notifier.c @@ -0,0 +1,314 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "changelog-notifier.h" + +#include <pthread.h> + +inline static void +changelog_notify_clear_fd (changelog_notify_t *cn, int i) +{ +        cn->client_fd[i] = -1; +} + +inline static void +changelog_notify_save_fd (changelog_notify_t *cn, int i, int fd) +{ +        cn->client_fd[i] = fd; +} + +static int +changelog_notify_insert_fd (xlator_t *this, changelog_notify_t *cn, int fd) +{ +        int i   = 0; +        int ret = 0; + +        for (; i < CHANGELOG_MAX_CLIENTS; i++) { +                if (cn->client_fd[i] == -1) +                        break; +        } + +        if (i == CHANGELOG_MAX_CLIENTS) { +                /** +                 * this case should not be hit as listen() would limit +                 * the number of completely established connections. +                 */ +                gf_log (this->name, GF_LOG_WARNING, +                        "hit max client limit (%d)", CHANGELOG_MAX_CLIENTS); +                ret = -1; +        } +        else +                changelog_notify_save_fd (cn, i, fd); + +        return ret; +} + +static void +changelog_notify_fill_rset (changelog_notify_t *cn, fd_set *rset, int *maxfd) +{ +        int i = 0; + +        FD_ZERO (rset); + +        FD_SET (cn->socket_fd, rset); +        *maxfd = cn->socket_fd; + +        FD_SET (cn->rfd, rset); +        *maxfd = max (*maxfd, cn->rfd); + +        for (; i < CHANGELOG_MAX_CLIENTS; i++) { +                if (cn->client_fd[i] != -1) { +                        FD_SET (cn->client_fd[i], rset); +                        *maxfd = max (*maxfd, cn->client_fd[i]); +                } +        } + +        *maxfd = *maxfd + 1; +} + +static int +changelog_notify_client (changelog_notify_t *cn, char *path, ssize_t len) +{ +        int i = 0; +        int ret = 0; + +        for (; i < CHANGELOG_MAX_CLIENTS; i++) { +                if (cn->client_fd[i] == -1) +                        continue; + +                if (changelog_write (cn->client_fd[i], +                                     path, len)) { +                        ret = -1; + +                        close (cn->client_fd[i]); +                        changelog_notify_clear_fd (cn, i); +                } +        } + +        return ret; +} + +static void +changelog_notifier_init (changelog_notify_t *cn) +{ +        int i = 0; + +        cn->socket_fd = -1; + +        for (; i < CHANGELOG_MAX_CLIENTS; i++) { +                changelog_notify_clear_fd (cn, i); +        } +} + +static void +changelog_close_client_conn (changelog_notify_t *cn) +{ +        int i = 0; + +        for (; i < CHANGELOG_MAX_CLIENTS; i++) { +                if (cn->client_fd[i] == -1) +                        continue; + +                close (cn->client_fd[i]); +                changelog_notify_clear_fd (cn, i); +        } +} + +static void +changelog_notifier_cleanup (void *arg) +{ +        changelog_notify_t *cn = NULL; + +        cn = (changelog_notify_t *) arg; + +        changelog_close_client_conn (cn); + +        if (cn->socket_fd != -1) +                close (cn->socket_fd); + +        if (cn->rfd) +                close (cn->rfd); + +        if (unlink (cn->sockpath)) +                gf_log ("", GF_LOG_WARNING, +                        "could not unlink changelog socket file" +                        " %s (reason: %s", cn->sockpath, strerror (errno)); +} + +void * +changelog_notifier (void *data) +{ +        int                 i         = 0; +        int                 fd        = 0; +        int                 max_fd    = 0; +        int                 len       = 0; +        ssize_t             readlen   = 0; +        xlator_t           *this      = NULL; +        changelog_priv_t   *priv      = NULL; +        changelog_notify_t *cn        = NULL; +        struct sockaddr_un  local     = {0,}; +        char path[PATH_MAX]           = {0,}; +        char abspath[PATH_MAX]        = {0,}; + +        char buffer; +        fd_set rset; + +        priv = (changelog_priv_t *) data; + +        cn = &priv->cn; +        this = cn->this; + +        pthread_cleanup_push (changelog_notifier_cleanup, cn); + +        changelog_notifier_init (cn); + +        cn->socket_fd = socket (AF_UNIX, SOCK_STREAM, 0); +        if (cn->socket_fd < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "changelog socket error (reason: %s)", +                        strerror (errno)); +                goto out; +        } + +        CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick, +                                    cn->sockpath, PATH_MAX); +        if (unlink (cn->sockpath) < 0) { +                if (errno != ENOENT) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "Could not unlink changelog socket file (%s)" +                                " (reason: %s)", +                                CHANGELOG_UNIX_SOCK, strerror (errno)); +                        goto cleanup; +                } +        } + +        local.sun_family = AF_UNIX; +        strcpy (local.sun_path, cn->sockpath); + +        len = strlen (local.sun_path) + sizeof (local.sun_family); + +        /* bind to the unix domain socket */ +        if (bind (cn->socket_fd, (struct sockaddr *) &local, len) < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Could not bind to changelog socket (reason: %s)", +                        strerror (errno)); +                goto cleanup; +        } + +        /* listen for incoming connections */ +        if (listen (cn->socket_fd, CHANGELOG_MAX_CLIENTS) < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "listen() error on changelog socket (reason: %s)", +                        strerror (errno)); +                goto cleanup; +        } + +        /** +         * simple select() on all to-be-read file descriptors. This method +         * though old school works pretty well when you have a handfull of +         * fd's to be watched (clients). +         * +         * Future TODO: move this to epoll based notification facility if +         *              number of clients increase. +         */ +        for (;;) { +                changelog_notify_fill_rset (cn, &rset, &max_fd); + +                if (select (max_fd, &rset, NULL, NULL, NULL) < 0) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "select() returned -1 (reason: %s)", +                                strerror (errno)); +                        sleep (2); +                        continue; +                } + +                if (FD_ISSET (cn->socket_fd, &rset)) { +                        fd = accept (cn->socket_fd, NULL, NULL); +                        if (fd < 0) { +                                gf_log (this->name, GF_LOG_ERROR, +                                        "accept error on changelog socket" +                                        " (reason: %s)", strerror (errno)); +                        } else if (changelog_notify_insert_fd (this, cn, fd)) { +                                gf_log (this->name, GF_LOG_ERROR, +                                        "hit max client limit"); +                        } +                } + +                if (FD_ISSET (cn->rfd, &rset)) { +                        /** +                         * read changelog filename and notify all connected +                         * clients. +                         */ +                        readlen = 0; +                        while (readlen < PATH_MAX) { +                                len = read (cn->rfd, &path[readlen++], 1); +                                if (len == -1) { +                                        break; +                                } + +                                if (len == 0) { +                                        gf_log (this->name, GF_LOG_ERROR, +                                                "rollover thread sent EOF" +                                                " on pipe - possibly a crash."); +                                        /* be blunt and close all connections */ +                                        pthread_exit(NULL); +                                } + +                                if (path[readlen - 1] == '\0') +                                        break; +                        } + +                        /* should we close all client connections here too? */ +                        if (len < 0 || readlen == PATH_MAX) { +                                gf_log (this->name, GF_LOG_ERROR, +                                        "Could not get pathname from rollover" +                                        " thread or pathname too long"); +                                goto process_rest; +                        } + +                        (void) snprintf (abspath, PATH_MAX, +                                         "%s/%s", priv->changelog_dir, path); +                        if (changelog_notify_client (cn, abspath, +                                                     strlen (abspath) + 1)) +                                gf_log (this->name, GF_LOG_ERROR, +                                        "could not notify some clients with new" +                                        " changelogs"); +                } + +        process_rest: +                for (i = 0; i < CHANGELOG_MAX_CLIENTS; i++) { +                        if ( (fd = cn->client_fd[i]) == -1 ) +                                continue; + +                        if (FD_ISSET (fd, &rset)) { +                                /** +                                 * the only data we accept from the client is a +                                 * disconnect. Anything else is treated as bogus +                                 * and is silently discarded (also warned!!!). +                                 */ +                                if ( (readlen = read (fd, &buffer, 1)) <= 0 ) { +                                        close (fd); +                                        changelog_notify_clear_fd (cn, i); +                                } else { +                                        /* silently discard data and log */ +                                        gf_log (this->name, GF_LOG_WARNING, +                                                "misbehaving changelog client"); +                                } +                        } +                } + +        } + + cleanup:; +        pthread_cleanup_pop (1); + + out: +        return NULL; +} diff --git a/xlators/features/changelog/src/changelog-notifier.h b/xlators/features/changelog/src/changelog-notifier.h new file mode 100644 index 00000000..55e72835 --- /dev/null +++ b/xlators/features/changelog/src/changelog-notifier.h @@ -0,0 +1,19 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CHANGELOG_NOTIFIER_H +#define _CHANGELOG_NOTIFIER_H + +#include "changelog-helpers.h" + +void * +changelog_notifier (void *data); + +#endif diff --git a/xlators/features/changelog/src/changelog-rt.c b/xlators/features/changelog/src/changelog-rt.c new file mode 100644 index 00000000..c147f68c --- /dev/null +++ b/xlators/features/changelog/src/changelog-rt.c @@ -0,0 +1,72 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "xlator.h" +#include "defaults.h" +#include "logging.h" + +#include "changelog-rt.h" +#include "changelog-mem-types.h" + +int +changelog_rt_init (xlator_t *this, changelog_dispatcher_t *cd) +{ +        changelog_rt_t *crt = NULL; + +        crt = GF_CALLOC (1, sizeof (*crt), +                         gf_changelog_mt_rt_t); +        if (!crt) +                return -1; + +        LOCK_INIT (&crt->lock); + +        cd->cd_data = crt; +        cd->dispatchfn = &changelog_rt_enqueue; + +        return 0; +} + +int +changelog_rt_fini (xlator_t *this, changelog_dispatcher_t *cd) +{ +        changelog_rt_t *crt = NULL; + +        crt = cd->cd_data; + +        LOCK_DESTROY (&crt->lock); +        GF_FREE (crt); + +        return 0; +} + +int +changelog_rt_enqueue (xlator_t *this, changelog_priv_t *priv, void *cbatch, +                      changelog_log_data_t *cld_0, changelog_log_data_t *cld_1) +{ +        int             ret = 0; +        changelog_rt_t *crt = NULL; + +        crt = (changelog_rt_t *) cbatch; + +        LOCK (&crt->lock); +        { +                ret = changelog_handle_change (this, priv, cld_0); +                if (!ret && cld_1) +                        ret = changelog_handle_change (this, priv, cld_1); +        } +        UNLOCK (&crt->lock); + +        return ret; +} diff --git a/xlators/features/changelog/src/changelog-rt.h b/xlators/features/changelog/src/changelog-rt.h new file mode 100644 index 00000000..1fc2bbc5 --- /dev/null +++ b/xlators/features/changelog/src/changelog-rt.h @@ -0,0 +1,33 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CHANGELOG_RT_H +#define _CHANGELOG_RT_H + +#include "locking.h" +#include "timer.h" +#include "pthread.h" + +#include "changelog-helpers.h" + +/* unused as of now - may be you would need it later */ +typedef struct changelog_rt { +        gf_lock_t lock; +} changelog_rt_t; + +int +changelog_rt_init (xlator_t *this, changelog_dispatcher_t *cd); +int +changelog_rt_fini (xlator_t *this, changelog_dispatcher_t *cd); +int +changelog_rt_enqueue (xlator_t *this, changelog_priv_t *priv, void *cbatch, +                      changelog_log_data_t *cld_0, changelog_log_data_t *cld_1); + +#endif /* _CHANGELOG_RT_H */ diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c new file mode 100644 index 00000000..35e3e784 --- /dev/null +++ b/xlators/features/changelog/src/changelog.c @@ -0,0 +1,1487 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "xlator.h" +#include "defaults.h" +#include "logging.h" +#include "iobuf.h" + +#include "changelog-rt.h" + +#include "changelog-encoders.h" +#include "changelog-mem-types.h" + +#include <pthread.h> + +#include "changelog-notifier.h" + +static struct changelog_bootstrap +cb_bootstrap[] = { +        { +                .mode = CHANGELOG_MODE_RT, +                .ctor = changelog_rt_init, +                .dtor = changelog_rt_fini, +        }, +}; + +static struct changelog_encoder +cb_encoder[] = { +        [CHANGELOG_ENCODE_BINARY] = +        { +                .encoder = CHANGELOG_ENCODE_BINARY, +                .encode = changelog_encode_binary, +        }, +        [CHANGELOG_ENCODE_ASCII] = +        { +                .encoder = CHANGELOG_ENCODE_ASCII, +                .encode = changelog_encode_ascii, +        }, +}; + +/* Entry operations - TYPE III */ + +/** + * entry operations do not undergo inode version checking. + */ + +/* {{{ */ + +/* rmdir */ + +int32_t +changelog_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                     int32_t op_ret, int32_t op_errno, struct iatt *preparent, +                     struct iatt *postparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (rmdir, frame, op_ret, op_errno, +                                preparent, postparent, xdata); +        return 0; +} + +int32_t +changelog_rmdir (call_frame_t *frame, xlator_t *this, +                 loc_t *loc, int xflags, dict_t *xdata) +{ +        size_t            xtra_len = 0; +        changelog_priv_t *priv     = NULL; +        changelog_opt_t  *co       = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT_NOCHECK (this, frame->local, +                                NULL, loc->inode->gfid, 2); + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + + wind: +        STACK_WIND (frame, changelog_rmdir_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir, +                    loc, xflags, xdata); +        return 0; +} + +/* unlink */ + +int32_t +changelog_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                      int32_t op_ret, int32_t op_errno, struct iatt *preparent, +                      struct iatt *postparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (unlink, frame, op_ret, op_errno, +                                preparent, postparent, xdata); +        return 0; +} + +int32_t +changelog_unlink (call_frame_t *frame, xlator_t *this, +                  loc_t *loc, int xflags, dict_t *xdata) +{ +        size_t            xtra_len = 0; +        changelog_priv_t *priv     = NULL; +        changelog_opt_t  *co       = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, loc->inode->gfid, 2); + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + + wind: +        STACK_WIND (frame, changelog_unlink_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink, +                    loc, xflags, xdata); +        return 0; +} + +/* rename */ + +int32_t +changelog_rename_cbk (call_frame_t *frame, +                      void *cookie, xlator_t *this, +                      int32_t op_ret, int32_t op_errno, +                      struct iatt *buf, struct iatt *preoldparent, +                      struct iatt *postoldparent, struct iatt *prenewparent, +                      struct iatt *postnewparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (rename, frame, op_ret, op_errno, +                                buf, preoldparent, postoldparent, +                                prenewparent, postnewparent, xdata); +        return 0; +} + + +int32_t +changelog_rename (call_frame_t *frame, xlator_t *this, +                  loc_t *oldloc, loc_t *newloc, dict_t *xdata) +{ +        size_t            xtra_len  = 0; +        uuid_t            null_uuid = {0,}; +        changelog_priv_t *priv      = NULL; +        changelog_opt_t  *co        = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        /* 3 == fop + oldloc + newloc */ +        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, null_uuid, 3); + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, oldloc->pargfid, oldloc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        co++; +        CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 3); + + wind: +        STACK_WIND (frame, changelog_rename_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename, +                    oldloc, newloc, xdata); +        return 0; +} + +/* link */ + +int32_t +changelog_link_cbk (call_frame_t *frame, +                    void *cookie, xlator_t *this, int32_t op_ret, +                    int32_t op_errno, inode_t *inode, +                    struct iatt *buf, struct iatt *preparent, +                    struct iatt *postparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (link, frame, op_ret, op_errno, +                                inode, buf, preparent, postparent, xdata); +        return 0; +} + +int32_t +changelog_link (call_frame_t *frame, +                xlator_t *this, loc_t *oldloc, +                loc_t *newloc, dict_t *xdata) +{ +        size_t            xtra_len = 0; +        changelog_priv_t *priv     = NULL; +        changelog_opt_t  *co       = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, oldloc->gfid, 2); + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + + wind: +        STACK_WIND (frame, changelog_link_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->link, +                    oldloc, newloc, xdata); +        return 0; +} + +/* mkdir */ + +int32_t +changelog_mkdir_cbk (call_frame_t *frame, +                     void *cookie, xlator_t *this, int32_t op_ret, +                     int32_t op_errno, inode_t *inode, +                     struct iatt *buf, struct iatt *preparent, +                     struct iatt *postparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (mkdir, frame, op_ret, op_errno, +                                inode, buf, preparent, postparent, xdata); +        return 0; +} + +int32_t +changelog_mkdir (call_frame_t *frame, xlator_t *this, +                 loc_t *loc, mode_t mode, mode_t umask, dict_t *xdata) +{ +        int               ret      = -1; +        uuid_t            gfid     = {0,}; +        void             *uuid_req = NULL; +        size_t            xtra_len = 0; +        changelog_priv_t *priv     = NULL; +        changelog_opt_t  *co       = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); +        if (ret) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "failed to get gfid from dict"); +                goto wind; +        } +        uuid_copy (gfid, uuid_req); + +        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2); + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + + wind: +        STACK_WIND (frame, changelog_mkdir_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->mkdir, +                    loc, mode, umask, xdata); +        return 0; +} + +/* symlink */ + +int32_t +changelog_symlink_cbk (call_frame_t *frame, +                       void *cookie, xlator_t *this, +                       int32_t op_ret, int32_t op_errno, +                       inode_t *inode, struct iatt *buf, struct iatt *preparent, +                       struct iatt *postparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (symlink, frame, op_ret, op_errno, +                                inode, buf, preparent, postparent, xdata); +        return 0; +} + +int32_t +changelog_symlink (call_frame_t *frame, xlator_t *this, +                   const char *linkname, loc_t *loc, +                   mode_t umask, dict_t *xdata) +{ +        int               ret      = -1; +        size_t            xtra_len = 0; +        uuid_t            gfid     = {0,}; +        void             *uuid_req = NULL; +        changelog_priv_t *priv     = NULL; +        changelog_opt_t  *co       = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); +        if (ret) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "failed to get gfid from dict"); +                goto wind; +        } +        uuid_copy (gfid, uuid_req); + +        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2); + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + + wind: +        STACK_WIND (frame, changelog_symlink_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->symlink, +                    linkname, loc, umask, xdata); +        return 0; +} + +/* mknod */ + +int32_t +changelog_mknod_cbk (call_frame_t *frame, +                     void *cookie, xlator_t *this, +                     int32_t op_ret, int32_t op_errno, inode_t *inode, +                     struct iatt *buf, struct iatt *preparent, +                     struct iatt *postparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (mknod, frame, op_ret, op_errno, +                                inode, buf, preparent, postparent, xdata); +        return 0; +} + +int32_t +changelog_mknod (call_frame_t *frame, +                 xlator_t *this, loc_t *loc, +                 mode_t mode, dev_t dev, mode_t umask, dict_t *xdata) +{ +        int               ret      = -1; +        uuid_t            gfid     = {0,}; +        void             *uuid_req = NULL; +        size_t            xtra_len = 0; +        changelog_priv_t *priv     = NULL; +        changelog_opt_t  *co       = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); +        if (ret) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "failed to get gfid from dict"); +                goto wind; +        } +        uuid_copy (gfid, uuid_req); + +        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2); + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + + wind: +        STACK_WIND (frame, changelog_mknod_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->mknod, +                    loc, mode, dev, umask, xdata); +        return 0; +} + +/* creat */ + +int32_t +changelog_create_cbk (call_frame_t *frame, +                      void *cookie, xlator_t *this, +                      int32_t op_ret, int32_t op_errno, +                      fd_t *fd, inode_t *inode, struct iatt *buf, +                      struct iatt *preparent, +                      struct iatt *postparent, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + + unwind: +        CHANGELOG_STACK_UNWIND (create, frame, +                                op_ret, op_errno, fd, inode, +                                buf, preparent, postparent, xdata); +        return 0; +} + +int32_t +changelog_create (call_frame_t *frame, xlator_t *this, +                  loc_t *loc, int32_t flags, mode_t mode, +                  mode_t umask, fd_t *fd, dict_t *xdata) +{ +        int               ret      = -1; +        uuid_t            gfid     = {0,}; +        void             *uuid_req = NULL; +        changelog_opt_t  *co       = NULL; +        changelog_priv_t *priv     = NULL; +        size_t            xtra_len = 0; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); +        if (ret) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "failed to get gfid from dict"); +                goto wind; +        } +        uuid_copy (gfid, uuid_req); + +        /* init with two extra records */ +        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2); +        if (!frame->local) +                goto wind; + +        co = changelog_get_usable_buffer (frame->local); +        if (!co) +                goto wind; + +        CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + +        co++; +        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, +                              entry_fn, entry_free_fn, xtra_len, wind); + +        changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + + wind: +        STACK_WIND (frame, changelog_create_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->create, +                    loc, flags, mode, umask, fd, xdata); +        return 0; +} + +/* }}} */ + + +/* Metadata modification fops - TYPE II */ + +/* {{{ */ + +/* {f}setattr */ + +int32_t +changelog_fsetattr_cbk (call_frame_t *frame, +                        void *cookie, xlator_t *this, int32_t op_ret, +                        int32_t op_errno, struct iatt *preop_stbuf, +                        struct iatt *postop_stbuf, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + unwind: +        CHANGELOG_STACK_UNWIND (fsetattr, frame, op_ret, op_errno, +                                preop_stbuf, postop_stbuf, xdata); + +        return 0; + + +} + +int32_t +changelog_fsetattr (call_frame_t *frame, +                    xlator_t *this, fd_t *fd, +                    struct iatt *stbuf, int32_t valid, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        fd->inode, fd->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_fsetattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetattr, +                    fd, stbuf, valid, xdata); +        return 0; + + +} + +int32_t +changelog_setattr_cbk (call_frame_t *frame, +                       void *cookie, xlator_t *this, int32_t op_ret, +                       int32_t op_errno, struct iatt *preop_stbuf, +                       struct iatt *postop_stbuf, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + unwind: +        CHANGELOG_STACK_UNWIND (setattr, frame, op_ret, op_errno, +                                preop_stbuf, postop_stbuf, xdata); + +        return 0; +} + +int32_t +changelog_setattr (call_frame_t *frame, +                   xlator_t *this, loc_t *loc, +                   struct iatt *stbuf, int32_t valid, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        loc->inode, loc->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_setattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->setattr, +                    loc, stbuf, valid, xdata); +        return 0; +} + +/* {f}removexattr */ + +int32_t +changelog_fremovexattr_cbk (call_frame_t *frame, +                            void *cookie, xlator_t *this, +                            int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + unwind: +        CHANGELOG_STACK_UNWIND (fremovexattr, frame, op_ret, op_errno, xdata); + +        return 0; +} + +int32_t +changelog_fremovexattr (call_frame_t *frame, xlator_t *this, +                        fd_t *fd, const char *name, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        fd->inode, fd->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_fremovexattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->fremovexattr, +                    fd, name, xdata); +        return 0; +} + +int32_t +changelog_removexattr_cbk (call_frame_t *frame, +                           void *cookie, xlator_t *this, +                           int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + unwind: +        CHANGELOG_STACK_UNWIND (removexattr, frame, op_ret, op_errno, xdata); + +        return 0; +} + +int32_t +changelog_removexattr (call_frame_t *frame, xlator_t *this, +                       loc_t *loc, const char *name, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        loc->inode, loc->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_removexattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->removexattr, +                    loc, name, xdata); +        return 0; +} + +/* {f}setxattr */ + +int32_t +changelog_setxattr_cbk (call_frame_t *frame, +                        void *cookie, xlator_t *this, +                        int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + unwind: +        CHANGELOG_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata); + +        return 0; +} + +int32_t +changelog_setxattr (call_frame_t *frame, +                    xlator_t *this, loc_t *loc, +                    dict_t *dict, int32_t flags, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        loc->inode, loc->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_setxattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->setxattr, +                    loc, dict, flags, xdata); +        return 0; +} + +int32_t +changelog_fsetxattr_cbk (call_frame_t *frame, +                         void *cookie, xlator_t *this, int32_t op_ret, +                         int32_t op_errno, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + + unwind: +        CHANGELOG_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, xdata); + +        return 0; +} + +int32_t +changelog_fsetxattr (call_frame_t *frame, +                     xlator_t *this, fd_t *fd, dict_t *dict, +                     int32_t flags, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        fd->inode, fd->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_fsetxattr_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetxattr, +                    fd, dict, flags, xdata); +        return 0; +} + +/* }}} */ + + +/* Data modification fops - TYPE I */ + +/* {{{ */ + +/* {f}truncate() */ + +int32_t +changelog_truncate_cbk (call_frame_t *frame, +                        void *cookie, xlator_t *this, int32_t op_ret, +                        int32_t op_errno, struct iatt *prebuf, +                        struct iatt *postbuf, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); + + unwind: +        CHANGELOG_STACK_UNWIND (truncate, frame, +                                op_ret, op_errno, prebuf, postbuf, xdata); +        return 0; +} + +int32_t +changelog_truncate (call_frame_t *frame, +                    xlator_t *this, loc_t *loc, off_t offset, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        loc->inode, loc->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_truncate_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->truncate, +                    loc, offset, xdata); +        return 0; +} + +int32_t +changelog_ftruncate_cbk (call_frame_t *frame, +                         void *cookie, xlator_t *this, int32_t op_ret, +                         int32_t op_errno, struct iatt *prebuf, +                         struct iatt *postbuf, dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); + + unwind: +        CHANGELOG_STACK_UNWIND (ftruncate, frame, +                                op_ret, op_errno, prebuf, postbuf, xdata); +        return 0; +} + +int32_t +changelog_ftruncate (call_frame_t *frame, +                     xlator_t *this, fd_t *fd, off_t offset, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        fd->inode, fd->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_ftruncate_cbk, +                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->ftruncate, +                    fd, offset, xdata); +        return 0; +} + +/* writev() */ + +int32_t +changelog_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                    int32_t op_ret, int32_t op_errno, struct iatt *prebuf, +                    struct iatt *postbuf, +                    dict_t *xdata) +{ +        changelog_priv_t  *priv  = NULL; +        changelog_local_t *local = NULL; + +        priv  = this->private; +        local = frame->local; + +        CHANGELOG_COND_GOTO (priv, ((op_ret <= 0) || !local), unwind); + +        changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); + + unwind: +        CHANGELOG_STACK_UNWIND (writev, frame, +                                op_ret, op_errno, prebuf, postbuf, xdata); +        return 0; +} + +int32_t +changelog_writev (call_frame_t *frame, +                  xlator_t *this, fd_t *fd, struct iovec *vector, +                  int32_t count, off_t offset, uint32_t flags, +                  struct iobref *iobref, dict_t *xdata) +{ +        changelog_priv_t *priv = NULL; + +        priv = this->private; +        CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind); + +        CHANGELOG_INIT (this, frame->local, +                        fd->inode, fd->inode->gfid, 0); + + wind: +        STACK_WIND (frame, changelog_writev_cbk, FIRST_CHILD (this), +                    FIRST_CHILD (this)->fops->writev, fd, vector, +                    count, offset, flags, iobref, xdata); +        return 0; +} + +/* }}} */ + +/** + * The + *   - @init () + *   - @fini () + *   - @reconfigure () + *   ... and helper routines + */ + +/** + * needed if there are more operation modes in the future. + */ +static void +changelog_assign_opmode (changelog_priv_t *priv, char *mode) +{ +        if ( strncmp (mode, "realtime", 8) == 0 ) { +                priv->op_mode = CHANGELOG_MODE_RT; +        } +} + +static void +changelog_assign_encoding (changelog_priv_t *priv, char *enc) +{ +        if ( strncmp (enc, "binary", 6) == 0 ) { +                priv->encode_mode = CHANGELOG_ENCODE_BINARY; +        } else if ( strncmp (enc, "ascii", 5) == 0 ) { +                priv->encode_mode = CHANGELOG_ENCODE_ASCII; +        } +} + +/* cleanup any helper threads that are running */ +static void +changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv) +{ +        if (priv->cr.rollover_th) { +                changelog_thread_cleanup (this, priv->cr.rollover_th); +                priv->cr.rollover_th = 0; +        } + +        if (priv->cf.fsync_th) { +                changelog_thread_cleanup (this, priv->cf.fsync_th); +                priv->cf.fsync_th = 0; +        } +} + +/* spawn helper thread; cleaning up in case of errors */ +static int +changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv) +{ +        int ret = 0; + +        priv->cr.this = this; +        ret = pthread_create (&priv->cr.rollover_th, +                              NULL, changelog_rollover, priv); +        if (ret) +                goto out; + +        if (priv->fsync_interval) { +                priv->cf.this = this; +                ret = pthread_create (&priv->cf.fsync_th, +                                      NULL, changelog_fsync_thread, priv); +        } + +        if (ret) +                changelog_cleanup_helper_threads (this, priv); + + out: +        return ret; +} + +/* cleanup the notifier thread */ +static int +changelog_cleanup_notifier (xlator_t *this, changelog_priv_t *priv) +{ +        int ret = 0; + +        if (priv->cn.notify_th) { +                changelog_thread_cleanup (this, priv->cn.notify_th); +                priv->cn.notify_th = 0; + +                ret = close (priv->wfd); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, +                                "error closing writer end of notifier pipe" +                                " (reason: %s)", strerror (errno)); +        } + +        return ret; +} + +/* spawn the notifier thread - nop if already running */ +static int +changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv) +{ +        int ret        = 0; +        int flags      = 0; +        int pipe_fd[2] = {0, 0}; + +        if (priv->cn.notify_th) +                goto out; /* notifier thread already running */ + +        ret = pipe (pipe_fd); +        if (ret == -1) { +                gf_log (this->name, GF_LOG_ERROR, +                        "Cannot create pipe (reason: %s)", strerror (errno)); +                goto out; +        } + +        /* writer is non-blocking */ +        flags = fcntl (pipe_fd[1], F_GETFL); +        flags |= O_NONBLOCK; + +        ret = fcntl (pipe_fd[1], F_SETFL, flags); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "failed to set O_NONBLOCK flag"); +                goto out; +        } + +        priv->wfd = pipe_fd[1]; + +        priv->cn.this = this; +        priv->cn.rfd  = pipe_fd[0]; + +        ret = pthread_create (&priv->cn.notify_th, +                              NULL, changelog_notifier, priv); + + out: +        return ret; +} + +int32_t +mem_acct_init (xlator_t *this) +{ +        int     ret = -1; + +        if (!this) +                return ret; + +        ret = xlator_mem_acct_init (this, gf_changelog_mt_end + 1); + +        if (ret != 0) { +                gf_log (this->name, GF_LOG_WARNING, "Memory accounting" +                        " init failed"); +                return ret; +        } + +        return ret; +} + +static int +changelog_init (xlator_t *this, changelog_priv_t *priv) +{ +        int                  i   = 0; +        int                  ret = -1; +        struct timeval       tv  = {0,}; +        changelog_log_data_t cld = {0,}; + +        ret = gettimeofday (&tv, NULL); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "gettimeofday() failure"); +                goto out; +        } + +        priv->slice.tv_start = tv; + +        priv->maps[CHANGELOG_TYPE_DATA]     = "D "; +        priv->maps[CHANGELOG_TYPE_METADATA] = "M "; +        priv->maps[CHANGELOG_TYPE_ENTRY]    = "E "; + +        /* spawn the notifier thread */ +        ret = changelog_spawn_notifier (this, priv); +        if (ret) +                goto out; + +        /** +         * start with a fresh changelog file every time. this is done +         * in case there was an encoding change. so... things are kept +         * simple here. +         */ +        ret = changelog_fill_rollover_data (&cld, _gf_false); +        if (ret) +                goto out; + +        LOCK (&priv->lock); +        { +                for (; i < CHANGELOG_MAX_TYPE; i++) { +                        /* start with version 1 */ +                        priv->slice.changelog_version[i] = 1; +                } + +                ret = changelog_inject_single_event (this, priv, &cld); +        } +        UNLOCK (&priv->lock); + +        /* ... and finally spawn the helpers threads */ +        ret = changelog_spawn_helper_threads (this, priv); + + out: +        return ret; +} + +int +reconfigure (xlator_t *this, dict_t *options) +{ +        int                     ret            = 0; +        char                   *tmp            = NULL; +        changelog_priv_t       *priv           = NULL; +        gf_boolean_t            active_earlier = _gf_true; +        gf_boolean_t            active_now     = _gf_true; +        changelog_time_slice_t *slice          = NULL; +        changelog_log_data_t    cld            = {0,}; + +        priv = this->private; +        if (!priv) +                goto out; + +        ret = -1; +        active_earlier = priv->active; + +        /* first stop the rollover and the fsync thread */ +        changelog_cleanup_helper_threads (this, priv); + +        GF_OPTION_RECONF ("changelog-dir", tmp, options, str, out); +        if (!tmp) { +                gf_log (this->name, GF_LOG_ERROR, +                        "\"changelog-dir\" option is not set"); +                goto out; +        } + +        GF_FREE (priv->changelog_dir); +        priv->changelog_dir = gf_strdup (tmp); +        if (!priv->changelog_dir) +                goto out; + +        ret = mkdir_p (priv->changelog_dir, 0600, _gf_true); +        if (ret) +                goto out; + +        GF_OPTION_RECONF ("changelog", active_now, options, bool, out); + +        /** +         * changelog_handle_change() handles changes that could possibly +         * have been submit changes before changelog deactivation. +         */ +        if (!active_now) +                priv->active = _gf_false; + +        GF_OPTION_RECONF ("op-mode", tmp, options, str, out); +        changelog_assign_opmode (priv, tmp); + +        GF_OPTION_RECONF ("encoding", tmp, options, str, out); +        changelog_assign_encoding (priv, tmp); + +        GF_OPTION_RECONF ("rollover-time", +                          priv->rollover_time, options, int32, out); +        GF_OPTION_RECONF ("fsync-interval", +                          priv->fsync_interval, options, int32, out); + +        if (active_now || active_earlier) { +                ret = changelog_fill_rollover_data (&cld, !active_now); +                if (ret) +                        goto out; + +                slice = &priv->slice; + +                LOCK (&priv->lock); +                { +                        ret = changelog_inject_single_event (this, priv, &cld); +                        if (!ret && active_now) +                                SLICE_VERSION_UPDATE (slice); +                } +                UNLOCK (&priv->lock); + +                if (ret) +                        goto out; + +                if (active_now) { +                        ret = changelog_spawn_notifier (this, priv); +                        if (!ret) +                                ret = changelog_spawn_helper_threads (this, +                                                                      priv); +                } else +                        ret = changelog_cleanup_notifier (this, priv); +        } + + out: +        if (ret) { +                ret = changelog_cleanup_notifier (this, priv); +        } else { +                gf_log (this->name, GF_LOG_DEBUG, +                        "changelog reconfigured"); +                if (active_now) +                        priv->active = _gf_true; +        } + +        return ret; +} + +int32_t +init (xlator_t *this) +{ +        int               ret  = -1; +        char             *tmp  = NULL; +        changelog_priv_t *priv = NULL; + +        GF_VALIDATE_OR_GOTO ("changelog", this, out); + +        if (!this->children || this->children->next) { +                gf_log (this->name, GF_LOG_ERROR, +                        "translator needs a single subvolume"); +                goto out; +        } + +        if (!this->parents) { +                gf_log (this->name, GF_LOG_ERROR, +                        "dangling volume. please check volfile"); +                goto out; +        } + +        priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t); +        if (!priv) +                goto out; + +        this->local_pool = mem_pool_new (changelog_local_t, 64); +        if (!this->local_pool) { +                gf_log (this->name, GF_LOG_ERROR, +                        "failed to create local memory pool"); +                goto out; +        } + +        LOCK_INIT (&priv->lock); + +        GF_OPTION_INIT ("changelog-brick", tmp, str, out); +        if (!tmp) { +                gf_log (this->name, GF_LOG_ERROR, +                        "\"changelog-brick\" option is not set"); +                goto out; +        } + +        priv->changelog_brick = gf_strdup (tmp); +        if (!priv->changelog_brick) +                goto out; +        tmp = NULL; + +        GF_OPTION_INIT ("changelog-dir", tmp, str, out); +        if (!tmp) { +                gf_log (this->name, GF_LOG_ERROR, +                        "\"changelog-dir\" option is not set"); +                goto out; +        } + +        priv->changelog_dir = gf_strdup (tmp); +        if (!priv->changelog_dir) +                goto out; +        tmp = NULL; + +        /** +         * create the directory even if change-logging would be inactive +         * so that consumers can _look_ into it (finding nothing...) +         */ +        ret = mkdir_p (priv->changelog_dir, 0600, _gf_true); +        if (ret) +                goto out; + +        GF_OPTION_INIT ("changelog", priv->active, bool, out); +        if (!priv->active) { +                ret = 0; +                goto out; +        } + +        GF_OPTION_INIT ("op-mode", tmp, str, out); +        changelog_assign_opmode (priv, tmp); + +        tmp = NULL; + +        GF_OPTION_INIT ("encoding", tmp, str, out); +        changelog_assign_encoding (priv, tmp); + +        GF_OPTION_INIT ("rollover-time", priv->rollover_time, int32, out); + +        GF_OPTION_INIT ("fsync-interval", priv->fsync_interval, int32, out); + +        GF_ASSERT (cb_encoder[priv->encode_mode].encoder == priv->encode_mode); +        priv->ce = &cb_encoder[priv->encode_mode]; + +        GF_ASSERT (cb_bootstrap[priv->op_mode].mode == priv->op_mode); +        priv->cb = &cb_bootstrap[priv->op_mode]; + +        /* ... now bootstrap the logger */ +        ret = priv->cb->ctor (this, &priv->cd); +        if (ret) +                goto out; + +        priv->changelog_fd = -1; +        if (priv->active) +                ret = changelog_init (this, priv); +        if (ret) +                goto out; + +        gf_log (this->name, GF_LOG_DEBUG, "changelog translator loaded"); + + out: +        if (ret) { +                if (this->local_pool) +                        mem_pool_destroy (this->local_pool); +                ret = priv->cb->dtor (this, &priv->cd); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, +                                "error in cleanup during init()"); +                GF_FREE (priv->changelog_brick); +                GF_FREE (priv->changelog_dir); +                GF_FREE (priv); +                this->private = NULL; +        } else +                this->private = priv; + +        return ret; +} + +void +fini (xlator_t *this) +{ +        int               ret  = -1; +        changelog_priv_t *priv = NULL; + +        priv = this->private; + +        if (priv) { +                ret = priv->cb->dtor (this, &priv->cd); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, +                                "error in fini"); +                mem_pool_destroy (this->local_pool); +                GF_FREE (priv->changelog_brick); +                GF_FREE (priv->changelog_dir); +                GF_FREE (priv); +        } + +        this->private = NULL; + +        return; +} + +struct xlator_fops fops = { +        .mknod        = changelog_mknod, +        .mkdir        = changelog_mkdir, +        .create       = changelog_create, +        .symlink      = changelog_symlink, +        .writev       = changelog_writev, +        .truncate     = changelog_truncate, +        .ftruncate    = changelog_ftruncate, +        .link         = changelog_link, +        .rename       = changelog_rename, +        .unlink       = changelog_unlink, +        .rmdir        = changelog_rmdir, +        .setattr      = changelog_setattr, +        .fsetattr     = changelog_fsetattr, +        .setxattr     = changelog_setxattr, +        .fsetxattr    = changelog_fsetxattr, +        .removexattr  = changelog_removexattr, +        .fremovexattr = changelog_fremovexattr, +}; + +struct xlator_cbks cbks = { +        .forget = changelog_forget, +}; + +struct volume_options options[] = { +        {.key = {"changelog"}, +         .type = GF_OPTION_TYPE_BOOL, +         .default_value = "off", +         .description = "enable/disable change-logging" +        }, +        {.key = {"changelog-brick"}, +         .type = GF_OPTION_TYPE_PATH, +         .description = "brick path to generate unique socket file name." +                       " should be the export directory of the volume strictly." +        }, +        {.key = {"changelog-dir"}, +         .type = GF_OPTION_TYPE_PATH, +         .description = "directory for the changelog files" +        }, +        {.key = {"op-mode"}, +         .type = GF_OPTION_TYPE_STR, +         .default_value = "realtime", +         .value = {"realtime"}, +         .description = "operation mode - futuristic operation modes" +        }, +        {.key = {"encoding"}, +         .type = GF_OPTION_TYPE_STR, +         .default_value = "ascii", +         .value = {"binary", "ascii"}, +         .description = "encoding type for changelogs" +        }, +        {.key = {"rollover-time"}, +         .default_value = "60", +         .type = GF_OPTION_TYPE_TIME, +         .description = "time to switch to a new changelog file (in seconds)" +        }, +        {.key = {"fsync-interval"}, +         .type = GF_OPTION_TYPE_TIME, +         .default_value = "0", +         .description = "do not open CHANGELOG file with O_SYNC mode." +                        " instead perform fsync() at specified intervals" +        }, +        {.key = {NULL} +        }, +}; diff --git a/xlators/features/marker/utils/syncdaemon/Makefile.am b/xlators/features/marker/utils/syncdaemon/Makefile.am index cc7cee10..c19f6b45 100644 --- a/xlators/features/marker/utils/syncdaemon/Makefile.am +++ b/xlators/features/marker/utils/syncdaemon/Makefile.am @@ -1,6 +1,7 @@  syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon -syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ -                    $(top_builddir)/contrib/ipaddr-py/ipaddr.py +syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \ +	resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ +	$(top_builddir)/contrib/ipaddr-py/ipaddr.py  CLEANFILES = diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c index fea67e77..be594183 100644 --- a/xlators/mgmt/glusterd/src/glusterd-utils.c +++ b/xlators/mgmt/glusterd/src/glusterd-utils.c @@ -11,7 +11,6 @@  #define _CONFIG_H  #include "config.h"  #endif -#include <openssl/md5.h>  #include <inttypes.h>  #include "globals.h" @@ -88,16 +87,6 @@ gd_peer_uuid_str (glusterd_peerinfo_t *peerinfo)          return peerinfo->uuid_str;  } -static void -md5_wrapper(const unsigned char *data, size_t len, char *md5) -{ -        unsigned short i = 0; -        unsigned short lim = MD5_DIGEST_LENGTH*2+1; -        unsigned char scratch[MD5_DIGEST_LENGTH] = {0,}; -        MD5(data, len, scratch); -        for (; i < MD5_DIGEST_LENGTH; i++) -                snprintf(md5 + i * 2, lim-i*2, "%02x", scratch[i]); -}  int32_t  glusterd_get_lock_owner (uuid_t *uuid) diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c index 4162274e..127e4c46 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.c +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c @@ -1422,6 +1422,7 @@ server_graph_builder (volgen_graph_t *graph, glusterd_volinfo_t *volinfo,          char     *vgname                  = NULL;          char     *vg                      = NULL;          glusterd_brickinfo_t *brickinfo   = NULL; +        char changelog_basepath[PATH_MAX] = {0,};          brickinfo = param;          path      = brickinfo->path; @@ -1481,6 +1482,25 @@ server_graph_builder (volgen_graph_t *graph, glusterd_volinfo_t *volinfo,                  if (ret)                          return -1;          } + +        xl = volgen_graph_add (graph, "features/changelog", volname); +        if (!xl) +                return -1; + +        ret = xlator_set_option (xl, "changelog-brick", path); +        if (ret) +                return -1; + +        snprintf (changelog_basepath, sizeof (changelog_basepath), +                  "%s/%s", path, ".glusterfs/changelogs"); +        ret = xlator_set_option (xl, "changelog-dir", changelog_basepath); +        if (ret) +                return -1; + +        ret = check_and_add_debug_xl (graph, set_dict, volname, "changelog"); +        if (ret) +                return -1; +          xl = volgen_graph_add (graph, "features/access-control", volname);          if (!xl)                  return -1; diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c index 40949f01..6575e252 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c +++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c @@ -1240,6 +1240,32 @@ struct volopt_map_entry glusterd_volopt_map[] = {            .value       = "0",            .op_version  = 2          }, +        /* changelog translator - global tunables */ +        { .key         = "changelog.changelog", +          .voltype     = "features/changelog", +          .type        = NO_DOC, +          .op_version  = 2 +        }, +        { .key         = "changelog.changelog-dir", +          .voltype     = "features/changelog", +          .type        = NO_DOC, +          .op_version  = 2 +        }, +        { .key         = "changelog.encoding", +          .voltype     = "features/changelog", +          .type        = NO_DOC, +          .op_version  = 2 +        }, +        { .key         = "changelog.rollover-time", +          .voltype     = "features/changelog", +          .type        = NO_DOC, +          .op_version  = 2 +        }, +        { .key         = "changelog.fsync-interval", +          .voltype     = "features/changelog", +          .type        = NO_DOC, +          .op_version  = 2 +        },          { .key         = NULL          }  };  | 
