diff options
author | Venky Shankar <vshankar@redhat.com> | 2014-02-19 20:47:46 +0530 |
---|---|---|
committer | Venky Shankar <vshankar@redhat.com> | 2014-05-14 05:10:15 -0700 |
commit | d2db585ce7e26851178104433fa9422482d8719e (patch) | |
tree | 2e52f15cf261906debd8ec54106ffe9f84af881e /xlators/features/changelog/lib/src | |
parent | bfde478cedda8267134ee3807c8db5e042115eae (diff) |
features/changelog : historical journal consumption.
Facilitates Glusterfs with the ability to detect file-operations
happened in past by scanning the back-end(brick-level) glusterfs
journal (changelog).
Design:
* List of changelogs produces in one perfectly running session are
stored in htime file which also holds necessary information about
the session start and end time.
* Involves fixed sized seeks to identify N'th changelog in the list.
* Requires O(log n), (where n is number of changelogs in the list),
time to identify the end changelog for the given start-end time
interval.
Currently the background processing of changelogs is sub optimal. BZ
1097041 tracks the development effort.
For complete design, refer the below link:
http://lists.nongnu.org/archive/html/gluster-devel/2014-02/msg00206.html
Change-Id: I27e49f75e492e843084d0ecaf9130224d08462a0
BUG: 1091961
Signed-off-by: Ajeet Jha <ajha@redhat.com>
Signed-off-by: Venky Shankar <vshankar@redhat.com>
Signed-off-by: Ajeet Jha <ajha@redhat.com>
Reviewed-on: http://review.gluster.org/6930
Reviewed-by: Kotresh HR <khiremat@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'xlators/features/changelog/lib/src')
3 files changed, 764 insertions, 14 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h index 2d545da9e82..218896b86e7 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h @@ -24,6 +24,7 @@ #define GF_CHANGELOG_PROCESSED_DIR ".processed" #define GF_CHANGELOG_PROCESSING_DIR ".processing" #define GF_CHANGELOG_HISTORY_DIR ".history" +#define TIMESTAMP_LENGTH 10 #ifndef MAXLINE #define MAXLINE 4096 @@ -72,8 +73,45 @@ typedef struct gf_changelog { /* Holds gfc for History API */ struct gf_changelog *hist_gfc; + + /* holds 0 done scanning, 1 keep scanning and -1 error */ + int hist_done; } gf_changelog_t; +typedef struct gf_changelog_history_data { + int len; + + int htime_fd; + + /* parallelism count */ + int n_parallel; + + /* history from, to indexes */ + unsigned long from; + unsigned long to; +} gf_changelog_history_data_t; + +typedef struct gf_changelog_consume_data { + /** set of inputs */ + + /* fd to read from */ + int fd; + + /* from @offset */ + off_t offset; + + xlator_t *this; + gf_changelog_t *gfc; + + /** set of outputs */ + + /* return value */ + int retval; + + /* journal processed */ + char changelog[PATH_MAX]; +} gf_changelog_consume_data_t; + int gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc); @@ -98,4 +136,11 @@ gf_ftruncate (int fd, off_t length); off_t gf_lseek (int fd, off_t offset, int whence); +int +gf_changelog_consume (xlator_t *this, + gf_changelog_t *gfc, + char *from_path, gf_boolean_t no_publish); +int +gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path); + #endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-process.c b/xlators/features/changelog/lib/src/gf-changelog-process.c index 3ea2700c62b..3b8d2683672 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-process.c +++ b/xlators/features/changelog/lib/src/gf-changelog-process.c @@ -459,8 +459,49 @@ gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd, return ret; } -static int -gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path) +int +gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path) +{ + int ret = 0; + char dest[PATH_MAX] = {0,}; + char to_path[PATH_MAX] = {0,}; + struct stat stbuf = {0,}; + + (void) snprintf (to_path, PATH_MAX, "%s%s", + gfc->gfc_current_dir, basename (from_path)); + + /* handle zerob file that wont exist in current */ + ret = stat (from_path, &stbuf); + if (ret) + goto out; + + if (stbuf.st_size == 0) { + ret = unlink (from_path); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "could not unlink %s (reason %s)", + from_path, strerror (errno)); + goto out; + } + + (void) snprintf (dest, PATH_MAX, "%s%s", + gfc->gfc_processing_dir, basename (from_path)); + + ret = rename (to_path, dest); + if (ret){ + gf_log (this->name, GF_LOG_ERROR, + "error moving %s to processing dir" + " (reason: %s)", to_path, strerror (errno)); + } + +out: + return ret; +} + +int +gf_changelog_consume (xlator_t *this, + gf_changelog_t *gfc, + char *from_path, gf_boolean_t no_publish) { int ret = -1; int fd1 = 0; @@ -472,6 +513,7 @@ gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path) ret = stat (from_path, &stbuf); if (ret || !S_ISREG(stbuf.st_mode)) { + ret = -1; gf_log (this->name, GF_LOG_ERROR, "stat failed on changelog file: %s", from_path); goto out; @@ -506,6 +548,8 @@ gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path) if (!ret) { /* move it to processing on a successfull decode */ + if (no_publish == _gf_true) + goto close_fd; ret = rename (to_path, dest); if (ret) gf_log (this->name, GF_LOG_ERROR, @@ -516,10 +560,15 @@ gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path) /* remove it from .current if it's an empty file */ if (zerob) { + if (no_publish == _gf_true) { + ret = 0; + goto close_fd; + } + ret = unlink (to_path); if (ret) gf_log (this->name, GF_LOG_ERROR, - "could not unlink %s (reason: %s", + "could not unlink %s (reason: %s)", to_path, strerror (errno)); } } @@ -546,7 +595,7 @@ gf_changelog_ext_change (xlator_t *this, alo = 1; gf_log (this->name, GF_LOG_DEBUG, "processing changelog: %s", path); - ret = gf_changelog_consume (this, gfc, path); + ret = gf_changelog_consume (this, gfc, path, _gf_false); } if (ret) diff --git a/xlators/features/changelog/lib/src/gf-history-changelog.c b/xlators/features/changelog/lib/src/gf-history-changelog.c index bfc4cd37dc3..a895037eeca 100644 --- a/xlators/features/changelog/lib/src/gf-history-changelog.c +++ b/xlators/features/changelog/lib/src/gf-history-changelog.c @@ -11,6 +11,7 @@ #include "globals.h" #include "glusterfs.h" #include "logging.h" +#include "syscall.h" #include "gf-changelog-helpers.h" @@ -18,7 +19,8 @@ #include "changelog-misc.h" #include "changelog-mem-types.h" -/*@API +/** + * @API * gf_history_changelog_done: * Move processed history changelog file from .processing * to .processed @@ -86,6 +88,7 @@ gf_history_changelog_done (char *file) free (buffer); /* allocated by realpath() */ return ret; } + /** * @API * gf_history_changelog_start_fresh: @@ -126,7 +129,7 @@ gf_history_changelog_start_fresh () return -1; } -/* +/** * @API * gf_history_changelog_next_change: * Return the next history changelog file entry. Zero means all @@ -182,7 +185,7 @@ gf_history_changelog_next_change (char *bufptr, size_t maxlen) return -1; } -/* +/** * @API * gf_history_changelog_scan: * Scan and generate a list of change entries. @@ -191,8 +194,13 @@ gf_history_changelog_next_change (char *bufptr, size_t maxlen) * This call also acts as a cancellation point for the consumer. * * RETURN VALUES: - * nr_entries: On success. - * -1 : On error. + * +ve integer : success and keep scanning.(count of changelogs) + * 0 : success and done scanning. + * -1 : error. + * + * NOTE: After first 0 return call_get_next change for once more time + * to empty the tracker + * */ ssize_t gf_history_changelog_scan () @@ -204,10 +212,11 @@ gf_history_changelog_scan () xlator_t *this = NULL; size_t nr_entries = 0; gf_changelog_t *gfc = NULL; - gf_changelog_t *hist_gfc = NULL; + gf_changelog_t *hist_gfc = NULL; struct dirent *entryp = NULL; struct dirent *result = NULL; char buffer[PATH_MAX] = {0,}; + static int is_last_scan = 0; this = THIS; if (!this) @@ -221,15 +230,23 @@ gf_history_changelog_scan () if (!hist_gfc) goto out; + retry: + if (is_last_scan == 1) + return 0; + if (hist_gfc->hist_done == 0) + is_last_scan = 1; + errno = EINVAL; + if (hist_gfc->hist_done == -1) + goto out; tracker_fd = hist_gfc->gfc_fd; if (gf_ftruncate (tracker_fd, 0)) goto out; - len = offsetof(struct dirent, d_name) - + pathconf(hist_gfc->gfc_processing_dir, _PC_NAME_MAX) + 1; + len = offsetof (struct dirent, d_name) + + pathconf (hist_gfc->gfc_processing_dir, _PC_NAME_MAX) + 1; entryp = GF_CALLOC (1, len, gf_changelog_mt_libgfchangelog_dirent_t); if (!entryp) @@ -265,10 +282,649 @@ gf_history_changelog_scan () GF_FREE (entryp); + gf_log (this->name, GF_LOG_DEBUG, + "hist_done %d, is_last_scan: %d", hist_gfc->hist_done, is_last_scan); + if (!result) { - if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) - return nr_entries; + if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) { + if (nr_entries > 0) + return nr_entries; + else { + sleep(1); + goto retry; + } + } } out: return -1; } + +/* + * Gets timestamp value at the changelog path at index. + * Returns 0 on success(updates given time-stamp), -1 on failure. + */ +int +gf_history_get_timestamp (int fd, int index, int len, + unsigned long *ts) +{ + xlator_t *this = NULL; + int n_read = -1; + char path_buf[PATH_MAX]= {0,}; + char *iter = path_buf; + size_t offset = index * (len+1); + unsigned long value = 0; + int ret = 0; + + this = THIS; + if (!this) { + return -1; + } + + n_read = pread (fd, path_buf, len, offset); + if (n_read < 0 ) { + ret = -1; + gf_log ( this->name, GF_LOG_ERROR, + "could not read from htime file"); + goto out; + } + iter+= len - TIMESTAMP_LENGTH; + sscanf (iter, "%lu",&value); +out: + if(ret == 0) + *ts = value; + return ret; +} + +/* + * Function to ensure correctness of search + * Checks whether @value is there next to @target_index or not + */ +int +gf_history_check ( int fd, int target_index, unsigned long value, int len) +{ + int ret = 0; + unsigned long ts1 = 0; + unsigned long ts2 = 0; + + if (target_index == 0) { + ret = gf_history_get_timestamp (fd, target_index, len, &ts1); + if (ret == -1) + goto out; + if (value <= ts1) + goto out; + else { + ret = -1; + goto out; + } + } + + ret = gf_history_get_timestamp (fd, target_index, len, &ts1); + if (ret ==-1) + goto out; + ret = gf_history_get_timestamp (fd, target_index -1, len, &ts2); + if (ret ==-1) + goto out; + + if ( (value <= ts1) && (value > ts2) ) { + goto out; + } + else + ret = -1; +out: + return ret; +} + +/* + * This is a "binary search" based search function which checks neighbours + * for in-range availability of the value to be searched and provides the + * index at which the changelog file nearest to the requested timestamp(value) + * can be read from. + * + * Actual offset can be calculated as (index* (len+1) ). + * "1" is because the changelog paths are null terminated. + * + * @path : Htime file to search in + * @value : time stamp to search + * @from : start index to search + * @to : end index to search + * @len : length of fixes length strings seperated by null + */ + +int +gf_history_b_search (int fd, unsigned long value, + unsigned long from, unsigned long to, int len) +{ + int m_index = -1; + unsigned long cur_value = 0; + unsigned long ts1 = 0; + int ret = 0; + + m_index = (from + to)/2; + + if ( (to - from) <=1 ) { + /* either one or 2 changelogs left */ + if ( to != from ) { + /* check if value is less or greater than to + * return accordingly + */ + ret = gf_history_get_timestamp (fd, from, len, &ts1); + if (ret ==-1) + goto out; + if ( ts1 >= value) { + /* actually compatision should be + * exactly == but considering + * + * case of only 2 changelogs in htime file + */ + return from; + } + else + return to; + } + else + return to; + } + + ret = gf_history_get_timestamp (fd, m_index, len, &cur_value); + if (ret == -1) + goto out; + if (cur_value == value) { + return m_index; + } + else if (value > cur_value) { + ret = gf_history_get_timestamp (fd, m_index+1, len, &cur_value); + if (ret == -1) + goto out; + if (value < cur_value) + return m_index + 1; + else + return gf_history_b_search (fd, value, + m_index+1, to, len); + } + else { + if (m_index ==0) { + /* we are sure that values exists + * in this htime file + */ + return 0; + } + else { + ret = gf_history_get_timestamp (fd, m_index-1, len, + &cur_value); + if (ret == -1) + goto out; + if (value > cur_value) { + return m_index; + } + else + return gf_history_b_search (fd, value, from, + m_index-1, len); + } + } +out: + return -1; +} + +void * +gf_changelog_consume_wrap (void* data) +{ + int ret = -1; + ssize_t nread = 0; + xlator_t *this = NULL; + gf_changelog_consume_data_t *ccd = NULL; + + ccd = (gf_changelog_consume_data_t *) data; + this = ccd->this; + + ccd->retval = -1; + + nread = pread (ccd->fd, ccd->changelog, PATH_MAX, ccd->offset); + if (nread < 0) { + gf_log (this->name, GF_LOG_ERROR, + "cannot read from history metadata file (reason %s)", + strerror (errno)); + goto out; + } + + /* TODO: handle short reads and EOF. */ + + ret = gf_changelog_consume (ccd->this, + ccd->gfc, ccd->changelog, _gf_true); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "could not parse changelog: %s", ccd->changelog); + goto out; + } + + ccd->retval = 0; + + out: + return NULL; +} + +/** + * "gf_history_consume" is a worker function for history. + * parses and moves changelogs files from index "from" + * to index "to" in open htime file whose fd is "fd". + */ + +#define MAX_PARALLELS 10 + +void * +gf_history_consume (void * data) +{ + xlator_t *this = NULL; + gf_changelog_t *gfc = NULL; + gf_changelog_t *hist_gfc = NULL; + int ret = 0; + int iter = 0; + int fd = -1; + int from = -1; + int to = -1; + int len = -1; + int n_parallel = 0; + int n_envoked = 0; + gf_boolean_t publish = _gf_true; + pthread_t th_id[MAX_PARALLELS] = {0,}; + gf_changelog_history_data_t *hist_data = NULL; + gf_changelog_consume_data_t ccd[MAX_PARALLELS] = {{0},}; + gf_changelog_consume_data_t *curr = NULL; + + hist_data = (gf_changelog_history_data_t *) data; + if (hist_data == NULL) { + ret = -1; + goto out; + } + + fd = hist_data->htime_fd; + from = hist_data->from; + to = hist_data->to; + len = hist_data->len; + n_parallel = hist_data->n_parallel; + + this = THIS; + if (!this) { + ret = -1; + goto out; + } + + gfc = (gf_changelog_t *) this->private; + if (!gfc) { + ret = -1; + goto out; + } + + hist_gfc = gfc->hist_gfc; + if (!hist_gfc) { + ret = -1; + goto out; + } + + while (from <= to) { + n_envoked = 0; + + for (iter = 0 ; (iter < n_parallel) && (from <= to); iter++) { + curr = &ccd[iter]; + + curr->this = this; + curr->gfc = hist_gfc; + curr->fd = fd; + curr->offset = from * (len + 1); + + curr->retval = 0; + memset (curr->changelog, '\0', PATH_MAX); + + ret = pthread_create (&th_id[iter], NULL, + gf_changelog_consume_wrap, curr); + if (ret) { + gf_log ( this->name, GF_LOG_ERROR, + "could not create consume-thread" + " reason (%s)", strerror (ret)); + ret = -1; + goto sync; + } else + n_envoked++; + + from++; + } + + sync: + for (iter = 0; iter < n_envoked; iter++) { + ret = pthread_join (th_id[iter], NULL); + if (ret) { + publish = _gf_false; + gf_log (this->name, GF_LOG_ERROR, + "pthread_join() error %s", + strerror (ret)); + /* try to join the rest */ + continue; + } + + if (publish == _gf_false) + continue; + + curr = &ccd[iter]; + if (ccd->retval) { + publish = _gf_false; + gf_log (this->name, GF_LOG_ERROR, + "parsing error, ceased publishing..."); + continue; + } + + ret = gf_changelog_publish (curr->this, + curr->gfc, curr->changelog); + if (ret) { + publish = _gf_false; + gf_log (this->name, GF_LOG_ERROR, + "publish error, ceased publishing..."); + } + } + } + + /* informing "parsing done". */ + hist_gfc->hist_done = (publish == _gf_true) ? 0 : -1; + +out: + if (fd != -1) + close (fd); + GF_FREE (hist_data); + return NULL; +} + +/** + * @API + * gf_history_changelog() : Get/parse historical changelogs and get them ready + * for consumption. + * + * Arguments: + * @changelog_dir : Directory location from where history changelogs are + * supposed to be consumed. + * @start: Unix timestamp FROM where changelogs should be consumed. + * @end: Unix timestamp TO where changelogsshould be consumed. + * @n_parallel : degree of parallelism while changelog parsing. + * @actual_end : the end time till where changelogs are available. + * + * Return: + * Returns <timestamp> on success, the last time till where changelogs are + * available. + * Returns -1 on failure(error). + */ + +#define MAKE_HTIME_FILE_PATH(htime_file, htime_dir, htime_bname) do { \ + strcpy (htime_file, htime_dir); \ + strcat (htime_file, "/"); \ + strcat (htime_file, htime_bname); \ + } while (0) + +/** + * Extract timestamp range from a historical metadata file + * Returns: + * 0 : Success ({min,max}_ts with the appropriate values) + * -1 : Failure + * -2 : Ignore this metadata file and process next + */ +int +gf_changelog_extract_min_max (const char *dname, const char *htime_dir, + int *fd, unsigned long *total, + unsigned long *min_ts, unsigned long *max_ts) +{ + int ret = -1; + xlator_t *this = NULL; + char htime_file[PATH_MAX] = {0,}; + struct stat stbuf = {0,}; + char *iter = NULL; + char x_value[30] = {0,}; + + this = THIS; + + MAKE_HTIME_FILE_PATH (htime_file, htime_dir, dname); + + iter = (htime_file + strlen (htime_file) - TIMESTAMP_LENGTH); + sscanf (iter ,"%lu",min_ts); + + ret = stat (htime_file, &stbuf); + if (ret) { + ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "stat() failed on htime file %s (reason %s)", + htime_file, strerror (errno)); + goto out; + } + + /* ignore everything except regular files */ + if (!S_ISREG (stbuf.st_mode)) { + ret = -2; + goto out; + } + + *fd = open (htime_file, O_RDONLY); + if (*fd < 0) { + ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "open() failed for htime %s (reasong %s)", + htime_file, strerror (errno)); + goto out; + } + + /* Looks good, extract max timestamp */ + ret = sys_fgetxattr (*fd, HTIME_KEY, x_value, sizeof (x_value)); + if (ret < 0) { + ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "error extracting max timstamp from htime file" + " %s (reason %s)", htime_file, strerror (errno)); + goto out; + } + + sscanf (x_value, "%lu:%lu", max_ts, total); + gf_log (this->name, GF_LOG_INFO, + "MIN: %lu, MAX: %lu, TOTAL CHANGELOGS: %lu", + *min_ts, *max_ts, *total); + + ret = 0; + + out: + return ret; +} + +int +gf_history_changelog (char* changelog_dir, unsigned long start, + unsigned long end, int n_parallel, + unsigned long *actual_end) +{ + int ret = 0; + int len = -1; + int fd = -1; + int n_read = -1; + unsigned long min_ts = 0; + unsigned long max_ts = 0; + unsigned long end2 = 0; + unsigned long ts1 = 0; + unsigned long ts2 = 0; + unsigned long to = 0; + unsigned long from = 0; + unsigned long total_changelog = 0; + xlator_t *this = NULL; + gf_changelog_t *gfc = NULL; + gf_changelog_t *hist_gfc = NULL; + gf_changelog_history_data_t *hist_data = NULL; + DIR *dirp = NULL; + struct dirent *dp = NULL; + pthread_t consume_th = 0; + char htime_dir[PATH_MAX] = {0,}; + char buffer[PATH_MAX] = {0,}; + + pthread_attr_t attr; + + this = THIS; + if (!this) { + ret = -1; + goto out; + } + + gfc = (gf_changelog_t *) this->private; + if (!gfc) { + ret = -1; + goto out; + } + + hist_gfc = (gf_changelog_t *) gfc->hist_gfc; + if (!hist_gfc) { + ret = -1; + goto out; + } + + /* basic sanity check */ + if (start > end || n_parallel <= 0) { + ret = -1; + goto out; + } + + /* cap parallelism count */ + if (n_parallel > MAX_PARALLELS) + n_parallel = MAX_PARALLELS; + + CHANGELOG_FILL_HTIME_DIR (changelog_dir, htime_dir); + + dirp = opendir (htime_dir); + if (dirp == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "open dir on htime failed : %s (reason: %s)", + htime_dir, strerror (errno)); + ret = -1; + goto out; + } + + while ((dp = readdir (dirp)) != NULL) { + ret = gf_changelog_extract_min_max (dp->d_name, htime_dir, + &fd, &total_changelog, + &min_ts, &max_ts); + if (ret) { + if (-2 == ret) + continue; + goto out; + } + + if (start >= min_ts && start < max_ts) { + /** + * TODO: handle short reads later... + */ + n_read = read (fd, buffer, PATH_MAX); + if (n_read < 0) { + ret = -1; + gf_log ( this->name, GF_LOG_ERROR, + "unable to read htime file"); + goto out; + } + + len = strlen (buffer); + + /** + * search @start in the htime file returning it's index + * (@from) + */ + from = gf_history_b_search (fd, start, 0, + total_changelog - 1, len); + + /* ensuring correctness of gf_b_search */ + if (gf_history_check (fd, from, start, len) != 0) { + ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "wrong result for start: %lu idx: %lu", + start, from); + goto out; + } + + end2 = (end <= max_ts) ? end : max_ts; + + /** + * search @end2 in htime file returning it's index (@to) + */ + to = gf_history_b_search (fd, end2, + 0, total_changelog - 1, len); + + if (gf_history_check (fd, to, end2, len) != 0) { + ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "wrong result for start: %lu idx: %lu", + end2, to); + goto out; + } + + ret = gf_history_get_timestamp (fd, from, len, &ts1); + if (ret == -1) + goto out; + + ret = gf_history_get_timestamp (fd, to, len, &ts2); + if (ret == -1) + goto out; + + gf_log (this->name, GF_LOG_INFO, + "FINAL: from: %lu, to: %lu, changes: %lu", + ts1, ts2, (to - from + 1)); + + hist_data = GF_CALLOC (1, + sizeof (gf_changelog_history_data_t), + gf_changelog_mt_history_data_t); + + hist_data->htime_fd = fd; + hist_data->from = from; + hist_data->to = to; + hist_data->len = len; + hist_data->n_parallel = n_parallel; + + ret = pthread_attr_init (&attr); + if (ret != 0) { + ret = -1; + goto out; + } + + ret = pthread_attr_setdetachstate + (&attr, PTHREAD_CREATE_DETACHED); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "unable to sets the detach" + " state attribute, reason(%s)", + strerror (ret)); + ret = -1; + goto out; + } + + /* spawn a thread for background parsing & publishing */ + ret = pthread_create (&consume_th, &attr, + gf_history_consume, hist_data); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "creation of consume parent-thread" + " failed. reason(%s)", strerror (ret)); + ret = -1; + goto out; + } + + goto out; + + } /* end of range check */ + + } /* end of readdir() */ + + if (!from || !to) + ret = -1; + +out: + if (dirp != NULL) + closedir (dirp); + + if (ret < 0) { + if (fd != -1) + close (fd); + GF_FREE (hist_data); + (void) pthread_attr_destroy (&attr); + + return ret; + } + + hist_gfc->hist_done = 1; + *actual_end = ts2; + + return ret; +} |