diff options
Diffstat (limited to 'xlators/features/changelog/lib')
18 files changed, 4521 insertions, 0 deletions
diff --git a/xlators/features/changelog/lib/Makefile.am b/xlators/features/changelog/lib/Makefile.am new file mode 100644 index 00000000000..a985f42a877 --- /dev/null +++ b/xlators/features/changelog/lib/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src + +CLEANFILES = diff --git a/xlators/features/changelog/lib/examples/c/get-changes-multi.c b/xlators/features/changelog/lib/examples/c/get-changes-multi.c new file mode 100644 index 00000000000..5ea5bbb6630 --- /dev/null +++ b/xlators/features/changelog/lib/examples/c/get-changes-multi.c @@ -0,0 +1,90 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +/** + * Compile it using: + * gcc -o getchanges-multi `pkg-config --cflags libgfchangelog` \ + * get-changes-multi.c `pkg-config --libs libgfchangelog` + */ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/un.h> +#include <limits.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <errno.h> + +#include "changelog.h" + +void * +brick_init(void *xl, struct gf_brick_spec *brick) +{ + return brick; +} + +void +brick_fini(void *xl, char *brick, void *data) +{ + return; +} + +void +brick_callback(void *xl, char *brick, void *data, changelog_event_t *ev) +{ + printf("->callback: (brick,type) [%s:%d]\n", brick, ev->ev_type); +} + +void +fill_brick_spec(struct gf_brick_spec *brick, char *path) +{ + brick->brick_path = strdup(path); + brick->filter = CHANGELOG_OP_TYPE_BR_RELEASE; + + brick->init = brick_init; + brick->fini = brick_fini; + brick->callback = brick_callback; + brick->connected = NULL; + brick->disconnected = NULL; +} + +int +main(int argc, char **argv) +{ + int ret = 0; + void *bricks = NULL; + struct gf_brick_spec *brick = NULL; + + bricks = calloc(2, sizeof(struct gf_brick_spec)); + if (!bricks) + goto error_return; + + brick = (struct gf_brick_spec *)bricks; + fill_brick_spec(brick, "/export/z1/zwoop"); + + brick++; + fill_brick_spec(brick, "/export/z2/zwoop"); + + ret = gf_changelog_init(NULL); + if (ret) + goto error_return; + + ret = gf_changelog_register_generic((struct gf_brick_spec *)bricks, 2, 0, + "/tmp/multi-changes.log", 9, NULL); + if (ret) + goto error_return; + + /* let callbacks do the job */ + select(0, NULL, NULL, NULL, NULL); + +error_return: + return -1; +} diff --git a/xlators/features/changelog/lib/examples/c/get-changes.c b/xlators/features/changelog/lib/examples/c/get-changes.c new file mode 100644 index 00000000000..8bc651c24a4 --- /dev/null +++ b/xlators/features/changelog/lib/examples/c/get-changes.c @@ -0,0 +1,93 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +/** + * get set of new changes every 10 seconds (just print the file names) + * + * Compile it using: + * gcc -o getchanges `pkg-config --cflags libgfchangelog` get-changes.c \ + * `pkg-config --libs libgfchangelog` + */ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/un.h> +#include <limits.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <errno.h> + +#include "changelog.h" + +#define handle_error(fn) printf("%s (reason: %s)\n", fn, strerror(errno)) + +int +main(int argc, char **argv) +{ + int i = 0; + int ret = 0; + ssize_t nr_changes = 0; + ssize_t changes = 0; + char fbuf[PATH_MAX] = { + 0, + }; + + ret = gf_changelog_init(NULL); + if (ret) { + handle_error("Init failed"); + goto out; + } + + /* get changes for brick "/home/vshankar/export/yow/yow-1" */ + ret = gf_changelog_register("/export/z1/zwoop", "/tmp/scratch", + "/tmp/change.log", 9, 5); + if (ret) { + handle_error("register failed"); + goto out; + } + + while (1) { + i = 0; + nr_changes = gf_changelog_scan(); + if (nr_changes < 0) { + handle_error("scan(): "); + break; + } + + if (nr_changes == 0) + goto next; + + printf("Got %ld changelog files\n", nr_changes); + + while ((changes = gf_changelog_next_change(fbuf, PATH_MAX)) > 0) { + printf("changelog file [%d]: %s\n", ++i, fbuf); + + /* process changelog */ + /* ... */ + /* ... */ + /* ... */ + /* done processing */ + + ret = gf_changelog_done(fbuf); + if (ret) + handle_error("gf_changelog_done"); + } + + if (changes == -1) + handle_error("gf_changelog_next_change"); + + next: + sleep(10); + } + +out: + return ret; +} diff --git a/xlators/features/changelog/lib/examples/c/get-history.c b/xlators/features/changelog/lib/examples/c/get-history.c new file mode 100644 index 00000000000..3e888d75ca6 --- /dev/null +++ b/xlators/features/changelog/lib/examples/c/get-history.c @@ -0,0 +1,116 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +/** + * get set of new changes every 10 seconds (just print the file names) + * + * Compile it using: + * gcc -o gethistory `pkg-config --cflags libgfchangelog` get-history.c \ + * `pkg-config --libs libgfchangelog` + */ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/un.h> +#include <limits.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <errno.h> + +#include "changelog.h" + +#define handle_error(fn) printf("%s (reason: %s)\n", fn, strerror(errno)) + +int +main(int argc, char **argv) +{ + int i = 0; + int ret = 0; + ssize_t nr_changes = 0; + ssize_t changes = 0; + char fbuf[PATH_MAX] = { + 0, + }; + unsigned long end_ts = 0; + + ret = gf_changelog_init(NULL); + if (ret) { + handle_error("init failed"); + goto out; + } + + ret = gf_changelog_register("/export/z1/zwoop", "/tmp/scratch_v1", + "/tmp/changes.log", 9, 5); + if (ret) { + handle_error("register failed"); + goto out; + } + + int a, b; + printf("give the two numbers start and end\t"); + scanf("%d%d", &a, &b); + ret = gf_history_changelog("/export/z1/zwoop/.glusterfs/changelogs", a, b, + 3, &end_ts); + if (ret == -1) { + printf("history failed"); + goto out; + } + + printf("end time till when changelog available : %d , ret(%d) \t", end_ts, + ret); + fflush(stdout); + + while (1) { + nr_changes = gf_history_changelog_scan(); + printf("scanned, nr_changes : %d\n", nr_changes); + if (nr_changes < 0) { + handle_error("scan(): "); + break; + } + + if (nr_changes == 0) { + printf("done scanning \n"); + goto out; + } + + printf("Got %ld changelog files\n", nr_changes); + + while ((changes = gf_history_changelog_next_change(fbuf, PATH_MAX)) > + 0) { + printf("changelog file [%d]: %s\n", ++i, fbuf); + + /* process changelog */ + /* ... */ + /* ... */ + /* ... */ + /* done processing */ + + ret = gf_history_changelog_done(fbuf); + if (ret) + handle_error("gf_changelog_done"); + } + /* + if (changes == -1) + handle_error ("gf_changelog_next_change"); + if (nr_changes ==1){ + printf("continue scanning\n"); + } + + if(nr_changes == 0){ + printf("done scanning \n"); + goto out; + } + */ + } + +out: + return ret; +} diff --git a/xlators/features/changelog/lib/examples/python/changes.py b/xlators/features/changelog/lib/examples/python/changes.py new file mode 100755 index 00000000000..c410d3b000d --- /dev/null +++ b/xlators/features/changelog/lib/examples/python/changes.py @@ -0,0 +1,34 @@ +#!/usr/bin/python3 + +from __future__ import print_function +import os +import sys +import time +import libgfchangelog + +cl = libgfchangelog.Changes() + +def get_changes(brick, scratch_dir, log_file, log_level, interval): + change_list = [] + try: + cl.cl_init() + cl.cl_register(brick, scratch_dir, log_file, log_level) + while True: + cl.cl_scan() + change_list = cl.cl_getchanges() + if change_list: + print(change_list) + for change in change_list: + print(('done with %s' % (change))) + cl.cl_done(change) + time.sleep(interval) + except OSError: + ex = sys.exc_info()[1] + print(ex) + +if __name__ == '__main__': + if len(sys.argv) != 6: + print(("usage: %s <brick> <scratch-dir> <log-file> <fetch-interval>" + % (sys.argv[0]))) + sys.exit(1) + get_changes(sys.argv[1], sys.argv[2], sys.argv[3], 9, int(sys.argv[4])) diff --git a/xlators/features/changelog/lib/examples/python/libgfchangelog.py b/xlators/features/changelog/lib/examples/python/libgfchangelog.py new file mode 100644 index 00000000000..2da9f2d2a8c --- /dev/null +++ b/xlators/features/changelog/lib/examples/python/libgfchangelog.py @@ -0,0 +1,71 @@ +import os +from ctypes import * +from ctypes.util import find_library + +class Changes(object): + libgfc = CDLL(find_library("gfchangelog"), mode=RTLD_GLOBAL, + use_errno=True) + + @classmethod + def geterrno(cls): + return get_errno() + + @classmethod + def raise_oserr(cls): + errn = cls.geterrno() + raise OSError(errn, os.strerror(errn)) + + @classmethod + def _get_api(cls, call): + return getattr(cls.libgfc, call) + + @classmethod + def cl_init(cls): + ret = cls._get_api('gf_changelog_init')(None) + if ret == -1: + cls.raise_changelog_err() + + @classmethod + def cl_register(cls, brick, path, log_file, log_level, retries = 0): + ret = cls._get_api('gf_changelog_register')(brick, path, + log_file, log_level, retries) + if ret == -1: + cls.raise_oserr() + + @classmethod + def cl_scan(cls): + ret = cls._get_api('gf_changelog_scan')() + if ret == -1: + cls.raise_oserr() + + @classmethod + def cl_startfresh(cls): + ret = cls._get_api('gf_changelog_start_fresh')() + if ret == -1: + cls.raise_oserr() + + @classmethod + def cl_getchanges(cls): + """ remove hardcoding for path name length """ + def clsort(f): + return f.split('.')[-1] + changes = [] + buf = create_string_buffer('\0', 4096) + call = cls._get_api('gf_changelog_next_change') + + while True: + ret = call(buf, 4096) + if ret in (0, -1): + break; + changes.append(buf.raw[:ret-1]) + if ret == -1: + cls.raise_oserr() + # cleanup tracker + cls.cl_startfresh() + return sorted(changes, key=clsort) + + @classmethod + def cl_done(cls, clfile): + ret = cls._get_api('gf_changelog_done')(clfile) + if ret == -1: + cls.raise_oserr() diff --git a/xlators/features/changelog/lib/src/Makefile.am b/xlators/features/changelog/lib/src/Makefile.am new file mode 100644 index 00000000000..c933ec53ed2 --- /dev/null +++ b/xlators/features/changelog/lib/src/Makefile.am @@ -0,0 +1,35 @@ +libgfchangelog_la_CFLAGS = -Wall $(GF_CFLAGS) $(GF_DARWIN_LIBGLUSTERFS_CFLAGS) \ + -DDATADIR=\"$(localstatedir)\" + +libgfchangelog_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 -D__USE_LARGEFILE64 -fpic \ + -I../../../src/ -I$(top_srcdir)/libglusterfs/src \ + -I$(top_srcdir)/xlators/features/changelog/src \ + -I$(top_srcdir)/rpc/xdr/src -I$(top_builddir)/rpc/xdr/src \ + -I$(top_srcdir)/rpc/rpc-lib/src \ + -I$(top_srcdir)/rpc/rpc-transport/socket/src \ + -DDATADIR=\"$(localstatedir)\" + +libgfchangelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \ + $(top_builddir)/rpc/xdr/src/libgfxdr.la \ + $(top_builddir)/rpc/rpc-lib/src/libgfrpc.la + +libgfchangelog_la_LDFLAGS = $(GF_LDFLAGS) \ + -version-info $(LIBGFCHANGELOG_LT_VERSION) \ + $(GF_NO_UNDEFINED) + +lib_LTLIBRARIES = libgfchangelog.la + +CONTRIB_BUILDDIR = $(top_builddir)/contrib + +libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-journal-handler.c \ + gf-changelog-helpers.c gf-changelog-api.c gf-history-changelog.c \ + gf-changelog-rpc.c gf-changelog-reborp.c \ + $(top_srcdir)/xlators/features/changelog/src/changelog-rpc-common.c + +noinst_HEADERS = gf-changelog-helpers.h gf-changelog-rpc.h \ + gf-changelog-journal.h changelog-lib-messages.h + +CLEANFILES = + +$(top_builddir)/libglusterfs/src/libglusterfs.la: + $(MAKE) -C $(top_builddir)/libglusterfs/src/ all diff --git a/xlators/features/changelog/lib/src/changelog-lib-messages.h b/xlators/features/changelog/lib/src/changelog-lib-messages.h new file mode 100644 index 00000000000..d7fe7274353 --- /dev/null +++ b/xlators/features/changelog/lib/src/changelog-lib-messages.h @@ -0,0 +1,74 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. + */ + +#ifndef _CHANGELOG_LIB_MESSAGES_H_ +#define _CHANGELOG_LIB_MESSAGES_H_ + +#include <glusterfs/glfs-message-id.h> + +/* To add new message IDs, append new identifiers at the end of the list. + * + * Never remove a message ID. If it's not used anymore, you can rename it or + * leave it as it is, but not delete it. This is to prevent reutilization of + * IDs by other messages. + * + * The component name must match one of the entries defined in + * glfs-message-id.h. + */ + +GLFS_MSGID( + CHANGELOG_LIB, CHANGELOG_LIB_MSG_OPEN_FAILED, + CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, + CHANGELOG_LIB_MSG_SCRATCH_DIR_ENTRIES_CREATION_ERROR, + CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, CHANGELOG_LIB_MSG_OPENDIR_ERROR, + CHANGELOG_LIB_MSG_RENAME_FAILED, CHANGELOG_LIB_MSG_READ_ERROR, + CHANGELOG_LIB_MSG_HTIME_ERROR, CHANGELOG_LIB_MSG_GET_TIME_ERROR, + CHANGELOG_LIB_MSG_WRITE_FAILED, CHANGELOG_LIB_MSG_PTHREAD_ERROR, + CHANGELOG_LIB_MSG_MMAP_FAILED, CHANGELOG_LIB_MSG_MUNMAP_FAILED, + CHANGELOG_LIB_MSG_ASCII_ERROR, CHANGELOG_LIB_MSG_STAT_FAILED, + CHANGELOG_LIB_MSG_GET_XATTR_FAILED, CHANGELOG_LIB_MSG_PUBLISH_ERROR, + CHANGELOG_LIB_MSG_PARSE_ERROR, CHANGELOG_LIB_MSG_MIN_MAX_INFO, + CHANGELOG_LIB_MSG_CLEANUP_ERROR, CHANGELOG_LIB_MSG_UNLINK_FAILED, + CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED, + CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED, CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO, + CHANGELOG_LIB_MSG_CLEANING_BRICK_ENTRY_INFO, + CHANGELOG_LIB_MSG_FREEING_ENTRY_INFO, CHANGELOG_LIB_MSG_XDR_DECODING_FAILED, + CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, + CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, + CHANGELOG_LIB_MSG_COPY_FROM_BUFFER_FAILED, + CHANGELOG_LIB_MSG_PTHREAD_JOIN_FAILED, CHANGELOG_LIB_MSG_HIST_FAILED, + CHANGELOG_LIB_MSG_DRAINED_EVENT_INFO, CHANGELOG_LIB_MSG_PARSE_ERROR_CEASED, + CHANGELOG_LIB_MSG_REQUESTING_INFO, CHANGELOG_LIB_MSG_FINAL_INFO); + +#define CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO_STR "Registering brick" +#define CHANGELOG_LIB_MSG_RENAME_FAILED_STR "error moving changelog file" +#define CHANGELOG_LIB_MSG_OPEN_FAILED_STR "cannot open changelog file" +#define CHANGELOG_LIB_MSG_UNLINK_FAILED_STR "failed to unlink" +#define CHANGELOG_LIB_MSG_FAILED_TO_RMDIR_STR "failed to rmdir" +#define CHANGELOG_LIB_MSG_STAT_FAILED_STR "stat failed on changelog file" +#define CHANGELOG_LIB_MSG_PARSE_ERROR_STR "could not parse changelog" +#define CHANGELOG_LIB_MSG_PARSE_ERROR_CEASED_STR \ + "parsing error, ceased publishing..." +#define CHANGELOG_LIB_MSG_HTIME_ERROR_STR "fop failed on htime file" +#define CHANGELOG_LIB_MSG_GET_XATTR_FAILED_STR \ + "error extracting max timstamp from htime file" +#define CHANGELOG_LIB_MSG_MIN_MAX_INFO_STR "changelogs min max" +#define CHANGELOG_LIB_MSG_REQUESTING_INFO_STR "Requesting historical changelogs" +#define CHANGELOG_LIB_MSG_FINAL_INFO_STR "FINAL" +#define CHANGELOG_LIB_MSG_HIST_FAILED_STR \ + "Requested changelog range is not available" +#define CHANGELOG_LIB_MSG_GET_TIME_ERROR_STR "wrong result" +#define CHANGELOG_LIB_MSG_CLEANING_BRICK_ENTRY_INFO_STR \ + "Cleaning brick entry for brick" +#define CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO_STR "Draining event" +#define CHANGELOG_LIB_MSG_DRAINED_EVENT_INFO_STR "Drained event" +#define CHANGELOG_LIB_MSG_FREEING_ENTRY_INFO_STR "freeing entry" + +#endif /* !_CHANGELOG_MESSAGES_H_ */ diff --git a/xlators/features/changelog/lib/src/gf-changelog-api.c b/xlators/features/changelog/lib/src/gf-changelog-api.c new file mode 100644 index 00000000000..81a5cbfec10 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-api.c @@ -0,0 +1,224 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <glusterfs/compat-uuid.h> +#include <glusterfs/globals.h> +#include <glusterfs/glusterfs.h> +#include <glusterfs/syscall.h> + +#include "gf-changelog-helpers.h" +#include "gf-changelog-journal.h" +#include "changelog-mem-types.h" +#include "changelog-lib-messages.h" + +int +gf_changelog_done(char *file) +{ + int ret = -1; + char *buffer = NULL; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + char to_path[PATH_MAX] = { + 0, + }; + + errno = EINVAL; + + this = THIS; + if (!this) + goto out; + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; + + if (!file || !strlen(file)) + goto out; + + /* make sure 'file' is inside ->jnl_working_dir */ + buffer = realpath(file, NULL); + if (!buffer) + goto out; + + if (strncmp(jnl->jnl_working_dir, buffer, strlen(jnl->jnl_working_dir))) + goto out; + + (void)snprintf(to_path, PATH_MAX, "%s%s", jnl->jnl_processed_dir, + basename(buffer)); + gf_msg_debug(this->name, 0, "moving %s to processed directory", file); + ret = sys_rename(buffer, to_path); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_RENAME_FAILED, "from=%s", file, "to=%s", + to_path, NULL); + goto out; + } + + ret = 0; + +out: + if (buffer) + free(buffer); /* allocated by realpath() */ + return ret; +} + +/** + * @API + * for a set of changelogs, start from the beginning + */ +int +gf_changelog_start_fresh() +{ + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + + this = THIS; + if (!this) + goto out; + + errno = EINVAL; + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; + + if (gf_ftruncate(jnl->jnl_fd, 0)) + goto out; + + return 0; + +out: + return -1; +} + +/** + * @API + * return the next changelog file entry. zero means all chanelogs + * consumed. + */ +ssize_t +gf_changelog_next_change(char *bufptr, size_t maxlen) +{ + ssize_t size = -1; + int tracker_fd = 0; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + char buffer[PATH_MAX] = { + 0, + }; + + errno = EINVAL; + + this = THIS; + if (!this) + goto out; + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; + + tracker_fd = jnl->jnl_fd; + + size = gf_readline(tracker_fd, buffer, maxlen); + if (size < 0) { + size = -1; + goto out; + } + + if (size == 0) + goto out; + + memcpy(bufptr, buffer, size - 1); + bufptr[size - 1] = '\0'; + +out: + return size; +} + +/** + * @API + * gf_changelog_scan() - scan and generate a list of change entries + * + * calling this api multiple times (without calling gf_changlog_done()) + * would result new changelogs(s) being refreshed in the tracker file. + * This call also acts as a cancellation point for the consumer. + */ +ssize_t +gf_changelog_scan() +{ + int tracker_fd = 0; + size_t off = 0; + xlator_t *this = NULL; + size_t nr_entries = 0; + gf_changelog_journal_t *jnl = NULL; + struct dirent *entry = NULL; + struct dirent scratch[2] = { + { + 0, + }, + }; + char buffer[PATH_MAX] = { + 0, + }; + + this = THIS; + if (!this) + goto out; + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; + if (JNL_IS_API_DISCONNECTED(jnl)) { + errno = ENOTCONN; + goto out; + } + + errno = EINVAL; + + tracker_fd = jnl->jnl_fd; + if (gf_ftruncate(tracker_fd, 0)) + goto out; + + rewinddir(jnl->jnl_dir); + + for (;;) { + errno = 0; + entry = sys_readdir(jnl->jnl_dir, scratch); + if (!entry || errno != 0) + break; + + if (!strcmp(basename(entry->d_name), ".") || + !strcmp(basename(entry->d_name), "..")) + continue; + + nr_entries++; + + GF_CHANGELOG_FILL_BUFFER(jnl->jnl_processing_dir, buffer, off, + strlen(jnl->jnl_processing_dir)); + GF_CHANGELOG_FILL_BUFFER(entry->d_name, buffer, off, + strlen(entry->d_name)); + GF_CHANGELOG_FILL_BUFFER("\n", buffer, off, 1); + + if (gf_changelog_write(tracker_fd, buffer, off) != off) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_WRITE_FAILED, + "error writing changelog filename" + " to tracker file"); + break; + } + off = 0; + } + + if (!entry) { + if (gf_lseek(tracker_fd, 0, SEEK_SET) != -1) + return nr_entries; + } +out: + return -1; +} diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.c b/xlators/features/changelog/lib/src/gf-changelog-helpers.c new file mode 100644 index 00000000000..75f8a6dfc08 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.c @@ -0,0 +1,170 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "changelog-mem-types.h" +#include "gf-changelog-helpers.h" +#include "changelog-lib-messages.h" +#include <glusterfs/syscall.h> + +size_t +gf_changelog_write(int fd, char *buffer, size_t len) +{ + ssize_t size = 0; + size_t written = 0; + + while (written < len) { + size = sys_write(fd, buffer + written, len - written); + if (size <= 0) + break; + + written += size; + } + + return written; +} + +void +gf_rfc3986_encode_space_newline(unsigned char *s, char *enc, char *estr) +{ + for (; *s; s++) { + if (estr[*s]) + sprintf(enc, "%c", estr[*s]); + else + sprintf(enc, "%%%02X", *s); + while (*++enc) + ; + } +} + +/** + * thread safe version of readline with buffering + * (taken from Unix Network Programming Volume I, W.R. Stevens) + * + * This is favoured over fgets() as we'd need to ftruncate() + * (see gf_changelog_scan() API) to record new changelog files. + * stream open functions does have a truncate like api (although + * that can be done via @fflush(fp), @ftruncate(fd) and @fseek(fp), + * but this involves mixing POSIX file descriptors and stream FILE *). + * + * NOTE: This implementation still does work with more than one fd's + * used to perform gf_readline(). For this very reason it's not + * made a part of libglusterfs. + */ + +static __thread read_line_t thread_tsd = {}; + +static ssize_t +my_read(read_line_t *tsd, int fd, char *ptr) +{ + if (tsd->rl_cnt <= 0) { + tsd->rl_cnt = sys_read(fd, tsd->rl_buf, MAXLINE); + + if (tsd->rl_cnt < 0) + return -1; + else if (tsd->rl_cnt == 0) + return 0; + tsd->rl_bufptr = tsd->rl_buf; + } + + tsd->rl_cnt--; + *ptr = *tsd->rl_bufptr++; + return 1; +} + +ssize_t +gf_readline(int fd, void *vptr, size_t maxlen) +{ + size_t n = 0; + size_t rc = 0; + char c = ' '; + char *ptr = NULL; + read_line_t *tsd = &thread_tsd; + + ptr = vptr; + for (n = 1; n < maxlen; n++) { + if ((rc = my_read(tsd, fd, &c)) == 1) { + *ptr++ = c; + if (c == '\n') + break; + } else if (rc == 0) { + *ptr = '\0'; + return (n - 1); + } else + return -1; + } + + *ptr = '\0'; + return n; +} + +off_t +gf_lseek(int fd, off_t offset, int whence) +{ + off_t off = 0; + read_line_t *tsd = &thread_tsd; + + off = sys_lseek(fd, offset, whence); + if (off == -1) + return -1; + + tsd->rl_cnt = 0; + tsd->rl_bufptr = tsd->rl_buf; + + return off; +} + +int +gf_ftruncate(int fd, off_t length) +{ + read_line_t *tsd = &thread_tsd; + + if (sys_ftruncate(fd, 0)) + return -1; + + tsd->rl_cnt = 0; + tsd->rl_bufptr = tsd->rl_buf; + + return 0; +} + +int +gf_thread_cleanup(xlator_t *this, pthread_t thread) +{ + int ret = 0; + void *res = NULL; + + ret = pthread_cancel(thread); + if (ret != 0) { + gf_msg(this->name, GF_LOG_WARNING, 0, + CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, + "Failed to send cancellation to thread"); + goto error_return; + } + + ret = pthread_join(thread, &res); + if (ret != 0) { + gf_msg(this->name, GF_LOG_WARNING, 0, + CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, + "failed to join thread"); + goto error_return; + } + + if (res != PTHREAD_CANCELED) { + gf_msg(this->name, GF_LOG_WARNING, 0, + CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, + "Thread could not be cleaned up"); + goto error_return; + } + + return 0; + +error_return: + return -1; +} diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h new file mode 100644 index 00000000000..9c609d33172 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h @@ -0,0 +1,255 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef _GF_CHANGELOG_HELPERS_H +#define _GF_CHANGELOG_HELPERS_H + +#include <unistd.h> +#include <dirent.h> +#include <limits.h> +#include <glusterfs/locking.h> + +#include <glusterfs/xlator.h> + +#include "changelog.h" + +#include "changelog-rpc-common.h" +#include "gf-changelog-journal.h" + +#define GF_CHANGELOG_TRACKER "tracker" + +#define GF_CHANGELOG_CURRENT_DIR ".current" +#define GF_CHANGELOG_PROCESSED_DIR ".processed" +#define GF_CHANGELOG_PROCESSING_DIR ".processing" +#define GF_CHANGELOG_HISTORY_DIR ".history" +#define TIMESTAMP_LENGTH 10 + +#ifndef MAXLINE +#define MAXLINE 4096 +#endif + +#define GF_CHANGELOG_FILL_BUFFER(ptr, ascii, off, len) \ + do { \ + memcpy(ascii + off, ptr, len); \ + off += len; \ + } while (0) + +typedef struct read_line { + int rl_cnt; + char *rl_bufptr; + char rl_buf[MAXLINE]; +} read_line_t; + +struct gf_changelog; +struct gf_event; + +/** + * Event list for ordered event notification + * + * ->next_seq holds the next _expected_ sequence number. + */ +struct gf_event_list { + pthread_mutex_t lock; /* protects this structure */ + pthread_cond_t cond; + + pthread_t invoker; + + unsigned long next_seq; /* next sequence number expected: + zero during bootstrap */ + + struct gf_changelog *entry; /* backpointer to it's brick + encapsulator (entry) */ + struct list_head events; /* list of events */ +}; + +/** + * include a refcount if it's of use by additional layers + */ +struct gf_event { + int count; + + unsigned long seq; + + struct list_head list; + + struct iovec iov[0]; +}; +#define GF_EVENT_CALLOC_SIZE(cnt, len) \ + (sizeof(struct gf_event) + (cnt * sizeof(struct iovec)) + len) + +/** + * assign the base address of the IO vector to the correct memory +o * area and set it's addressable length. + */ +#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos) \ + do { \ + vec->iov_base = ((char *)event) + sizeof(struct gf_event) + \ + (event->count * sizeof(struct iovec)) + pos; \ + vec->iov_len = len; \ + pos += len; \ + } while (0) + +typedef enum gf_changelog_conn_state { + GF_CHANGELOG_CONN_STATE_PENDING = 0, + GF_CHANGELOG_CONN_STATE_ACCEPTED, + GF_CHANGELOG_CONN_STATE_DISCONNECTED, +} gf_changelog_conn_state_t; + +/** + * An instance of this structure is allocated for each brick for which + * notifications are streamed. + */ +typedef struct gf_changelog { + gf_lock_t statelock; + gf_changelog_conn_state_t connstate; + + xlator_t *this; + + struct list_head list; /* list of instances */ + + char brick[PATH_MAX]; /* brick path for this end-point */ + + changelog_rpc_t grpc; /* rpc{-clnt,svc} for this brick */ +#define RPC_PROBER(ent) ent->grpc.rpc +#define RPC_REBORP(ent) ent->grpc.svc +#define RPC_SOCK(ent) ent->grpc.sock + + unsigned int notify; /* notification flag(s) */ + + FINI *fini; /* destructor callback */ + CALLBACK *callback; /* event callback dispatcher */ + CONNECT *connected; /* connect callback */ + DISCONNECT *disconnected; /* disconnection callback */ + + void *ptr; /* owner specific private data */ + xlator_t *invokerxl; /* consumers _this_, if valid, + assigned to THIS before cbk is + invoked */ + + gf_boolean_t ordered; + + void (*queueevent)(struct gf_event_list *, struct gf_event *); + void (*pickevent)(struct gf_event_list *, struct gf_event **); + + struct gf_event_list event; +} gf_changelog_t; + +static inline int +gf_changelog_filter_check(gf_changelog_t *entry, changelog_event_t *event) +{ + if (event->ev_type & entry->notify) + return 1; + return 0; +} + +#define GF_NEED_ORDERED_EVENTS(ent) (ent->ordered == _gf_true) + +/** private structure */ +typedef struct gf_private { + pthread_mutex_t lock; /* protects ->connections, cleanups */ + pthread_cond_t cond; + + void *api; /* pointer for API access */ + + pthread_t poller; /* event poller thread */ + pthread_t connectionjanitor; /* connection cleaner */ + + struct list_head connections; /* list of connections */ + struct list_head cleanups; /* list of connection to be + cleaned up */ +} gf_private_t; + +#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *)this->private)->api) + +/** + * upcall: invoke callback with _correct_ THIS + */ +#define GF_CHANGELOG_INVOKE_CBK(this, cbk, brick, args...) \ + do { \ + xlator_t *old_this = NULL; \ + xlator_t *invokerxl = NULL; \ + \ + invokerxl = entry->invokerxl; \ + old_this = this; \ + \ + if (invokerxl) { \ + THIS = invokerxl; \ + } \ + \ + cbk(invokerxl, brick, args); \ + THIS = old_this; \ + \ + } while (0) + +#define SAVE_THIS(xl) \ + do { \ + old_this = xl; \ + THIS = master; \ + } while (0) + +#define RESTORE_THIS() \ + do { \ + if (old_this) \ + THIS = old_this; \ + } while (0) + +/** APIs and the rest */ + +void * +gf_changelog_process(void *data); + +void +gf_rfc3986_encode_space_newline(unsigned char *s, char *enc, char *estr); + +size_t +gf_changelog_write(int fd, char *buffer, size_t len); + +ssize_t +gf_readline(int fd, void *vptr, size_t maxlen); + +int +gf_ftruncate(int fd, off_t length); + +off_t +gf_lseek(int fd, off_t offset, int whence); + +int +gf_changelog_consume(xlator_t *this, gf_changelog_journal_t *jnl, + char *from_path, gf_boolean_t no_publish); +int +gf_changelog_publish(xlator_t *this, gf_changelog_journal_t *jnl, + char *from_path); +int +gf_thread_cleanup(xlator_t *this, pthread_t thread); +void * +gf_changelog_callback_invoker(void *arg); + +int +gf_cleanup_event(xlator_t *, struct gf_event_list *); + +/* (un)ordered event queueing */ +void +queue_ordered_event(struct gf_event_list *, struct gf_event *); + +void +queue_unordered_event(struct gf_event_list *, struct gf_event *); + +/* (un)ordered event picking */ +void +pick_event_ordered(struct gf_event_list *, struct gf_event **); + +void +pick_event_unordered(struct gf_event_list *, struct gf_event **); + +/* connection janitor thread */ +void * +gf_changelog_connection_janitor(void *); + +#endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c new file mode 100644 index 00000000000..7f6e2329e71 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c @@ -0,0 +1,1029 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <glusterfs/compat-uuid.h> +#include <glusterfs/globals.h> +#include <glusterfs/glusterfs.h> +#include <glusterfs/syscall.h> +#include <glusterfs/compat-errno.h> + +#include "gf-changelog-helpers.h" + +/* from the changelog translator */ +#include "changelog-misc.h" +#include "changelog-mem-types.h" + +#include "gf-changelog-journal.h" +#include "changelog-lib-messages.h" + +extern int byebye; + +enum changelog_versions { VERSION_1_1 = 0, VERSION_1_2 = 1 }; + +/** + * number of gfid records after fop number + */ +int nr_gfids[2][GF_FOP_MAXVALUE] = {{ + [GF_FOP_MKNOD] = 1, + [GF_FOP_MKDIR] = 1, + [GF_FOP_UNLINK] = 1, + [GF_FOP_RMDIR] = 1, + [GF_FOP_SYMLINK] = 1, + [GF_FOP_RENAME] = 2, + [GF_FOP_LINK] = 1, + [GF_FOP_CREATE] = 1, + }, + { + [GF_FOP_MKNOD] = 1, + [GF_FOP_MKDIR] = 1, + [GF_FOP_UNLINK] = 2, + [GF_FOP_RMDIR] = 2, + [GF_FOP_SYMLINK] = 1, + [GF_FOP_RENAME] = 2, + [GF_FOP_LINK] = 1, + [GF_FOP_CREATE] = 1, + }}; + +int nr_extra_recs[2][GF_FOP_MAXVALUE] = {{ + [GF_FOP_MKNOD] = 3, + [GF_FOP_MKDIR] = 3, + [GF_FOP_UNLINK] = 0, + [GF_FOP_RMDIR] = 0, + [GF_FOP_SYMLINK] = 0, + [GF_FOP_RENAME] = 0, + [GF_FOP_LINK] = 0, + [GF_FOP_CREATE] = 3, + }, + { + [GF_FOP_MKNOD] = 3, + [GF_FOP_MKDIR] = 3, + [GF_FOP_UNLINK] = 0, + [GF_FOP_RMDIR] = 0, + [GF_FOP_SYMLINK] = 0, + [GF_FOP_RENAME] = 0, + [GF_FOP_LINK] = 0, + [GF_FOP_CREATE] = 3, + }}; + +static char * +binary_to_ascii(uuid_t uuid) +{ + return uuid_utoa(uuid); +} + +static char * +conv_noop(char *ptr) +{ + return ptr; +} + +#define VERIFY_SEPARATOR(ptr, plen, perr) \ + { \ + if (*(ptr + plen) != '\0') { \ + perr = 1; \ + break; \ + } \ + } + +#define MOVER_MOVE(mover, nleft, bytes) \ + { \ + mover += bytes; \ + nleft -= bytes; \ + } + +#define PARSE_GFID(mov, ptr, le, fn, perr) \ + { \ + VERIFY_SEPARATOR(mov, le, perr); \ + ptr = fn(mov); \ + if (!ptr) { \ + perr = 1; \ + break; \ + } \ + } + +#define FILL_AND_MOVE(pt, buf, of, mo, nl, le) \ + { \ + GF_CHANGELOG_FILL_BUFFER(pt, buf, of, strlen(pt)); \ + MOVER_MOVE(mo, nl, le); \ + } + +#define PARSE_GFID_MOVE(ptr, uuid, mover, nleft, perr) \ + { \ + memcpy(uuid, mover, sizeof(uuid_t)); \ + ptr = binary_to_ascii(uuid); \ + if (!ptr) { \ + perr = 1; \ + break; \ + } \ + MOVER_MOVE(mover, nleft, sizeof(uuid_t)); \ + } + +#define LINE_BUFSIZE (3 * PATH_MAX) /* enough buffer for extra chars too */ + +/** + * using mmap() makes parsing easy. fgets() cannot be used here as + * the binary gfid could contain a line-feed (0x0A), in that case fgets() + * would read an incomplete line and parsing would fail. using POSIX fds + * would result is additional code to maintain state in case of partial + * reads of data (where multiple entries do not fit extirely in the buffer). + * + * mmap() gives the flexibility of pointing to an offset in the file + * without us worrying about reading it in memory (VM does that for us for + * free). + */ + +static int +gf_changelog_parse_binary(xlator_t *this, gf_changelog_journal_t *jnl, + int from_fd, int to_fd, size_t start_offset, + struct stat *stbuf, int version_idx) + +{ + int ret = -1; + off_t off = 0; + off_t nleft = 0; + uuid_t uuid = { + 0, + }; + char *ptr = NULL; + char *bname_start = NULL; + char *bname_end = NULL; + char *mover = NULL; + void *start = NULL; + char current_mover = ' '; + size_t blen = 0; + int parse_err = 0; + char *ascii = NULL; + + ascii = GF_CALLOC(LINE_BUFSIZE, sizeof(char), gf_common_mt_char); + + nleft = stbuf->st_size; + + start = mmap(NULL, nleft, PROT_READ, MAP_PRIVATE, from_fd, 0); + if (start == MAP_FAILED) { + gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MMAP_FAILED, + "mmap() error"); + goto out; + } + + mover = start; + + MOVER_MOVE(mover, nleft, start_offset); + + while (nleft > 0) { + off = blen = 0; + ptr = bname_start = bname_end = NULL; + + current_mover = *mover; + + switch (current_mover) { + case 'D': + case 'M': + MOVER_MOVE(mover, nleft, 1); + PARSE_GFID_MOVE(ptr, uuid, mover, nleft, parse_err); + + break; + + case 'E': + MOVER_MOVE(mover, nleft, 1); + PARSE_GFID_MOVE(ptr, uuid, mover, nleft, parse_err); + + bname_start = mover; + bname_end = strchr(mover, '\n'); + if (bname_end == NULL) { + parse_err = 1; + break; + } + + blen = bname_end - bname_start; + MOVER_MOVE(mover, nleft, blen); + + break; + + default: + parse_err = 1; + } + + if (parse_err) + break; + + GF_CHANGELOG_FILL_BUFFER(¤t_mover, ascii, off, 1); + GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1); + GF_CHANGELOG_FILL_BUFFER(ptr, ascii, off, strlen(ptr)); + if (blen) + GF_CHANGELOG_FILL_BUFFER(bname_start, ascii, off, blen); + GF_CHANGELOG_FILL_BUFFER("\n", ascii, off, 1); + + if (gf_changelog_write(to_fd, ascii, off) != off) { + gf_msg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_ASCII_ERROR, + "processing binary changelog failed due to " + " error in writing ascii change"); + break; + } + + MOVER_MOVE(mover, nleft, 1); + } + + if ((nleft == 0) && (!parse_err)) + ret = 0; + + if (munmap(start, stbuf->st_size)) + gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MUNMAP_FAILED, + "munmap() error"); +out: + if (ascii) + GF_FREE(ascii); + return ret; +} + +/** + * ascii decoder: + * - separate out one entry from another + * - use fop name rather than fop number + */ +static int +gf_changelog_parse_ascii(xlator_t *this, gf_changelog_journal_t *jnl, + int from_fd, int to_fd, size_t start_offset, + struct stat *stbuf, int version_idx) +{ + int ng = 0; + int ret = -1; + int fop = 0; + int len = 0; + off_t off = 0; + off_t nleft = 0; + char *ptr = NULL; + char *eptr = NULL; + void *start = NULL; + char *mover = NULL; + int parse_err = 0; + char current_mover = ' '; + char *ascii = NULL; + const char *fopname = NULL; + + ascii = GF_CALLOC(LINE_BUFSIZE, sizeof(char), gf_common_mt_char); + + nleft = stbuf->st_size; + + start = mmap(NULL, nleft, PROT_READ, MAP_PRIVATE, from_fd, 0); + if (start == MAP_FAILED) { + gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MMAP_FAILED, + "mmap() error"); + goto out; + } + + mover = start; + + MOVER_MOVE(mover, nleft, start_offset); + + while (nleft > 0) { + off = 0; + current_mover = *mover; + + GF_CHANGELOG_FILL_BUFFER(¤t_mover, ascii, off, 1); + GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1); + + switch (current_mover) { + case 'D': + MOVER_MOVE(mover, nleft, 1); + + /* target gfid */ + PARSE_GFID(mover, ptr, UUID_CANONICAL_FORM_LEN, conv_noop, + parse_err); + FILL_AND_MOVE(ptr, ascii, off, mover, nleft, + UUID_CANONICAL_FORM_LEN); + break; + case 'M': + MOVER_MOVE(mover, nleft, 1); + + /* target gfid */ + PARSE_GFID(mover, ptr, UUID_CANONICAL_FORM_LEN, conv_noop, + parse_err); + FILL_AND_MOVE(ptr, ascii, off, mover, nleft, + UUID_CANONICAL_FORM_LEN); + FILL_AND_MOVE(" ", ascii, off, mover, nleft, 1); + + /* fop */ + len = strlen(mover); + VERIFY_SEPARATOR(mover, len, parse_err); + + fop = atoi(mover); + fopname = gf_fop_list[fop]; + if (fopname == NULL) { + parse_err = 1; + break; + } + + MOVER_MOVE(mover, nleft, len); + + len = strlen(fopname); + GF_CHANGELOG_FILL_BUFFER(fopname, ascii, off, len); + + break; + + case 'E': + MOVER_MOVE(mover, nleft, 1); + + /* target gfid */ + PARSE_GFID(mover, ptr, UUID_CANONICAL_FORM_LEN, conv_noop, + parse_err); + FILL_AND_MOVE(ptr, ascii, off, mover, nleft, + UUID_CANONICAL_FORM_LEN); + FILL_AND_MOVE(" ", ascii, off, mover, nleft, 1); + + /* fop */ + len = strlen(mover); + VERIFY_SEPARATOR(mover, len, parse_err); + + fop = atoi(mover); + fopname = gf_fop_list[fop]; + if (fopname == NULL) { + parse_err = 1; + break; + } + + MOVER_MOVE(mover, nleft, len); + + len = strlen(fopname); + GF_CHANGELOG_FILL_BUFFER(fopname, ascii, off, len); + + ng = nr_extra_recs[version_idx][fop]; + for (; ng > 0; ng--) { + MOVER_MOVE(mover, nleft, 1); + len = strlen(mover); + VERIFY_SEPARATOR(mover, len, parse_err); + + GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1); + FILL_AND_MOVE(mover, ascii, off, mover, nleft, len); + } + + /* pargfid + bname */ + ng = nr_gfids[version_idx][fop]; + while (ng-- > 0) { + MOVER_MOVE(mover, nleft, 1); + len = strlen(mover); + if (!len) { + MOVER_MOVE(mover, nleft, 1); + continue; + } + + GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1); + + PARSE_GFID(mover, ptr, len, conv_noop, parse_err); + eptr = calloc(3, strlen(ptr)); + if (!eptr) { + parse_err = 1; + break; + } + + gf_rfc3986_encode_space_newline((unsigned char *)ptr, eptr, + jnl->rfc3986_space_newline); + FILL_AND_MOVE(eptr, ascii, off, mover, nleft, len); + free(eptr); + } + + break; + default: + parse_err = 1; + } + + if (parse_err) + break; + + GF_CHANGELOG_FILL_BUFFER("\n", ascii, off, 1); + + if (gf_changelog_write(to_fd, ascii, off) != off) { + gf_msg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_ASCII_ERROR, + "processing ascii changelog failed due to " + " error in writing change"); + break; + } + + MOVER_MOVE(mover, nleft, 1); + } + + if ((nleft == 0) && (!parse_err)) + ret = 0; + + if (munmap(start, stbuf->st_size)) + gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MUNMAP_FAILED, + "munmap() error"); + +out: + if (ascii) + GF_FREE(ascii); + + return ret; +} + +static int +gf_changelog_decode(xlator_t *this, gf_changelog_journal_t *jnl, int from_fd, + int to_fd, struct stat *stbuf, int *zerob) +{ + int ret = -1; + int encoding = -1; + int major_version = -1; + int minor_version = -1; + int version_idx = -1; + size_t elen = 0; + char buffer[1024] = { + 0, + }; + + CHANGELOG_GET_HEADER_INFO(from_fd, buffer, sizeof(buffer), encoding, + major_version, minor_version, elen); + if (encoding == -1) /* unknown encoding */ + goto out; + + if (major_version == -1) /* unknown major version */ + goto out; + + if (minor_version == -1) /* unknown minor version */ + goto out; + + if (!CHANGELOG_VALID_ENCODING(encoding)) + goto out; + + if (elen == stbuf->st_size) { + *zerob = 1; + goto out; + } + + if (major_version == 1 && minor_version == 1) { + version_idx = VERSION_1_1; + } else if (major_version == 1 && minor_version == 2) { + version_idx = VERSION_1_2; + } + + if (version_idx == -1) /* unknown version number */ + goto out; + + /** + * start processing after the header + */ + if (sys_lseek(from_fd, elen, SEEK_SET) < 0) { + goto out; + } + switch (encoding) { + case CHANGELOG_ENCODE_BINARY: + /** + * this ideally should have been a part of changelog-encoders.c + * (ie. part of the changelog translator). + */ + ret = gf_changelog_parse_binary(this, jnl, from_fd, to_fd, elen, + stbuf, version_idx); + break; + + case CHANGELOG_ENCODE_ASCII: + ret = gf_changelog_parse_ascii(this, jnl, from_fd, to_fd, elen, + stbuf, version_idx); + break; + } + +out: + return ret; +} + +int +gf_changelog_publish(xlator_t *this, gf_changelog_journal_t *jnl, + char *from_path) +{ + int ret = 0; + char dest[PATH_MAX] = { + 0, + }; + char to_path[PATH_MAX] = { + 0, + }; + struct stat stbuf = { + 0, + }; + + if (snprintf(to_path, PATH_MAX, "%s%s", jnl->jnl_current_dir, + basename(from_path)) >= PATH_MAX) + return -1; + + /* handle zerob file that won't exist in current */ + ret = sys_stat(to_path, &stbuf); + if (ret) { + if (errno == ENOENT) + ret = 0; + goto out; + } + + if (snprintf(dest, PATH_MAX, "%s%s", jnl->jnl_processing_dir, + basename(from_path)) >= PATH_MAX) + return -1; + + ret = sys_rename(to_path, dest); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_RENAME_FAILED, "from=%s", to_path, "to=%s", + dest, NULL); + } + +out: + return ret; +} + +int +gf_changelog_consume(xlator_t *this, gf_changelog_journal_t *jnl, + char *from_path, gf_boolean_t no_publish) +{ + int ret = -1; + int fd1 = 0; + int fd2 = 0; + int zerob = 0; + struct stat stbuf = { + 0, + }; + char dest[PATH_MAX] = { + 0, + }; + char to_path[PATH_MAX] = { + 0, + }; + + if (snprintf(to_path, PATH_MAX, "%s%s", jnl->jnl_current_dir, + basename(from_path)) >= PATH_MAX) + goto out; + if (snprintf(dest, PATH_MAX, "%s%s", jnl->jnl_processing_dir, + basename(from_path)) >= PATH_MAX) + goto out; + + ret = sys_stat(from_path, &stbuf); + if (ret || !S_ISREG(stbuf.st_mode)) { + ret = -1; + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_STAT_FAILED, + "path=%s", from_path, NULL); + goto out; + } + + fd1 = open(from_path, O_RDONLY); + if (fd1 < 0) { + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_OPEN_FAILED, + "path=%s", from_path, NULL); + goto out; + } + + fd2 = open(to_path, O_CREAT | O_TRUNC | O_RDWR, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (fd2 < 0) { + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_OPEN_FAILED, + "path=%s", to_path, NULL); + goto close_fd; + } else { + ret = gf_changelog_decode(this, jnl, fd1, fd2, &stbuf, &zerob); + + sys_close(fd2); + + if (!ret) { + /* move it to processing on a successful + decode */ + if (no_publish == _gf_true) + goto close_fd; + ret = sys_rename(to_path, dest); + if (ret) + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_RENAME_FAILED, "from=%s", to_path, + "to=%s", dest, NULL); + } + + /* remove it from .current if it's an empty file */ + if (zerob) { + /* zerob changelogs must be unlinked */ + ret = sys_unlink(to_path); + if (ret) + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_UNLINK_FAILED, "name=empty changelog", + "path=%s", to_path, NULL); + } + } + +close_fd: + sys_close(fd1); + +out: + return ret; +} + +void * +gf_changelog_process(void *data) +{ + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_entry_t *entry = NULL; + gf_changelog_processor_t *jnl_proc = NULL; + + jnl = data; + jnl_proc = jnl->jnl_proc; + THIS = jnl->this; + this = jnl->this; + + while (1) { + pthread_mutex_lock(&jnl_proc->lock); + { + while (list_empty(&jnl_proc->entries)) { + jnl_proc->waiting = _gf_true; + pthread_cond_wait(&jnl_proc->cond, &jnl_proc->lock); + } + + entry = list_first_entry(&jnl_proc->entries, gf_changelog_entry_t, + list); + if (entry) + list_del(&entry->list); + + jnl_proc->waiting = _gf_false; + } + pthread_mutex_unlock(&jnl_proc->lock); + + if (entry) { + (void)gf_changelog_consume(this, jnl, entry->path, _gf_false); + GF_FREE(entry); + } + } + + return NULL; +} + +void +gf_changelog_queue_journal(gf_changelog_processor_t *jnl_proc, + changelog_event_t *event) +{ + size_t len = 0; + gf_changelog_entry_t *entry = NULL; + + entry = GF_CALLOC(1, sizeof(gf_changelog_entry_t), + gf_changelog_mt_libgfchangelog_entry_t); + if (!entry) + return; + INIT_LIST_HEAD(&entry->list); + + len = strlen(event->u.journal.path); + (void)memcpy(entry->path, event->u.journal.path, len + 1); + entry->path[len] = '\0'; + + pthread_mutex_lock(&jnl_proc->lock); + { + list_add_tail(&entry->list, &jnl_proc->entries); + if (jnl_proc->waiting) + pthread_cond_signal(&jnl_proc->cond); + } + pthread_mutex_unlock(&jnl_proc->lock); + + return; +} + +void +gf_changelog_handle_journal(void *xl, char *brick, void *cbkdata, + changelog_event_t *event) +{ + gf_changelog_journal_t *jnl = NULL; + gf_changelog_processor_t *jnl_proc = NULL; + + jnl = cbkdata; + jnl_proc = jnl->jnl_proc; + + gf_changelog_queue_journal(jnl_proc, event); +} + +void +gf_changelog_journal_disconnect(void *xl, char *brick, void *data) +{ + gf_changelog_journal_t *jnl = NULL; + + jnl = data; + + pthread_spin_lock(&jnl->lock); + { + JNL_SET_API_STATE(jnl, JNL_API_DISCONNECTED); + }; + pthread_spin_unlock(&jnl->lock); +} + +void +gf_changelog_journal_connect(void *xl, char *brick, void *data) +{ + gf_changelog_journal_t *jnl = NULL; + + jnl = data; + + pthread_spin_lock(&jnl->lock); + { + JNL_SET_API_STATE(jnl, JNL_API_CONNECTED); + }; + pthread_spin_unlock(&jnl->lock); + + return; +} + +void +gf_changelog_cleanup_processor(gf_changelog_journal_t *jnl) +{ + int ret = 0; + xlator_t *this = NULL; + gf_changelog_processor_t *jnl_proc = NULL; + + this = THIS; + if (!this || !jnl || !jnl->jnl_proc) + goto error_return; + + jnl_proc = jnl->jnl_proc; + + ret = gf_thread_cleanup(this, jnl_proc->processor); + if (ret != 0) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_CLEANUP_ERROR, + "failed to cleanup processor thread"); + goto error_return; + } + + (void)pthread_mutex_destroy(&jnl_proc->lock); + (void)pthread_cond_destroy(&jnl_proc->cond); + + GF_FREE(jnl_proc); + +error_return: + return; +} + +int +gf_changelog_init_processor(gf_changelog_journal_t *jnl) +{ + int ret = -1; + gf_changelog_processor_t *jnl_proc = NULL; + + jnl_proc = GF_CALLOC(1, sizeof(gf_changelog_processor_t), + gf_changelog_mt_libgfchangelog_t); + if (!jnl_proc) + goto error_return; + + ret = pthread_mutex_init(&jnl_proc->lock, NULL); + if (ret != 0) + goto free_jnl_proc; + ret = pthread_cond_init(&jnl_proc->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + + INIT_LIST_HEAD(&jnl_proc->entries); + jnl_proc->waiting = _gf_false; + jnl->jnl_proc = jnl_proc; + + ret = gf_thread_create(&jnl_proc->processor, NULL, gf_changelog_process, + jnl, "clogproc"); + if (ret != 0) { + jnl->jnl_proc = NULL; + goto cleanup_cond; + } + + return 0; + +cleanup_cond: + (void)pthread_cond_destroy(&jnl_proc->cond); +cleanup_mutex: + (void)pthread_mutex_destroy(&jnl_proc->lock); +free_jnl_proc: + GF_FREE(jnl_proc); +error_return: + return -1; +} + +static void +gf_changelog_cleanup_fds(gf_changelog_journal_t *jnl) +{ + /* tracker fd */ + if (jnl->jnl_fd != -1) + sys_close(jnl->jnl_fd); + /* processing dir */ + if (jnl->jnl_dir) + sys_closedir(jnl->jnl_dir); + + if (jnl->jnl_working_dir) + free(jnl->jnl_working_dir); /* allocated by realpath */ +} + +static int +gf_changelog_open_dirs(xlator_t *this, gf_changelog_journal_t *jnl) +{ + int ret = -1; + DIR *dir = NULL; + int tracker_fd = 0; + char tracker_path[PATH_MAX] = { + 0, + }; + + /* .current */ + (void)snprintf(jnl->jnl_current_dir, PATH_MAX, + "%s/" GF_CHANGELOG_CURRENT_DIR "/", jnl->jnl_working_dir); + ret = recursive_rmdir(jnl->jnl_current_dir); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, "path=%s", + jnl->jnl_current_dir, NULL); + goto out; + } + ret = mkdir_p(jnl->jnl_current_dir, 0600, _gf_false); + if (ret) + goto out; + + /* .processed */ + (void)snprintf(jnl->jnl_processed_dir, PATH_MAX, + "%s/" GF_CHANGELOG_PROCESSED_DIR "/", jnl->jnl_working_dir); + ret = mkdir_p(jnl->jnl_processed_dir, 0600, _gf_false); + if (ret) + goto out; + + /* .processing */ + (void)snprintf(jnl->jnl_processing_dir, PATH_MAX, + "%s/" GF_CHANGELOG_PROCESSING_DIR "/", jnl->jnl_working_dir); + ret = recursive_rmdir(jnl->jnl_processing_dir); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, "path=%s", + jnl->jnl_processing_dir, NULL); + goto out; + } + + ret = mkdir_p(jnl->jnl_processing_dir, 0600, _gf_false); + if (ret) + goto out; + + dir = sys_opendir(jnl->jnl_processing_dir); + if (!dir) { + gf_msg("", GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_OPENDIR_ERROR, + "opendir() error"); + goto out; + } + + jnl->jnl_dir = dir; + + (void)snprintf(tracker_path, PATH_MAX, "%s/" GF_CHANGELOG_TRACKER, + jnl->jnl_working_dir); + + tracker_fd = open(tracker_path, O_CREAT | O_APPEND | O_RDWR, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (tracker_fd < 0) { + sys_closedir(jnl->jnl_dir); + ret = -1; + goto out; + } + + jnl->jnl_fd = tracker_fd; + ret = 0; +out: + return ret; +} + +int +gf_changelog_init_history(xlator_t *this, gf_changelog_journal_t *jnl, + char *brick_path) +{ + int i = 0; + int ret = 0; + char hist_scratch_dir[PATH_MAX] = { + 0, + }; + + jnl->hist_jnl = GF_CALLOC(1, sizeof(*jnl), + gf_changelog_mt_libgfchangelog_t); + if (!jnl->hist_jnl) + goto error_return; + + jnl->hist_jnl->jnl_dir = NULL; + jnl->hist_jnl->jnl_fd = -1; + + (void)snprintf(hist_scratch_dir, PATH_MAX, + "%s/" GF_CHANGELOG_HISTORY_DIR "/", jnl->jnl_working_dir); + + ret = mkdir_p(hist_scratch_dir, 0600, _gf_false); + if (ret) + goto dealloc_hist; + + jnl->hist_jnl->jnl_working_dir = realpath(hist_scratch_dir, NULL); + if (!jnl->hist_jnl->jnl_working_dir) + goto dealloc_hist; + + ret = gf_changelog_open_dirs(this, jnl->hist_jnl); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_OPENDIR_ERROR, + "could not create entries in history scratch dir"); + goto dealloc_hist; + } + + if (snprintf(jnl->hist_jnl->jnl_brickpath, PATH_MAX, "%s", brick_path) >= + PATH_MAX) + goto dealloc_hist; + + for (i = 0; i < 256; i++) { + jnl->hist_jnl->rfc3986_space_newline[i] = (i == ' ' || i == '\n' || + i == '%') + ? 0 + : i; + } + + return 0; + +dealloc_hist: + GF_FREE(jnl->hist_jnl); + jnl->hist_jnl = NULL; +error_return: + return -1; +} + +void +gf_changelog_journal_fini(void *xl, char *brick, void *data) +{ + gf_changelog_journal_t *jnl = NULL; + + jnl = data; + + gf_changelog_cleanup_processor(jnl); + + gf_changelog_cleanup_fds(jnl); + if (jnl->hist_jnl) + gf_changelog_cleanup_fds(jnl->hist_jnl); + + GF_FREE(jnl); +} + +void * +gf_changelog_journal_init(void *xl, struct gf_brick_spec *brick) +{ + int i = 0; + int ret = 0; + xlator_t *this = NULL; + struct stat buf = { + 0, + }; + char *scratch_dir = NULL; + gf_changelog_journal_t *jnl = NULL; + + this = xl; + scratch_dir = (char *)brick->ptr; + + jnl = GF_CALLOC(1, sizeof(gf_changelog_journal_t), + gf_changelog_mt_libgfchangelog_t); + if (!jnl) + goto error_return; + + if (snprintf(jnl->jnl_brickpath, PATH_MAX, "%s", brick->brick_path) >= + PATH_MAX) + goto dealloc_private; + + if (sys_stat(scratch_dir, &buf) && errno == ENOENT) { + ret = mkdir_p(scratch_dir, 0600, _gf_true); + if (ret) + goto dealloc_private; + } + + jnl->jnl_working_dir = realpath(scratch_dir, NULL); + if (!jnl->jnl_working_dir) + goto dealloc_private; + + ret = gf_changelog_open_dirs(this, jnl); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_OPENDIR_ERROR, + "could not create entries in scratch dir"); + goto dealloc_private; + } + + /* RFC 3986 {de,en}coding */ + for (i = 0; i < 256; i++) { + jnl->rfc3986_space_newline[i] = (i == ' ' || i == '\n' || i == '%') ? 0 + : i; + } + + ret = gf_changelog_init_history(this, jnl, brick->brick_path); + if (ret) + goto cleanup_fds; + + /* initialize journal processor */ + jnl->this = this; + ret = gf_changelog_init_processor(jnl); + if (ret) + goto cleanup_fds; + + JNL_SET_API_STATE(jnl, JNL_API_CONN_INPROGESS); + ret = pthread_spin_init(&jnl->lock, 0); + if (ret != 0) + goto cleanup_processor; + return jnl; + +cleanup_processor: + gf_changelog_cleanup_processor(jnl); +cleanup_fds: + gf_changelog_cleanup_fds(jnl); + if (jnl->hist_jnl) + gf_changelog_cleanup_fds(jnl->hist_jnl); +dealloc_private: + GF_FREE(jnl); +error_return: + return NULL; +} diff --git a/xlators/features/changelog/lib/src/gf-changelog-journal.h b/xlators/features/changelog/lib/src/gf-changelog-journal.h new file mode 100644 index 00000000000..ba5b9bf827e --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-journal.h @@ -0,0 +1,116 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef __GF_CHANGELOG_JOURNAL_H +#define __GF_CHANGELOG_JOURNAL_H + +#include <unistd.h> +#include <pthread.h> + +#include "changelog.h" + +enum api_conn { + JNL_API_CONNECTED, + JNL_API_CONN_INPROGESS, + JNL_API_DISCONNECTED, +}; + +typedef struct gf_changelog_entry { + char path[PATH_MAX]; + + struct list_head list; +} gf_changelog_entry_t; + +typedef struct gf_changelog_processor { + pthread_mutex_t lock; /* protects ->entries */ + pthread_cond_t cond; /* waiter during empty list */ + gf_boolean_t waiting; + + pthread_t processor; /* thread-id of journal processing thread */ + + struct list_head entries; +} gf_changelog_processor_t; + +typedef struct gf_changelog_journal { + DIR *jnl_dir; /* 'processing' directory stream */ + + int jnl_fd; /* fd to the tracker file */ + + char jnl_brickpath[PATH_MAX]; /* brick path for this end-point */ + + gf_changelog_processor_t *jnl_proc; + + char *jnl_working_dir; /* scratch directory */ + + char jnl_current_dir[PATH_MAX]; + char jnl_processed_dir[PATH_MAX]; + char jnl_processing_dir[PATH_MAX]; + + char rfc3986_space_newline[256]; /* RFC 3986 string encoding */ + + struct gf_changelog_journal *hist_jnl; + int hist_done; /* holds 0 done scanning, + 1 keep scanning and -1 error */ + + pthread_spinlock_t lock; + int connected; + xlator_t *this; +} gf_changelog_journal_t; + +#define JNL_SET_API_STATE(jnl, state) (jnl->connected = state) +#define JNL_IS_API_DISCONNECTED(jnl) (jnl->connected == JNL_API_DISCONNECTED) + +/* History API */ +typedef struct gf_changelog_history_data { + int len; + + int htime_fd; + + /* parallelism count */ + int n_parallel; + + /* history from, to indexes */ + unsigned long from; + unsigned long to; + xlator_t *this; +} gf_changelog_history_data_t; + +typedef struct gf_changelog_consume_data { + /** set of inputs */ + + /* fd to read from */ + int fd; + + /* from @offset */ + off_t offset; + + xlator_t *this; + + gf_changelog_journal_t *jnl; + + /** set of outputs */ + + /* return value */ + int retval; + + /* journal processed */ + char changelog[PATH_MAX]; +} gf_changelog_consume_data_t; + +/* event handler */ +CALLBACK gf_changelog_handle_journal; + +/* init, connect & disconnect handler */ +INIT gf_changelog_journal_init; +FINI gf_changelog_journal_fini; +CONNECT gf_changelog_journal_connect; +DISCONNECT gf_changelog_journal_disconnect; + +#endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c new file mode 100644 index 00000000000..56b11cbb705 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c @@ -0,0 +1,413 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "changelog-misc.h" +#include "changelog-mem-types.h" + +#include "gf-changelog-helpers.h" +#include "changelog-rpc-common.h" +#include "changelog-lib-messages.h" + +#include <glusterfs/syscall.h> + +/** + * Reverse socket: actual data transfer handler. Connection + * initiator is PROBER, data transfer is REBORP. + */ + +static struct rpcsvc_program *gf_changelog_reborp_programs[]; + +void * +gf_changelog_connection_janitor(void *arg) +{ + int32_t ret = 0; + xlator_t *this = NULL; + gf_private_t *priv = NULL; + gf_changelog_t *entry = NULL; + struct gf_event *event = NULL; + struct gf_event_list *ev = NULL; + unsigned long drained = 0; + + this = arg; + THIS = this; + + priv = this->private; + + while (1) { + pthread_mutex_lock(&priv->lock); + { + while (list_empty(&priv->cleanups)) + pthread_cond_wait(&priv->cond, &priv->lock); + + entry = list_first_entry(&priv->cleanups, gf_changelog_t, list); + list_del_init(&entry->list); + } + pthread_mutex_unlock(&priv->lock); + + drained = 0; + ev = &entry->event; + + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_LIB_MSG_CLEANING_BRICK_ENTRY_INFO, "brick=%s", + entry->brick, NULL); + + /* 0x0: disable rpc-clnt */ + rpc_clnt_disable(RPC_PROBER(entry)); + + /* 0x1: cleanup callback invoker thread */ + ret = gf_cleanup_event(this, ev); + if (ret) + continue; + + /* 0x2: drain pending events */ + while (!list_empty(&ev->events)) { + event = list_first_entry(&ev->events, struct gf_event, list); + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO, "seq=%lu", + event->seq, "payload=%d", event->count, NULL); + + GF_FREE(event); + drained++; + } + + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_LIB_MSG_DRAINED_EVENT_INFO, "num=%lu", drained, NULL); + + /* 0x3: freeup brick entry */ + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_LIB_MSG_FREEING_ENTRY_INFO, "entry=%p", entry, NULL); + LOCK_DESTROY(&entry->statelock); + GF_FREE(entry); + } + + return NULL; +} + +int +gf_changelog_reborp_rpcsvc_notify(rpcsvc_t *rpc, void *mydata, + rpcsvc_event_t event, void *data) +{ + int ret = 0; + xlator_t *this = NULL; + gf_changelog_t *entry = NULL; + + if (!(event == RPCSVC_EVENT_ACCEPT || event == RPCSVC_EVENT_DISCONNECT)) + return 0; + + entry = mydata; + this = entry->this; + + switch (event) { + case RPCSVC_EVENT_ACCEPT: + ret = sys_unlink(RPC_SOCK(entry)); + if (ret != 0) + gf_smsg(this->name, GF_LOG_WARNING, errno, + CHANGELOG_LIB_MSG_UNLINK_FAILED, "name=reverse socket", + "path=%s", RPC_SOCK(entry), NULL); + if (entry->connected) + GF_CHANGELOG_INVOKE_CBK(this, entry->connected, entry->brick, + entry->ptr); + break; + case RPCSVC_EVENT_DISCONNECT: + if (entry->disconnected) + GF_CHANGELOG_INVOKE_CBK(this, entry->disconnected, entry->brick, + entry->ptr); + /* passthrough */ + default: + break; + } + + return 0; +} + +rpcsvc_t * +gf_changelog_reborp_init_rpc_listner(xlator_t *this, char *path, char *sock, + void *cbkdata) +{ + CHANGELOG_MAKE_TMP_SOCKET_PATH(path, sock, UNIX_PATH_MAX); + return changelog_rpc_server_init(this, sock, cbkdata, + gf_changelog_reborp_rpcsvc_notify, + gf_changelog_reborp_programs); +} + +/** + * This is dirty and painful as of now until there is event filtering in the + * server. The entire event buffer is scanned and interested events are picked, + * whereas we _should_ be notified with the events we were interested in + * (selected at the time of probe). As of now this is complete BS and needs + * fixture ASAP. I just made it work, it needs to be better. + * + * @FIXME: cleanup this bugger once server filters events. + */ +void +gf_changelog_invoke_callback(gf_changelog_t *entry, struct iovec **vec, + int payloadcnt) +{ + int i = 0; + int evsize = 0; + xlator_t *this = NULL; + changelog_event_t *event = NULL; + + this = entry->this; + + for (; i < payloadcnt; i++) { + event = (changelog_event_t *)vec[i]->iov_base; + evsize = vec[i]->iov_len / CHANGELOG_EV_SIZE; + + for (; evsize > 0; evsize--, event++) { + if (gf_changelog_filter_check(entry, event)) { + GF_CHANGELOG_INVOKE_CBK(this, entry->callback, entry->brick, + entry->ptr, event); + } + } + } +} + +/** + * Ordered event handler is self-adaptive.. if the event sequence number + * is what's expected (->next_seq) there is no ordering list that's + * maintained. On out-of-order event notifications, event buffers are + * dynamically allocated and ordered. + */ + +int +__is_expected_sequence(struct gf_event_list *ev, struct gf_event *event) +{ + return (ev->next_seq == event->seq); +} + +int +__can_process_event(struct gf_event_list *ev, struct gf_event **event) +{ + *event = list_first_entry(&ev->events, struct gf_event, list); + + if (__is_expected_sequence(ev, *event)) { + list_del(&(*event)->list); + ev->next_seq++; + return 1; + } + + return 0; +} + +void +pick_event_ordered(struct gf_event_list *ev, struct gf_event **event) +{ + pthread_mutex_lock(&ev->lock); + { + while (list_empty(&ev->events) || !__can_process_event(ev, event)) + pthread_cond_wait(&ev->cond, &ev->lock); + } + pthread_mutex_unlock(&ev->lock); +} + +void +pick_event_unordered(struct gf_event_list *ev, struct gf_event **event) +{ + pthread_mutex_lock(&ev->lock); + { + while (list_empty(&ev->events)) + pthread_cond_wait(&ev->cond, &ev->lock); + *event = list_first_entry(&ev->events, struct gf_event, list); + list_del(&(*event)->list); + } + pthread_mutex_unlock(&ev->lock); +} + +void * +gf_changelog_callback_invoker(void *arg) +{ + xlator_t *this = NULL; + gf_changelog_t *entry = NULL; + struct iovec *vec = NULL; + struct gf_event *event = NULL; + struct gf_event_list *ev = NULL; + + ev = arg; + entry = ev->entry; + THIS = this = entry->this; + + while (1) { + entry->pickevent(ev, &event); + + vec = (struct iovec *)&event->iov; + gf_changelog_invoke_callback(entry, &vec, event->count); + + GF_FREE(event); + } + + return NULL; +} + +static int +orderfn(struct list_head *pos1, struct list_head *pos2) +{ + struct gf_event *event1 = NULL; + struct gf_event *event2 = NULL; + + event1 = list_entry(pos1, struct gf_event, list); + event2 = list_entry(pos2, struct gf_event, list); + + if (event1->seq > event2->seq) + return 1; + return -1; +} + +void +queue_ordered_event(struct gf_event_list *ev, struct gf_event *event) +{ + /* add event to the ordered event list and wake up listener(s) */ + pthread_mutex_lock(&ev->lock); + { + list_add_order(&event->list, &ev->events, orderfn); + if (!ev->next_seq) + ev->next_seq = event->seq; + if (ev->next_seq == event->seq) + pthread_cond_signal(&ev->cond); + } + pthread_mutex_unlock(&ev->lock); +} + +void +queue_unordered_event(struct gf_event_list *ev, struct gf_event *event) +{ + /* add event to the tail of the queue and wake up listener(s) */ + pthread_mutex_lock(&ev->lock); + { + list_add_tail(&event->list, &ev->events); + pthread_cond_signal(&ev->cond); + } + pthread_mutex_unlock(&ev->lock); +} + +int +gf_changelog_event_handler(rpcsvc_request_t *req, xlator_t *this, + gf_changelog_t *entry) +{ + int i = 0; + size_t payloadlen = 0; + ssize_t len = 0; + int payloadcnt = 0; + changelog_event_req rpc_req = { + 0, + }; + changelog_event_rsp rpc_rsp = { + 0, + }; + struct iovec *vec = NULL; + struct gf_event *event = NULL; + struct gf_event_list *ev = NULL; + + ev = &entry->event; + + len = xdr_to_generic(req->msg[0], &rpc_req, + (xdrproc_t)xdr_changelog_event_req); + if (len < 0) { + gf_msg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_XDR_DECODING_FAILED, "xdr decoding failed"); + req->rpc_err = GARBAGE_ARGS; + goto handle_xdr_error; + } + + if (len < req->msg[0].iov_len) { + payloadcnt = 1; + payloadlen = (req->msg[0].iov_len - len); + } + for (i = 1; i < req->count; i++) { + payloadcnt++; + payloadlen += req->msg[i].iov_len; + } + + event = GF_CALLOC(1, GF_EVENT_CALLOC_SIZE(payloadcnt, payloadlen), + gf_changelog_mt_libgfchangelog_event_t); + if (!event) + goto handle_xdr_error; + INIT_LIST_HEAD(&event->list); + + payloadlen = 0; + event->seq = rpc_req.seq; + event->count = payloadcnt; + + /* deep copy IO vectors */ + vec = &event->iov[0]; + GF_EVENT_ASSIGN_IOVEC(vec, event, (req->msg[0].iov_len - len), payloadlen); + (void)memcpy(vec->iov_base, req->msg[0].iov_base + len, vec->iov_len); + + for (i = 1; i < req->count; i++) { + vec = &event->iov[i]; + GF_EVENT_ASSIGN_IOVEC(vec, event, req->msg[i].iov_len, payloadlen); + (void)memcpy(event->iov[i].iov_base, req->msg[i].iov_base, + req->msg[i].iov_len); + } + + gf_msg_debug(this->name, 0, + "seq: %" PRIu64 " [%s] (time: %" PRIu64 ".%" PRIu64 + "), " + "(vec: %d, len: %zd)", + rpc_req.seq, entry->brick, rpc_req.tv_sec, rpc_req.tv_usec, + payloadcnt, payloadlen); + + /* dispatch event */ + entry->queueevent(ev, event); + + /* ack sequence number */ + rpc_rsp.op_ret = 0; + rpc_rsp.seq = rpc_req.seq; + + goto submit_rpc; + +handle_xdr_error: + rpc_rsp.op_ret = -1; + rpc_rsp.seq = 0; /* invalid */ +submit_rpc: + return changelog_rpc_sumbit_reply(req, &rpc_rsp, NULL, 0, NULL, + (xdrproc_t)xdr_changelog_event_rsp); +} + +int +gf_changelog_reborp_handle_event(rpcsvc_request_t *req) +{ + xlator_t *this = NULL; + rpcsvc_t *svc = NULL; + gf_changelog_t *entry = NULL; + + svc = rpcsvc_request_service(req); + entry = svc->mydata; + + this = THIS = entry->this; + + return gf_changelog_event_handler(req, this, entry); +} + +static rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = { + [CHANGELOG_REV_PROC_EVENT] = {"CHANGELOG EVENT HANDLER", + gf_changelog_reborp_handle_event, NULL, + CHANGELOG_REV_PROC_EVENT, DRC_NA, 0}, +}; + +/** + * Do not use synctask as the RPC layer dereferences ->mydata as THIS. + * In gf_changelog_setup_rpc(), @cbkdata is of type @gf_changelog_t, + * and that's required to invoke the callback with the appropriate + * brick path and it's private data. + */ +static struct rpcsvc_program gf_changelog_reborp_prog = { + .progname = "LIBGFCHANGELOG REBORP", + .prognum = CHANGELOG_REV_RPC_PROCNUM, + .progver = CHANGELOG_REV_RPC_PROCVER, + .numactors = CHANGELOG_REV_PROC_MAX, + .actors = gf_changelog_reborp_actors, + .synctask = _gf_false, +}; + +static struct rpcsvc_program *gf_changelog_reborp_programs[] = { + &gf_changelog_reborp_prog, + NULL, +}; diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.c b/xlators/features/changelog/lib/src/gf-changelog-rpc.c new file mode 100644 index 00000000000..8ec6ffbcebc --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.c @@ -0,0 +1,98 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "gf-changelog-rpc.h" +#include "changelog-misc.h" +#include "changelog-mem-types.h" + +struct rpc_clnt_program gf_changelog_clnt; + +/* TODO: piggyback reconnect to called (upcall) */ +int +gf_changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, + rpc_clnt_event_t event, void *data) +{ + switch (event) { + case RPC_CLNT_CONNECT: + break; + case RPC_CLNT_DISCONNECT: + case RPC_CLNT_MSG: + case RPC_CLNT_DESTROY: + case RPC_CLNT_PING: + break; + } + + return 0; +} + +struct rpc_clnt * +gf_changelog_rpc_init(xlator_t *this, gf_changelog_t *entry) +{ + char sockfile[UNIX_PATH_MAX] = { + 0, + }; + + CHANGELOG_MAKE_SOCKET_PATH(entry->brick, sockfile, UNIX_PATH_MAX); + return changelog_rpc_client_init(this, entry, sockfile, + gf_changelog_rpc_notify); +} + +/** + * remote procedure calls declarations. + */ + +int +gf_probe_changelog_cbk(struct rpc_req *req, struct iovec *iovec, int count, + void *myframe) +{ + return 0; +} + +int +gf_probe_changelog_filter(call_frame_t *frame, xlator_t *this, void *data) +{ + char *sock = NULL; + gf_changelog_t *entry = NULL; + changelog_probe_req req = { + 0, + }; + + entry = data; + sock = RPC_SOCK(entry); + + (void)memcpy(&req.sock, sock, strlen(sock)); + req.filter = entry->notify; + + /* invoke RPC */ + return changelog_rpc_sumbit_req( + RPC_PROBER(entry), (void *)&req, frame, &gf_changelog_clnt, + CHANGELOG_RPC_PROBE_FILTER, NULL, 0, NULL, this, gf_probe_changelog_cbk, + (xdrproc_t)xdr_changelog_probe_req); +} + +int +gf_changelog_invoke_rpc(xlator_t *this, gf_changelog_t *entry, int procidx) +{ + return changelog_invoke_rpc(this, RPC_PROBER(entry), &gf_changelog_clnt, + procidx, entry); +} + +struct rpc_clnt_procedure gf_changelog_procs[CHANGELOG_RPC_PROC_MAX] = { + [CHANGELOG_RPC_PROC_NULL] = {"NULL", NULL}, + [CHANGELOG_RPC_PROBE_FILTER] = {"PROBE FILTER", gf_probe_changelog_filter}, +}; + +struct rpc_clnt_program gf_changelog_clnt = { + .progname = "LIBGFCHANGELOG", + .prognum = CHANGELOG_RPC_PROGNUM, + .progver = CHANGELOG_RPC_PROGVER, + .numproc = CHANGELOG_RPC_PROC_MAX, + .proctable = gf_changelog_procs, +}; diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.h b/xlators/features/changelog/lib/src/gf-changelog-rpc.h new file mode 100644 index 00000000000..5c82d6f1c08 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.h @@ -0,0 +1,28 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef __GF_CHANGELOG_RPC_H +#define __GF_CHANGELOG_RPC_H + +#include <glusterfs/xlator.h> + +#include "gf-changelog-helpers.h" +#include "changelog-rpc-common.h" + +struct rpc_clnt * +gf_changelog_rpc_init(xlator_t *, gf_changelog_t *); + +int +gf_changelog_invoke_rpc(xlator_t *, gf_changelog_t *, int); + +rpcsvc_t * +gf_changelog_reborp_init_rpc_listner(xlator_t *, char *, char *, void *); + +#endif diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c new file mode 100644 index 00000000000..57c3d39ef76 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog.c @@ -0,0 +1,652 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <errno.h> +#include <dirent.h> +#include <stddef.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <sys/time.h> +#include <sys/resource.h> + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include <string.h> + +#include <glusterfs/globals.h> +#include <glusterfs/glusterfs.h> +#include <glusterfs/logging.h> +#include <glusterfs/defaults.h> +#include <glusterfs/syncop.h> + +#include "gf-changelog-rpc.h" +#include "gf-changelog-helpers.h" + +/* from the changelog translator */ +#include "changelog-misc.h" +#include "changelog-mem-types.h" +#include "changelog-lib-messages.h" + +/** + * Global singleton xlator pointer for the library, initialized + * during library load. This should probably be hidden inside + * an initialized object which is an handle for the consumer. + * + * TODO: do away with the global.. + */ +xlator_t *master = NULL; + +static inline gf_private_t * +gf_changelog_alloc_priv() +{ + int ret = 0; + gf_private_t *priv = NULL; + + priv = GF_CALLOC(1, sizeof(*priv), gf_changelog_mt_priv_t); + if (!priv) + goto error_return; + INIT_LIST_HEAD(&priv->connections); + INIT_LIST_HEAD(&priv->cleanups); + + ret = pthread_mutex_init(&priv->lock, NULL); + if (ret != 0) + goto free_priv; + ret = pthread_cond_init(&priv->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + + priv->api = NULL; + return priv; + +cleanup_mutex: + (void)pthread_mutex_destroy(&priv->lock); +free_priv: + GF_FREE(priv); +error_return: + return NULL; +} + +#define GF_CHANGELOG_EVENT_POOL_SIZE 16384 +#define GF_CHANGELOG_EVENT_THREAD_COUNT 4 + +static int +gf_changelog_ctx_defaults_init(glusterfs_ctx_t *ctx) +{ + cmd_args_t *cmd_args = NULL; + struct rlimit lim = { + 0, + }; + call_pool_t *pool = NULL; + int ret = -1; + + ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end); + if (ret != 0) + return -1; + + ctx->process_uuid = generate_glusterfs_ctx_id(); + if (!ctx->process_uuid) + return -1; + + ctx->page_size = 128 * GF_UNIT_KB; + + ctx->iobuf_pool = iobuf_pool_new(); + if (!ctx->iobuf_pool) + goto free_pool; + + ctx->event_pool = gf_event_pool_new(GF_CHANGELOG_EVENT_POOL_SIZE, + GF_CHANGELOG_EVENT_THREAD_COUNT); + if (!ctx->event_pool) + goto free_pool; + + pool = GF_CALLOC(1, sizeof(call_pool_t), + gf_changelog_mt_libgfchangelog_call_pool_t); + if (!pool) + goto free_pool; + + /* frame_mem_pool size 112 * 64 */ + pool->frame_mem_pool = mem_pool_new(call_frame_t, 32); + if (!pool->frame_mem_pool) + goto free_pool; + + /* stack_mem_pool size 256 * 128 */ + pool->stack_mem_pool = mem_pool_new(call_stack_t, 16); + + if (!pool->stack_mem_pool) + goto free_pool; + + ctx->stub_mem_pool = mem_pool_new(call_stub_t, 16); + if (!ctx->stub_mem_pool) + goto free_pool; + + ctx->dict_pool = mem_pool_new(dict_t, 32); + if (!ctx->dict_pool) + goto free_pool; + + ctx->dict_pair_pool = mem_pool_new(data_pair_t, 512); + if (!ctx->dict_pair_pool) + goto free_pool; + + ctx->dict_data_pool = mem_pool_new(data_t, 512); + if (!ctx->dict_data_pool) + goto free_pool; + + ctx->logbuf_pool = mem_pool_new(log_buf_t, 256); + if (!ctx->logbuf_pool) + goto free_pool; + + INIT_LIST_HEAD(&pool->all_frames); + LOCK_INIT(&pool->lock); + ctx->pool = pool; + + LOCK_INIT(&ctx->lock); + + cmd_args = &ctx->cmd_args; + + INIT_LIST_HEAD(&cmd_args->xlator_options); + + lim.rlim_cur = RLIM_INFINITY; + lim.rlim_max = RLIM_INFINITY; + setrlimit(RLIMIT_CORE, &lim); + + return 0; + +free_pool: + if (pool) { + GF_FREE(pool->frame_mem_pool); + + GF_FREE(pool->stack_mem_pool); + + GF_FREE(pool); + } + + GF_FREE(ctx->stub_mem_pool); + + GF_FREE(ctx->dict_pool); + + GF_FREE(ctx->dict_pair_pool); + + GF_FREE(ctx->dict_data_pool); + + GF_FREE(ctx->logbuf_pool); + + GF_FREE(ctx->iobuf_pool); + + GF_FREE(ctx->event_pool); + + return -1; +} + +/* TODO: cleanup ctx defaults */ +void +gf_changelog_cleanup_this(xlator_t *this) +{ + glusterfs_ctx_t *ctx = NULL; + + if (!this) + return; + + ctx = this->ctx; + syncenv_destroy(ctx->env); + free(ctx); + + this->private = NULL; + this->ctx = NULL; + + mem_pools_fini(); +} + +static int +gf_changelog_init_context() +{ + glusterfs_ctx_t *ctx = NULL; + + ctx = glusterfs_ctx_new(); + if (!ctx) + goto error_return; + + if (glusterfs_globals_init(ctx)) + goto free_ctx; + + THIS->ctx = ctx; + if (gf_changelog_ctx_defaults_init(ctx)) + goto free_ctx; + + ctx->env = syncenv_new(0, 0, 0); + if (!ctx->env) + goto free_ctx; + return 0; + +free_ctx: + free(ctx); + THIS->ctx = NULL; +error_return: + return -1; +} + +static int +gf_changelog_init_master() +{ + int ret = 0; + + ret = gf_changelog_init_context(); + mem_pools_init(); + + return ret; +} + +/* TODO: cleanup clnt/svc on failure */ +int +gf_changelog_setup_rpc(xlator_t *this, gf_changelog_t *entry, int proc) +{ + int ret = 0; + rpcsvc_t *svc = NULL; + struct rpc_clnt *rpc = NULL; + + /** + * Initialize a connect back socket. A probe() RPC call to the server + * triggers a reverse connect. + */ + svc = gf_changelog_reborp_init_rpc_listner(this, entry->brick, + RPC_SOCK(entry), entry); + if (!svc) + goto error_return; + RPC_REBORP(entry) = svc; + + /* Initialize an RPC client */ + rpc = gf_changelog_rpc_init(this, entry); + if (!rpc) + goto error_return; + RPC_PROBER(entry) = rpc; + + /** + * @FIXME + * till we have connection state machine, let's delay the RPC call + * for now.. + */ + sleep(2); + + /** + * Probe changelog translator for reverse connection. After a successful + * call, there's less use of the client and can be disconnected, but + * let's leave the connection active for any future RPC calls. + */ + ret = gf_changelog_invoke_rpc(this, entry, proc); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED, + "Could not initiate probe RPC, bailing out!!!"); + goto error_return; + } + + return 0; + +error_return: + return -1; +} + +int +gf_cleanup_event(xlator_t *this, struct gf_event_list *ev) +{ + int ret = 0; + + ret = gf_thread_cleanup(this, ev->invoker); + if (ret) { + gf_msg(this->name, GF_LOG_WARNING, -ret, + CHANGELOG_LIB_MSG_CLEANUP_ERROR, + "cannot cleanup callback invoker thread." + " Not freeing resources"); + return -1; + } + + ev->entry = NULL; + + return 0; +} + +static int +gf_init_event(gf_changelog_t *entry) +{ + int ret = 0; + struct gf_event_list *ev = NULL; + + ev = &entry->event; + ev->entry = entry; + + ret = pthread_mutex_init(&ev->lock, NULL); + if (ret != 0) + goto error_return; + ret = pthread_cond_init(&ev->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + INIT_LIST_HEAD(&ev->events); + + ev->next_seq = 0; /* bootstrap sequencing */ + + if (GF_NEED_ORDERED_EVENTS(entry)) { + entry->pickevent = pick_event_ordered; + entry->queueevent = queue_ordered_event; + } else { + entry->pickevent = pick_event_unordered; + entry->queueevent = queue_unordered_event; + } + + ret = gf_thread_create(&ev->invoker, NULL, gf_changelog_callback_invoker, + ev, "clogcbki"); + if (ret != 0) { + entry->pickevent = NULL; + entry->queueevent = NULL; + goto cleanup_cond; + } + + return 0; + +cleanup_cond: + (void)pthread_cond_destroy(&ev->cond); +cleanup_mutex: + (void)pthread_mutex_destroy(&ev->lock); +error_return: + return -1; +} + +/** + * TODO: + * - cleanup invoker thread + * - cleanup event list + * - destroy rpc{-clnt, svc} + */ +int +gf_cleanup_brick_connection(xlator_t *this, gf_changelog_t *entry) +{ + return 0; +} + +int +gf_cleanup_connections(xlator_t *this) +{ + return 0; +} + +static int +gf_setup_brick_connection(xlator_t *this, struct gf_brick_spec *brick, + gf_boolean_t ordered, void *xl) +{ + int ret = 0; + gf_private_t *priv = NULL; + gf_changelog_t *entry = NULL; + + priv = this->private; + + if (!brick->callback || !brick->init || !brick->fini) + goto error_return; + + entry = GF_CALLOC(1, sizeof(*entry), gf_changelog_mt_libgfchangelog_t); + if (!entry) + goto error_return; + INIT_LIST_HEAD(&entry->list); + + LOCK_INIT(&entry->statelock); + entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING; + + entry->notify = brick->filter; + if (snprintf(entry->brick, PATH_MAX, "%s", brick->brick_path) >= PATH_MAX) + goto free_entry; + + entry->this = this; + entry->invokerxl = xl; + + entry->ordered = ordered; + ret = gf_init_event(entry); + if (ret) + goto free_entry; + + entry->fini = brick->fini; + entry->callback = brick->callback; + entry->connected = brick->connected; + entry->disconnected = brick->disconnected; + + entry->ptr = brick->init(this, brick); + if (!entry->ptr) + goto cleanup_event; + priv->api = entry->ptr; /* pointer to API, if required */ + + pthread_mutex_lock(&priv->lock); + { + list_add_tail(&entry->list, &priv->connections); + } + pthread_mutex_unlock(&priv->lock); + + ret = gf_changelog_setup_rpc(this, entry, CHANGELOG_RPC_PROBE_FILTER); + if (ret) + goto cleanup_event; + return 0; + +cleanup_event: + (void)gf_cleanup_event(this, &entry->event); +free_entry: + gf_msg_debug(this->name, 0, "freeing entry %p", entry); + list_del(&entry->list); /* FIXME: kludge for now */ + GF_FREE(entry); +error_return: + return -1; +} + +int +gf_changelog_register_brick(xlator_t *this, struct gf_brick_spec *brick, + gf_boolean_t ordered, void *xl) +{ + return gf_setup_brick_connection(this, brick, ordered, xl); +} + +static int +gf_changelog_setup_logging(xlator_t *this, char *logfile, int loglevel) +{ + /* passing ident as NULL means to use default ident for syslog */ + if (gf_log_init(this->ctx, logfile, NULL)) + return -1; + + gf_log_set_loglevel(this->ctx, (loglevel == -1) ? GF_LOG_INFO : loglevel); + return 0; +} + +static int +gf_changelog_set_master(xlator_t *master, void *xl) +{ + int32_t ret = 0; + xlator_t *this = NULL; + xlator_t *old_this = NULL; + gf_private_t *priv = NULL; + + this = xl; + if (!this || !this->ctx) { + ret = gf_changelog_init_master(); + if (ret) + return -1; + this = THIS; + } + + master->ctx = this->ctx; + + INIT_LIST_HEAD(&master->volume_options); + SAVE_THIS(THIS); + + ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end); + if (ret != 0) + goto restore_this; + + priv = gf_changelog_alloc_priv(); + if (!priv) { + ret = -1; + goto restore_this; + } + + if (!xl) { + /* poller thread */ + ret = gf_thread_create(&priv->poller, NULL, changelog_rpc_poller, THIS, + "clogpoll"); + if (ret != 0) { + GF_FREE(priv); + gf_msg(master->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, + "failed to spawn poller thread"); + goto restore_this; + } + } + + master->private = priv; + +restore_this: + RESTORE_THIS(); + + return ret; +} + +int +gf_changelog_init(void *xl) +{ + int ret = 0; + gf_private_t *priv = NULL; + + if (master) + return 0; + + master = calloc(1, sizeof(*master)); + if (!master) + goto error_return; + + master->name = strdup("gfchangelog"); + if (!master->name) + goto dealloc_master; + + ret = gf_changelog_set_master(master, xl); + if (ret) + goto dealloc_name; + + priv = master->private; + ret = gf_thread_create(&priv->connectionjanitor, NULL, + gf_changelog_connection_janitor, master, "clogjan"); + if (ret != 0) { + /* TODO: cleanup priv, mutex (poller thread for !xl) */ + goto dealloc_name; + } + + return 0; + +dealloc_name: + free(master->name); +dealloc_master: + free(master); + master = NULL; +error_return: + return -1; +} + +int +gf_changelog_register_generic(struct gf_brick_spec *bricks, int count, + int ordered, char *logfile, int lvl, void *xl) +{ + int ret = 0; + xlator_t *this = NULL; + xlator_t *old_this = NULL; + struct gf_brick_spec *brick = NULL; + gf_boolean_t need_order = _gf_false; + + SAVE_THIS(xl); + + this = THIS; + if (!this) + goto error_return; + + ret = gf_changelog_setup_logging(this, logfile, lvl); + if (ret) + goto error_return; + + need_order = (ordered) ? _gf_true : _gf_false; + + brick = bricks; + while (count--) { + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, "brick=%s", + brick->brick_path, "notify_filter=%d", brick->filter, NULL); + + ret = gf_changelog_register_brick(this, brick, need_order, xl); + if (ret != 0) { + gf_msg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED, + "Error registering with changelog xlator"); + break; + } + + brick++; + } + + if (ret != 0) + goto cleanup_inited_bricks; + + RESTORE_THIS(); + return 0; + +cleanup_inited_bricks: + gf_cleanup_connections(this); +error_return: + RESTORE_THIS(); + return -1; +} + +/** + * @API + * gf_changelog_register() + * + * This is _NOT_ a generic register API. It's a special API to handle + * updates at a journal granulality. This is used by consumers wanting + * to process persistent journal such as geo-replication via a set of + * APIs. All of this is required to maintain backward compatibility. + * Owner specific private data is stored in ->api (in gf_private_t), + * which is used by APIs to access it's private data. This limits + * the library access to a single brick, but that's how it used to + * be anyway. Furthermore, this API solely _owns_ "this", therefore + * callers already having a notion of "this" are expected to use the + * newer API. + * + * Newer applications wanting to use this library need not face this + * limitation and reply of the much more feature rich generic register + * API, which is purely callback based. + * + * NOTE: @max_reconnects is not used but required for backward compat. + * + * For generic API, refer gf_changelog_register_generic(). + */ +int +gf_changelog_register(char *brick_path, char *scratch_dir, char *log_file, + int log_level, int max_reconnects) +{ + struct gf_brick_spec brick = { + 0, + }; + + if (master) + THIS = master; + else + return -1; + + brick.brick_path = brick_path; + brick.filter = CHANGELOG_OP_TYPE_JOURNAL; + + brick.init = gf_changelog_journal_init; + brick.fini = gf_changelog_journal_fini; + brick.callback = gf_changelog_handle_journal; + brick.connected = gf_changelog_journal_connect; + brick.disconnected = gf_changelog_journal_disconnect; + + brick.ptr = scratch_dir; + + return gf_changelog_register_generic(&brick, 1, 1, log_file, log_level, + NULL); +} diff --git a/xlators/features/changelog/lib/src/gf-history-changelog.c b/xlators/features/changelog/lib/src/gf-history-changelog.c new file mode 100644 index 00000000000..a16219f3664 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-history-changelog.c @@ -0,0 +1,1020 @@ +#include <errno.h> +#include <dirent.h> +#include <stddef.h> +#include <sys/types.h> + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include <string.h> + +#include <glusterfs/globals.h> +#include <glusterfs/glusterfs.h> +#include <glusterfs/logging.h> +#include <glusterfs/syscall.h> + +#include "gf-changelog-helpers.h" +#include "gf-changelog-journal.h" + +/* from the changelog translator */ +#include "changelog-misc.h" +#include "changelog-lib-messages.h" +#include "changelog-mem-types.h" + +/** + * @API + * gf_history_changelog_done: + * Move processed history changelog file from .processing + * to .processed + * + * ARGUMENTS: + * file(IN): path to processed history changelog file in + * .processing directory. + * + * RETURN VALUE: + * 0: On success. + * -1: On error. + */ +int +gf_history_changelog_done(char *file) +{ + int ret = -1; + char *buffer = NULL; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; + char to_path[PATH_MAX] = { + 0, + }; + + errno = EINVAL; + + this = THIS; + if (!this) + goto out; + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; + + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) + goto out; + + if (!file || !strlen(file)) + goto out; + + /* make sure 'file' is inside ->jnl_working_dir */ + buffer = realpath(file, NULL); + if (!buffer) + goto out; + + if (strncmp(hist_jnl->jnl_working_dir, buffer, + strlen(hist_jnl->jnl_working_dir))) + goto out; + + (void)snprintf(to_path, PATH_MAX, "%s%s", hist_jnl->jnl_processed_dir, + basename(buffer)); + gf_msg_debug(this->name, 0, "moving %s to processed directory", file); + ret = sys_rename(buffer, to_path); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_RENAME_FAILED, "from=%s", file, "to=%s", + to_path, NULL); + goto out; + } + + ret = 0; + +out: + if (buffer) + free(buffer); /* allocated by realpath() */ + return ret; +} + +/** + * @API + * gf_history_changelog_start_fresh: + * For a set of changelogs, start from the beginning. + * It will truncates the history tracker fd. + * + * RETURN VALUES: + * 0: On success. + * -1: On error. + */ +int +gf_history_changelog_start_fresh() +{ + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; + + this = THIS; + if (!this) + goto out; + + errno = EINVAL; + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; + + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) + goto out; + + if (gf_ftruncate(hist_jnl->jnl_fd, 0)) + goto out; + + return 0; + +out: + return -1; +} + +/** + * @API + * gf_history_changelog_next_change: + * Return the next history changelog file entry. Zero means all + * history chanelogs are consumed. + * + * ARGUMENTS: + * bufptr(OUT): Path to unprocessed history changelog file + * from tracker file. + * maxlen(IN): Usually PATH_MAX. + * + * RETURN VALUES: + * size: On success. + * -1 : On error. + */ +ssize_t +gf_history_changelog_next_change(char *bufptr, size_t maxlen) +{ + ssize_t size = -1; + int tracker_fd = 0; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; + char buffer[PATH_MAX] = { + 0, + }; + + if (maxlen > PATH_MAX) { + errno = ENAMETOOLONG; + goto out; + } + + errno = EINVAL; + + this = THIS; + if (!this) + goto out; + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; + + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) + goto out; + + tracker_fd = hist_jnl->jnl_fd; + + size = gf_readline(tracker_fd, buffer, maxlen); + if (size < 0) { + size = -1; + goto out; + } + + if (size == 0) + goto out; + + memcpy(bufptr, buffer, size - 1); + bufptr[size - 1] = '\0'; + +out: + return size; +} + +/** + * @API + * gf_history_changelog_scan: + * Scan and generate a list of change entries. + * Calling this api multiple times (without calling gf_changlog_done()) + * would result new changelogs(s) being refreshed in the tracker file. + * This call also acts as a cancellation point for the consumer. + * + * RETURN VALUES: + * +ve integer : success and keep scanning.(count of changelogs) + * 0 : success and done scanning. + * -1 : error. + * + * NOTE: After first 0 return call_get_next change for once more time + * to empty the tracker + * + */ +ssize_t +gf_history_changelog_scan() +{ + int tracker_fd = 0; + size_t off = 0; + xlator_t *this = NULL; + size_t nr_entries = 0; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; + struct dirent *entry = NULL; + struct dirent scratch[2] = { + { + 0, + }, + }; + char buffer[PATH_MAX] = { + 0, + }; + static int is_last_scan; + + this = THIS; + if (!this) + goto out; + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; + if (JNL_IS_API_DISCONNECTED(jnl)) { + errno = ENOTCONN; + goto out; + } + + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) + goto out; + +retry: + if (is_last_scan == 1) + return 0; + if (hist_jnl->hist_done == 0) + is_last_scan = 1; + + errno = EINVAL; + if (hist_jnl->hist_done == -1) + goto out; + + tracker_fd = hist_jnl->jnl_fd; + + if (gf_ftruncate(tracker_fd, 0)) + goto out; + + rewinddir(hist_jnl->jnl_dir); + + for (;;) { + errno = 0; + entry = sys_readdir(hist_jnl->jnl_dir, scratch); + if (!entry || errno != 0) + break; + + if (strcmp(basename(entry->d_name), ".") == 0 || + strcmp(basename(entry->d_name), "..") == 0) + continue; + + nr_entries++; + + GF_CHANGELOG_FILL_BUFFER(hist_jnl->jnl_processing_dir, buffer, off, + strlen(hist_jnl->jnl_processing_dir)); + GF_CHANGELOG_FILL_BUFFER(entry->d_name, buffer, off, + strlen(entry->d_name)); + GF_CHANGELOG_FILL_BUFFER("\n", buffer, off, 1); + + if (gf_changelog_write(tracker_fd, buffer, off) != off) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_WRITE_FAILED, + "error writing changelog filename" + " to tracker file"); + break; + } + off = 0; + } + + gf_msg_debug(this->name, 0, "hist_done %d, is_last_scan: %d", + hist_jnl->hist_done, is_last_scan); + + if (!entry) { + if (gf_lseek(tracker_fd, 0, SEEK_SET) != -1) { + if (nr_entries > 0) + return nr_entries; + else { + sleep(1); + goto retry; + } + } + } +out: + return -1; +} + +/* + * Gets timestamp value at the changelog path at index. + * Returns 0 on success(updates given time-stamp), -1 on failure. + */ +int +gf_history_get_timestamp(int fd, int index, int len, unsigned long *ts) +{ + xlator_t *this = NULL; + int n_read = -1; + char path_buf[PATH_MAX] = { + 0, + }; + char *iter = path_buf; + size_t offset = index * (len + 1); + unsigned long value = 0; + int ret = 0; + + this = THIS; + if (!this) { + return -1; + } + + n_read = sys_pread(fd, path_buf, len, offset); + if (n_read < 0) { + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_READ_ERROR, + "could not read from htime file"); + goto out; + } + iter += len - TIMESTAMP_LENGTH; + sscanf(iter, "%lu", &value); +out: + if (ret == 0) + *ts = value; + return ret; +} + +/* + * Function to ensure correctness of search + * Checks whether @value is there next to @target_index or not + */ +int +gf_history_check(int fd, int target_index, unsigned long value, int len) +{ + int ret = 0; + unsigned long ts1 = 0; + unsigned long ts2 = 0; + + if (target_index == 0) { + ret = gf_history_get_timestamp(fd, target_index, len, &ts1); + if (ret == -1) + goto out; + if (value <= ts1) + goto out; + else { + ret = -1; + goto out; + } + } + + ret = gf_history_get_timestamp(fd, target_index, len, &ts1); + if (ret == -1) + goto out; + ret = gf_history_get_timestamp(fd, target_index - 1, len, &ts2); + if (ret == -1) + goto out; + + if ((value <= ts1) && (value > ts2)) { + goto out; + } else + ret = -1; +out: + return ret; +} + +/* + * This is a "binary search" based search function which checks neighbours + * for in-range availability of the value to be searched and provides the + * index at which the changelog file nearest to the requested timestamp(value) + * can be read from. + * + * Actual offset can be calculated as (index* (len+1) ). + * "1" is because the changelog paths are null terminated. + * + * @path : Htime file to search in + * @value : time stamp to search + * @from : start index to search + * @to : end index to search + * @len : length of fixes length strings separated by null + */ + +int +gf_history_b_search(int fd, unsigned long value, unsigned long from, + unsigned long to, int len) +{ + int m_index = -1; + unsigned long cur_value = 0; + unsigned long ts1 = 0; + int ret = 0; + + m_index = (from + to) / 2; + + if ((to - from) <= 1) { + /* either one or 2 changelogs left */ + if (to != from) { + /* check if value is less or greater than to + * return accordingly + */ + ret = gf_history_get_timestamp(fd, from, len, &ts1); + if (ret == -1) + goto out; + if (ts1 >= value) { + /* actually compatision should be + * exactly == but considering + * + * case of only 2 changelogs in htime file + */ + return from; + } else + return to; + } else + return to; + } + + ret = gf_history_get_timestamp(fd, m_index, len, &cur_value); + if (ret == -1) + goto out; + if (cur_value == value) { + return m_index; + } else if (value > cur_value) { + ret = gf_history_get_timestamp(fd, m_index + 1, len, &cur_value); + if (ret == -1) + goto out; + if (value < cur_value) + return m_index + 1; + else + return gf_history_b_search(fd, value, m_index + 1, to, len); + } else { + if (m_index == 0) { + /* we are sure that values exists + * in this htime file + */ + return 0; + } else { + ret = gf_history_get_timestamp(fd, m_index - 1, len, &cur_value); + if (ret == -1) + goto out; + if (value > cur_value) { + return m_index; + } else + return gf_history_b_search(fd, value, from, m_index - 1, len); + } + } +out: + return -1; +} + +/* + * Description: Checks if the changelog path is usable or not, + * which is differentiated by checking for "changelog" + * in the path and not "CHANGELOG". + * + * Returns: + * 1 : Yes, usable ( contains "CHANGELOG" ) + * 0 : No, Not usable ( contains, "changelog") + */ +int +gf_is_changelog_usable(char *cl_path) +{ + int ret = -1; + const char low_c[] = "changelog"; + char *str_ret = NULL; + char *bname = NULL; + + bname = basename(cl_path); + + str_ret = strstr(bname, low_c); + + if (str_ret != NULL) + ret = 0; + else + ret = 1; + + return ret; +} + +void * +gf_changelog_consume_wrap(void *data) +{ + int ret = -1; + ssize_t nread = 0; + xlator_t *this = NULL; + gf_changelog_consume_data_t *ccd = NULL; + + ccd = (gf_changelog_consume_data_t *)data; + this = ccd->this; + + ccd->retval = -1; + + nread = sys_pread(ccd->fd, ccd->changelog, PATH_MAX - 1, ccd->offset); + if (nread < 0) { + gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_READ_ERROR, + "cannot read from history metadata file"); + goto out; + } + + /* TODO: handle short reads and EOF. */ + if (gf_is_changelog_usable(ccd->changelog) == 1) { + ret = gf_changelog_consume(ccd->this, ccd->jnl, ccd->changelog, + _gf_true); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_PARSE_ERROR, + "name=%s", ccd->changelog, NULL); + goto out; + } + } + ccd->retval = 0; + +out: + return NULL; +} + +/** + * "gf_history_consume" is a worker function for history. + * parses and moves changelogs files from index "from" + * to index "to" in open htime file whose fd is "fd". + */ + +#define MAX_PARALLELS 10 + +void * +gf_history_consume(void *data) +{ + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; + int ret = 0; + int iter = 0; + int fd = -1; + int from = -1; + int to = -1; + int len = -1; + int n_parallel = 0; + int n_envoked = 0; + gf_boolean_t publish = _gf_true; + pthread_t th_id[MAX_PARALLELS] = { + 0, + }; + gf_changelog_history_data_t *hist_data = NULL; + gf_changelog_consume_data_t ccd[MAX_PARALLELS] = { + {0}, + }; + gf_changelog_consume_data_t *curr = NULL; + + hist_data = (gf_changelog_history_data_t *)data; + if (hist_data == NULL) { + ret = -1; + goto out; + } + + fd = hist_data->htime_fd; + from = hist_data->from; + to = hist_data->to; + len = hist_data->len; + n_parallel = hist_data->n_parallel; + + THIS = hist_data->this; + this = hist_data->this; + if (!this) { + ret = -1; + goto out; + } + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) { + ret = -1; + goto out; + } + + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) { + ret = -1; + goto out; + } + + while (from <= to) { + n_envoked = 0; + + for (iter = 0; (iter < n_parallel) && (from <= to); iter++) { + curr = &ccd[iter]; + + curr->this = this; + curr->jnl = hist_jnl; + curr->fd = fd; + curr->offset = from * (len + 1); + + curr->retval = 0; + memset(curr->changelog, '\0', PATH_MAX); + + ret = gf_thread_create(&th_id[iter], NULL, + gf_changelog_consume_wrap, curr, + "clogc%03hx", (iter + 1) & 0x3ff); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, ret, + CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, + "could not create consume-thread"); + goto sync; + } else + n_envoked++; + + from++; + } + + sync: + for (iter = 0; iter < n_envoked; iter++) { + ret = pthread_join(th_id[iter], NULL); + if (ret) { + publish = _gf_false; + gf_msg(this->name, GF_LOG_ERROR, ret, + CHANGELOG_LIB_MSG_PTHREAD_JOIN_FAILED, + "pthread_join() error"); + /* try to join the rest */ + continue; + } + + if (publish == _gf_false) + continue; + + curr = &ccd[iter]; + if (ccd->retval) { + publish = _gf_false; + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_PARSE_ERROR_CEASED, NULL); + continue; + } + + ret = gf_changelog_publish(curr->this, curr->jnl, curr->changelog); + if (ret) { + publish = _gf_false; + gf_msg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_PUBLISH_ERROR, + "publish error, ceased publishing..."); + } + } + } + + /* informing "parsing done". */ + hist_jnl->hist_done = (publish == _gf_true) ? 0 : -1; + +out: + if (fd != -1) + (void)sys_close(fd); + GF_FREE(hist_data); + return NULL; +} + +/** + * @API + * gf_history_changelog() : Get/parse historical changelogs and get them ready + * for consumption. + * + * Arguments: + * @changelog_dir : Directory location from where history changelogs are + * supposed to be consumed. + * @start: Unix timestamp FROM where changelogs should be consumed. + * @end: Unix timestamp TO where changelogsshould be consumed. + * @n_parallel : degree of parallelism while changelog parsing. + * @actual_end : the end time till where changelogs are available. + * + * Return: + * Returns <timestamp> on success, the last time till where changelogs are + * available. + * Returns -1 on failure(error). + */ + +/** + * Extract timestamp range from a historical metadata file + * Returns: + * 0 : Success ({min,max}_ts with the appropriate values) + * -1 : Failure + * -2 : Ignore this metadata file and process next + */ +int +gf_changelog_extract_min_max(const char *dname, const char *htime_dir, int *fd, + unsigned long *total, unsigned long *min_ts, + unsigned long *max_ts) +{ + int ret = -1; + xlator_t *this = NULL; + char htime_file[PATH_MAX] = { + 0, + }; + struct stat stbuf = { + 0, + }; + char *iter = NULL; + char x_value[30] = { + 0, + }; + + this = THIS; + + snprintf(htime_file, PATH_MAX, "%s/%s", htime_dir, dname); + + iter = (htime_file + strlen(htime_file) - TIMESTAMP_LENGTH); + sscanf(iter, "%lu", min_ts); + + ret = sys_stat(htime_file, &stbuf); + if (ret) { + ret = -1; + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HTIME_ERROR, + "op=stat", "path=%s", htime_file, NULL); + goto out; + } + + /* ignore everything except regular files */ + if (!S_ISREG(stbuf.st_mode)) { + ret = -2; + goto out; + } + + *fd = open(htime_file, O_RDONLY); + if (*fd < 0) { + ret = -1; + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HTIME_ERROR, + "op=open", "path=%s", htime_file, NULL); + goto out; + } + + /* Looks good, extract max timestamp */ + ret = sys_fgetxattr(*fd, HTIME_KEY, x_value, sizeof(x_value)); + if (ret < 0) { + ret = -1; + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_GET_XATTR_FAILED, "path=%s", htime_file, + NULL); + goto out; + } + + sscanf(x_value, "%lu:%lu", max_ts, total); + gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_LIB_MSG_MIN_MAX_INFO, + "min=%lu", *min_ts, "max=%lu", *max_ts, "total_changelogs=%lu", + *total, NULL); + + ret = 0; + +out: + return ret; +} + +/* gf_history_changelog returns actual_end and spawns threads to + * parse historical changelogs. The return values are as follows. + * 0 : On success + * 1 : Successful, but partial historical changelogs available, + * end time falls into different htime file or future time + * -2 : Error, requested historical changelog not available, not + * even partial + * -1 : On any error + */ +int +gf_history_changelog(char *changelog_dir, unsigned long start, + unsigned long end, int n_parallel, + unsigned long *actual_end) +{ + int ret = 0; + int len = -1; + int fd = -1; + int n_read = -1; + unsigned long min_ts = 0; + unsigned long max_ts = 0; + unsigned long end2 = 0; + unsigned long ts1 = 0; + unsigned long ts2 = 0; + unsigned long to = 0; + unsigned long from = 0; + unsigned long total_changelog = 0; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; + gf_changelog_history_data_t *hist_data = NULL; + DIR *dirp = NULL; + struct dirent *entry = NULL; + struct dirent scratch[2] = { + { + 0, + }, + }; + pthread_t consume_th = 0; + char htime_dir[PATH_MAX] = { + 0, + }; + char buffer[PATH_MAX] = { + 0, + }; + gf_boolean_t partial_history = _gf_false; + + pthread_attr_t attr; + + this = THIS; + if (!this) { + ret = -1; + goto out; + } + + ret = pthread_attr_init(&attr); + if (ret != 0) { + gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_PTHREAD_ERROR, + "Pthread init failed"); + return -1; + } + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) { + ret = -1; + goto out; + } + + hist_jnl = (gf_changelog_journal_t *)jnl->hist_jnl; + if (!hist_jnl) { + ret = -1; + goto out; + } + + gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_LIB_MSG_REQUESTING_INFO, + "start=%lu", start, "end=%lu", end, NULL); + + /* basic sanity check */ + if (start > end || n_parallel <= 0) { + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HIST_FAILED, + "start=%lu", start, "end=%lu", end, "thread_count=%d", + n_parallel, NULL); + ret = -1; + goto out; + } + + /* cap parallelism count */ + if (n_parallel > MAX_PARALLELS) + n_parallel = MAX_PARALLELS; + + CHANGELOG_FILL_HTIME_DIR(changelog_dir, htime_dir); + + dirp = sys_opendir(htime_dir); + if (dirp == NULL) { + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HTIME_ERROR, + "op=opendir", "path=%s", htime_dir, NULL); + ret = -1; + goto out; + } + + for (;;) { + errno = 0; + + entry = sys_readdir(dirp, scratch); + + if (!entry || errno != 0) { + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_HIST_FAILED, "start=%lu", start, + "end=%lu", end, NULL); + ret = -2; + break; + } + + ret = gf_changelog_extract_min_max(entry->d_name, htime_dir, &fd, + &total_changelog, &min_ts, &max_ts); + if (ret) { + if (-2 == ret) + continue; + goto out; + } + + if (start >= min_ts && start < max_ts) { + /** + * TODO: handle short reads later... + */ + n_read = sys_read(fd, buffer, PATH_MAX); + if (n_read < 0) { + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_READ_ERROR, + "unable to read htime file"); + goto out; + } + + len = strlen(buffer); + + /** + * search @start in the htime file returning it's index + * (@from) + */ + from = gf_history_b_search(fd, start, 0, total_changelog - 1, len); + + /* ensuring correctness of gf_b_search */ + if (gf_history_check(fd, from, start, len) != 0) { + ret = -1; + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_GET_TIME_ERROR, "for=start", + "start=%lu", start, "idx=%lu", from, NULL); + goto out; + } + + end2 = (end <= max_ts) ? end : max_ts; + + /* Check if end falls out of same HTIME file. The end + * falling to a different htime file or changelog + * disable-enable is detected only after 20 seconds. + * This is required because, applications generally + * asks historical changelogs till current time and + * it is possible changelog is not rolled over yet. + * So, buffer time of default rollover time plus 5 + * seconds is subtracted. If the application requests + * the end time with in half a minute of changelog + * disable, it's not detected as changelog disable and + * it's application's responsibility to retry after + * 20 seconds before confirming it as partial history. + */ + if ((end - 20) > max_ts) { + partial_history = _gf_true; + } + + /** + * search @end2 in htime file returning it's index (@to) + */ + to = gf_history_b_search(fd, end2, 0, total_changelog - 1, len); + + if (gf_history_check(fd, to, end2, len) != 0) { + ret = -1; + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_GET_TIME_ERROR, "for=end", + "start=%lu", end2, "idx=%lu", to, NULL); + goto out; + } + + ret = gf_history_get_timestamp(fd, from, len, &ts1); + if (ret == -1) + goto out; + + ret = gf_history_get_timestamp(fd, to, len, &ts2); + if (ret == -1) + goto out; + + gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_LIB_MSG_FINAL_INFO, + "from=%lu", ts1, "to=%lu", ts2, "changes=%lu", + (to - from + 1), NULL); + + hist_data = GF_CALLOC(1, sizeof(gf_changelog_history_data_t), + gf_changelog_mt_history_data_t); + + hist_data->htime_fd = fd; + hist_data->from = from; + hist_data->to = to; + hist_data->len = len; + hist_data->n_parallel = n_parallel; + hist_data->this = this; + + ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if (ret != 0) { + gf_msg(this->name, GF_LOG_ERROR, ret, + CHANGELOG_LIB_MSG_PTHREAD_ERROR, + "unable to sets the detach" + " state attribute"); + ret = -1; + goto out; + } + + /* spawn a thread for background parsing & publishing */ + ret = gf_thread_create(&consume_th, &attr, gf_history_consume, + hist_data, "cloghcon"); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, ret, + CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, + "creation of consume parent-thread" + " failed."); + ret = -1; + goto out; + } + + goto out; + + } else { /* end of range check */ + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_HIST_FAILED, "start=%lu", start, + "end=%lu", end, "chlog_min=%lu", min_ts, "chlog_max=%lu", + max_ts, NULL); + } + } /* end of readdir() */ + +out: + if (dirp != NULL) + (void)sys_closedir(dirp); + + if (ret < 0) { + if (fd != -1) + (void)sys_close(fd); + GF_FREE(hist_data); + (void)pthread_attr_destroy(&attr); + + return ret; + } + + hist_jnl->hist_done = 1; + *actual_end = ts2; + + if (partial_history) { + ret = 1; + } + + return ret; +} |
