From d2db585ce7e26851178104433fa9422482d8719e Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Wed, 19 Feb 2014 20:47:46 +0530 Subject: features/changelog : historical journal consumption. Facilitates Glusterfs with the ability to detect file-operations happened in past by scanning the back-end(brick-level) glusterfs journal (changelog). Design: * List of changelogs produces in one perfectly running session are stored in htime file which also holds necessary information about the session start and end time. * Involves fixed sized seeks to identify N'th changelog in the list. * Requires O(log n), (where n is number of changelogs in the list), time to identify the end changelog for the given start-end time interval. Currently the background processing of changelogs is sub optimal. BZ 1097041 tracks the development effort. For complete design, refer the below link: http://lists.nongnu.org/archive/html/gluster-devel/2014-02/msg00206.html Change-Id: I27e49f75e492e843084d0ecaf9130224d08462a0 BUG: 1091961 Signed-off-by: Ajeet Jha Signed-off-by: Venky Shankar Signed-off-by: Ajeet Jha Reviewed-on: http://review.gluster.org/6930 Reviewed-by: Kotresh HR Tested-by: Gluster Build System --- .../changelog/lib/examples/c/get-history.c | 109 ++++ .../changelog/lib/src/gf-changelog-helpers.h | 45 ++ .../changelog/lib/src/gf-changelog-process.c | 57 +- .../changelog/lib/src/gf-history-changelog.c | 676 ++++++++++++++++++++- xlators/features/changelog/src/changelog-helpers.c | 103 +++- xlators/features/changelog/src/changelog-helpers.h | 17 +- .../features/changelog/src/changelog-mem-types.h | 1 + xlators/features/changelog/src/changelog-misc.h | 7 + xlators/features/changelog/src/changelog.c | 37 +- 9 files changed, 1032 insertions(+), 20 deletions(-) create mode 100644 xlators/features/changelog/lib/examples/c/get-history.c (limited to 'xlators/features/changelog') diff --git a/xlators/features/changelog/lib/examples/c/get-history.c b/xlators/features/changelog/lib/examples/c/get-history.c new file mode 100644 index 00000000000..33eb8c32d4d --- /dev/null +++ b/xlators/features/changelog/lib/examples/c/get-history.c @@ -0,0 +1,109 @@ +/* + Copyright (c) 2013 Red Hat, Inc. + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +/** + * get set of new changes every 10 seconds (just print the file names) + * + * Compile it using: + * gcc -o gethistory `pkg-config --cflags libgfchangelog` get-history.c \ + * `pkg-config --libs libgfchangelog` + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "changelog.h" + +#define handle_error(fn) \ + printf ("%s (reason: %s)\n", fn, strerror (errno)) + +int +main (int argc, char ** argv) +{ + int i = 0; + int ret = 0; + ssize_t nr_changes = 0; + ssize_t changes = 0; + char fbuf[PATH_MAX] = {0,}; + unsigned long end_ts = 0; + + ret = gf_changelog_register ("/export1/v1/b1", + "/tmp/scratch_v1", "/tmp/scratch_v1/changes.log", + 9, 5); + if (ret) { + handle_error ("register failed"); + goto out; + } + + int a, b; + printf ("give the two numbers start and end\t"); + scanf ("%d%d", &a, &b); + ret = gf_history_changelog ("/export1/v1/b1/.glusterfs/changelogs",a, b, 3, &end_ts); + if (ret == -1) { + printf ("history failed"); + goto out; + } + + printf ("end time till when changelog available : %d , ret(%d) \t", end_ts, ret); + fflush(stdout); + + while (1) { + nr_changes = gf_history_changelog_scan (); + printf ("scanned, nr_changes : %d\n",nr_changes); + if (nr_changes < 0) { + handle_error ("scan(): "); + break; + } + + if (nr_changes == 0) { + printf ("done scanning \n"); + goto out; + } + + printf ("Got %ld changelog files\n", nr_changes); + + while ( (changes = + gf_history_changelog_next_change (fbuf, PATH_MAX)) > 0) { + printf ("changelog file [%d]: %s\n", ++i, fbuf); + + /* process changelog */ + /* ... */ + /* ... */ + /* ... */ + /* done processing */ + + ret = gf_history_changelog_done (fbuf); + if (ret) + handle_error ("gf_changelog_done"); + } + /* + if (changes == -1) + handle_error ("gf_changelog_next_change"); + if (nr_changes ==1){ + printf("continue scanning\n"); + } + + if(nr_changes == 0){ + printf("done scanning \n"); + goto out; + } + */ + } + + +out: + return ret; +} diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h index 2d545da9e82..218896b86e7 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h @@ -24,6 +24,7 @@ #define GF_CHANGELOG_PROCESSED_DIR ".processed" #define GF_CHANGELOG_PROCESSING_DIR ".processing" #define GF_CHANGELOG_HISTORY_DIR ".history" +#define TIMESTAMP_LENGTH 10 #ifndef MAXLINE #define MAXLINE 4096 @@ -72,8 +73,45 @@ typedef struct gf_changelog { /* Holds gfc for History API */ struct gf_changelog *hist_gfc; + + /* holds 0 done scanning, 1 keep scanning and -1 error */ + int hist_done; } gf_changelog_t; +typedef struct gf_changelog_history_data { + int len; + + int htime_fd; + + /* parallelism count */ + int n_parallel; + + /* history from, to indexes */ + unsigned long from; + unsigned long to; +} gf_changelog_history_data_t; + +typedef struct gf_changelog_consume_data { + /** set of inputs */ + + /* fd to read from */ + int fd; + + /* from @offset */ + off_t offset; + + xlator_t *this; + gf_changelog_t *gfc; + + /** set of outputs */ + + /* return value */ + int retval; + + /* journal processed */ + char changelog[PATH_MAX]; +} gf_changelog_consume_data_t; + int gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc); @@ -98,4 +136,11 @@ gf_ftruncate (int fd, off_t length); off_t gf_lseek (int fd, off_t offset, int whence); +int +gf_changelog_consume (xlator_t *this, + gf_changelog_t *gfc, + char *from_path, gf_boolean_t no_publish); +int +gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path); + #endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-process.c b/xlators/features/changelog/lib/src/gf-changelog-process.c index 3ea2700c62b..3b8d2683672 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-process.c +++ b/xlators/features/changelog/lib/src/gf-changelog-process.c @@ -459,8 +459,49 @@ gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd, return ret; } -static int -gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path) +int +gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path) +{ + int ret = 0; + char dest[PATH_MAX] = {0,}; + char to_path[PATH_MAX] = {0,}; + struct stat stbuf = {0,}; + + (void) snprintf (to_path, PATH_MAX, "%s%s", + gfc->gfc_current_dir, basename (from_path)); + + /* handle zerob file that wont exist in current */ + ret = stat (from_path, &stbuf); + if (ret) + goto out; + + if (stbuf.st_size == 0) { + ret = unlink (from_path); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "could not unlink %s (reason %s)", + from_path, strerror (errno)); + goto out; + } + + (void) snprintf (dest, PATH_MAX, "%s%s", + gfc->gfc_processing_dir, basename (from_path)); + + ret = rename (to_path, dest); + if (ret){ + gf_log (this->name, GF_LOG_ERROR, + "error moving %s to processing dir" + " (reason: %s)", to_path, strerror (errno)); + } + +out: + return ret; +} + +int +gf_changelog_consume (xlator_t *this, + gf_changelog_t *gfc, + char *from_path, gf_boolean_t no_publish) { int ret = -1; int fd1 = 0; @@ -472,6 +513,7 @@ gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path) ret = stat (from_path, &stbuf); if (ret || !S_ISREG(stbuf.st_mode)) { + ret = -1; gf_log (this->name, GF_LOG_ERROR, "stat failed on changelog file: %s", from_path); goto out; @@ -506,6 +548,8 @@ gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path) if (!ret) { /* move it to processing on a successfull decode */ + if (no_publish == _gf_true) + goto close_fd; ret = rename (to_path, dest); if (ret) gf_log (this->name, GF_LOG_ERROR, @@ -516,10 +560,15 @@ gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path) /* remove it from .current if it's an empty file */ if (zerob) { + if (no_publish == _gf_true) { + ret = 0; + goto close_fd; + } + ret = unlink (to_path); if (ret) gf_log (this->name, GF_LOG_ERROR, - "could not unlink %s (reason: %s", + "could not unlink %s (reason: %s)", to_path, strerror (errno)); } } @@ -546,7 +595,7 @@ gf_changelog_ext_change (xlator_t *this, alo = 1; gf_log (this->name, GF_LOG_DEBUG, "processing changelog: %s", path); - ret = gf_changelog_consume (this, gfc, path); + ret = gf_changelog_consume (this, gfc, path, _gf_false); } if (ret) diff --git a/xlators/features/changelog/lib/src/gf-history-changelog.c b/xlators/features/changelog/lib/src/gf-history-changelog.c index bfc4cd37dc3..a895037eeca 100644 --- a/xlators/features/changelog/lib/src/gf-history-changelog.c +++ b/xlators/features/changelog/lib/src/gf-history-changelog.c @@ -11,6 +11,7 @@ #include "globals.h" #include "glusterfs.h" #include "logging.h" +#include "syscall.h" #include "gf-changelog-helpers.h" @@ -18,7 +19,8 @@ #include "changelog-misc.h" #include "changelog-mem-types.h" -/*@API +/** + * @API * gf_history_changelog_done: * Move processed history changelog file from .processing * to .processed @@ -86,6 +88,7 @@ gf_history_changelog_done (char *file) free (buffer); /* allocated by realpath() */ return ret; } + /** * @API * gf_history_changelog_start_fresh: @@ -126,7 +129,7 @@ gf_history_changelog_start_fresh () return -1; } -/* +/** * @API * gf_history_changelog_next_change: * Return the next history changelog file entry. Zero means all @@ -182,7 +185,7 @@ gf_history_changelog_next_change (char *bufptr, size_t maxlen) return -1; } -/* +/** * @API * gf_history_changelog_scan: * Scan and generate a list of change entries. @@ -191,8 +194,13 @@ gf_history_changelog_next_change (char *bufptr, size_t maxlen) * This call also acts as a cancellation point for the consumer. * * RETURN VALUES: - * nr_entries: On success. - * -1 : On error. + * +ve integer : success and keep scanning.(count of changelogs) + * 0 : success and done scanning. + * -1 : error. + * + * NOTE: After first 0 return call_get_next change for once more time + * to empty the tracker + * */ ssize_t gf_history_changelog_scan () @@ -204,10 +212,11 @@ gf_history_changelog_scan () xlator_t *this = NULL; size_t nr_entries = 0; gf_changelog_t *gfc = NULL; - gf_changelog_t *hist_gfc = NULL; + gf_changelog_t *hist_gfc = NULL; struct dirent *entryp = NULL; struct dirent *result = NULL; char buffer[PATH_MAX] = {0,}; + static int is_last_scan = 0; this = THIS; if (!this) @@ -221,15 +230,23 @@ gf_history_changelog_scan () if (!hist_gfc) goto out; + retry: + if (is_last_scan == 1) + return 0; + if (hist_gfc->hist_done == 0) + is_last_scan = 1; + errno = EINVAL; + if (hist_gfc->hist_done == -1) + goto out; tracker_fd = hist_gfc->gfc_fd; if (gf_ftruncate (tracker_fd, 0)) goto out; - len = offsetof(struct dirent, d_name) - + pathconf(hist_gfc->gfc_processing_dir, _PC_NAME_MAX) + 1; + len = offsetof (struct dirent, d_name) + + pathconf (hist_gfc->gfc_processing_dir, _PC_NAME_MAX) + 1; entryp = GF_CALLOC (1, len, gf_changelog_mt_libgfchangelog_dirent_t); if (!entryp) @@ -265,10 +282,649 @@ gf_history_changelog_scan () GF_FREE (entryp); + gf_log (this->name, GF_LOG_DEBUG, + "hist_done %d, is_last_scan: %d", hist_gfc->hist_done, is_last_scan); + if (!result) { - if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) - return nr_entries; + if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) { + if (nr_entries > 0) + return nr_entries; + else { + sleep(1); + goto retry; + } + } } out: return -1; } + +/* + * Gets timestamp value at the changelog path at index. + * Returns 0 on success(updates given time-stamp), -1 on failure. + */ +int +gf_history_get_timestamp (int fd, int index, int len, + unsigned long *ts) +{ + xlator_t *this = NULL; + int n_read = -1; + char path_buf[PATH_MAX]= {0,}; + char *iter = path_buf; + size_t offset = index * (len+1); + unsigned long value = 0; + int ret = 0; + + this = THIS; + if (!this) { + return -1; + } + + n_read = pread (fd, path_buf, len, offset); + if (n_read < 0 ) { + ret = -1; + gf_log ( this->name, GF_LOG_ERROR, + "could not read from htime file"); + goto out; + } + iter+= len - TIMESTAMP_LENGTH; + sscanf (iter, "%lu",&value); +out: + if(ret == 0) + *ts = value; + return ret; +} + +/* + * Function to ensure correctness of search + * Checks whether @value is there next to @target_index or not + */ +int +gf_history_check ( int fd, int target_index, unsigned long value, int len) +{ + int ret = 0; + unsigned long ts1 = 0; + unsigned long ts2 = 0; + + if (target_index == 0) { + ret = gf_history_get_timestamp (fd, target_index, len, &ts1); + if (ret == -1) + goto out; + if (value <= ts1) + goto out; + else { + ret = -1; + goto out; + } + } + + ret = gf_history_get_timestamp (fd, target_index, len, &ts1); + if (ret ==-1) + goto out; + ret = gf_history_get_timestamp (fd, target_index -1, len, &ts2); + if (ret ==-1) + goto out; + + if ( (value <= ts1) && (value > ts2) ) { + goto out; + } + else + ret = -1; +out: + return ret; +} + +/* + * This is a "binary search" based search function which checks neighbours + * for in-range availability of the value to be searched and provides the + * index at which the changelog file nearest to the requested timestamp(value) + * can be read from. + * + * Actual offset can be calculated as (index* (len+1) ). + * "1" is because the changelog paths are null terminated. + * + * @path : Htime file to search in + * @value : time stamp to search + * @from : start index to search + * @to : end index to search + * @len : length of fixes length strings seperated by null + */ + +int +gf_history_b_search (int fd, unsigned long value, + unsigned long from, unsigned long to, int len) +{ + int m_index = -1; + unsigned long cur_value = 0; + unsigned long ts1 = 0; + int ret = 0; + + m_index = (from + to)/2; + + if ( (to - from) <=1 ) { + /* either one or 2 changelogs left */ + if ( to != from ) { + /* check if value is less or greater than to + * return accordingly + */ + ret = gf_history_get_timestamp (fd, from, len, &ts1); + if (ret ==-1) + goto out; + if ( ts1 >= value) { + /* actually compatision should be + * exactly == but considering + * + * case of only 2 changelogs in htime file + */ + return from; + } + else + return to; + } + else + return to; + } + + ret = gf_history_get_timestamp (fd, m_index, len, &cur_value); + if (ret == -1) + goto out; + if (cur_value == value) { + return m_index; + } + else if (value > cur_value) { + ret = gf_history_get_timestamp (fd, m_index+1, len, &cur_value); + if (ret == -1) + goto out; + if (value < cur_value) + return m_index + 1; + else + return gf_history_b_search (fd, value, + m_index+1, to, len); + } + else { + if (m_index ==0) { + /* we are sure that values exists + * in this htime file + */ + return 0; + } + else { + ret = gf_history_get_timestamp (fd, m_index-1, len, + &cur_value); + if (ret == -1) + goto out; + if (value > cur_value) { + return m_index; + } + else + return gf_history_b_search (fd, value, from, + m_index-1, len); + } + } +out: + return -1; +} + +void * +gf_changelog_consume_wrap (void* data) +{ + int ret = -1; + ssize_t nread = 0; + xlator_t *this = NULL; + gf_changelog_consume_data_t *ccd = NULL; + + ccd = (gf_changelog_consume_data_t *) data; + this = ccd->this; + + ccd->retval = -1; + + nread = pread (ccd->fd, ccd->changelog, PATH_MAX, ccd->offset); + if (nread < 0) { + gf_log (this->name, GF_LOG_ERROR, + "cannot read from history metadata file (reason %s)", + strerror (errno)); + goto out; + } + + /* TODO: handle short reads and EOF. */ + + ret = gf_changelog_consume (ccd->this, + ccd->gfc, ccd->changelog, _gf_true); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "could not parse changelog: %s", ccd->changelog); + goto out; + } + + ccd->retval = 0; + + out: + return NULL; +} + +/** + * "gf_history_consume" is a worker function for history. + * parses and moves changelogs files from index "from" + * to index "to" in open htime file whose fd is "fd". + */ + +#define MAX_PARALLELS 10 + +void * +gf_history_consume (void * data) +{ + xlator_t *this = NULL; + gf_changelog_t *gfc = NULL; + gf_changelog_t *hist_gfc = NULL; + int ret = 0; + int iter = 0; + int fd = -1; + int from = -1; + int to = -1; + int len = -1; + int n_parallel = 0; + int n_envoked = 0; + gf_boolean_t publish = _gf_true; + pthread_t th_id[MAX_PARALLELS] = {0,}; + gf_changelog_history_data_t *hist_data = NULL; + gf_changelog_consume_data_t ccd[MAX_PARALLELS] = {{0},}; + gf_changelog_consume_data_t *curr = NULL; + + hist_data = (gf_changelog_history_data_t *) data; + if (hist_data == NULL) { + ret = -1; + goto out; + } + + fd = hist_data->htime_fd; + from = hist_data->from; + to = hist_data->to; + len = hist_data->len; + n_parallel = hist_data->n_parallel; + + this = THIS; + if (!this) { + ret = -1; + goto out; + } + + gfc = (gf_changelog_t *) this->private; + if (!gfc) { + ret = -1; + goto out; + } + + hist_gfc = gfc->hist_gfc; + if (!hist_gfc) { + ret = -1; + goto out; + } + + while (from <= to) { + n_envoked = 0; + + for (iter = 0 ; (iter < n_parallel) && (from <= to); iter++) { + curr = &ccd[iter]; + + curr->this = this; + curr->gfc = hist_gfc; + curr->fd = fd; + curr->offset = from * (len + 1); + + curr->retval = 0; + memset (curr->changelog, '\0', PATH_MAX); + + ret = pthread_create (&th_id[iter], NULL, + gf_changelog_consume_wrap, curr); + if (ret) { + gf_log ( this->name, GF_LOG_ERROR, + "could not create consume-thread" + " reason (%s)", strerror (ret)); + ret = -1; + goto sync; + } else + n_envoked++; + + from++; + } + + sync: + for (iter = 0; iter < n_envoked; iter++) { + ret = pthread_join (th_id[iter], NULL); + if (ret) { + publish = _gf_false; + gf_log (this->name, GF_LOG_ERROR, + "pthread_join() error %s", + strerror (ret)); + /* try to join the rest */ + continue; + } + + if (publish == _gf_false) + continue; + + curr = &ccd[iter]; + if (ccd->retval) { + publish = _gf_false; + gf_log (this->name, GF_LOG_ERROR, + "parsing error, ceased publishing..."); + continue; + } + + ret = gf_changelog_publish (curr->this, + curr->gfc, curr->changelog); + if (ret) { + publish = _gf_false; + gf_log (this->name, GF_LOG_ERROR, + "publish error, ceased publishing..."); + } + } + } + + /* informing "parsing done". */ + hist_gfc->hist_done = (publish == _gf_true) ? 0 : -1; + +out: + if (fd != -1) + close (fd); + GF_FREE (hist_data); + return NULL; +} + +/** + * @API + * gf_history_changelog() : Get/parse historical changelogs and get them ready + * for consumption. + * + * Arguments: + * @changelog_dir : Directory location from where history changelogs are + * supposed to be consumed. + * @start: Unix timestamp FROM where changelogs should be consumed. + * @end: Unix timestamp TO where changelogsshould be consumed. + * @n_parallel : degree of parallelism while changelog parsing. + * @actual_end : the end time till where changelogs are available. + * + * Return: + * Returns on success, the last time till where changelogs are + * available. + * Returns -1 on failure(error). + */ + +#define MAKE_HTIME_FILE_PATH(htime_file, htime_dir, htime_bname) do { \ + strcpy (htime_file, htime_dir); \ + strcat (htime_file, "/"); \ + strcat (htime_file, htime_bname); \ + } while (0) + +/** + * Extract timestamp range from a historical metadata file + * Returns: + * 0 : Success ({min,max}_ts with the appropriate values) + * -1 : Failure + * -2 : Ignore this metadata file and process next + */ +int +gf_changelog_extract_min_max (const char *dname, const char *htime_dir, + int *fd, unsigned long *total, + unsigned long *min_ts, unsigned long *max_ts) +{ + int ret = -1; + xlator_t *this = NULL; + char htime_file[PATH_MAX] = {0,}; + struct stat stbuf = {0,}; + char *iter = NULL; + char x_value[30] = {0,}; + + this = THIS; + + MAKE_HTIME_FILE_PATH (htime_file, htime_dir, dname); + + iter = (htime_file + strlen (htime_file) - TIMESTAMP_LENGTH); + sscanf (iter ,"%lu",min_ts); + + ret = stat (htime_file, &stbuf); + if (ret) { + ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "stat() failed on htime file %s (reason %s)", + htime_file, strerror (errno)); + goto out; + } + + /* ignore everything except regular files */ + if (!S_ISREG (stbuf.st_mode)) { + ret = -2; + goto out; + } + + *fd = open (htime_file, O_RDONLY); + if (*fd < 0) { + ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "open() failed for htime %s (reasong %s)", + htime_file, strerror (errno)); + goto out; + } + + /* Looks good, extract max timestamp */ + ret = sys_fgetxattr (*fd, HTIME_KEY, x_value, sizeof (x_value)); + if (ret < 0) { + ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "error extracting max timstamp from htime file" + " %s (reason %s)", htime_file, strerror (errno)); + goto out; + } + + sscanf (x_value, "%lu:%lu", max_ts, total); + gf_log (this->name, GF_LOG_INFO, + "MIN: %lu, MAX: %lu, TOTAL CHANGELOGS: %lu", + *min_ts, *max_ts, *total); + + ret = 0; + + out: + return ret; +} + +int +gf_history_changelog (char* changelog_dir, unsigned long start, + unsigned long end, int n_parallel, + unsigned long *actual_end) +{ + int ret = 0; + int len = -1; + int fd = -1; + int n_read = -1; + unsigned long min_ts = 0; + unsigned long max_ts = 0; + unsigned long end2 = 0; + unsigned long ts1 = 0; + unsigned long ts2 = 0; + unsigned long to = 0; + unsigned long from = 0; + unsigned long total_changelog = 0; + xlator_t *this = NULL; + gf_changelog_t *gfc = NULL; + gf_changelog_t *hist_gfc = NULL; + gf_changelog_history_data_t *hist_data = NULL; + DIR *dirp = NULL; + struct dirent *dp = NULL; + pthread_t consume_th = 0; + char htime_dir[PATH_MAX] = {0,}; + char buffer[PATH_MAX] = {0,}; + + pthread_attr_t attr; + + this = THIS; + if (!this) { + ret = -1; + goto out; + } + + gfc = (gf_changelog_t *) this->private; + if (!gfc) { + ret = -1; + goto out; + } + + hist_gfc = (gf_changelog_t *) gfc->hist_gfc; + if (!hist_gfc) { + ret = -1; + goto out; + } + + /* basic sanity check */ + if (start > end || n_parallel <= 0) { + ret = -1; + goto out; + } + + /* cap parallelism count */ + if (n_parallel > MAX_PARALLELS) + n_parallel = MAX_PARALLELS; + + CHANGELOG_FILL_HTIME_DIR (changelog_dir, htime_dir); + + dirp = opendir (htime_dir); + if (dirp == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "open dir on htime failed : %s (reason: %s)", + htime_dir, strerror (errno)); + ret = -1; + goto out; + } + + while ((dp = readdir (dirp)) != NULL) { + ret = gf_changelog_extract_min_max (dp->d_name, htime_dir, + &fd, &total_changelog, + &min_ts, &max_ts); + if (ret) { + if (-2 == ret) + continue; + goto out; + } + + if (start >= min_ts && start < max_ts) { + /** + * TODO: handle short reads later... + */ + n_read = read (fd, buffer, PATH_MAX); + if (n_read < 0) { + ret = -1; + gf_log ( this->name, GF_LOG_ERROR, + "unable to read htime file"); + goto out; + } + + len = strlen (buffer); + + /** + * search @start in the htime file returning it's index + * (@from) + */ + from = gf_history_b_search (fd, start, 0, + total_changelog - 1, len); + + /* ensuring correctness of gf_b_search */ + if (gf_history_check (fd, from, start, len) != 0) { + ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "wrong result for start: %lu idx: %lu", + start, from); + goto out; + } + + end2 = (end <= max_ts) ? end : max_ts; + + /** + * search @end2 in htime file returning it's index (@to) + */ + to = gf_history_b_search (fd, end2, + 0, total_changelog - 1, len); + + if (gf_history_check (fd, to, end2, len) != 0) { + ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "wrong result for start: %lu idx: %lu", + end2, to); + goto out; + } + + ret = gf_history_get_timestamp (fd, from, len, &ts1); + if (ret == -1) + goto out; + + ret = gf_history_get_timestamp (fd, to, len, &ts2); + if (ret == -1) + goto out; + + gf_log (this->name, GF_LOG_INFO, + "FINAL: from: %lu, to: %lu, changes: %lu", + ts1, ts2, (to - from + 1)); + + hist_data = GF_CALLOC (1, + sizeof (gf_changelog_history_data_t), + gf_changelog_mt_history_data_t); + + hist_data->htime_fd = fd; + hist_data->from = from; + hist_data->to = to; + hist_data->len = len; + hist_data->n_parallel = n_parallel; + + ret = pthread_attr_init (&attr); + if (ret != 0) { + ret = -1; + goto out; + } + + ret = pthread_attr_setdetachstate + (&attr, PTHREAD_CREATE_DETACHED); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "unable to sets the detach" + " state attribute, reason(%s)", + strerror (ret)); + ret = -1; + goto out; + } + + /* spawn a thread for background parsing & publishing */ + ret = pthread_create (&consume_th, &attr, + gf_history_consume, hist_data); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "creation of consume parent-thread" + " failed. reason(%s)", strerror (ret)); + ret = -1; + goto out; + } + + goto out; + + } /* end of range check */ + + } /* end of readdir() */ + + if (!from || !to) + ret = -1; + +out: + if (dirp != NULL) + closedir (dirp); + + if (ret < 0) { + if (fd != -1) + close (fd); + GF_FREE (hist_data); + (void) pthread_attr_destroy (&attr); + + return ret; + } + + hist_gfc->hist_done = 1; + *actual_end = ts2; + + return ret; +} diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c index 71f2f7a25ad..984106b75e6 100644 --- a/xlators/features/changelog/src/changelog-helpers.c +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -17,6 +17,7 @@ #include "defaults.h" #include "logging.h" #include "iobuf.h" +#include "syscall.h" #include "changelog-helpers.h" #include "changelog-mem-types.h" @@ -132,6 +133,49 @@ changelog_write (int fd, char *buffer, size_t len) return (writen != len); } +int +htime_update (xlator_t *this, + changelog_priv_t *priv, unsigned long ts, + char * buffer) +{ + char changelog_path[PATH_MAX+1] = {0,}; + int len = -1; + char x_value[25] = {0,}; + /* time stamp(10) + : (1) + rolltime (12 ) + buffer (2) */ + int ret = 0; + + if (priv->htime_fd ==-1) { + gf_log (this->name, GF_LOG_ERROR, + "Htime fd not available for updation"); + ret = -1; + goto out; + } + strcpy (changelog_path, buffer); + len = strlen (changelog_path); + changelog_path[len] = '\0'; /* redundant */ + + if (changelog_write (priv->htime_fd, (void*) changelog_path, len+1 ) < 0) { + gf_log (this->name, GF_LOG_ERROR, + "Htime file content write failed"); + ret =-1; + goto out; + } + + sprintf (x_value,"%lu:%d",ts, priv->rollover_count); + + if (sys_fsetxattr (priv->htime_fd, HTIME_KEY, x_value, + strlen (x_value), XATTR_REPLACE)) { + gf_log (this->name, GF_LOG_ERROR, + "Htime xattr updation failed"); + goto out; + } + + priv->rollover_count +=1; + +out: + return ret; +} + static int changelog_rollover_changelog (xlator_t *this, changelog_priv_t *priv, unsigned long ts) @@ -173,6 +217,15 @@ changelog_rollover_changelog (xlator_t *this, ofile, nfile, strerror (errno)); } + if (!ret) { + ret = htime_update (this, priv, ts, nfile); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "could not update htime file"); + goto out; + } + } + if (notify) { bname = basename (nfile); gf_log (this->name, GF_LOG_DEBUG, "notifying: %s", bname); @@ -212,6 +265,54 @@ changelog_rollover_changelog (xlator_t *this, return ret; } +/* Returns 0 on successful creation of htime file + * returns -1 on failure or error + */ +int +htime_open (xlator_t *this, + changelog_priv_t * priv, unsigned long ts) +{ + int fd = -1; + int ret = 0; + char ht_dir_path[PATH_MAX] = {0,}; + char ht_file_path[PATH_MAX] = {0,}; + int flags = 0; + + CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, ht_dir_path); + + /* get the htime file name in ht_file_path */ + (void) snprintf (ht_file_path,PATH_MAX,"%s/%s.%lu",ht_dir_path, + HTIME_FILE_NAME, ts); + + flags |= (O_CREAT | O_RDWR | O_SYNC); + fd = open (ht_file_path, flags, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (fd < 0) { + gf_log (this->name, GF_LOG_ERROR, + "unable to open/create htime file: %s" + "(reason: %s)", ht_file_path, strerror (errno)); + ret = -1; + goto out; + + } + + if (sys_fsetxattr (fd, HTIME_KEY, HTIME_INITIAL_VALUE, + sizeof (HTIME_INITIAL_VALUE)-1, 0)) { + gf_log (this->name, GF_LOG_ERROR, + "Htime xattr initialization failed"); + ret = -1; + goto out; + } + + /* save this htime_fd in priv->htime_fd */ + priv->htime_fd = fd; + /* initialize rollover-number in priv to 1 */ + priv->rollover_count = 1; + +out: + return ret; +} + int changelog_open (xlator_t *this, changelog_priv_t *priv) @@ -311,7 +412,7 @@ changelog_handle_change (xlator_t *this, int ret = 0; if (CHANGELOG_TYPE_IS_ROLLOVER (cld->cld_type)) { - changelog_encode_change(priv); + changelog_encode_change (priv); ret = changelog_start_next_change (this, priv, cld->cld_roll_time, cld->cld_finale); diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h index ec90f8a13d7..987af190b9c 100644 --- a/xlators/features/changelog/src/changelog-helpers.h +++ b/xlators/features/changelog/src/changelog-helpers.h @@ -203,9 +203,18 @@ struct changelog_priv { /* logging directory */ char *changelog_dir; + /* htime directory */ + char *htime_dir; + /* one file for all changelog types */ int changelog_fd; + /* htime fd for current changelog session */ + int htime_fd; + + /* rollover_count used by htime */ + int rollover_count; + gf_lock_t lock; /* writen end of the pipe */ @@ -393,6 +402,11 @@ void * changelog_fsync_thread (void *data); int changelog_forget (xlator_t *this, inode_t *inode); +int +htime_update (xlator_t *this, changelog_priv_t *priv, + unsigned long ts, char * buffer); +int +htime_open (xlator_t *this, changelog_priv_t * priv, unsigned long ts); /* Geo-Rep snapshot dependency changes */ void @@ -549,5 +563,6 @@ call_stub_t *__chlog_barrier_dequeue (xlator_t *this, struct list_head *queue); goto label; \ } \ } while (0) +/* End: Geo-Rep snapshot dependency changes */ + #endif /* _CHANGELOG_HELPERS_H */ -/* End: Geo-Rep snapshot dependency changes */ diff --git a/xlators/features/changelog/src/changelog-mem-types.h b/xlators/features/changelog/src/changelog-mem-types.h index d72464eab70..e1fa319a715 100644 --- a/xlators/features/changelog/src/changelog-mem-types.h +++ b/xlators/features/changelog/src/changelog-mem-types.h @@ -23,6 +23,7 @@ enum gf_changelog_mem_types { gf_changelog_mt_libgfchangelog_rl_t = gf_common_mt_end + 7, gf_changelog_mt_libgfchangelog_dirent_t = gf_common_mt_end + 8, gf_changelog_mt_changelog_buffer_t = gf_common_mt_end + 9, + gf_changelog_mt_history_data_t = gf_common_mt_end + 10, gf_changelog_mt_end }; diff --git a/xlators/features/changelog/src/changelog-misc.h b/xlators/features/changelog/src/changelog-misc.h index 127b03e2e1b..257c1f34218 100644 --- a/xlators/features/changelog/src/changelog-misc.h +++ b/xlators/features/changelog/src/changelog-misc.h @@ -16,6 +16,9 @@ #define CHANGELOG_MAX_TYPE 3 #define CHANGELOG_FILE_NAME "CHANGELOG" +#define HTIME_FILE_NAME "HTIME" +#define HTIME_KEY "trusted.glusterfs.htime" +#define HTIME_INITIAL_VALUE "0:0" #define CHANGELOG_VERSION_MAJOR 1 #define CHANGELOG_VERSION_MINOR 1 @@ -64,6 +67,10 @@ } \ } while (0) +#define CHANGELOG_FILL_HTIME_DIR(changelog_dir, path) do { \ + strcpy (path, changelog_dir); \ + strcat (path, "/htime"); \ + } while(0) /** * everything after 'CHANGELOG_TYPE_ENTRY' are internal types * (ie. none of the fops trigger this type of event), hence diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index 5eb2cd93fd3..0b982148f44 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -1661,6 +1661,11 @@ changelog_init (xlator_t *this, changelog_priv_t *priv) * simple here. */ ret = changelog_fill_rollover_data (&cld, _gf_false); + if(ret) + goto out; + + ret = htime_open (this, priv, cld.cld_roll_time); + /* call htime open with cld's rollover_time */ if (ret) goto out; @@ -1779,6 +1784,8 @@ reconfigure (xlator_t *this, dict_t *options) gf_boolean_t active_now = _gf_true; changelog_time_slice_t *slice = NULL; changelog_log_data_t cld = {0,}; + char htime_dir[PATH_MAX] = {0,}; + struct timeval tv = {0,}; priv = this->private; if (!priv) @@ -1803,6 +1810,12 @@ reconfigure (xlator_t *this, dict_t *options) goto out; ret = mkdir_p (priv->changelog_dir, 0600, _gf_true); + + if (ret) + goto out; + CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir); + ret = mkdir_p (htime_dir, 0600, _gf_true); + if (ret) goto out; @@ -1847,6 +1860,15 @@ reconfigure (xlator_t *this, dict_t *options) goto out; if (active_now) { + if (!active_earlier) { + if (gettimeofday(&tv, NULL) ) { + gf_log (this->name, GF_LOG_ERROR, + "unable to fetch htime"); + ret = -1; + goto out; + } + htime_open(this, priv, tv.tv_sec); + } ret = changelog_spawn_notifier (this, priv); if (!ret) ret = changelog_spawn_helper_threads (this, @@ -1871,10 +1893,11 @@ reconfigure (xlator_t *this, dict_t *options) int32_t init (xlator_t *this) { - int ret = -1; - char *tmp = NULL; - changelog_priv_t *priv = NULL; - gf_boolean_t cond_lock_init = _gf_false; + int ret = -1; + char *tmp = NULL; + changelog_priv_t *priv = NULL; + gf_boolean_t cond_lock_init = _gf_false; + char htime_dir[PATH_MAX] = {0,}; GF_VALIDATE_OR_GOTO ("changelog", this, out); @@ -1932,6 +1955,12 @@ init (xlator_t *this) * so that consumers can _look_ into it (finding nothing...) */ ret = mkdir_p (priv->changelog_dir, 0600, _gf_true); + + if (ret) + goto out; + + CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir); + ret = mkdir_p (htime_dir, 0600, _gf_true); if (ret) goto out; -- cgit