diff options
author | Gluster Ant <bugzilla-bot@gluster.org> | 2018-09-12 17:52:45 +0530 |
---|---|---|
committer | Nigel Babu <nigelb@redhat.com> | 2018-09-12 17:52:45 +0530 |
commit | e16868dede6455cab644805af6fe1ac312775e13 (patch) | |
tree | 15aebdb4fff2d87cf8a72f836816b3aa634da58d /xlators/features/changelog/lib/src | |
parent | 45a71c0548b6fd2c757aa2e7b7671a1411948894 (diff) |
Land part 2 of clang-format changes
Change-Id: Ia84cc24c8924e6d22d02ac15f611c10e26db99b4
Signed-off-by: Nigel Babu <nigelb@redhat.com>
Diffstat (limited to 'xlators/features/changelog/lib/src')
-rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog-api.c | 305 | ||||
-rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog-helpers.c | 256 | ||||
-rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog-journal-handler.c | 1635 | ||||
-rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog-reborp.c | 581 | ||||
-rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog-rpc.c | 90 | ||||
-rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog.c | 859 | ||||
-rw-r--r-- | xlators/features/changelog/lib/src/gf-history-changelog.c | 1551 |
7 files changed, 2600 insertions, 2677 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog-api.c b/xlators/features/changelog/lib/src/gf-changelog-api.c index 372550c7acf..1b6e932596d 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-api.c +++ b/xlators/features/changelog/lib/src/gf-changelog-api.c @@ -19,57 +19,54 @@ #include "changelog-lib-messages.h" int -gf_changelog_done (char *file) +gf_changelog_done(char *file) { - int ret = -1; - char *buffer = NULL; - xlator_t *this = NULL; - gf_changelog_journal_t *jnl = NULL; - char to_path[PATH_MAX] = {0,}; - - errno = EINVAL; - - this = THIS; - if (!this) - goto out; - - jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); - if (!jnl) - goto out; - - if (!file || !strlen (file)) - goto out; - - /* make sure 'file' is inside ->jnl_working_dir */ - buffer = realpath (file, NULL); - if (!buffer) - goto out; - - if (strncmp (jnl->jnl_working_dir, - buffer, strlen (jnl->jnl_working_dir))) - goto out; - - (void) snprintf (to_path, PATH_MAX, "%s%s", - jnl->jnl_processed_dir, basename (buffer)); - gf_msg_debug (this->name, 0, - "moving %s to processed directory", file); - ret = sys_rename (buffer, to_path); - if (ret) { - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_RENAME_FAILED, - "cannot move changelog file", - "from=%s", file, - "to=%s", to_path, - NULL); - goto out; - } - - ret = 0; + int ret = -1; + char *buffer = NULL; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + char to_path[PATH_MAX] = { + 0, + }; + + errno = EINVAL; + + this = THIS; + if (!this) + goto out; + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; + + if (!file || !strlen(file)) + goto out; + + /* make sure 'file' is inside ->jnl_working_dir */ + buffer = realpath(file, NULL); + if (!buffer) + goto out; + + if (strncmp(jnl->jnl_working_dir, buffer, strlen(jnl->jnl_working_dir))) + goto out; + + (void)snprintf(to_path, PATH_MAX, "%s%s", jnl->jnl_processed_dir, + basename(buffer)); + gf_msg_debug(this->name, 0, "moving %s to processed directory", file); + ret = sys_rename(buffer, to_path); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_RENAME_FAILED, "cannot move changelog file", + "from=%s", file, "to=%s", to_path, NULL); + goto out; + } + + ret = 0; - out: - if (buffer) - free (buffer); /* allocated by realpath() */ - return ret; +out: + if (buffer) + free(buffer); /* allocated by realpath() */ + return ret; } /** @@ -77,28 +74,28 @@ gf_changelog_done (char *file) * for a set of changelogs, start from the beginning */ int -gf_changelog_start_fresh () +gf_changelog_start_fresh() { - xlator_t *this = NULL; - gf_changelog_journal_t *jnl = NULL; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; - this = THIS; - if (!this) - goto out; + this = THIS; + if (!this) + goto out; - errno = EINVAL; + errno = EINVAL; - jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); - if (!jnl) - goto out; + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; - if (gf_ftruncate (jnl->jnl_fd, 0)) - goto out; + if (gf_ftruncate(jnl->jnl_fd, 0)) + goto out; - return 0; + return 0; - out: - return -1; +out: + return -1; } /** @@ -107,40 +104,42 @@ gf_changelog_start_fresh () * consumed. */ ssize_t -gf_changelog_next_change (char *bufptr, size_t maxlen) +gf_changelog_next_change(char *bufptr, size_t maxlen) { - ssize_t size = -1; - int tracker_fd = 0; - xlator_t *this = NULL; - gf_changelog_journal_t *jnl = NULL; - char buffer[PATH_MAX] = {0,}; + ssize_t size = -1; + int tracker_fd = 0; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + char buffer[PATH_MAX] = { + 0, + }; - errno = EINVAL; + errno = EINVAL; - this = THIS; - if (!this) - goto out; + this = THIS; + if (!this) + goto out; - jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); - if (!jnl) - goto out; + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; - tracker_fd = jnl->jnl_fd; + tracker_fd = jnl->jnl_fd; - size = gf_readline (tracker_fd, buffer, maxlen); - if (size < 0) { - size = -1; - goto out; - } + size = gf_readline(tracker_fd, buffer, maxlen); + if (size < 0) { + size = -1; + goto out; + } - if (size == 0) - goto out; + if (size == 0) + goto out; - memcpy (bufptr, buffer, size - 1); - bufptr[size - 1] = '\0'; + memcpy(bufptr, buffer, size - 1); + bufptr[size - 1] = '\0'; out: - return size; + return size; } /** @@ -152,70 +151,74 @@ out: * This call also acts as a cancellation point for the consumer. */ ssize_t -gf_changelog_scan () +gf_changelog_scan() { - int tracker_fd = 0; - size_t off = 0; - xlator_t *this = NULL; - size_t nr_entries = 0; - gf_changelog_journal_t *jnl = NULL; - struct dirent *entry = NULL; - struct dirent scratch[2] = {{0,},}; - char buffer[PATH_MAX] = {0,}; - - this = THIS; - if (!this) - goto out; - - jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); - if (!jnl) - goto out; - if (JNL_IS_API_DISCONNECTED (jnl)) { - errno = ENOTCONN; - goto out; + int tracker_fd = 0; + size_t off = 0; + xlator_t *this = NULL; + size_t nr_entries = 0; + gf_changelog_journal_t *jnl = NULL; + struct dirent *entry = NULL; + struct dirent scratch[2] = { + { + 0, + }, + }; + char buffer[PATH_MAX] = { + 0, + }; + + this = THIS; + if (!this) + goto out; + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; + if (JNL_IS_API_DISCONNECTED(jnl)) { + errno = ENOTCONN; + goto out; + } + + errno = EINVAL; + + tracker_fd = jnl->jnl_fd; + if (gf_ftruncate(tracker_fd, 0)) + goto out; + + rewinddir(jnl->jnl_dir); + + for (;;) { + errno = 0; + entry = sys_readdir(jnl->jnl_dir, scratch); + if (!entry || errno != 0) + break; + + if (!strcmp(basename(entry->d_name), ".") || + !strcmp(basename(entry->d_name), "..")) + continue; + + nr_entries++; + + GF_CHANGELOG_FILL_BUFFER(jnl->jnl_processing_dir, buffer, off, + strlen(jnl->jnl_processing_dir)); + GF_CHANGELOG_FILL_BUFFER(entry->d_name, buffer, off, + strlen(entry->d_name)); + GF_CHANGELOG_FILL_BUFFER("\n", buffer, off, 1); + + if (gf_changelog_write(tracker_fd, buffer, off) != off) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_WRITE_FAILED, + "error writing changelog filename" + " to tracker file"); + break; } + off = 0; + } - errno = EINVAL; - - tracker_fd = jnl->jnl_fd; - if (gf_ftruncate (tracker_fd, 0)) - goto out; - - rewinddir (jnl->jnl_dir); - - for (;;) { - errno = 0; - entry = sys_readdir (jnl->jnl_dir, scratch); - if (!entry || errno != 0) - break; - - if (!strcmp (basename (entry->d_name), ".") - || !strcmp (basename (entry->d_name), "..")) - continue; - - nr_entries++; - - GF_CHANGELOG_FILL_BUFFER (jnl->jnl_processing_dir, - buffer, off, - strlen (jnl->jnl_processing_dir)); - GF_CHANGELOG_FILL_BUFFER (entry->d_name, buffer, - off, strlen (entry->d_name)); - GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1); - - if (gf_changelog_write (tracker_fd, buffer, off) != off) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_LIB_MSG_WRITE_FAILED, - "error writing changelog filename" - " to tracker file"); - break; - } - off = 0; - } - - if (!entry) { - if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) - return nr_entries; - } - out: - return -1; + if (!entry) { + if (gf_lseek(tracker_fd, 0, SEEK_SET) != -1) + return nr_entries; + } +out: + return -1; } diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.c b/xlators/features/changelog/lib/src/gf-changelog-helpers.c index 9ff1d135933..fd15ec68ab8 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-helpers.c +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.c @@ -13,38 +13,40 @@ #include "changelog-lib-messages.h" #include "syscall.h" -ssize_t gf_changelog_read_path (int fd, char *buffer, size_t bufsize) +ssize_t +gf_changelog_read_path(int fd, char *buffer, size_t bufsize) { - return sys_read (fd, buffer, bufsize); + return sys_read(fd, buffer, bufsize); } size_t -gf_changelog_write (int fd, char *buffer, size_t len) +gf_changelog_write(int fd, char *buffer, size_t len) { - ssize_t size = 0; - size_t written = 0; + ssize_t size = 0; + size_t written = 0; - while (written < len) { - size = sys_write (fd, buffer + written, len - written); - if (size <= 0) - break; + while (written < len) { + size = sys_write(fd, buffer + written, len - written); + if (size <= 0) + break; - written += size; - } + written += size; + } - return written; + return written; } void -gf_rfc3986_encode_space_newline (unsigned char *s, char *enc, char *estr) +gf_rfc3986_encode_space_newline(unsigned char *s, char *enc, char *estr) { - for (; *s; s++) { - if (estr[*s]) - sprintf(enc, "%c", estr[*s]); - else - sprintf(enc, "%%%02X", *s); - while (*++enc); - } + for (; *s; s++) { + if (estr[*s]) + sprintf(enc, "%c", estr[*s]); + else + sprintf(enc, "%%%02X", *s); + while (*++enc) + ; + } } /** @@ -66,154 +68,152 @@ static pthread_key_t rl_key; static pthread_once_t rl_once = PTHREAD_ONCE_INIT; static void -readline_destructor (void *ptr) +readline_destructor(void *ptr) { - GF_FREE (ptr); + GF_FREE(ptr); } static void -readline_once (void) +readline_once(void) { - pthread_key_create (&rl_key, readline_destructor); + pthread_key_create(&rl_key, readline_destructor); } static ssize_t -my_read (read_line_t *tsd, int fd, char *ptr) +my_read(read_line_t *tsd, int fd, char *ptr) { - if (tsd->rl_cnt <= 0) { - tsd->rl_cnt = sys_read (fd, tsd->rl_buf, MAXLINE); - - if (tsd->rl_cnt < 0) - return -1; - else if (tsd->rl_cnt == 0) - return 0; - tsd->rl_bufptr = tsd->rl_buf; - } - - tsd->rl_cnt--; - *ptr = *tsd->rl_bufptr++; - return 1; + if (tsd->rl_cnt <= 0) { + tsd->rl_cnt = sys_read(fd, tsd->rl_buf, MAXLINE); + + if (tsd->rl_cnt < 0) + return -1; + else if (tsd->rl_cnt == 0) + return 0; + tsd->rl_bufptr = tsd->rl_buf; + } + + tsd->rl_cnt--; + *ptr = *tsd->rl_bufptr++; + return 1; } static int -gf_readline_init_once (read_line_t **tsd) +gf_readline_init_once(read_line_t **tsd) { - if (pthread_once (&rl_once, readline_once) != 0) - return -1; + if (pthread_once(&rl_once, readline_once) != 0) + return -1; - *tsd = pthread_getspecific (rl_key); - if (*tsd) - goto out; + *tsd = pthread_getspecific(rl_key); + if (*tsd) + goto out; - *tsd = GF_CALLOC (1, sizeof (**tsd), - gf_changelog_mt_libgfchangelog_rl_t); - if (!*tsd) - return -1; + *tsd = GF_CALLOC(1, sizeof(**tsd), gf_changelog_mt_libgfchangelog_rl_t); + if (!*tsd) + return -1; - if (pthread_setspecific (rl_key, *tsd) != 0) - return -1; + if (pthread_setspecific(rl_key, *tsd) != 0) + return -1; - out: - return 0; +out: + return 0; } ssize_t -gf_readline (int fd, void *vptr, size_t maxlen) +gf_readline(int fd, void *vptr, size_t maxlen) { - size_t n = 0; - size_t rc = 0; - char c = ' '; - char *ptr = NULL; - read_line_t *tsd = NULL; - - if (gf_readline_init_once (&tsd)) - return -1; - - ptr = vptr; - for (n = 1; n < maxlen; n++) { - if ( (rc = my_read (tsd, fd, &c)) == 1 ) { - *ptr++ = c; - if (c == '\n') - break; - } else if (rc == 0) { - *ptr = '\0'; - return (n - 1); - } else - return -1; - } - - *ptr = '\0'; - return n; + size_t n = 0; + size_t rc = 0; + char c = ' '; + char *ptr = NULL; + read_line_t *tsd = NULL; + + if (gf_readline_init_once(&tsd)) + return -1; + ptr = vptr; + for (n = 1; n < maxlen; n++) { + if ((rc = my_read(tsd, fd, &c)) == 1) { + *ptr++ = c; + if (c == '\n') + break; + } else if (rc == 0) { + *ptr = '\0'; + return (n - 1); + } else + return -1; + } + + *ptr = '\0'; + return n; } off_t -gf_lseek (int fd, off_t offset, int whence) +gf_lseek(int fd, off_t offset, int whence) { - off_t off = 0; - read_line_t *tsd = NULL; + off_t off = 0; + read_line_t *tsd = NULL; - if (gf_readline_init_once (&tsd)) - return -1; + if (gf_readline_init_once(&tsd)) + return -1; - off = sys_lseek (fd, offset, whence); - if (off == -1) - return -1; + off = sys_lseek(fd, offset, whence); + if (off == -1) + return -1; - tsd->rl_cnt = 0; - tsd->rl_bufptr = tsd->rl_buf; + tsd->rl_cnt = 0; + tsd->rl_bufptr = tsd->rl_buf; - return off; + return off; } int -gf_ftruncate (int fd, off_t length) +gf_ftruncate(int fd, off_t length) { - read_line_t *tsd = NULL; + read_line_t *tsd = NULL; - if (gf_readline_init_once (&tsd)) - return -1; + if (gf_readline_init_once(&tsd)) + return -1; - if (sys_ftruncate (fd, 0)) - return -1; + if (sys_ftruncate(fd, 0)) + return -1; - tsd->rl_cnt = 0; - tsd->rl_bufptr = tsd->rl_buf; + tsd->rl_cnt = 0; + tsd->rl_bufptr = tsd->rl_buf; - return 0; + return 0; } int -gf_thread_cleanup (xlator_t *this, pthread_t thread) +gf_thread_cleanup(xlator_t *this, pthread_t thread) { - int ret = 0; - void *res = NULL; - - ret = pthread_cancel (thread); - if (ret != 0) { - gf_msg (this->name, GF_LOG_WARNING, 0, - CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, - "Failed to send cancellation to thread"); - goto error_return; - } - - ret = pthread_join (thread, &res); - if (ret != 0) { - gf_msg (this->name, GF_LOG_WARNING, 0, - CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, - "failed to join thread"); - goto error_return; - } - - if (res != PTHREAD_CANCELED) { - gf_msg (this->name, GF_LOG_WARNING, 0, - CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, - "Thread could not be cleaned up"); - goto error_return; - } - - return 0; - - error_return: - return -1; + int ret = 0; + void *res = NULL; + + ret = pthread_cancel(thread); + if (ret != 0) { + gf_msg(this->name, GF_LOG_WARNING, 0, + CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, + "Failed to send cancellation to thread"); + goto error_return; + } + + ret = pthread_join(thread, &res); + if (ret != 0) { + gf_msg(this->name, GF_LOG_WARNING, 0, + CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, + "failed to join thread"); + goto error_return; + } + + if (res != PTHREAD_CANCELED) { + gf_msg(this->name, GF_LOG_WARNING, 0, + CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, + "Thread could not be cleaned up"); + goto error_return; + } + + return 0; + +error_return: + return -1; } diff --git a/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c index bdb410030f6..ef46bf50c97 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c +++ b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c @@ -25,112 +25,107 @@ extern int byebye; -enum changelog_versions { - VERSION_1_1 = 0, - VERSION_1_2 = 1 -}; +enum changelog_versions { VERSION_1_1 = 0, VERSION_1_2 = 1 }; /** * number of gfid records after fop number */ -int nr_gfids[2][GF_FOP_MAXVALUE] = { - { - [GF_FOP_MKNOD] = 1, - [GF_FOP_MKDIR] = 1, - [GF_FOP_UNLINK] = 1, - [GF_FOP_RMDIR] = 1, - [GF_FOP_SYMLINK] = 1, - [GF_FOP_RENAME] = 2, - [GF_FOP_LINK] = 1, - [GF_FOP_CREATE] = 1, - }, - { - [GF_FOP_MKNOD] = 1, - [GF_FOP_MKDIR] = 1, - [GF_FOP_UNLINK] = 2, - [GF_FOP_RMDIR] = 2, - [GF_FOP_SYMLINK] = 1, - [GF_FOP_RENAME] = 2, - [GF_FOP_LINK] = 1, - [GF_FOP_CREATE] = 1, - } -}; - -int nr_extra_recs[2][GF_FOP_MAXVALUE] = { - { - [GF_FOP_MKNOD] = 3, - [GF_FOP_MKDIR] = 3, - [GF_FOP_UNLINK] = 0, - [GF_FOP_RMDIR] = 0, - [GF_FOP_SYMLINK] = 0, - [GF_FOP_RENAME] = 0, - [GF_FOP_LINK] = 0, - [GF_FOP_CREATE] = 3, - }, - { - [GF_FOP_MKNOD] = 3, - [GF_FOP_MKDIR] = 3, - [GF_FOP_UNLINK] = 0, - [GF_FOP_RMDIR] = 0, - [GF_FOP_SYMLINK] = 0, - [GF_FOP_RENAME] = 0, - [GF_FOP_LINK] = 0, - [GF_FOP_CREATE] = 3, - } -}; +int nr_gfids[2][GF_FOP_MAXVALUE] = {{ + [GF_FOP_MKNOD] = 1, + [GF_FOP_MKDIR] = 1, + [GF_FOP_UNLINK] = 1, + [GF_FOP_RMDIR] = 1, + [GF_FOP_SYMLINK] = 1, + [GF_FOP_RENAME] = 2, + [GF_FOP_LINK] = 1, + [GF_FOP_CREATE] = 1, + }, + { + [GF_FOP_MKNOD] = 1, + [GF_FOP_MKDIR] = 1, + [GF_FOP_UNLINK] = 2, + [GF_FOP_RMDIR] = 2, + [GF_FOP_SYMLINK] = 1, + [GF_FOP_RENAME] = 2, + [GF_FOP_LINK] = 1, + [GF_FOP_CREATE] = 1, + }}; + +int nr_extra_recs[2][GF_FOP_MAXVALUE] = {{ + [GF_FOP_MKNOD] = 3, + [GF_FOP_MKDIR] = 3, + [GF_FOP_UNLINK] = 0, + [GF_FOP_RMDIR] = 0, + [GF_FOP_SYMLINK] = 0, + [GF_FOP_RENAME] = 0, + [GF_FOP_LINK] = 0, + [GF_FOP_CREATE] = 3, + }, + { + [GF_FOP_MKNOD] = 3, + [GF_FOP_MKDIR] = 3, + [GF_FOP_UNLINK] = 0, + [GF_FOP_RMDIR] = 0, + [GF_FOP_SYMLINK] = 0, + [GF_FOP_RENAME] = 0, + [GF_FOP_LINK] = 0, + [GF_FOP_CREATE] = 3, + }}; static char * -binary_to_ascii (uuid_t uuid) +binary_to_ascii(uuid_t uuid) { - return uuid_utoa (uuid); + return uuid_utoa(uuid); } static char * -conv_noop (char *ptr) { return ptr; } - -#define VERIFY_SEPARATOR(ptr, plen, perr) \ - { \ - if (*(ptr + plen) != '\0') { \ - perr = 1; \ - break; \ - } \ - } +conv_noop(char *ptr) +{ + return ptr; +} -#define MOVER_MOVE(mover, nleft, bytes) \ - { \ - mover += bytes; \ - nleft -= bytes; \ - } \ - -#define PARSE_GFID(mov, ptr, le, fn, perr) \ - { \ - VERIFY_SEPARATOR (mov, le, perr); \ - ptr = fn (mov); \ - if (!ptr) { \ - perr = 1; \ - break; \ - } \ - } +#define VERIFY_SEPARATOR(ptr, plen, perr) \ + { \ + if (*(ptr + plen) != '\0') { \ + perr = 1; \ + break; \ + } \ + } -#define FILL_AND_MOVE(pt, buf, of, mo, nl, le) \ - { \ - GF_CHANGELOG_FILL_BUFFER (pt, buf, of, strlen (pt)); \ - MOVER_MOVE (mo, nl, le); \ - } +#define MOVER_MOVE(mover, nleft, bytes) \ + { \ + mover += bytes; \ + nleft -= bytes; \ + } + +#define PARSE_GFID(mov, ptr, le, fn, perr) \ + { \ + VERIFY_SEPARATOR(mov, le, perr); \ + ptr = fn(mov); \ + if (!ptr) { \ + perr = 1; \ + break; \ + } \ + } +#define FILL_AND_MOVE(pt, buf, of, mo, nl, le) \ + { \ + GF_CHANGELOG_FILL_BUFFER(pt, buf, of, strlen(pt)); \ + MOVER_MOVE(mo, nl, le); \ + } -#define PARSE_GFID_MOVE(ptr, uuid, mover, nleft, perr) \ - { \ - memcpy (uuid, mover, sizeof (uuid_t)); \ - ptr = binary_to_ascii (uuid); \ - if (!ptr) { \ - perr = 1; \ - break; \ - } \ - MOVER_MOVE (mover, nleft, sizeof (uuid_t)); \ - } \ +#define PARSE_GFID_MOVE(ptr, uuid, mover, nleft, perr) \ + { \ + memcpy(uuid, mover, sizeof(uuid_t)); \ + ptr = binary_to_ascii(uuid); \ + if (!ptr) { \ + perr = 1; \ + break; \ + } \ + MOVER_MOVE(mover, nleft, sizeof(uuid_t)); \ + } -#define LINE_BUFSIZE (3*PATH_MAX) /* enough buffer for extra chars too */ +#define LINE_BUFSIZE (3 * PATH_MAX) /* enough buffer for extra chars too */ /** * using mmap() makes parsing easy. fgets() cannot be used here as @@ -145,111 +140,107 @@ conv_noop (char *ptr) { return ptr; } */ static int -gf_changelog_parse_binary (xlator_t *this, - gf_changelog_journal_t *jnl, - int from_fd, int to_fd, - size_t start_offset, struct stat *stbuf, - int version_idx) +gf_changelog_parse_binary(xlator_t *this, gf_changelog_journal_t *jnl, + int from_fd, int to_fd, size_t start_offset, + struct stat *stbuf, int version_idx) { - int ret = -1; - off_t off = 0; - off_t nleft = 0; - uuid_t uuid = {0,}; - char *ptr = NULL; - char *bname_start = NULL; - char *bname_end = NULL; - char *mover = NULL; - void *start = NULL; - char current_mover = ' '; - size_t blen = 0; - int parse_err = 0; - char *ascii = NULL; - - ascii = GF_CALLOC (LINE_BUFSIZE, sizeof(char), gf_common_mt_char); - - nleft = stbuf->st_size; - - start = mmap (NULL, nleft, PROT_READ, MAP_PRIVATE, from_fd, 0); - if (start == MAP_FAILED) { - gf_msg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_MMAP_FAILED, - "mmap() error"); - goto out; - } - - mover = start; + int ret = -1; + off_t off = 0; + off_t nleft = 0; + uuid_t uuid = { + 0, + }; + char *ptr = NULL; + char *bname_start = NULL; + char *bname_end = NULL; + char *mover = NULL; + void *start = NULL; + char current_mover = ' '; + size_t blen = 0; + int parse_err = 0; + char *ascii = NULL; + + ascii = GF_CALLOC(LINE_BUFSIZE, sizeof(char), gf_common_mt_char); + + nleft = stbuf->st_size; + + start = mmap(NULL, nleft, PROT_READ, MAP_PRIVATE, from_fd, 0); + if (start == MAP_FAILED) { + gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MMAP_FAILED, + "mmap() error"); + goto out; + } - MOVER_MOVE (mover, nleft, start_offset); + mover = start; - while (nleft > 0) { + MOVER_MOVE(mover, nleft, start_offset); - off = blen = 0; - ptr = bname_start = bname_end = NULL; + while (nleft > 0) { + off = blen = 0; + ptr = bname_start = bname_end = NULL; - current_mover = *mover; + current_mover = *mover; - switch (current_mover) { - case 'D': - case 'M': - MOVER_MOVE (mover, nleft, 1); - PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err); + switch (current_mover) { + case 'D': + case 'M': + MOVER_MOVE(mover, nleft, 1); + PARSE_GFID_MOVE(ptr, uuid, mover, nleft, parse_err); - break; + break; - case 'E': - MOVER_MOVE (mover, nleft, 1); - PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err); + case 'E': + MOVER_MOVE(mover, nleft, 1); + PARSE_GFID_MOVE(ptr, uuid, mover, nleft, parse_err); - bname_start = mover; - bname_end = strchr (mover, '\n'); - if (bname_end == NULL) { - parse_err = 1; - break; - } + bname_start = mover; + bname_end = strchr(mover, '\n'); + if (bname_end == NULL) { + parse_err = 1; + break; + } - blen = bname_end - bname_start; - MOVER_MOVE (mover, nleft, blen); + blen = bname_end - bname_start; + MOVER_MOVE(mover, nleft, blen); - break; + break; - default: - parse_err = 1; - } + default: + parse_err = 1; + } - if (parse_err) - break; + if (parse_err) + break; + + GF_CHANGELOG_FILL_BUFFER(¤t_mover, ascii, off, 1); + GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1); + GF_CHANGELOG_FILL_BUFFER(ptr, ascii, off, strlen(ptr)); + if (blen) + GF_CHANGELOG_FILL_BUFFER(bname_start, ascii, off, blen); + GF_CHANGELOG_FILL_BUFFER("\n", ascii, off, 1); + + if (gf_changelog_write(to_fd, ascii, off) != off) { + gf_msg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_ASCII_ERROR, + "processing binary changelog failed due to " + " error in writing ascii change"); + break; + } - GF_CHANGELOG_FILL_BUFFER (¤t_mover, ascii, off, 1); - GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); - GF_CHANGELOG_FILL_BUFFER (ptr, ascii, off, strlen (ptr)); - if (blen) - GF_CHANGELOG_FILL_BUFFER (bname_start, - ascii, off, blen); - GF_CHANGELOG_FILL_BUFFER ("\n", ascii, off, 1); - - if (gf_changelog_write (to_fd, ascii, off) != off) { - gf_msg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_ASCII_ERROR, - "processing binary changelog failed due to " - " error in writing ascii change"); - break; - } + MOVER_MOVE(mover, nleft, 1); + } - MOVER_MOVE (mover, nleft, 1); - } + if ((nleft == 0) && (!parse_err)) + ret = 0; - if ((nleft == 0) && (!parse_err)) - ret = 0; - - if (munmap (start, stbuf->st_size)) - gf_msg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_MUNMAP_FAILED, - "munmap() error"); - out: - if (ascii) - GF_FREE (ascii); - return ret; + if (munmap(start, stbuf->st_size)) + gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MUNMAP_FAILED, + "munmap() error"); +out: + if (ascii) + GF_FREE(ascii); + return ret; } /** @@ -258,804 +249,784 @@ gf_changelog_parse_binary (xlator_t *this, * - use fop name rather than fop number */ static int -gf_changelog_parse_ascii (xlator_t *this, - gf_changelog_journal_t *jnl, - int from_fd, int to_fd, - size_t start_offset, struct stat *stbuf, - int version_idx) +gf_changelog_parse_ascii(xlator_t *this, gf_changelog_journal_t *jnl, + int from_fd, int to_fd, size_t start_offset, + struct stat *stbuf, int version_idx) { - int ng = 0; - int ret = -1; - int fop = 0; - int len = 0; - off_t off = 0; - off_t nleft = 0; - char *ptr = NULL; - char *eptr = NULL; - void *start = NULL; - char *mover = NULL; - int parse_err = 0; - char current_mover = ' '; - char *ascii = NULL; - const char *fopname = NULL; - - ascii = GF_CALLOC (LINE_BUFSIZE, sizeof(char), gf_common_mt_char); - - nleft = stbuf->st_size; - - start = mmap (NULL, nleft, PROT_READ, MAP_PRIVATE, from_fd, 0); - if (start == MAP_FAILED) { - gf_msg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_MMAP_FAILED, - "mmap() error"); - goto out; - } + int ng = 0; + int ret = -1; + int fop = 0; + int len = 0; + off_t off = 0; + off_t nleft = 0; + char *ptr = NULL; + char *eptr = NULL; + void *start = NULL; + char *mover = NULL; + int parse_err = 0; + char current_mover = ' '; + char *ascii = NULL; + const char *fopname = NULL; + + ascii = GF_CALLOC(LINE_BUFSIZE, sizeof(char), gf_common_mt_char); + + nleft = stbuf->st_size; + + start = mmap(NULL, nleft, PROT_READ, MAP_PRIVATE, from_fd, 0); + if (start == MAP_FAILED) { + gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MMAP_FAILED, + "mmap() error"); + goto out; + } - mover = start; + mover = start; - MOVER_MOVE (mover, nleft, start_offset); + MOVER_MOVE(mover, nleft, start_offset); - while (nleft > 0) { - off = 0; - current_mover = *mover; + while (nleft > 0) { + off = 0; + current_mover = *mover; - GF_CHANGELOG_FILL_BUFFER (¤t_mover, ascii, off, 1); - GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); + GF_CHANGELOG_FILL_BUFFER(¤t_mover, ascii, off, 1); + GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1); - switch (current_mover) { - case 'D': - MOVER_MOVE (mover, nleft, 1); + switch (current_mover) { + case 'D': + MOVER_MOVE(mover, nleft, 1); - /* target gfid */ - PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN, - conv_noop, parse_err); - FILL_AND_MOVE(ptr, ascii, off, - mover, nleft, UUID_CANONICAL_FORM_LEN); - break; - case 'M': - MOVER_MOVE (mover, nleft, 1); + /* target gfid */ + PARSE_GFID(mover, ptr, UUID_CANONICAL_FORM_LEN, conv_noop, + parse_err); + FILL_AND_MOVE(ptr, ascii, off, mover, nleft, + UUID_CANONICAL_FORM_LEN); + break; + case 'M': + MOVER_MOVE(mover, nleft, 1); + + /* target gfid */ + PARSE_GFID(mover, ptr, UUID_CANONICAL_FORM_LEN, conv_noop, + parse_err); + FILL_AND_MOVE(ptr, ascii, off, mover, nleft, + UUID_CANONICAL_FORM_LEN); + FILL_AND_MOVE(" ", ascii, off, mover, nleft, 1); + + /* fop */ + len = strlen(mover); + VERIFY_SEPARATOR(mover, len, parse_err); + + fop = atoi(mover); + fopname = gf_fop_list[fop]; + if (fopname == NULL) { + parse_err = 1; + break; + } - /* target gfid */ - PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN, - conv_noop, parse_err); - FILL_AND_MOVE (ptr, ascii, off, - mover, nleft, UUID_CANONICAL_FORM_LEN); - FILL_AND_MOVE (" ", ascii, off, mover, nleft, 1); + MOVER_MOVE(mover, nleft, len); - /* fop */ - len = strlen (mover); - VERIFY_SEPARATOR (mover, len, parse_err); + len = strlen(fopname); + GF_CHANGELOG_FILL_BUFFER(fopname, ascii, off, len); - fop = atoi (mover); - fopname = gf_fop_list[fop]; - if (fopname == NULL) { - parse_err = 1; - break; - } + break; - MOVER_MOVE (mover, nleft, len); + case 'E': + MOVER_MOVE(mover, nleft, 1); + + /* target gfid */ + PARSE_GFID(mover, ptr, UUID_CANONICAL_FORM_LEN, conv_noop, + parse_err); + FILL_AND_MOVE(ptr, ascii, off, mover, nleft, + UUID_CANONICAL_FORM_LEN); + FILL_AND_MOVE(" ", ascii, off, mover, nleft, 1); + + /* fop */ + len = strlen(mover); + VERIFY_SEPARATOR(mover, len, parse_err); + + fop = atoi(mover); + fopname = gf_fop_list[fop]; + if (fopname == NULL) { + parse_err = 1; + break; + } - len = strlen (fopname); - GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len); + MOVER_MOVE(mover, nleft, len); - break; + len = strlen(fopname); + GF_CHANGELOG_FILL_BUFFER(fopname, ascii, off, len); - case 'E': - MOVER_MOVE (mover, nleft, 1); - - /* target gfid */ - PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN, - conv_noop, parse_err); - FILL_AND_MOVE (ptr, ascii, off, - mover, nleft, UUID_CANONICAL_FORM_LEN); - FILL_AND_MOVE (" ", ascii, off, - mover, nleft, 1); - - /* fop */ - len = strlen (mover); - VERIFY_SEPARATOR (mover, len, parse_err); - - fop = atoi (mover); - fopname = gf_fop_list[fop]; - if (fopname == NULL) { - parse_err = 1; - break; - } - - MOVER_MOVE (mover, nleft, len); - - len = strlen (fopname); - GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len); - - ng = nr_extra_recs[version_idx][fop]; - for (; ng > 0; ng--) { - MOVER_MOVE (mover, nleft, 1); - len = strlen (mover); - VERIFY_SEPARATOR (mover, len, parse_err); - - GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); - FILL_AND_MOVE (mover, ascii, - off, mover, nleft, len); - } - - /* pargfid + bname */ - ng = nr_gfids[version_idx][fop]; - while (ng-- > 0) { - MOVER_MOVE (mover, nleft, 1); - len = strlen (mover); - if (!len) { - MOVER_MOVE (mover, nleft, 1); - continue; - } - - GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); - - PARSE_GFID (mover, ptr, len, - conv_noop, parse_err); - eptr = calloc (3, strlen (ptr)); - if (!eptr) { - parse_err = 1; - break; - } - - gf_rfc3986_encode_space_newline ( - (unsigned char *) ptr, - eptr, - jnl->rfc3986_space_newline); - FILL_AND_MOVE (eptr, ascii, off, - mover, nleft, len); - free (eptr); - } + ng = nr_extra_recs[version_idx][fop]; + for (; ng > 0; ng--) { + MOVER_MOVE(mover, nleft, 1); + len = strlen(mover); + VERIFY_SEPARATOR(mover, len, parse_err); - break; - default: - parse_err = 1; + GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1); + FILL_AND_MOVE(mover, ascii, off, mover, nleft, len); } - if (parse_err) + /* pargfid + bname */ + ng = nr_gfids[version_idx][fop]; + while (ng-- > 0) { + MOVER_MOVE(mover, nleft, 1); + len = strlen(mover); + if (!len) { + MOVER_MOVE(mover, nleft, 1); + continue; + } + + GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1); + + PARSE_GFID(mover, ptr, len, conv_noop, parse_err); + eptr = calloc(3, strlen(ptr)); + if (!eptr) { + parse_err = 1; break; + } - GF_CHANGELOG_FILL_BUFFER ("\n", ascii, off, 1); - - if (gf_changelog_write (to_fd, ascii, off) != off) { - gf_msg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_ASCII_ERROR, - "processing ascii changelog failed due to " - " error in writing change"); - break; + gf_rfc3986_encode_space_newline((unsigned char *)ptr, eptr, + jnl->rfc3986_space_newline); + FILL_AND_MOVE(eptr, ascii, off, mover, nleft, len); + free(eptr); } - MOVER_MOVE (mover, nleft, 1); + break; + default: + parse_err = 1; + } + + if (parse_err) + break; + + GF_CHANGELOG_FILL_BUFFER("\n", ascii, off, 1); + if (gf_changelog_write(to_fd, ascii, off) != off) { + gf_msg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_ASCII_ERROR, + "processing ascii changelog failed due to " + " error in writing change"); + break; } - if ((nleft == 0) && (!parse_err)) - ret = 0; + MOVER_MOVE(mover, nleft, 1); + } + + if ((nleft == 0) && (!parse_err)) + ret = 0; - if (munmap (start, stbuf->st_size)) - gf_msg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_MUNMAP_FAILED, - "munmap() error"); + if (munmap(start, stbuf->st_size)) + gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MUNMAP_FAILED, + "munmap() error"); - out: - if (ascii) - GF_FREE (ascii); +out: + if (ascii) + GF_FREE(ascii); - return ret; + return ret; } static int -gf_changelog_decode (xlator_t *this, gf_changelog_journal_t *jnl, - int from_fd, int to_fd, struct stat *stbuf, int *zerob) +gf_changelog_decode(xlator_t *this, gf_changelog_journal_t *jnl, int from_fd, + int to_fd, struct stat *stbuf, int *zerob) { - int ret = -1; - int encoding = -1; - int major_version = -1; - int minor_version = -1; - int version_idx = -1; - size_t elen = 0; - char buffer[1024] = {0,}; - - CHANGELOG_GET_HEADER_INFO (from_fd, buffer, sizeof (buffer), encoding, - major_version, minor_version, elen); - if (encoding == -1) /* unknown encoding */ - goto out; - - if (major_version == -1) /* unknown major version */ - goto out; - - if (minor_version == -1) /* unknown minor version */ - goto out; - - if (!CHANGELOG_VALID_ENCODING (encoding)) - goto out; - - if (elen == stbuf->st_size) { - *zerob = 1; - goto out; - } + int ret = -1; + int encoding = -1; + int major_version = -1; + int minor_version = -1; + int version_idx = -1; + size_t elen = 0; + char buffer[1024] = { + 0, + }; + + CHANGELOG_GET_HEADER_INFO(from_fd, buffer, sizeof(buffer), encoding, + major_version, minor_version, elen); + if (encoding == -1) /* unknown encoding */ + goto out; + + if (major_version == -1) /* unknown major version */ + goto out; + + if (minor_version == -1) /* unknown minor version */ + goto out; + + if (!CHANGELOG_VALID_ENCODING(encoding)) + goto out; + + if (elen == stbuf->st_size) { + *zerob = 1; + goto out; + } - if (major_version == 1 && minor_version == 1) { - version_idx = VERSION_1_1; - } else if (major_version == 1 && minor_version == 2) { - version_idx = VERSION_1_2; - } + if (major_version == 1 && minor_version == 1) { + version_idx = VERSION_1_1; + } else if (major_version == 1 && minor_version == 2) { + version_idx = VERSION_1_2; + } - if (version_idx == -1) /* unknown version number */ - goto out; + if (version_idx == -1) /* unknown version number */ + goto out; - /** - * start processing after the header - */ - if (sys_lseek (from_fd, elen, SEEK_SET) < 0) { - goto out; - } - switch (encoding) { + /** + * start processing after the header + */ + if (sys_lseek(from_fd, elen, SEEK_SET) < 0) { + goto out; + } + switch (encoding) { case CHANGELOG_ENCODE_BINARY: - /** - * this ideally should have been a part of changelog-encoders.c - * (ie. part of the changelog translator). - */ - ret = gf_changelog_parse_binary (this, jnl, from_fd, - to_fd, elen, stbuf, - version_idx); - break; + /** + * this ideally should have been a part of changelog-encoders.c + * (ie. part of the changelog translator). + */ + ret = gf_changelog_parse_binary(this, jnl, from_fd, to_fd, elen, + stbuf, version_idx); + break; case CHANGELOG_ENCODE_ASCII: - ret = gf_changelog_parse_ascii (this, jnl, from_fd, - to_fd, elen, stbuf, - version_idx); - break; - } + ret = gf_changelog_parse_ascii(this, jnl, from_fd, to_fd, elen, + stbuf, version_idx); + break; + } out: - return ret; + return ret; } int -gf_changelog_publish (xlator_t *this, - gf_changelog_journal_t *jnl, char *from_path) +gf_changelog_publish(xlator_t *this, gf_changelog_journal_t *jnl, + char *from_path) { - int ret = 0; - char dest[PATH_MAX] = {0,}; - char to_path[PATH_MAX] = {0,}; - struct stat stbuf = {0,}; - - if (snprintf (to_path, PATH_MAX, "%s%s", jnl->jnl_current_dir, - basename (from_path)) >= PATH_MAX) - return -1; - - /* handle zerob file that won't exist in current */ - ret = sys_stat (to_path, &stbuf); - if (ret) { - if (errno == ENOENT) - ret = 0; - goto out; - } + int ret = 0; + char dest[PATH_MAX] = { + 0, + }; + char to_path[PATH_MAX] = { + 0, + }; + struct stat stbuf = { + 0, + }; + + if (snprintf(to_path, PATH_MAX, "%s%s", jnl->jnl_current_dir, + basename(from_path)) >= PATH_MAX) + return -1; - if (snprintf (dest, PATH_MAX, "%s%s", jnl->jnl_processing_dir, - basename (from_path)) >= PATH_MAX) - return -1; - - ret = sys_rename (to_path, dest); - if (ret) { - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_RENAME_FAILED, - "error moving changelog to processing dir", - "path=%s", to_path, - NULL); - } + /* handle zerob file that won't exist in current */ + ret = sys_stat(to_path, &stbuf); + if (ret) { + if (errno == ENOENT) + ret = 0; + goto out; + } + + if (snprintf(dest, PATH_MAX, "%s%s", jnl->jnl_processing_dir, + basename(from_path)) >= PATH_MAX) + return -1; + + ret = sys_rename(to_path, dest); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_RENAME_FAILED, + "error moving changelog to processing dir", "path=%s", to_path, + NULL); + } out: - return ret; + return ret; } int -gf_changelog_consume (xlator_t *this, - gf_changelog_journal_t *jnl, - char *from_path, gf_boolean_t no_publish) +gf_changelog_consume(xlator_t *this, gf_changelog_journal_t *jnl, + char *from_path, gf_boolean_t no_publish) { - int ret = -1; - int fd1 = 0; - int fd2 = 0; - int zerob = 0; - struct stat stbuf = {0,}; - char dest[PATH_MAX] = {0,}; - char to_path[PATH_MAX] = {0,}; - - if (snprintf (to_path, PATH_MAX, "%s%s", jnl->jnl_current_dir, - basename (from_path)) >= PATH_MAX) - goto out; - if (snprintf (dest, PATH_MAX, "%s%s", jnl->jnl_processing_dir, - basename (from_path)) >= PATH_MAX) - goto out; - - ret = sys_stat (from_path, &stbuf); - if (ret || !S_ISREG(stbuf.st_mode)) { - ret = -1; - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_STAT_FAILED, - "stat failed on changelog file", - "path=%s", from_path, - NULL); - goto out; - } + int ret = -1; + int fd1 = 0; + int fd2 = 0; + int zerob = 0; + struct stat stbuf = { + 0, + }; + char dest[PATH_MAX] = { + 0, + }; + char to_path[PATH_MAX] = { + 0, + }; + + if (snprintf(to_path, PATH_MAX, "%s%s", jnl->jnl_current_dir, + basename(from_path)) >= PATH_MAX) + goto out; + if (snprintf(dest, PATH_MAX, "%s%s", jnl->jnl_processing_dir, + basename(from_path)) >= PATH_MAX) + goto out; + + ret = sys_stat(from_path, &stbuf); + if (ret || !S_ISREG(stbuf.st_mode)) { + ret = -1; + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_STAT_FAILED, + "stat failed on changelog file", "path=%s", from_path, NULL); + goto out; + } - fd1 = open (from_path, O_RDONLY); - if (fd1 < 0) { - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_OPEN_FAILED, - "cannot open changelog file", - "path=%s", from_path, - NULL); - goto out; - } + fd1 = open(from_path, O_RDONLY); + if (fd1 < 0) { + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_OPEN_FAILED, + "cannot open changelog file", "path=%s", from_path, NULL); + goto out; + } - fd2 = open (to_path, O_CREAT | O_TRUNC | O_RDWR, - S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); - if (fd2 < 0) { - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_OPEN_FAILED, - "cannot create ascii changelog file", - "path=%s", to_path, - NULL); + fd2 = open(to_path, O_CREAT | O_TRUNC | O_RDWR, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (fd2 < 0) { + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_OPEN_FAILED, + "cannot create ascii changelog file", "path=%s", to_path, NULL); + goto close_fd; + } else { + ret = gf_changelog_decode(this, jnl, fd1, fd2, &stbuf, &zerob); + + sys_close(fd2); + + if (!ret) { + /* move it to processing on a successful + decode */ + if (no_publish == _gf_true) goto close_fd; - } else { - ret = gf_changelog_decode (this, jnl, fd1, - fd2, &stbuf, &zerob); - - sys_close (fd2); - - if (!ret) { - /* move it to processing on a successful - decode */ - if (no_publish == _gf_true) - goto close_fd; - ret = sys_rename (to_path, dest); - if (ret) - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_RENAME_FAILED, - "error moving changelog to processing dir", - "path=%s", to_path, - NULL); - } + ret = sys_rename(to_path, dest); + if (ret) + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_RENAME_FAILED, + "error moving changelog to processing dir", "path=%s", + to_path, NULL); + } - /* remove it from .current if it's an empty file */ - if (zerob) { - /* zerob changelogs must be unlinked */ - ret = sys_unlink (to_path); - if (ret) - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_UNLINK_FAILED, - "could not unlink empty changelog", - "path=%s", to_path, - NULL); - } + /* remove it from .current if it's an empty file */ + if (zerob) { + /* zerob changelogs must be unlinked */ + ret = sys_unlink(to_path); + if (ret) + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_UNLINK_FAILED, + "could not unlink empty changelog", "path=%s", to_path, + NULL); } + } - close_fd: - sys_close (fd1); +close_fd: + sys_close(fd1); - out: - return ret; +out: + return ret; } void * -gf_changelog_process (void *data) +gf_changelog_process(void *data) { - xlator_t *this = NULL; - gf_changelog_journal_t *jnl = NULL; - gf_changelog_entry_t *entry = NULL; - gf_changelog_processor_t *jnl_proc = NULL; - - jnl = data; - jnl_proc = jnl->jnl_proc; - THIS = jnl->this; - this = jnl->this; - - while (1) { - pthread_mutex_lock (&jnl_proc->lock); - { - while (list_empty (&jnl_proc->entries)) { - jnl_proc->waiting = _gf_true; - pthread_cond_wait - (&jnl_proc->cond, &jnl_proc->lock); - } - - entry = list_first_entry (&jnl_proc->entries, - gf_changelog_entry_t, list); - if (entry) - list_del (&entry->list); - - jnl_proc->waiting = _gf_false; - } - pthread_mutex_unlock (&jnl_proc->lock); + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_entry_t *entry = NULL; + gf_changelog_processor_t *jnl_proc = NULL; + + jnl = data; + jnl_proc = jnl->jnl_proc; + THIS = jnl->this; + this = jnl->this; + + while (1) { + pthread_mutex_lock(&jnl_proc->lock); + { + while (list_empty(&jnl_proc->entries)) { + jnl_proc->waiting = _gf_true; + pthread_cond_wait(&jnl_proc->cond, &jnl_proc->lock); + } - if (entry) { - (void) gf_changelog_consume (this, jnl, - entry->path, _gf_false); - GF_FREE (entry); - } + entry = list_first_entry(&jnl_proc->entries, gf_changelog_entry_t, + list); + if (entry) + list_del(&entry->list); + + jnl_proc->waiting = _gf_false; } + pthread_mutex_unlock(&jnl_proc->lock); + + if (entry) { + (void)gf_changelog_consume(this, jnl, entry->path, _gf_false); + GF_FREE(entry); + } + } - return NULL; + return NULL; } void -gf_changelog_queue_journal (gf_changelog_processor_t *jnl_proc, - changelog_event_t *event) +gf_changelog_queue_journal(gf_changelog_processor_t *jnl_proc, + changelog_event_t *event) { - size_t len = 0; - gf_changelog_entry_t *entry = NULL; + size_t len = 0; + gf_changelog_entry_t *entry = NULL; - entry = GF_CALLOC (1, sizeof (gf_changelog_entry_t), - gf_changelog_mt_libgfchangelog_entry_t); - if (!entry) - return; - INIT_LIST_HEAD (&entry->list); + entry = GF_CALLOC(1, sizeof(gf_changelog_entry_t), + gf_changelog_mt_libgfchangelog_entry_t); + if (!entry) + return; + INIT_LIST_HEAD(&entry->list); - len = strlen (event->u.journal.path); - (void)memcpy (entry->path, event->u.journal.path, len+1); - entry->path[len] = '\0'; + len = strlen(event->u.journal.path); + (void)memcpy(entry->path, event->u.journal.path, len + 1); + entry->path[len] = '\0'; - pthread_mutex_lock (&jnl_proc->lock); - { - list_add_tail (&entry->list, &jnl_proc->entries); - if (jnl_proc->waiting) - pthread_cond_signal (&jnl_proc->cond); - } - pthread_mutex_unlock (&jnl_proc->lock); + pthread_mutex_lock(&jnl_proc->lock); + { + list_add_tail(&entry->list, &jnl_proc->entries); + if (jnl_proc->waiting) + pthread_cond_signal(&jnl_proc->cond); + } + pthread_mutex_unlock(&jnl_proc->lock); - return; + return; } void -gf_changelog_handle_journal (void *xl, char *brick, - void *cbkdata, changelog_event_t *event) +gf_changelog_handle_journal(void *xl, char *brick, void *cbkdata, + changelog_event_t *event) { - gf_changelog_journal_t *jnl = NULL; - gf_changelog_processor_t *jnl_proc = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_processor_t *jnl_proc = NULL; - jnl = cbkdata; - jnl_proc = jnl->jnl_proc; + jnl = cbkdata; + jnl_proc = jnl->jnl_proc; - gf_changelog_queue_journal (jnl_proc, event); + gf_changelog_queue_journal(jnl_proc, event); } void -gf_changelog_journal_disconnect (void *xl, char *brick, void *data) +gf_changelog_journal_disconnect(void *xl, char *brick, void *data) { - gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *jnl = NULL; - jnl = data; + jnl = data; - pthread_spin_lock (&jnl->lock); - { - JNL_SET_API_STATE (jnl, JNL_API_DISCONNECTED); - }; - pthread_spin_unlock (&jnl->lock); + pthread_spin_lock(&jnl->lock); + { + JNL_SET_API_STATE(jnl, JNL_API_DISCONNECTED); + }; + pthread_spin_unlock(&jnl->lock); } void -gf_changelog_journal_connect (void *xl, char *brick, void *data) +gf_changelog_journal_connect(void *xl, char *brick, void *data) { - gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *jnl = NULL; - jnl = data; + jnl = data; - pthread_spin_lock (&jnl->lock); - { - JNL_SET_API_STATE (jnl, JNL_API_CONNECTED); - }; - pthread_spin_unlock (&jnl->lock); + pthread_spin_lock(&jnl->lock); + { + JNL_SET_API_STATE(jnl, JNL_API_CONNECTED); + }; + pthread_spin_unlock(&jnl->lock); - return; + return; } void -gf_changelog_cleanup_processor (gf_changelog_journal_t *jnl) +gf_changelog_cleanup_processor(gf_changelog_journal_t *jnl) { - int ret = 0; - xlator_t *this = NULL; - gf_changelog_processor_t *jnl_proc = NULL; - - this = THIS; - if (!this || !jnl || !jnl->jnl_proc) - goto error_return; - - jnl_proc = jnl->jnl_proc; - - ret = gf_thread_cleanup (this, jnl_proc->processor); - if (ret != 0) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_LIB_MSG_CLEANUP_ERROR, - "failed to cleanup processor thread"); - goto error_return; - } + int ret = 0; + xlator_t *this = NULL; + gf_changelog_processor_t *jnl_proc = NULL; - (void)pthread_mutex_destroy (&jnl_proc->lock); - (void)pthread_cond_destroy (&jnl_proc->cond); + this = THIS; + if (!this || !jnl || !jnl->jnl_proc) + goto error_return; - GF_FREE (jnl_proc); + jnl_proc = jnl->jnl_proc; - error_return: - return; + ret = gf_thread_cleanup(this, jnl_proc->processor); + if (ret != 0) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_CLEANUP_ERROR, + "failed to cleanup processor thread"); + goto error_return; + } + + (void)pthread_mutex_destroy(&jnl_proc->lock); + (void)pthread_cond_destroy(&jnl_proc->cond); + + GF_FREE(jnl_proc); + +error_return: + return; } int -gf_changelog_init_processor (gf_changelog_journal_t *jnl) +gf_changelog_init_processor(gf_changelog_journal_t *jnl) { - int ret = -1; - gf_changelog_processor_t *jnl_proc = NULL; + int ret = -1; + gf_changelog_processor_t *jnl_proc = NULL; - jnl_proc = GF_CALLOC (1, sizeof (gf_changelog_processor_t), - gf_changelog_mt_libgfchangelog_t); - if (!jnl_proc) - goto error_return; - - ret = pthread_mutex_init (&jnl_proc->lock, NULL); - if (ret != 0) - goto free_jnl_proc; - ret = pthread_cond_init (&jnl_proc->cond, NULL); - if (ret != 0) - goto cleanup_mutex; - - INIT_LIST_HEAD (&jnl_proc->entries); - jnl_proc->waiting = _gf_false; - jnl->jnl_proc = jnl_proc; - - ret = gf_thread_create (&jnl_proc->processor, - NULL, gf_changelog_process, jnl, "clogproc"); - if (ret != 0) { - jnl->jnl_proc = NULL; - goto cleanup_cond; - } + jnl_proc = GF_CALLOC(1, sizeof(gf_changelog_processor_t), + gf_changelog_mt_libgfchangelog_t); + if (!jnl_proc) + goto error_return; + + ret = pthread_mutex_init(&jnl_proc->lock, NULL); + if (ret != 0) + goto free_jnl_proc; + ret = pthread_cond_init(&jnl_proc->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + + INIT_LIST_HEAD(&jnl_proc->entries); + jnl_proc->waiting = _gf_false; + jnl->jnl_proc = jnl_proc; + + ret = gf_thread_create(&jnl_proc->processor, NULL, gf_changelog_process, + jnl, "clogproc"); + if (ret != 0) { + jnl->jnl_proc = NULL; + goto cleanup_cond; + } - return 0; + return 0; - cleanup_cond: - (void) pthread_cond_destroy (&jnl_proc->cond); - cleanup_mutex: - (void) pthread_mutex_destroy (&jnl_proc->lock); - free_jnl_proc: - GF_FREE (jnl_proc); - error_return: - return -1; +cleanup_cond: + (void)pthread_cond_destroy(&jnl_proc->cond); +cleanup_mutex: + (void)pthread_mutex_destroy(&jnl_proc->lock); +free_jnl_proc: + GF_FREE(jnl_proc); +error_return: + return -1; } static void -gf_changelog_cleanup_fds (gf_changelog_journal_t *jnl) +gf_changelog_cleanup_fds(gf_changelog_journal_t *jnl) { - /* tracker fd */ - if (jnl->jnl_fd != -1) - sys_close (jnl->jnl_fd); - /* processing dir */ - if (jnl->jnl_dir) - sys_closedir (jnl->jnl_dir); - - if (jnl->jnl_working_dir) - free (jnl->jnl_working_dir); /* allocated by realpath */ + /* tracker fd */ + if (jnl->jnl_fd != -1) + sys_close(jnl->jnl_fd); + /* processing dir */ + if (jnl->jnl_dir) + sys_closedir(jnl->jnl_dir); + + if (jnl->jnl_working_dir) + free(jnl->jnl_working_dir); /* allocated by realpath */ } static int -gf_changelog_open_dirs (xlator_t *this, gf_changelog_journal_t *jnl) +gf_changelog_open_dirs(xlator_t *this, gf_changelog_journal_t *jnl) { - int ret = -1; - DIR *dir = NULL; - int tracker_fd = 0; - char tracker_path[PATH_MAX] = {0,}; - - /* .current */ - (void) snprintf (jnl->jnl_current_dir, PATH_MAX, - "%s/"GF_CHANGELOG_CURRENT_DIR"/", - jnl->jnl_working_dir); - ret = recursive_rmdir (jnl->jnl_current_dir); - if (ret) { - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, - "Failed to rmdir", - "path=%s", jnl->jnl_current_dir, - NULL); - goto out; - } - ret = mkdir_p (jnl->jnl_current_dir, 0600, _gf_false); - if (ret) - goto out; + int ret = -1; + DIR *dir = NULL; + int tracker_fd = 0; + char tracker_path[PATH_MAX] = { + 0, + }; + + /* .current */ + (void)snprintf(jnl->jnl_current_dir, PATH_MAX, + "%s/" GF_CHANGELOG_CURRENT_DIR "/", jnl->jnl_working_dir); + ret = recursive_rmdir(jnl->jnl_current_dir); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, "Failed to rmdir", "path=%s", + jnl->jnl_current_dir, NULL); + goto out; + } + ret = mkdir_p(jnl->jnl_current_dir, 0600, _gf_false); + if (ret) + goto out; + + /* .processed */ + (void)snprintf(jnl->jnl_processed_dir, PATH_MAX, + "%s/" GF_CHANGELOG_PROCESSED_DIR "/", jnl->jnl_working_dir); + ret = mkdir_p(jnl->jnl_processed_dir, 0600, _gf_false); + if (ret) + goto out; + + /* .processing */ + (void)snprintf(jnl->jnl_processing_dir, PATH_MAX, + "%s/" GF_CHANGELOG_PROCESSING_DIR "/", jnl->jnl_working_dir); + ret = recursive_rmdir(jnl->jnl_processing_dir); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, "Failed to rmdir", "path=%s", + jnl->jnl_processing_dir, NULL); + goto out; + } - /* .processed */ - (void) snprintf (jnl->jnl_processed_dir, PATH_MAX, - "%s/"GF_CHANGELOG_PROCESSED_DIR"/", - jnl->jnl_working_dir); - ret = mkdir_p (jnl->jnl_processed_dir, 0600, _gf_false); - if (ret) - goto out; - - /* .processing */ - (void) snprintf (jnl->jnl_processing_dir, PATH_MAX, - "%s/"GF_CHANGELOG_PROCESSING_DIR"/", - jnl->jnl_working_dir); - ret = recursive_rmdir (jnl->jnl_processing_dir); - if (ret) { - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, - "Failed to rmdir", - "path=%s", jnl->jnl_processing_dir, - NULL); - goto out; - } + ret = mkdir_p(jnl->jnl_processing_dir, 0600, _gf_false); + if (ret) + goto out; - ret = mkdir_p (jnl->jnl_processing_dir, 0600, _gf_false); - if (ret) - goto out; - - dir = sys_opendir (jnl->jnl_processing_dir); - if (!dir) { - gf_msg ("", GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_OPENDIR_ERROR, - "opendir() error"); - goto out; - } + dir = sys_opendir(jnl->jnl_processing_dir); + if (!dir) { + gf_msg("", GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_OPENDIR_ERROR, + "opendir() error"); + goto out; + } - jnl->jnl_dir = dir; + jnl->jnl_dir = dir; - (void) snprintf (tracker_path, PATH_MAX, - "%s/"GF_CHANGELOG_TRACKER, jnl->jnl_working_dir); + (void)snprintf(tracker_path, PATH_MAX, "%s/" GF_CHANGELOG_TRACKER, + jnl->jnl_working_dir); - tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR, - S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); - if (tracker_fd < 0) { - sys_closedir (jnl->jnl_dir); - ret = -1; - goto out; - } + tracker_fd = open(tracker_path, O_CREAT | O_APPEND | O_RDWR, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (tracker_fd < 0) { + sys_closedir(jnl->jnl_dir); + ret = -1; + goto out; + } - jnl->jnl_fd = tracker_fd; - ret = 0; - out: - return ret; + jnl->jnl_fd = tracker_fd; + ret = 0; +out: + return ret; } int -gf_changelog_init_history (xlator_t *this, - gf_changelog_journal_t *jnl, - char *brick_path) +gf_changelog_init_history(xlator_t *this, gf_changelog_journal_t *jnl, + char *brick_path) { - int i = 0; - int ret = 0; - char hist_scratch_dir[PATH_MAX] = {0,}; + int i = 0; + int ret = 0; + char hist_scratch_dir[PATH_MAX] = { + 0, + }; - jnl->hist_jnl = GF_CALLOC (1, sizeof (*jnl), - gf_changelog_mt_libgfchangelog_t); - if (!jnl->hist_jnl) - goto error_return; + jnl->hist_jnl = GF_CALLOC(1, sizeof(*jnl), + gf_changelog_mt_libgfchangelog_t); + if (!jnl->hist_jnl) + goto error_return; - jnl->hist_jnl->jnl_dir = NULL; - jnl->hist_jnl->jnl_fd = -1; + jnl->hist_jnl->jnl_dir = NULL; + jnl->hist_jnl->jnl_fd = -1; - (void) snprintf (hist_scratch_dir, PATH_MAX, - "%s/"GF_CHANGELOG_HISTORY_DIR"/", - jnl->jnl_working_dir); + (void)snprintf(hist_scratch_dir, PATH_MAX, + "%s/" GF_CHANGELOG_HISTORY_DIR "/", jnl->jnl_working_dir); - ret = mkdir_p (hist_scratch_dir, 0600, _gf_false); - if (ret) - goto dealloc_hist; - - jnl->hist_jnl->jnl_working_dir = realpath (hist_scratch_dir, NULL); - if (!jnl->hist_jnl->jnl_working_dir) - goto dealloc_hist; - - ret = gf_changelog_open_dirs (this, jnl->hist_jnl); - if (ret) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_LIB_MSG_OPENDIR_ERROR, - "could not create entries in history scratch dir"); - goto dealloc_hist; - } + ret = mkdir_p(hist_scratch_dir, 0600, _gf_false); + if (ret) + goto dealloc_hist; - if (snprintf (jnl->hist_jnl->jnl_brickpath, PATH_MAX, "%s", - brick_path) >= PATH_MAX) - goto dealloc_hist; + jnl->hist_jnl->jnl_working_dir = realpath(hist_scratch_dir, NULL); + if (!jnl->hist_jnl->jnl_working_dir) + goto dealloc_hist; - for (i = 0; i < 256; i++) { - jnl->hist_jnl->rfc3986_space_newline[i] = - (i == ' ' || i == '\n' || i == '%') ? 0 : i; - } + ret = gf_changelog_open_dirs(this, jnl->hist_jnl); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_OPENDIR_ERROR, + "could not create entries in history scratch dir"); + goto dealloc_hist; + } - return 0; + if (snprintf(jnl->hist_jnl->jnl_brickpath, PATH_MAX, "%s", brick_path) >= + PATH_MAX) + goto dealloc_hist; - dealloc_hist: - GF_FREE (jnl->hist_jnl); - jnl->hist_jnl = NULL; - error_return: - return -1; + for (i = 0; i < 256; i++) { + jnl->hist_jnl->rfc3986_space_newline[i] = (i == ' ' || i == '\n' || + i == '%') + ? 0 + : i; + } + + return 0; + +dealloc_hist: + GF_FREE(jnl->hist_jnl); + jnl->hist_jnl = NULL; +error_return: + return -1; } void -gf_changelog_journal_fini (void *xl, char *brick, void *data) +gf_changelog_journal_fini(void *xl, char *brick, void *data) { - gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *jnl = NULL; - jnl = data; + jnl = data; - gf_changelog_cleanup_processor (jnl); + gf_changelog_cleanup_processor(jnl); - gf_changelog_cleanup_fds (jnl); - if (jnl->hist_jnl) - gf_changelog_cleanup_fds (jnl->hist_jnl); + gf_changelog_cleanup_fds(jnl); + if (jnl->hist_jnl) + gf_changelog_cleanup_fds(jnl->hist_jnl); - GF_FREE (jnl); + GF_FREE(jnl); } void * -gf_changelog_journal_init (void *xl, struct gf_brick_spec *brick) +gf_changelog_journal_init(void *xl, struct gf_brick_spec *brick) { - int i = 0; - int ret = 0; - xlator_t *this = NULL; - struct stat buf = {0,}; - char *scratch_dir = NULL; - gf_changelog_journal_t *jnl = NULL; - - this = xl; - scratch_dir = (char *) brick->ptr; - - jnl = GF_CALLOC (1, sizeof (gf_changelog_journal_t), - gf_changelog_mt_libgfchangelog_t); - if (!jnl) - goto error_return; - - if (snprintf (jnl->jnl_brickpath, PATH_MAX, "%s", - brick->brick_path) >= PATH_MAX) - goto dealloc_private; - - if (sys_stat (scratch_dir, &buf) && errno == ENOENT) { - ret = mkdir_p (scratch_dir, 0600, _gf_true); - if (ret) - goto dealloc_private; - } + int i = 0; + int ret = 0; + xlator_t *this = NULL; + struct stat buf = { + 0, + }; + char *scratch_dir = NULL; + gf_changelog_journal_t *jnl = NULL; + + this = xl; + scratch_dir = (char *)brick->ptr; + + jnl = GF_CALLOC(1, sizeof(gf_changelog_journal_t), + gf_changelog_mt_libgfchangelog_t); + if (!jnl) + goto error_return; + + if (snprintf(jnl->jnl_brickpath, PATH_MAX, "%s", brick->brick_path) >= + PATH_MAX) + goto dealloc_private; + + if (sys_stat(scratch_dir, &buf) && errno == ENOENT) { + ret = mkdir_p(scratch_dir, 0600, _gf_true); + if (ret) + goto dealloc_private; + } - jnl->jnl_working_dir = realpath (scratch_dir, NULL); - if (!jnl->jnl_working_dir) - goto dealloc_private; + jnl->jnl_working_dir = realpath(scratch_dir, NULL); + if (!jnl->jnl_working_dir) + goto dealloc_private; - ret = gf_changelog_open_dirs (this, jnl); - if (ret) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_LIB_MSG_OPENDIR_ERROR, - "could not create entries in scratch dir"); - goto dealloc_private; - } - - /* RFC 3986 {de,en}coding */ - for (i = 0; i < 256; i++) { - jnl->rfc3986_space_newline[i] = - (i == ' ' || i == '\n' || i == '%') ? 0 : i; - } + ret = gf_changelog_open_dirs(this, jnl); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_OPENDIR_ERROR, + "could not create entries in scratch dir"); + goto dealloc_private; + } - ret = gf_changelog_init_history (this, jnl, brick->brick_path); - if (ret) - goto cleanup_fds; + /* RFC 3986 {de,en}coding */ + for (i = 0; i < 256; i++) { + jnl->rfc3986_space_newline[i] = (i == ' ' || i == '\n' || i == '%') ? 0 + : i; + } - /* initialize journal processor */ - jnl->this = this; - ret = gf_changelog_init_processor (jnl); - if (ret) - goto cleanup_fds; - - JNL_SET_API_STATE (jnl, JNL_API_CONN_INPROGESS); - ret = pthread_spin_init (&jnl->lock, 0); - if (ret != 0) - goto cleanup_processor; - return jnl; - - cleanup_processor: - gf_changelog_cleanup_processor (jnl); - cleanup_fds: - gf_changelog_cleanup_fds (jnl); - if (jnl->hist_jnl) - gf_changelog_cleanup_fds (jnl->hist_jnl); - dealloc_private: - GF_FREE (jnl); - error_return: - return NULL; + ret = gf_changelog_init_history(this, jnl, brick->brick_path); + if (ret) + goto cleanup_fds; + + /* initialize journal processor */ + jnl->this = this; + ret = gf_changelog_init_processor(jnl); + if (ret) + goto cleanup_fds; + + JNL_SET_API_STATE(jnl, JNL_API_CONN_INPROGESS); + ret = pthread_spin_init(&jnl->lock, 0); + if (ret != 0) + goto cleanup_processor; + return jnl; + +cleanup_processor: + gf_changelog_cleanup_processor(jnl); +cleanup_fds: + gf_changelog_cleanup_fds(jnl); + if (jnl->hist_jnl) + gf_changelog_cleanup_fds(jnl->hist_jnl); +dealloc_private: + GF_FREE(jnl); +error_return: + return NULL; } diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c index f9fb8fcf01a..8dfda4c79c5 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-reborp.c +++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c @@ -25,133 +25,121 @@ struct rpcsvc_program *gf_changelog_reborp_programs[]; void * -gf_changelog_connection_janitor (void *arg) +gf_changelog_connection_janitor(void *arg) { - int32_t ret = 0; - xlator_t *this = NULL; - gf_private_t *priv = NULL; - gf_changelog_t *entry = NULL; - struct gf_event *event = NULL; - struct gf_event_list *ev = NULL; - unsigned long drained = 0; - - this = arg; - THIS = this; - - priv = this->private; - - while (1) { - pthread_mutex_lock (&priv->lock); - { - while (list_empty (&priv->cleanups)) - pthread_cond_wait (&priv->cond, &priv->lock); - - entry = list_first_entry (&priv->cleanups, - gf_changelog_t, list); - list_del_init (&entry->list); - } - pthread_mutex_unlock (&priv->lock); - - drained = 0; - ev = &entry->event; - - gf_smsg (this->name, GF_LOG_INFO, 0, - CHANGELOG_LIB_MSG_CLEANING_BRICK_ENTRY_INFO, - "Cleaning brick entry for brick", - "brick=%s", entry->brick, - NULL); - - /* 0x0: disable rpc-clnt */ - rpc_clnt_disable (RPC_PROBER (entry)); - - /* 0x1: cleanup callback invoker thread */ - ret = gf_cleanup_event (this, ev); - if (ret) - continue; - - /* 0x2: drain pending events */ - while (!list_empty (&ev->events)) { - event = list_first_entry (&ev->events, - struct gf_event, list); - gf_smsg (this->name, GF_LOG_INFO, 0, - CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO, - "Draining event", - "seq=%lu", event->seq, - "payload=%d", event->count, - NULL); - - GF_FREE (event); - drained++; - } - - gf_smsg (this->name, GF_LOG_INFO, 0, - CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO, - "Drained events", - "num=%lu", drained, - NULL); - - /* 0x3: freeup brick entry */ - gf_smsg (this->name, GF_LOG_INFO, 0, - CHANGELOG_LIB_MSG_FREEING_ENTRY_INFO, - "freeing entry", - "entry=%p", entry, - NULL); - LOCK_DESTROY (&entry->statelock); - GF_FREE (entry); + int32_t ret = 0; + xlator_t *this = NULL; + gf_private_t *priv = NULL; + gf_changelog_t *entry = NULL; + struct gf_event *event = NULL; + struct gf_event_list *ev = NULL; + unsigned long drained = 0; + + this = arg; + THIS = this; + + priv = this->private; + + while (1) { + pthread_mutex_lock(&priv->lock); + { + while (list_empty(&priv->cleanups)) + pthread_cond_wait(&priv->cond, &priv->lock); + + entry = list_first_entry(&priv->cleanups, gf_changelog_t, list); + list_del_init(&entry->list); + } + pthread_mutex_unlock(&priv->lock); + + drained = 0; + ev = &entry->event; + + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_LIB_MSG_CLEANING_BRICK_ENTRY_INFO, + "Cleaning brick entry for brick", "brick=%s", entry->brick, + NULL); + + /* 0x0: disable rpc-clnt */ + rpc_clnt_disable(RPC_PROBER(entry)); + + /* 0x1: cleanup callback invoker thread */ + ret = gf_cleanup_event(this, ev); + if (ret) + continue; + + /* 0x2: drain pending events */ + while (!list_empty(&ev->events)) { + event = list_first_entry(&ev->events, struct gf_event, list); + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO, "Draining event", + "seq=%lu", event->seq, "payload=%d", event->count, NULL); + + GF_FREE(event); + drained++; } - return NULL; + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO, "Drained events", + "num=%lu", drained, NULL); + + /* 0x3: freeup brick entry */ + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_LIB_MSG_FREEING_ENTRY_INFO, "freeing entry", + "entry=%p", entry, NULL); + LOCK_DESTROY(&entry->statelock); + GF_FREE(entry); + } + + return NULL; } int -gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata, - rpcsvc_event_t event, void *data) +gf_changelog_reborp_rpcsvc_notify(rpcsvc_t *rpc, void *mydata, + rpcsvc_event_t event, void *data) { - int ret = 0; - xlator_t *this = NULL; - gf_changelog_t *entry = NULL; + int ret = 0; + xlator_t *this = NULL; + gf_changelog_t *entry = NULL; - if (!(event == RPCSVC_EVENT_ACCEPT || - event == RPCSVC_EVENT_DISCONNECT)) - return 0; + if (!(event == RPCSVC_EVENT_ACCEPT || event == RPCSVC_EVENT_DISCONNECT)) + return 0; - entry = mydata; - this = entry->this; + entry = mydata; + this = entry->this; - switch (event) { + switch (event) { case RPCSVC_EVENT_ACCEPT: - ret = sys_unlink (RPC_SOCK(entry)); - if (ret != 0) - gf_smsg (this->name, GF_LOG_WARNING, errno, - CHANGELOG_LIB_MSG_UNLINK_FAILED, - "failed to unlink " - "reverse socket", - "path=%s", RPC_SOCK (entry), - NULL); - if (entry->connected) - GF_CHANGELOG_INVOKE_CBK (this, entry->connected, - entry->brick, entry->ptr); - break; + ret = sys_unlink(RPC_SOCK(entry)); + if (ret != 0) + gf_smsg(this->name, GF_LOG_WARNING, errno, + CHANGELOG_LIB_MSG_UNLINK_FAILED, + "failed to unlink " + "reverse socket", + "path=%s", RPC_SOCK(entry), NULL); + if (entry->connected) + GF_CHANGELOG_INVOKE_CBK(this, entry->connected, entry->brick, + entry->ptr); + break; case RPCSVC_EVENT_DISCONNECT: - if (entry->disconnected) - GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected, - entry->brick, entry->ptr); - /* passthrough */ + if (entry->disconnected) + GF_CHANGELOG_INVOKE_CBK(this, entry->disconnected, entry->brick, + entry->ptr); + /* passthrough */ default: - break; - } + break; + } - return 0; + return 0; } rpcsvc_t * -gf_changelog_reborp_init_rpc_listner (xlator_t *this, - char *path, char *sock, void *cbkdata) +gf_changelog_reborp_init_rpc_listner(xlator_t *this, char *path, char *sock, + void *cbkdata) { - CHANGELOG_MAKE_TMP_SOCKET_PATH (path, sock, UNIX_PATH_MAX); - return changelog_rpc_server_init (this, sock, cbkdata, - gf_changelog_reborp_rpcsvc_notify, - gf_changelog_reborp_programs); + CHANGELOG_MAKE_TMP_SOCKET_PATH(path, sock, UNIX_PATH_MAX); + return changelog_rpc_server_init(this, sock, cbkdata, + gf_changelog_reborp_rpcsvc_notify, + gf_changelog_reborp_programs); } /** @@ -164,29 +152,27 @@ gf_changelog_reborp_init_rpc_listner (xlator_t *this, * @FIXME: cleanup this bugger once server filters events. */ void -gf_changelog_invoke_callback (gf_changelog_t *entry, - struct iovec **vec, int payloadcnt) +gf_changelog_invoke_callback(gf_changelog_t *entry, struct iovec **vec, + int payloadcnt) { - int i = 0; - int evsize = 0; - xlator_t *this = NULL; - changelog_event_t *event = NULL; - - this = entry->this; - - for (; i < payloadcnt; i++) { - event = (changelog_event_t *)vec[i]->iov_base; - evsize = vec[i]->iov_len / CHANGELOG_EV_SIZE; - - for (; evsize > 0; evsize--, event++) { - if (gf_changelog_filter_check (entry, event)) { - GF_CHANGELOG_INVOKE_CBK (this, - entry->callback, - entry->brick, - entry->ptr, event); - } - } + int i = 0; + int evsize = 0; + xlator_t *this = NULL; + changelog_event_t *event = NULL; + + this = entry->this; + + for (; i < payloadcnt; i++) { + event = (changelog_event_t *)vec[i]->iov_base; + evsize = vec[i]->iov_len / CHANGELOG_EV_SIZE; + + for (; evsize > 0; evsize--, event++) { + if (gf_changelog_filter_check(entry, event)) { + GF_CHANGELOG_INVOKE_CBK(this, entry->callback, entry->brick, + entry->ptr, event); + } } + } } /** @@ -197,218 +183,217 @@ gf_changelog_invoke_callback (gf_changelog_t *entry, */ int -__is_expected_sequence (struct gf_event_list *ev, struct gf_event *event) +__is_expected_sequence(struct gf_event_list *ev, struct gf_event *event) { - return (ev->next_seq == event->seq); + return (ev->next_seq == event->seq); } int -__can_process_event (struct gf_event_list *ev, struct gf_event **event) +__can_process_event(struct gf_event_list *ev, struct gf_event **event) { - *event = list_first_entry (&ev->events, struct gf_event, list); + *event = list_first_entry(&ev->events, struct gf_event, list); - if (__is_expected_sequence (ev, *event)) { - list_del (&(*event)->list); - ev->next_seq++; - return 1; - } + if (__is_expected_sequence(ev, *event)) { + list_del(&(*event)->list); + ev->next_seq++; + return 1; + } - return 0; + return 0; } void -pick_event_ordered (struct gf_event_list *ev, struct gf_event **event) +pick_event_ordered(struct gf_event_list *ev, struct gf_event **event) { - pthread_mutex_lock (&ev->lock); - { - while (list_empty (&ev->events) - || !__can_process_event (ev, event)) - pthread_cond_wait (&ev->cond, &ev->lock); - } - pthread_mutex_unlock (&ev->lock); + pthread_mutex_lock(&ev->lock); + { + while (list_empty(&ev->events) || !__can_process_event(ev, event)) + pthread_cond_wait(&ev->cond, &ev->lock); + } + pthread_mutex_unlock(&ev->lock); } void -pick_event_unordered (struct gf_event_list *ev, struct gf_event **event) +pick_event_unordered(struct gf_event_list *ev, struct gf_event **event) { - pthread_mutex_lock (&ev->lock); - { - while (list_empty (&ev->events)) - pthread_cond_wait (&ev->cond, &ev->lock); - *event = list_first_entry (&ev->events, struct gf_event, list); - list_del (&(*event)->list); - } - pthread_mutex_unlock (&ev->lock); + pthread_mutex_lock(&ev->lock); + { + while (list_empty(&ev->events)) + pthread_cond_wait(&ev->cond, &ev->lock); + *event = list_first_entry(&ev->events, struct gf_event, list); + list_del(&(*event)->list); + } + pthread_mutex_unlock(&ev->lock); } void * -gf_changelog_callback_invoker (void *arg) +gf_changelog_callback_invoker(void *arg) { - xlator_t *this = NULL; - gf_changelog_t *entry = NULL; - struct iovec *vec = NULL; - struct gf_event *event = NULL; - struct gf_event_list *ev = NULL; + xlator_t *this = NULL; + gf_changelog_t *entry = NULL; + struct iovec *vec = NULL; + struct gf_event *event = NULL; + struct gf_event_list *ev = NULL; - ev = arg; - entry = ev->entry; - THIS = this = entry->this; + ev = arg; + entry = ev->entry; + THIS = this = entry->this; - while (1) { - entry->pickevent (ev, &event); + while (1) { + entry->pickevent(ev, &event); - vec = (struct iovec *) &event->iov; - gf_changelog_invoke_callback (entry, &vec, event->count); + vec = (struct iovec *)&event->iov; + gf_changelog_invoke_callback(entry, &vec, event->count); - GF_FREE (event); - } + GF_FREE(event); + } - return NULL; + return NULL; } static int -orderfn (struct list_head *pos1, struct list_head *pos2) +orderfn(struct list_head *pos1, struct list_head *pos2) { - struct gf_event *event1 = NULL; - struct gf_event *event2 = NULL; + struct gf_event *event1 = NULL; + struct gf_event *event2 = NULL; - event1 = list_entry (pos1, struct gf_event, list); - event2 = list_entry (pos2, struct gf_event, list); + event1 = list_entry(pos1, struct gf_event, list); + event2 = list_entry(pos2, struct gf_event, list); - if (event1->seq > event2->seq) - return 1; - return -1; + if (event1->seq > event2->seq) + return 1; + return -1; } void -queue_ordered_event (struct gf_event_list *ev, struct gf_event *event) +queue_ordered_event(struct gf_event_list *ev, struct gf_event *event) { - /* add event to the ordered event list and wake up listener(s) */ - pthread_mutex_lock (&ev->lock); - { - list_add_order (&event->list, &ev->events, orderfn); - if (!ev->next_seq) - ev->next_seq = event->seq; - if (ev->next_seq == event->seq) - pthread_cond_signal (&ev->cond); - } - pthread_mutex_unlock (&ev->lock); + /* add event to the ordered event list and wake up listener(s) */ + pthread_mutex_lock(&ev->lock); + { + list_add_order(&event->list, &ev->events, orderfn); + if (!ev->next_seq) + ev->next_seq = event->seq; + if (ev->next_seq == event->seq) + pthread_cond_signal(&ev->cond); + } + pthread_mutex_unlock(&ev->lock); } void -queue_unordered_event (struct gf_event_list *ev, struct gf_event *event) +queue_unordered_event(struct gf_event_list *ev, struct gf_event *event) { - /* add event to the tail of the queue and wake up listener(s) */ - pthread_mutex_lock (&ev->lock); - { - list_add_tail (&event->list, &ev->events); - pthread_cond_signal (&ev->cond); - } - pthread_mutex_unlock (&ev->lock); + /* add event to the tail of the queue and wake up listener(s) */ + pthread_mutex_lock(&ev->lock); + { + list_add_tail(&event->list, &ev->events); + pthread_cond_signal(&ev->cond); + } + pthread_mutex_unlock(&ev->lock); } int -gf_changelog_event_handler (rpcsvc_request_t *req, - xlator_t *this, gf_changelog_t *entry) +gf_changelog_event_handler(rpcsvc_request_t *req, xlator_t *this, + gf_changelog_t *entry) { - int i = 0; - size_t payloadlen = 0; - ssize_t len = 0; - int payloadcnt = 0; - changelog_event_req rpc_req = {0,}; - changelog_event_rsp rpc_rsp = {0,}; - struct iovec *vec = NULL; - struct gf_event *event = NULL; - struct gf_event_list *ev = NULL; - - ev = &entry->event; - - len = xdr_to_generic (req->msg[0], - &rpc_req, (xdrproc_t)xdr_changelog_event_req); - if (len < 0) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_LIB_MSG_XDR_DECODING_FAILED, - "xdr decoding failed"); - req->rpc_err = GARBAGE_ARGS; - goto handle_xdr_error; - } - - if (len < req->msg[0].iov_len) { - payloadcnt = 1; - payloadlen = (req->msg[0].iov_len - len); - } - for (i = 1; i < req->count; i++) { - payloadcnt++; - payloadlen += req->msg[i].iov_len; - } - - event = GF_CALLOC (1, GF_EVENT_CALLOC_SIZE (payloadcnt, payloadlen), - gf_changelog_mt_libgfchangelog_event_t); - if (!event) - goto handle_xdr_error; - INIT_LIST_HEAD (&event->list); - - payloadlen = 0; - event->seq = rpc_req.seq; - event->count = payloadcnt; - - /* deep copy IO vectors */ - vec = &event->iov[0]; - GF_EVENT_ASSIGN_IOVEC (vec, event, - (req->msg[0].iov_len - len), payloadlen); - (void) memcpy (vec->iov_base, - req->msg[0].iov_base + len, vec->iov_len); - - for (i = 1; i < req->count; i++) { - vec = &event->iov[i]; - GF_EVENT_ASSIGN_IOVEC (vec, event, - req->msg[i].iov_len, payloadlen); - (void) memcpy (event->iov[i].iov_base, - req->msg[i].iov_base, req->msg[i].iov_len); - } - - gf_msg_debug (this->name, 0, - "seq: %lu [%s] (time: %lu.%lu), (vec: %d, len: %zd)", - rpc_req.seq, entry->brick, rpc_req.tv_sec, - rpc_req.tv_usec, payloadcnt, payloadlen); - - /* dispatch event */ - entry->queueevent (ev, event); - - /* ack sequence number */ - rpc_rsp.op_ret = 0; - rpc_rsp.seq = rpc_req.seq; - - goto submit_rpc; - - handle_xdr_error: - rpc_rsp.op_ret = -1; - rpc_rsp.seq = 0; /* invalid */ - submit_rpc: - return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL, - (xdrproc_t)xdr_changelog_event_rsp); + int i = 0; + size_t payloadlen = 0; + ssize_t len = 0; + int payloadcnt = 0; + changelog_event_req rpc_req = { + 0, + }; + changelog_event_rsp rpc_rsp = { + 0, + }; + struct iovec *vec = NULL; + struct gf_event *event = NULL; + struct gf_event_list *ev = NULL; + + ev = &entry->event; + + len = xdr_to_generic(req->msg[0], &rpc_req, + (xdrproc_t)xdr_changelog_event_req); + if (len < 0) { + gf_msg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_XDR_DECODING_FAILED, "xdr decoding failed"); + req->rpc_err = GARBAGE_ARGS; + goto handle_xdr_error; + } + + if (len < req->msg[0].iov_len) { + payloadcnt = 1; + payloadlen = (req->msg[0].iov_len - len); + } + for (i = 1; i < req->count; i++) { + payloadcnt++; + payloadlen += req->msg[i].iov_len; + } + + event = GF_CALLOC(1, GF_EVENT_CALLOC_SIZE(payloadcnt, payloadlen), + gf_changelog_mt_libgfchangelog_event_t); + if (!event) + goto handle_xdr_error; + INIT_LIST_HEAD(&event->list); + + payloadlen = 0; + event->seq = rpc_req.seq; + event->count = payloadcnt; + + /* deep copy IO vectors */ + vec = &event->iov[0]; + GF_EVENT_ASSIGN_IOVEC(vec, event, (req->msg[0].iov_len - len), payloadlen); + (void)memcpy(vec->iov_base, req->msg[0].iov_base + len, vec->iov_len); + + for (i = 1; i < req->count; i++) { + vec = &event->iov[i]; + GF_EVENT_ASSIGN_IOVEC(vec, event, req->msg[i].iov_len, payloadlen); + (void)memcpy(event->iov[i].iov_base, req->msg[i].iov_base, + req->msg[i].iov_len); + } + + gf_msg_debug(this->name, 0, + "seq: %lu [%s] (time: %lu.%lu), (vec: %d, len: %zd)", + rpc_req.seq, entry->brick, rpc_req.tv_sec, rpc_req.tv_usec, + payloadcnt, payloadlen); + + /* dispatch event */ + entry->queueevent(ev, event); + + /* ack sequence number */ + rpc_rsp.op_ret = 0; + rpc_rsp.seq = rpc_req.seq; + + goto submit_rpc; + +handle_xdr_error: + rpc_rsp.op_ret = -1; + rpc_rsp.seq = 0; /* invalid */ +submit_rpc: + return changelog_rpc_sumbit_reply(req, &rpc_rsp, NULL, 0, NULL, + (xdrproc_t)xdr_changelog_event_rsp); } int -gf_changelog_reborp_handle_event (rpcsvc_request_t *req) +gf_changelog_reborp_handle_event(rpcsvc_request_t *req) { - xlator_t *this = NULL; - rpcsvc_t *svc = NULL; - gf_changelog_t *entry = NULL; + xlator_t *this = NULL; + rpcsvc_t *svc = NULL; + gf_changelog_t *entry = NULL; - svc = rpcsvc_request_service (req); - entry = svc->mydata; + svc = rpcsvc_request_service(req); + entry = svc->mydata; - this = THIS = entry->this; + this = THIS = entry->this; - return gf_changelog_event_handler (req, this, entry); + return gf_changelog_event_handler(req, this, entry); } rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = { - [CHANGELOG_REV_PROC_EVENT] = { - "CHANGELOG EVENT HANDLER", CHANGELOG_REV_PROC_EVENT, - gf_changelog_reborp_handle_event, NULL, 0, DRC_NA - }, + [CHANGELOG_REV_PROC_EVENT] = {"CHANGELOG EVENT HANDLER", + CHANGELOG_REV_PROC_EVENT, + gf_changelog_reborp_handle_event, NULL, 0, + DRC_NA}, }; /** @@ -418,15 +403,15 @@ rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = { * brick path and it's private data. */ struct rpcsvc_program gf_changelog_reborp_prog = { - .progname = "LIBGFCHANGELOG REBORP", - .prognum = CHANGELOG_REV_RPC_PROCNUM, - .progver = CHANGELOG_REV_RPC_PROCVER, - .numactors = CHANGELOG_REV_PROC_MAX, - .actors = gf_changelog_reborp_actors, - .synctask = _gf_false, + .progname = "LIBGFCHANGELOG REBORP", + .prognum = CHANGELOG_REV_RPC_PROCNUM, + .progver = CHANGELOG_REV_RPC_PROCVER, + .numactors = CHANGELOG_REV_PROC_MAX, + .actors = gf_changelog_reborp_actors, + .synctask = _gf_false, }; struct rpcsvc_program *gf_changelog_reborp_programs[] = { - &gf_changelog_reborp_prog, - NULL, + &gf_changelog_reborp_prog, + NULL, }; diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.c b/xlators/features/changelog/lib/src/gf-changelog-rpc.c index 7eb5416ae98..8ec6ffbcebc 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-rpc.c +++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.c @@ -16,31 +16,32 @@ struct rpc_clnt_program gf_changelog_clnt; /* TODO: piggyback reconnect to called (upcall) */ int -gf_changelog_rpc_notify (struct rpc_clnt *rpc, - void *mydata, rpc_clnt_event_t event, void *data) +gf_changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, + rpc_clnt_event_t event, void *data) { - switch (event) { + switch (event) { case RPC_CLNT_CONNECT: - break; + break; case RPC_CLNT_DISCONNECT: case RPC_CLNT_MSG: case RPC_CLNT_DESTROY: case RPC_CLNT_PING: - break; - } + break; + } - return 0; + return 0; } struct rpc_clnt * -gf_changelog_rpc_init (xlator_t *this, gf_changelog_t *entry) +gf_changelog_rpc_init(xlator_t *this, gf_changelog_t *entry) { - char sockfile[UNIX_PATH_MAX] = {0,}; + char sockfile[UNIX_PATH_MAX] = { + 0, + }; - CHANGELOG_MAKE_SOCKET_PATH (entry->brick, - sockfile, UNIX_PATH_MAX); - return changelog_rpc_client_init (this, entry, - sockfile, gf_changelog_rpc_notify); + CHANGELOG_MAKE_SOCKET_PATH(entry->brick, sockfile, UNIX_PATH_MAX); + return changelog_rpc_client_init(this, entry, sockfile, + gf_changelog_rpc_notify); } /** @@ -48,51 +49,50 @@ gf_changelog_rpc_init (xlator_t *this, gf_changelog_t *entry) */ int -gf_probe_changelog_cbk (struct rpc_req *req, - struct iovec *iovec, int count, void *myframe) +gf_probe_changelog_cbk(struct rpc_req *req, struct iovec *iovec, int count, + void *myframe) { - return 0; + return 0; } int -gf_probe_changelog_filter (call_frame_t *frame, xlator_t *this, void *data) +gf_probe_changelog_filter(call_frame_t *frame, xlator_t *this, void *data) { - char *sock = NULL; - gf_changelog_t *entry = NULL; - changelog_probe_req req = {0,}; - - entry = data; - sock = RPC_SOCK (entry); - - (void) memcpy (&req.sock, sock, strlen (sock)); - req.filter = entry->notify; - - /* invoke RPC */ - return changelog_rpc_sumbit_req (RPC_PROBER (entry), (void *) &req, - frame, &gf_changelog_clnt, - CHANGELOG_RPC_PROBE_FILTER, NULL, 0, - NULL, this, gf_probe_changelog_cbk, - (xdrproc_t) xdr_changelog_probe_req); + char *sock = NULL; + gf_changelog_t *entry = NULL; + changelog_probe_req req = { + 0, + }; + + entry = data; + sock = RPC_SOCK(entry); + + (void)memcpy(&req.sock, sock, strlen(sock)); + req.filter = entry->notify; + + /* invoke RPC */ + return changelog_rpc_sumbit_req( + RPC_PROBER(entry), (void *)&req, frame, &gf_changelog_clnt, + CHANGELOG_RPC_PROBE_FILTER, NULL, 0, NULL, this, gf_probe_changelog_cbk, + (xdrproc_t)xdr_changelog_probe_req); } int -gf_changelog_invoke_rpc (xlator_t *this, gf_changelog_t *entry, int procidx) +gf_changelog_invoke_rpc(xlator_t *this, gf_changelog_t *entry, int procidx) { - return changelog_invoke_rpc (this, RPC_PROBER (entry), - &gf_changelog_clnt, procidx, entry); + return changelog_invoke_rpc(this, RPC_PROBER(entry), &gf_changelog_clnt, + procidx, entry); } struct rpc_clnt_procedure gf_changelog_procs[CHANGELOG_RPC_PROC_MAX] = { - [CHANGELOG_RPC_PROC_NULL] = {"NULL", NULL}, - [CHANGELOG_RPC_PROBE_FILTER] = { - "PROBE FILTER", gf_probe_changelog_filter - }, + [CHANGELOG_RPC_PROC_NULL] = {"NULL", NULL}, + [CHANGELOG_RPC_PROBE_FILTER] = {"PROBE FILTER", gf_probe_changelog_filter}, }; struct rpc_clnt_program gf_changelog_clnt = { - .progname = "LIBGFCHANGELOG", - .prognum = CHANGELOG_RPC_PROGNUM, - .progver = CHANGELOG_RPC_PROGVER, - .numproc = CHANGELOG_RPC_PROC_MAX, - .proctable = gf_changelog_procs, + .progname = "LIBGFCHANGELOG", + .prognum = CHANGELOG_RPC_PROGNUM, + .progver = CHANGELOG_RPC_PROGVER, + .numproc = CHANGELOG_RPC_PROC_MAX, + .proctable = gf_changelog_procs, }; diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c index 8198560e736..c7791c62950 100644 --- a/xlators/features/changelog/lib/src/gf-changelog.c +++ b/xlators/features/changelog/lib/src/gf-changelog.c @@ -45,291 +45,291 @@ */ xlator_t *master = NULL; -static inline -gf_private_t *gf_changelog_alloc_priv () +static inline gf_private_t * +gf_changelog_alloc_priv() { - int ret = 0; - gf_private_t *priv = NULL; - - priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t); - if (!priv) - goto error_return; - INIT_LIST_HEAD (&priv->connections); - INIT_LIST_HEAD (&priv->cleanups); - - ret = pthread_mutex_init (&priv->lock, NULL); - if (ret != 0) - goto free_priv; - ret = pthread_cond_init (&priv->cond, NULL); - if (ret != 0) - goto cleanup_mutex; - - priv->api = NULL; - return priv; - - cleanup_mutex: - (void) pthread_mutex_destroy (&priv->lock); - free_priv: - GF_FREE (priv); - error_return: - return NULL; + int ret = 0; + gf_private_t *priv = NULL; + + priv = GF_CALLOC(1, sizeof(*priv), gf_changelog_mt_priv_t); + if (!priv) + goto error_return; + INIT_LIST_HEAD(&priv->connections); + INIT_LIST_HEAD(&priv->cleanups); + + ret = pthread_mutex_init(&priv->lock, NULL); + if (ret != 0) + goto free_priv; + ret = pthread_cond_init(&priv->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + + priv->api = NULL; + return priv; + +cleanup_mutex: + (void)pthread_mutex_destroy(&priv->lock); +free_priv: + GF_FREE(priv); +error_return: + return NULL; } -#define GF_CHANGELOG_EVENT_POOL_SIZE 16384 +#define GF_CHANGELOG_EVENT_POOL_SIZE 16384 #define GF_CHANGELOG_EVENT_THREAD_COUNT 4 static int -gf_changelog_ctx_defaults_init (glusterfs_ctx_t *ctx) +gf_changelog_ctx_defaults_init(glusterfs_ctx_t *ctx) { - cmd_args_t *cmd_args = NULL; - struct rlimit lim = {0, }; - call_pool_t *pool = NULL; - int ret = -1; - - ret = xlator_mem_acct_init (THIS, gf_changelog_mt_end); - if (ret != 0) - return -1; + cmd_args_t *cmd_args = NULL; + struct rlimit lim = { + 0, + }; + call_pool_t *pool = NULL; + int ret = -1; + + ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end); + if (ret != 0) + return -1; - ctx->process_uuid = generate_glusterfs_ctx_id (); - if (!ctx->process_uuid) - return -1; + ctx->process_uuid = generate_glusterfs_ctx_id(); + if (!ctx->process_uuid) + return -1; - ctx->page_size = 128 * GF_UNIT_KB; + ctx->page_size = 128 * GF_UNIT_KB; - ctx->iobuf_pool = iobuf_pool_new (); - if (!ctx->iobuf_pool) - return -1; + ctx->iobuf_pool = iobuf_pool_new(); + if (!ctx->iobuf_pool) + return -1; - ctx->event_pool = event_pool_new (GF_CHANGELOG_EVENT_POOL_SIZE, - GF_CHANGELOG_EVENT_THREAD_COUNT); - if (!ctx->event_pool) - return -1; + ctx->event_pool = event_pool_new(GF_CHANGELOG_EVENT_POOL_SIZE, + GF_CHANGELOG_EVENT_THREAD_COUNT); + if (!ctx->event_pool) + return -1; - pool = GF_CALLOC (1, sizeof (call_pool_t), - gf_changelog_mt_libgfchangelog_call_pool_t); - if (!pool) - return -1; + pool = GF_CALLOC(1, sizeof(call_pool_t), + gf_changelog_mt_libgfchangelog_call_pool_t); + if (!pool) + return -1; - /* frame_mem_pool size 112 * 64 */ - pool->frame_mem_pool = mem_pool_new (call_frame_t, 32); - if (!pool->frame_mem_pool) - return -1; + /* frame_mem_pool size 112 * 64 */ + pool->frame_mem_pool = mem_pool_new(call_frame_t, 32); + if (!pool->frame_mem_pool) + return -1; - /* stack_mem_pool size 256 * 128 */ - pool->stack_mem_pool = mem_pool_new (call_stack_t, 16); + /* stack_mem_pool size 256 * 128 */ + pool->stack_mem_pool = mem_pool_new(call_stack_t, 16); - if (!pool->stack_mem_pool) - return -1; + if (!pool->stack_mem_pool) + return -1; - ctx->stub_mem_pool = mem_pool_new (call_stub_t, 16); - if (!ctx->stub_mem_pool) - return -1; + ctx->stub_mem_pool = mem_pool_new(call_stub_t, 16); + if (!ctx->stub_mem_pool) + return -1; - ctx->dict_pool = mem_pool_new (dict_t, 32); - if (!ctx->dict_pool) - return -1; + ctx->dict_pool = mem_pool_new(dict_t, 32); + if (!ctx->dict_pool) + return -1; - ctx->dict_pair_pool = mem_pool_new (data_pair_t, 512); - if (!ctx->dict_pair_pool) - return -1; + ctx->dict_pair_pool = mem_pool_new(data_pair_t, 512); + if (!ctx->dict_pair_pool) + return -1; - ctx->dict_data_pool = mem_pool_new (data_t, 512); - if (!ctx->dict_data_pool) - return -1; + ctx->dict_data_pool = mem_pool_new(data_t, 512); + if (!ctx->dict_data_pool) + return -1; - ctx->logbuf_pool = mem_pool_new (log_buf_t, 256); - if (!ctx->logbuf_pool) - return -1; + ctx->logbuf_pool = mem_pool_new(log_buf_t, 256); + if (!ctx->logbuf_pool) + return -1; - INIT_LIST_HEAD (&pool->all_frames); - LOCK_INIT (&pool->lock); - ctx->pool = pool; + INIT_LIST_HEAD(&pool->all_frames); + LOCK_INIT(&pool->lock); + ctx->pool = pool; - LOCK_INIT (&ctx->lock); + LOCK_INIT(&ctx->lock); - cmd_args = &ctx->cmd_args; + cmd_args = &ctx->cmd_args; - INIT_LIST_HEAD (&cmd_args->xlator_options); + INIT_LIST_HEAD(&cmd_args->xlator_options); - lim.rlim_cur = RLIM_INFINITY; - lim.rlim_max = RLIM_INFINITY; - setrlimit (RLIMIT_CORE, &lim); + lim.rlim_cur = RLIM_INFINITY; + lim.rlim_max = RLIM_INFINITY; + setrlimit(RLIMIT_CORE, &lim); - return 0; + return 0; } /* TODO: cleanup ctx defaults */ void -gf_changelog_cleanup_this (xlator_t *this) +gf_changelog_cleanup_this(xlator_t *this) { - glusterfs_ctx_t *ctx = NULL; + glusterfs_ctx_t *ctx = NULL; - if (!this) - return; + if (!this) + return; - ctx = this->ctx; - syncenv_destroy (ctx->env); - free (ctx); + ctx = this->ctx; + syncenv_destroy(ctx->env); + free(ctx); - this->private = NULL; - this->ctx = NULL; + this->private = NULL; + this->ctx = NULL; - mem_pools_fini (); + mem_pools_fini(); } static int -gf_changelog_init_context () +gf_changelog_init_context() { - glusterfs_ctx_t *ctx = NULL; + glusterfs_ctx_t *ctx = NULL; - ctx = glusterfs_ctx_new (); - if (!ctx) - goto error_return; + ctx = glusterfs_ctx_new(); + if (!ctx) + goto error_return; - if (glusterfs_globals_init (ctx)) - goto free_ctx; + if (glusterfs_globals_init(ctx)) + goto free_ctx; - THIS->ctx = ctx; - if (gf_changelog_ctx_defaults_init (ctx)) - goto free_ctx; + THIS->ctx = ctx; + if (gf_changelog_ctx_defaults_init(ctx)) + goto free_ctx; - ctx->env = syncenv_new (0, 0, 0); - if (!ctx->env) - goto free_ctx; - return 0; + ctx->env = syncenv_new(0, 0, 0); + if (!ctx->env) + goto free_ctx; + return 0; - free_ctx: - free (ctx); - THIS->ctx = NULL; - error_return: - return -1; +free_ctx: + free(ctx); + THIS->ctx = NULL; +error_return: + return -1; } static int -gf_changelog_init_master () +gf_changelog_init_master() { - int ret = 0; + int ret = 0; - mem_pools_init_early (); - ret = gf_changelog_init_context (); - mem_pools_init_late (); + mem_pools_init_early(); + ret = gf_changelog_init_context(); + mem_pools_init_late(); - return ret; + return ret; } /* TODO: cleanup clnt/svc on failure */ int -gf_changelog_setup_rpc (xlator_t *this, - gf_changelog_t *entry, int proc) +gf_changelog_setup_rpc(xlator_t *this, gf_changelog_t *entry, int proc) { - int ret = 0; - rpcsvc_t *svc = NULL; - struct rpc_clnt *rpc = NULL; - - /** - * Initialize a connect back socket. A probe() RPC call to the server - * triggers a reverse connect. - */ - svc = gf_changelog_reborp_init_rpc_listner (this, entry->brick, - RPC_SOCK (entry), entry); - if (!svc) - goto error_return; - RPC_REBORP (entry) = svc; - - /* Initialize an RPC client */ - rpc = gf_changelog_rpc_init (this, entry); - if (!rpc) - goto error_return; - RPC_PROBER (entry) = rpc; - - /** - * @FIXME - * till we have connection state machine, let's delay the RPC call - * for now.. - */ - sleep (2); - - /** - * Probe changelog translator for reverse connection. After a successful - * call, there's less use of the client and can be disconnected, but - * let's leave the connection active for any future RPC calls. - */ - ret = gf_changelog_invoke_rpc (this, entry, proc); - if (ret) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED, - "Could not initiate probe RPC, bailing out!!!"); - goto error_return; - } - - return 0; - - error_return: - return -1; + int ret = 0; + rpcsvc_t *svc = NULL; + struct rpc_clnt *rpc = NULL; + + /** + * Initialize a connect back socket. A probe() RPC call to the server + * triggers a reverse connect. + */ + svc = gf_changelog_reborp_init_rpc_listner(this, entry->brick, + RPC_SOCK(entry), entry); + if (!svc) + goto error_return; + RPC_REBORP(entry) = svc; + + /* Initialize an RPC client */ + rpc = gf_changelog_rpc_init(this, entry); + if (!rpc) + goto error_return; + RPC_PROBER(entry) = rpc; + + /** + * @FIXME + * till we have connection state machine, let's delay the RPC call + * for now.. + */ + sleep(2); + + /** + * Probe changelog translator for reverse connection. After a successful + * call, there's less use of the client and can be disconnected, but + * let's leave the connection active for any future RPC calls. + */ + ret = gf_changelog_invoke_rpc(this, entry, proc); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED, + "Could not initiate probe RPC, bailing out!!!"); + goto error_return; + } + + return 0; + +error_return: + return -1; } int -gf_cleanup_event (xlator_t *this, struct gf_event_list *ev) +gf_cleanup_event(xlator_t *this, struct gf_event_list *ev) { - int ret = 0; - - ret = gf_thread_cleanup (this, ev->invoker); - if (ret) { - gf_msg (this->name, GF_LOG_WARNING, -ret, - CHANGELOG_LIB_MSG_CLEANUP_ERROR, - "cannot cleanup callback invoker thread." - " Not freeing resources"); - return -1; - } + int ret = 0; + + ret = gf_thread_cleanup(this, ev->invoker); + if (ret) { + gf_msg(this->name, GF_LOG_WARNING, -ret, + CHANGELOG_LIB_MSG_CLEANUP_ERROR, + "cannot cleanup callback invoker thread." + " Not freeing resources"); + return -1; + } - ev->entry = NULL; + ev->entry = NULL; - return 0; + return 0; } static int -gf_init_event (gf_changelog_t *entry) +gf_init_event(gf_changelog_t *entry) { - int ret = 0; - struct gf_event_list *ev = NULL; - - ev = &entry->event; - ev->entry = entry; - - ret = pthread_mutex_init (&ev->lock, NULL); - if (ret != 0) - goto error_return; - ret = pthread_cond_init (&ev->cond, NULL); - if (ret != 0) - goto cleanup_mutex; - INIT_LIST_HEAD (&ev->events); - - ev->next_seq = 0; /* bootstrap sequencing */ - - if (GF_NEED_ORDERED_EVENTS (entry)) { - entry->pickevent = pick_event_ordered; - entry->queueevent = queue_ordered_event; - } else { - entry->pickevent = pick_event_unordered; - entry->queueevent = queue_unordered_event; - } - - ret = gf_thread_create (&ev->invoker, NULL, - gf_changelog_callback_invoker, ev, "clogcbki"); - if (ret != 0) { - entry->pickevent = NULL; - entry->queueevent = NULL; - goto cleanup_cond; - } - - return 0; - - cleanup_cond: - (void) pthread_cond_destroy (&ev->cond); - cleanup_mutex: - (void) pthread_mutex_destroy (&ev->lock); - error_return: - return -1; + int ret = 0; + struct gf_event_list *ev = NULL; + + ev = &entry->event; + ev->entry = entry; + + ret = pthread_mutex_init(&ev->lock, NULL); + if (ret != 0) + goto error_return; + ret = pthread_cond_init(&ev->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + INIT_LIST_HEAD(&ev->events); + + ev->next_seq = 0; /* bootstrap sequencing */ + + if (GF_NEED_ORDERED_EVENTS(entry)) { + entry->pickevent = pick_event_ordered; + entry->queueevent = queue_ordered_event; + } else { + entry->pickevent = pick_event_unordered; + entry->queueevent = queue_unordered_event; + } + + ret = gf_thread_create(&ev->invoker, NULL, gf_changelog_callback_invoker, + ev, "clogcbki"); + if (ret != 0) { + entry->pickevent = NULL; + entry->queueevent = NULL; + goto cleanup_cond; + } + + return 0; + +cleanup_cond: + (void)pthread_cond_destroy(&ev->cond); +cleanup_mutex: + (void)pthread_mutex_destroy(&ev->lock); +error_return: + return -1; } /** @@ -339,251 +339,242 @@ gf_init_event (gf_changelog_t *entry) * - destroy rpc{-clnt, svc} */ int -gf_cleanup_brick_connection (xlator_t *this, gf_changelog_t *entry) +gf_cleanup_brick_connection(xlator_t *this, gf_changelog_t *entry) { - return 0; + return 0; } int -gf_cleanup_connections (xlator_t *this) +gf_cleanup_connections(xlator_t *this) { - return 0; + return 0; } static int -gf_setup_brick_connection (xlator_t *this, - struct gf_brick_spec *brick, - gf_boolean_t ordered, void *xl) +gf_setup_brick_connection(xlator_t *this, struct gf_brick_spec *brick, + gf_boolean_t ordered, void *xl) { - int ret = 0; - gf_private_t *priv = NULL; - gf_changelog_t *entry = NULL; - - priv = this->private; - - if (!brick->callback || !brick->init || !brick->fini) - goto error_return; - - entry = GF_CALLOC (1, sizeof (*entry), - gf_changelog_mt_libgfchangelog_t); - if (!entry) - goto error_return; - INIT_LIST_HEAD (&entry->list); - - LOCK_INIT (&entry->statelock); - entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING; - - entry->notify = brick->filter; - if (snprintf (entry->brick, PATH_MAX, "%s", brick->brick_path) - >= PATH_MAX) - goto free_entry; - - entry->this = this; - entry->invokerxl = xl; - - entry->ordered = ordered; - ret = gf_init_event (entry); - if (ret) - goto free_entry; - - entry->fini = brick->fini; - entry->callback = brick->callback; - entry->connected = brick->connected; - entry->disconnected = brick->disconnected; - - entry->ptr = brick->init (this, brick); - if (!entry->ptr) - goto cleanup_event; - priv->api = entry->ptr; /* pointer to API, if required */ - - pthread_mutex_lock (&priv->lock); - { - list_add_tail (&entry->list, &priv->connections); - } - pthread_mutex_unlock (&priv->lock); - - ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER); - if (ret) - goto cleanup_event; - return 0; - - cleanup_event: - (void) gf_cleanup_event (this, &entry->event); - free_entry: - gf_msg_debug (this->name, 0, "freeing entry %p", entry); - list_del (&entry->list); /* FIXME: kludge for now */ - GF_FREE (entry); - error_return: - return -1; + int ret = 0; + gf_private_t *priv = NULL; + gf_changelog_t *entry = NULL; + + priv = this->private; + + if (!brick->callback || !brick->init || !brick->fini) + goto error_return; + + entry = GF_CALLOC(1, sizeof(*entry), gf_changelog_mt_libgfchangelog_t); + if (!entry) + goto error_return; + INIT_LIST_HEAD(&entry->list); + + LOCK_INIT(&entry->statelock); + entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING; + + entry->notify = brick->filter; + if (snprintf(entry->brick, PATH_MAX, "%s", brick->brick_path) >= PATH_MAX) + goto free_entry; + + entry->this = this; + entry->invokerxl = xl; + + entry->ordered = ordered; + ret = gf_init_event(entry); + if (ret) + goto free_entry; + + entry->fini = brick->fini; + entry->callback = brick->callback; + entry->connected = brick->connected; + entry->disconnected = brick->disconnected; + + entry->ptr = brick->init(this, brick); + if (!entry->ptr) + goto cleanup_event; + priv->api = entry->ptr; /* pointer to API, if required */ + + pthread_mutex_lock(&priv->lock); + { + list_add_tail(&entry->list, &priv->connections); + } + pthread_mutex_unlock(&priv->lock); + + ret = gf_changelog_setup_rpc(this, entry, CHANGELOG_RPC_PROBE_FILTER); + if (ret) + goto cleanup_event; + return 0; + +cleanup_event: + (void)gf_cleanup_event(this, &entry->event); +free_entry: + gf_msg_debug(this->name, 0, "freeing entry %p", entry); + list_del(&entry->list); /* FIXME: kludge for now */ + GF_FREE(entry); +error_return: + return -1; } int -gf_changelog_register_brick (xlator_t *this, - struct gf_brick_spec *brick, - gf_boolean_t ordered, void *xl) +gf_changelog_register_brick(xlator_t *this, struct gf_brick_spec *brick, + gf_boolean_t ordered, void *xl) { - return gf_setup_brick_connection (this, brick, ordered, xl); + return gf_setup_brick_connection(this, brick, ordered, xl); } static int -gf_changelog_setup_logging (xlator_t *this, char *logfile, int loglevel) +gf_changelog_setup_logging(xlator_t *this, char *logfile, int loglevel) { - /* passing ident as NULL means to use default ident for syslog */ - if (gf_log_init (this->ctx, logfile, NULL)) - return -1; + /* passing ident as NULL means to use default ident for syslog */ + if (gf_log_init(this->ctx, logfile, NULL)) + return -1; - gf_log_set_loglevel (this->ctx, (loglevel == -1) ? GF_LOG_INFO : - loglevel); - return 0; + gf_log_set_loglevel(this->ctx, (loglevel == -1) ? GF_LOG_INFO : loglevel); + return 0; } static int -gf_changelog_set_master (xlator_t *master, void *xl) +gf_changelog_set_master(xlator_t *master, void *xl) { - int32_t ret = 0; - xlator_t *this = NULL; - xlator_t *old_this = NULL; - gf_private_t *priv = NULL; - - this = xl; - if (!this || !this->ctx) { - ret = gf_changelog_init_master (); - if (ret) - return -1; - this = THIS; - } + int32_t ret = 0; + xlator_t *this = NULL; + xlator_t *old_this = NULL; + gf_private_t *priv = NULL; + + this = xl; + if (!this || !this->ctx) { + ret = gf_changelog_init_master(); + if (ret) + return -1; + this = THIS; + } - master->ctx = this->ctx; + master->ctx = this->ctx; - INIT_LIST_HEAD (&master->volume_options); - SAVE_THIS (THIS); + INIT_LIST_HEAD(&master->volume_options); + SAVE_THIS(THIS); - ret = xlator_mem_acct_init (THIS, gf_changelog_mt_end); - if (ret != 0) - goto restore_this; + ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end); + if (ret != 0) + goto restore_this; - priv = gf_changelog_alloc_priv (); - if (!priv) { - ret = -1; - goto restore_this; - } + priv = gf_changelog_alloc_priv(); + if (!priv) { + ret = -1; + goto restore_this; + } - if (!xl) { - /* poller thread */ - ret = gf_thread_create (&priv->poller, - NULL, changelog_rpc_poller, THIS, - "clogpoll"); - if (ret != 0) { - GF_FREE (priv); - gf_msg (master->name, GF_LOG_ERROR, 0, - CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, - "failed to spawn poller thread"); - goto restore_this; - } + if (!xl) { + /* poller thread */ + ret = gf_thread_create(&priv->poller, NULL, changelog_rpc_poller, THIS, + "clogpoll"); + if (ret != 0) { + GF_FREE(priv); + gf_msg(master->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, + "failed to spawn poller thread"); + goto restore_this; } + } - master->private = priv; + master->private = priv; - restore_this: - RESTORE_THIS (); +restore_this: + RESTORE_THIS(); - return ret; + return ret; } int -gf_changelog_init (void *xl) +gf_changelog_init(void *xl) { - int ret = 0; - gf_private_t *priv = NULL; - - if (master) - return 0; - - master = calloc (1, sizeof (*master)); - if (!master) - goto error_return; - - master->name = strdup ("gfchangelog"); - if (!master->name) - goto dealloc_master; - - ret = gf_changelog_set_master (master, xl); - if (ret) - goto dealloc_name; - - priv = master->private; - ret = gf_thread_create (&priv->connectionjanitor, NULL, - gf_changelog_connection_janitor, master, - "clogjan"); - if (ret != 0) { - /* TODO: cleanup priv, mutex (poller thread for !xl) */ - goto dealloc_name; - } + int ret = 0; + gf_private_t *priv = NULL; + if (master) return 0; - dealloc_name: - free (master->name); - dealloc_master: - free (master); - master = NULL; - error_return: - return -1; + master = calloc(1, sizeof(*master)); + if (!master) + goto error_return; + + master->name = strdup("gfchangelog"); + if (!master->name) + goto dealloc_master; + + ret = gf_changelog_set_master(master, xl); + if (ret) + goto dealloc_name; + + priv = master->private; + ret = gf_thread_create(&priv->connectionjanitor, NULL, + gf_changelog_connection_janitor, master, "clogjan"); + if (ret != 0) { + /* TODO: cleanup priv, mutex (poller thread for !xl) */ + goto dealloc_name; + } + + return 0; + +dealloc_name: + free(master->name); +dealloc_master: + free(master); + master = NULL; +error_return: + return -1; } int -gf_changelog_register_generic (struct gf_brick_spec *bricks, int count, - int ordered, char *logfile, int lvl, void *xl) +gf_changelog_register_generic(struct gf_brick_spec *bricks, int count, + int ordered, char *logfile, int lvl, void *xl) { - int ret = 0; - xlator_t *this = NULL; - xlator_t *old_this = NULL; - struct gf_brick_spec *brick = NULL; - gf_boolean_t need_order = _gf_false; + int ret = 0; + xlator_t *this = NULL; + xlator_t *old_this = NULL; + struct gf_brick_spec *brick = NULL; + gf_boolean_t need_order = _gf_false; - SAVE_THIS (xl); + SAVE_THIS(xl); - this = THIS; - if (!this) - goto error_return; + this = THIS; + if (!this) + goto error_return; - ret = gf_changelog_setup_logging (this, logfile, lvl); - if (ret) - goto error_return; - - need_order = (ordered) ? _gf_true : _gf_false; - - brick = bricks; - while (count--) { - gf_smsg (this->name, GF_LOG_INFO, 0, - CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, - "Registering brick", - "brick=%s", brick->brick_path, - "notify_filter=%d", brick->filter, - NULL); - - ret = gf_changelog_register_brick (this, brick, need_order, xl); - if (ret != 0) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED, - "Error registering with changelog xlator"); - break; - } - - brick++; + ret = gf_changelog_setup_logging(this, logfile, lvl); + if (ret) + goto error_return; + + need_order = (ordered) ? _gf_true : _gf_false; + + brick = bricks; + while (count--) { + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, "Registering brick", + "brick=%s", brick->brick_path, "notify_filter=%d", + brick->filter, NULL); + + ret = gf_changelog_register_brick(this, brick, need_order, xl); + if (ret != 0) { + gf_msg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED, + "Error registering with changelog xlator"); + break; } - if (ret != 0) - goto cleanup_inited_bricks; + brick++; + } - RESTORE_THIS(); - return 0; + if (ret != 0) + goto cleanup_inited_bricks; - cleanup_inited_bricks: - gf_cleanup_connections (this); - error_return: - RESTORE_THIS(); - return -1; + RESTORE_THIS(); + return 0; + +cleanup_inited_bricks: + gf_cleanup_connections(this); +error_return: + RESTORE_THIS(); + return -1; } /** @@ -610,27 +601,29 @@ gf_changelog_register_generic (struct gf_brick_spec *bricks, int count, * For generic API, refer gf_changelog_register_generic(). */ int -gf_changelog_register (char *brick_path, char *scratch_dir, - char *log_file, int log_level, int max_reconnects) +gf_changelog_register(char *brick_path, char *scratch_dir, char *log_file, + int log_level, int max_reconnects) { - struct gf_brick_spec brick = {0,}; + struct gf_brick_spec brick = { + 0, + }; - if (master) - THIS = master; - else - return -1; + if (master) + THIS = master; + else + return -1; - brick.brick_path = brick_path; - brick.filter = CHANGELOG_OP_TYPE_JOURNAL; + brick.brick_path = brick_path; + brick.filter = CHANGELOG_OP_TYPE_JOURNAL; - brick.init = gf_changelog_journal_init; - brick.fini = gf_changelog_journal_fini; - brick.callback = gf_changelog_handle_journal; - brick.connected = gf_changelog_journal_connect; - brick.disconnected = gf_changelog_journal_disconnect; + brick.init = gf_changelog_journal_init; + brick.fini = gf_changelog_journal_fini; + brick.callback = gf_changelog_handle_journal; + brick.connected = gf_changelog_journal_connect; + brick.disconnected = gf_changelog_journal_disconnect; - brick.ptr = scratch_dir; + brick.ptr = scratch_dir; - return gf_changelog_register_generic (&brick, 1, 1, - log_file, log_level, NULL); + return gf_changelog_register_generic(&brick, 1, 1, log_file, log_level, + NULL); } diff --git a/xlators/features/changelog/lib/src/gf-history-changelog.c b/xlators/features/changelog/lib/src/gf-history-changelog.c index 5259ae3893b..c8a31ebbd73 100644 --- a/xlators/features/changelog/lib/src/gf-history-changelog.c +++ b/xlators/features/changelog/lib/src/gf-history-changelog.c @@ -36,62 +36,60 @@ * -1: On error. */ int -gf_history_changelog_done (char *file) +gf_history_changelog_done(char *file) { - int ret = -1; - char *buffer = NULL; - xlator_t *this = NULL; - gf_changelog_journal_t *jnl = NULL; - gf_changelog_journal_t *hist_jnl = NULL; - char to_path[PATH_MAX] = {0,}; + int ret = -1; + char *buffer = NULL; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; + char to_path[PATH_MAX] = { + 0, + }; + + errno = EINVAL; + + this = THIS; + if (!this) + goto out; + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; + + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) + goto out; + + if (!file || !strlen(file)) + goto out; + + /* make sure 'file' is inside ->jnl_working_dir */ + buffer = realpath(file, NULL); + if (!buffer) + goto out; + + if (strncmp(hist_jnl->jnl_working_dir, buffer, + strlen(hist_jnl->jnl_working_dir))) + goto out; + + (void)snprintf(to_path, PATH_MAX, "%s%s", hist_jnl->jnl_processed_dir, + basename(buffer)); + gf_msg_debug(this->name, 0, "moving %s to processed directory", file); + ret = sys_rename(buffer, to_path); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_RENAME_FAILED, "cannot move changelog file", + "from=%s", file, "to=%s", to_path, NULL); + goto out; + } + + ret = 0; - errno = EINVAL; - - this = THIS; - if (!this) - goto out; - - jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); - if (!jnl) - goto out; - - hist_jnl = jnl->hist_jnl; - if (!hist_jnl) - goto out; - - if (!file || !strlen (file)) - goto out; - - /* make sure 'file' is inside ->jnl_working_dir */ - buffer = realpath (file, NULL); - if (!buffer) - goto out; - - if (strncmp (hist_jnl->jnl_working_dir, - buffer, strlen (hist_jnl->jnl_working_dir))) - goto out; - - (void) snprintf (to_path, PATH_MAX, "%s%s", - hist_jnl->jnl_processed_dir, basename (buffer)); - gf_msg_debug (this->name, 0, - "moving %s to processed directory", file); - ret = sys_rename (buffer, to_path); - if (ret) { - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_RENAME_FAILED, - "cannot move changelog file", - "from=%s", file, - "to=%s", to_path, - NULL); - goto out; - } - - ret = 0; - - out: - if (buffer) - free (buffer); /* allocated by realpath() */ - return ret; +out: + if (buffer) + free(buffer); /* allocated by realpath() */ + return ret; } /** @@ -105,33 +103,33 @@ gf_history_changelog_done (char *file) * -1: On error. */ int -gf_history_changelog_start_fresh () +gf_history_changelog_start_fresh() { - xlator_t *this = NULL; - gf_changelog_journal_t *jnl = NULL; - gf_changelog_journal_t *hist_jnl = NULL; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; - this = THIS; - if (!this) - goto out; + this = THIS; + if (!this) + goto out; - errno = EINVAL; + errno = EINVAL; - jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); - if (!jnl) - goto out; + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; - hist_jnl = jnl->hist_jnl; - if (!hist_jnl) - goto out; + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) + goto out; - if (gf_ftruncate (hist_jnl->jnl_fd, 0)) - goto out; + if (gf_ftruncate(hist_jnl->jnl_fd, 0)) + goto out; - return 0; + return 0; - out: - return -1; +out: + return -1; } /** @@ -150,50 +148,52 @@ gf_history_changelog_start_fresh () * -1 : On error. */ ssize_t -gf_history_changelog_next_change (char *bufptr, size_t maxlen) +gf_history_changelog_next_change(char *bufptr, size_t maxlen) { - ssize_t size = -1; - int tracker_fd = 0; - xlator_t *this = NULL; - gf_changelog_journal_t *jnl = NULL; - gf_changelog_journal_t *hist_jnl = NULL; - char buffer[PATH_MAX] = {0,}; - - if (maxlen > PATH_MAX) { - errno = ENAMETOOLONG; - goto out; - } + ssize_t size = -1; + int tracker_fd = 0; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; + char buffer[PATH_MAX] = { + 0, + }; - errno = EINVAL; + if (maxlen > PATH_MAX) { + errno = ENAMETOOLONG; + goto out; + } - this = THIS; - if (!this) - goto out; + errno = EINVAL; - jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); - if (!jnl) - goto out; + this = THIS; + if (!this) + goto out; - hist_jnl = jnl->hist_jnl; - if (!hist_jnl) - goto out; + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; - tracker_fd = hist_jnl->jnl_fd; + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) + goto out; - size = gf_readline (tracker_fd, buffer, maxlen); - if (size < 0) { - size = -1; - goto out; - } + tracker_fd = hist_jnl->jnl_fd; - if (size == 0) - goto out; + size = gf_readline(tracker_fd, buffer, maxlen); + if (size < 0) { + size = -1; + goto out; + } - memcpy (bufptr, buffer, size - 1); - bufptr[size - 1] = '\0'; + if (size == 0) + goto out; + + memcpy(bufptr, buffer, size - 1); + bufptr[size - 1] = '\0'; out: - return size; + return size; } /** @@ -214,97 +214,100 @@ out: * */ ssize_t -gf_history_changelog_scan () +gf_history_changelog_scan() { - int tracker_fd = 0; - size_t off = 0; - xlator_t *this = NULL; - size_t nr_entries = 0; - gf_changelog_journal_t *jnl = NULL; - gf_changelog_journal_t *hist_jnl = NULL; - struct dirent *entry = NULL; - struct dirent scratch[2] = {{0,},}; - char buffer[PATH_MAX] = {0,}; - static int is_last_scan; - - this = THIS; - if (!this) - goto out; + int tracker_fd = 0; + size_t off = 0; + xlator_t *this = NULL; + size_t nr_entries = 0; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; + struct dirent *entry = NULL; + struct dirent scratch[2] = { + { + 0, + }, + }; + char buffer[PATH_MAX] = { + 0, + }; + static int is_last_scan; + + this = THIS; + if (!this) + goto out; + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) + goto out; + if (JNL_IS_API_DISCONNECTED(jnl)) { + errno = ENOTCONN; + goto out; + } + + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) + goto out; + +retry: + if (is_last_scan == 1) + return 0; + if (hist_jnl->hist_done == 0) + is_last_scan = 1; - jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); - if (!jnl) - goto out; - if (JNL_IS_API_DISCONNECTED (jnl)) { - errno = ENOTCONN; - goto out; - } + errno = EINVAL; + if (hist_jnl->hist_done == -1) + goto out; - hist_jnl = jnl->hist_jnl; - if (!hist_jnl) - goto out; + tracker_fd = hist_jnl->jnl_fd; - retry: - if (is_last_scan == 1) - return 0; - if (hist_jnl->hist_done == 0) - is_last_scan = 1; + if (gf_ftruncate(tracker_fd, 0)) + goto out; - errno = EINVAL; - if (hist_jnl->hist_done == -1) - goto out; + rewinddir(hist_jnl->jnl_dir); - tracker_fd = hist_jnl->jnl_fd; + for (;;) { + errno = 0; + entry = sys_readdir(hist_jnl->jnl_dir, scratch); + if (!entry || errno != 0) + break; - if (gf_ftruncate (tracker_fd, 0)) - goto out; + if (strcmp(basename(entry->d_name), ".") == 0 || + strcmp(basename(entry->d_name), "..") == 0) + continue; - rewinddir (hist_jnl->jnl_dir); - - for (;;) { - errno = 0; - entry = sys_readdir (hist_jnl->jnl_dir, scratch); - if (!entry || errno != 0) - break; - - if (strcmp (basename (entry->d_name), ".") == 0 || - strcmp (basename (entry->d_name), "..") == 0) - continue; - - nr_entries++; - - GF_CHANGELOG_FILL_BUFFER (hist_jnl->jnl_processing_dir, - buffer, off, - strlen (hist_jnl->jnl_processing_dir)); - GF_CHANGELOG_FILL_BUFFER (entry->d_name, buffer, - off, strlen (entry->d_name)); - GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1); - - if (gf_changelog_write (tracker_fd, buffer, off) != off) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_LIB_MSG_WRITE_FAILED, - "error writing changelog filename" - " to tracker file"); - break; - } - off = 0; - } + nr_entries++; - gf_msg_debug (this->name, 0, - "hist_done %d, is_last_scan: %d", - hist_jnl->hist_done, is_last_scan); - - if (!entry) { - if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) { - if (nr_entries > 0) - return nr_entries; - else { - sleep(1); - goto retry; - } - } + GF_CHANGELOG_FILL_BUFFER(hist_jnl->jnl_processing_dir, buffer, off, + strlen(hist_jnl->jnl_processing_dir)); + GF_CHANGELOG_FILL_BUFFER(entry->d_name, buffer, off, + strlen(entry->d_name)); + GF_CHANGELOG_FILL_BUFFER("\n", buffer, off, 1); + + if (gf_changelog_write(tracker_fd, buffer, off) != off) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_WRITE_FAILED, + "error writing changelog filename" + " to tracker file"); + break; } - out: - return -1; + off = 0; + } + + gf_msg_debug(this->name, 0, "hist_done %d, is_last_scan: %d", + hist_jnl->hist_done, is_last_scan); + + if (!entry) { + if (gf_lseek(tracker_fd, 0, SEEK_SET) != -1) { + if (nr_entries > 0) + return nr_entries; + else { + sleep(1); + goto retry; + } + } + } +out: + return -1; } /* @@ -312,36 +315,36 @@ gf_history_changelog_scan () * Returns 0 on success(updates given time-stamp), -1 on failure. */ int -gf_history_get_timestamp (int fd, int index, int len, - unsigned long *ts) +gf_history_get_timestamp(int fd, int index, int len, unsigned long *ts) { - xlator_t *this = NULL; - int n_read = -1; - char path_buf[PATH_MAX]= {0,}; - char *iter = path_buf; - size_t offset = index * (len+1); - unsigned long value = 0; - int ret = 0; - - this = THIS; - if (!this) { - return -1; - } - - n_read = sys_pread (fd, path_buf, len, offset); - if (n_read < 0 ) { - ret = -1; - gf_msg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_READ_ERROR, - "could not read from htime file"); - goto out; - } - iter+= len - TIMESTAMP_LENGTH; - sscanf (iter, "%lu",&value); + xlator_t *this = NULL; + int n_read = -1; + char path_buf[PATH_MAX] = { + 0, + }; + char *iter = path_buf; + size_t offset = index * (len + 1); + unsigned long value = 0; + int ret = 0; + + this = THIS; + if (!this) { + return -1; + } + + n_read = sys_pread(fd, path_buf, len, offset); + if (n_read < 0) { + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_READ_ERROR, + "could not read from htime file"); + goto out; + } + iter += len - TIMESTAMP_LENGTH; + sscanf(iter, "%lu", &value); out: - if(ret == 0) - *ts = value; - return ret; + if (ret == 0) + *ts = value; + return ret; } /* @@ -349,38 +352,37 @@ out: * Checks whether @value is there next to @target_index or not */ int -gf_history_check ( int fd, int target_index, unsigned long value, int len) +gf_history_check(int fd, int target_index, unsigned long value, int len) { - int ret = 0; - unsigned long ts1 = 0; - unsigned long ts2 = 0; - - if (target_index == 0) { - ret = gf_history_get_timestamp (fd, target_index, len, &ts1); - if (ret == -1) - goto out; - if (value <= ts1) - goto out; - else { - ret = -1; - goto out; - } - } + int ret = 0; + unsigned long ts1 = 0; + unsigned long ts2 = 0; - ret = gf_history_get_timestamp (fd, target_index, len, &ts1); - if (ret ==-1) - goto out; - ret = gf_history_get_timestamp (fd, target_index -1, len, &ts2); - if (ret ==-1) - goto out; - - if ( (value <= ts1) && (value > ts2) ) { - goto out; + if (target_index == 0) { + ret = gf_history_get_timestamp(fd, target_index, len, &ts1); + if (ret == -1) + goto out; + if (value <= ts1) + goto out; + else { + ret = -1; + goto out; } - else - ret = -1; + } + + ret = gf_history_get_timestamp(fd, target_index, len, &ts1); + if (ret == -1) + goto out; + ret = gf_history_get_timestamp(fd, target_index - 1, len, &ts2); + if (ret == -1) + goto out; + + if ((value <= ts1) && (value > ts2)) { + goto out; + } else + ret = -1; out: - return ret; + return ret; } /* @@ -400,78 +402,69 @@ out: */ int -gf_history_b_search (int fd, unsigned long value, - unsigned long from, unsigned long to, int len) +gf_history_b_search(int fd, unsigned long value, unsigned long from, + unsigned long to, int len) { - int m_index = -1; - unsigned long cur_value = 0; - unsigned long ts1 = 0; - int ret = 0; - - m_index = (from + to)/2; - - if ( (to - from) <=1 ) { - /* either one or 2 changelogs left */ - if ( to != from ) { - /* check if value is less or greater than to - * return accordingly - */ - ret = gf_history_get_timestamp (fd, from, len, &ts1); - if (ret ==-1) - goto out; - if ( ts1 >= value) { - /* actually compatision should be - * exactly == but considering - * - * case of only 2 changelogs in htime file - */ - return from; - } - else - return to; - } - else - return to; - } - - ret = gf_history_get_timestamp (fd, m_index, len, &cur_value); + int m_index = -1; + unsigned long cur_value = 0; + unsigned long ts1 = 0; + int ret = 0; + + m_index = (from + to) / 2; + + if ((to - from) <= 1) { + /* either one or 2 changelogs left */ + if (to != from) { + /* check if value is less or greater than to + * return accordingly + */ + ret = gf_history_get_timestamp(fd, from, len, &ts1); + if (ret == -1) + goto out; + if (ts1 >= value) { + /* actually compatision should be + * exactly == but considering + * + * case of only 2 changelogs in htime file + */ + return from; + } else + return to; + } else + return to; + } + + ret = gf_history_get_timestamp(fd, m_index, len, &cur_value); + if (ret == -1) + goto out; + if (cur_value == value) { + return m_index; + } else if (value > cur_value) { + ret = gf_history_get_timestamp(fd, m_index + 1, len, &cur_value); if (ret == -1) + goto out; + if (value < cur_value) + return m_index + 1; + else + return gf_history_b_search(fd, value, m_index + 1, to, len); + } else { + if (m_index == 0) { + /* we are sure that values exists + * in this htime file + */ + return 0; + } else { + ret = gf_history_get_timestamp(fd, m_index - 1, len, &cur_value); + if (ret == -1) goto out; - if (cur_value == value) { + if (value > cur_value) { return m_index; + } else + return gf_history_b_search(fd, value, from, m_index - 1, len); } - else if (value > cur_value) { - ret = gf_history_get_timestamp (fd, m_index+1, len, &cur_value); - if (ret == -1) - goto out; - if (value < cur_value) - return m_index + 1; - else - return gf_history_b_search (fd, value, - m_index+1, to, len); - } - else { - if (m_index ==0) { - /* we are sure that values exists - * in this htime file - */ - return 0; - } - else { - ret = gf_history_get_timestamp (fd, m_index-1, len, - &cur_value); - if (ret == -1) - goto out; - if (value > cur_value) { - return m_index; - } - else - return gf_history_b_search (fd, value, from, - m_index-1, len); - } - } + } out: - return -1; + return -1; } /* @@ -484,65 +477,60 @@ out: * 0 : No, Not usable ( contains, "changelog") */ int -gf_is_changelog_usable (char *cl_path) +gf_is_changelog_usable(char *cl_path) { - int ret = -1; - const char low_c[] = "changelog"; - char *str_ret = NULL; - char *bname = NULL; + int ret = -1; + const char low_c[] = "changelog"; + char *str_ret = NULL; + char *bname = NULL; - bname = basename (cl_path); + bname = basename(cl_path); - str_ret = strstr (bname, low_c); + str_ret = strstr(bname, low_c); - if (str_ret != NULL) - ret = 0; - else - ret = 1; - - return ret; + if (str_ret != NULL) + ret = 0; + else + ret = 1; + return ret; } void * -gf_changelog_consume_wrap (void* data) +gf_changelog_consume_wrap(void *data) { - int ret = -1; - ssize_t nread = 0; - xlator_t *this = NULL; - gf_changelog_consume_data_t *ccd = NULL; - - ccd = (gf_changelog_consume_data_t *) data; - this = ccd->this; - - ccd->retval = -1; - - nread = sys_pread (ccd->fd, ccd->changelog, PATH_MAX-1, ccd->offset); - if (nread < 0) { - gf_msg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_READ_ERROR, - "cannot read from history metadata file"); - goto out; - } - - /* TODO: handle short reads and EOF. */ - if (gf_is_changelog_usable (ccd->changelog) == 1) { - - ret = gf_changelog_consume (ccd->this, - ccd->jnl, ccd->changelog, _gf_true); - if (ret) { - gf_smsg (this->name, GF_LOG_ERROR, - 0, CHANGELOG_LIB_MSG_PARSE_ERROR, - "could not parse changelog", - "name=%s", ccd->changelog, - NULL); - goto out; - } + int ret = -1; + ssize_t nread = 0; + xlator_t *this = NULL; + gf_changelog_consume_data_t *ccd = NULL; + + ccd = (gf_changelog_consume_data_t *)data; + this = ccd->this; + + ccd->retval = -1; + + nread = sys_pread(ccd->fd, ccd->changelog, PATH_MAX - 1, ccd->offset); + if (nread < 0) { + gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_READ_ERROR, + "cannot read from history metadata file"); + goto out; + } + + /* TODO: handle short reads and EOF. */ + if (gf_is_changelog_usable(ccd->changelog) == 1) { + ret = gf_changelog_consume(ccd->this, ccd->jnl, ccd->changelog, + _gf_true); + if (ret) { + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_PARSE_ERROR, + "could not parse changelog", "name=%s", ccd->changelog, + NULL); + goto out; } - ccd->retval = 0; + } + ccd->retval = 0; - out: - return NULL; +out: + return NULL; } /** @@ -551,133 +539,138 @@ gf_changelog_consume_wrap (void* data) * to index "to" in open htime file whose fd is "fd". */ -#define MAX_PARALLELS 10 +#define MAX_PARALLELS 10 void * -gf_history_consume (void * data) +gf_history_consume(void *data) { - xlator_t *this = NULL; - gf_changelog_journal_t *jnl = NULL; - gf_changelog_journal_t *hist_jnl = NULL; - int ret = 0; - int iter = 0; - int fd = -1; - int from = -1; - int to = -1; - int len = -1; - int n_parallel = 0; - int n_envoked = 0; - gf_boolean_t publish = _gf_true; - pthread_t th_id[MAX_PARALLELS] = {0,}; - gf_changelog_history_data_t *hist_data = NULL; - gf_changelog_consume_data_t ccd[MAX_PARALLELS] = {{0},}; - gf_changelog_consume_data_t *curr = NULL; - char thread_name[GF_THREAD_NAMEMAX] = {0,}; - - hist_data = (gf_changelog_history_data_t *) data; - if (hist_data == NULL) { - ret = -1; - goto out; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; + int ret = 0; + int iter = 0; + int fd = -1; + int from = -1; + int to = -1; + int len = -1; + int n_parallel = 0; + int n_envoked = 0; + gf_boolean_t publish = _gf_true; + pthread_t th_id[MAX_PARALLELS] = { + 0, + }; + gf_changelog_history_data_t *hist_data = NULL; + gf_changelog_consume_data_t ccd[MAX_PARALLELS] = { + {0}, + }; + gf_changelog_consume_data_t *curr = NULL; + char thread_name[GF_THREAD_NAMEMAX] = { + 0, + }; + + hist_data = (gf_changelog_history_data_t *)data; + if (hist_data == NULL) { + ret = -1; + goto out; + } + + fd = hist_data->htime_fd; + from = hist_data->from; + to = hist_data->to; + len = hist_data->len; + n_parallel = hist_data->n_parallel; + + THIS = hist_data->this; + this = hist_data->this; + if (!this) { + ret = -1; + goto out; + } + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) { + ret = -1; + goto out; + } + + hist_jnl = jnl->hist_jnl; + if (!hist_jnl) { + ret = -1; + goto out; + } + + while (from <= to) { + n_envoked = 0; + + for (iter = 0; (iter < n_parallel) && (from <= to); iter++) { + curr = &ccd[iter]; + + curr->this = this; + curr->jnl = hist_jnl; + curr->fd = fd; + curr->offset = from * (len + 1); + + curr->retval = 0; + memset(curr->changelog, '\0', PATH_MAX); + snprintf(thread_name, sizeof(thread_name), "clogc%03hx", + ((iter + 1) & 0x3ff)); + + ret = gf_thread_create(&th_id[iter], NULL, + gf_changelog_consume_wrap, curr, + thread_name); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, ret, + CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, + "could not create consume-thread"); + goto sync; + } else + n_envoked++; + + from++; } - fd = hist_data->htime_fd; - from = hist_data->from; - to = hist_data->to; - len = hist_data->len; - n_parallel = hist_data->n_parallel; - - THIS = hist_data->this; - this = hist_data->this; - if (!this) { - ret = -1; - goto out; + sync: + for (iter = 0; iter < n_envoked; iter++) { + ret = pthread_join(th_id[iter], NULL); + if (ret) { + publish = _gf_false; + gf_msg(this->name, GF_LOG_ERROR, ret, + CHANGELOG_LIB_MSG_PTHREAD_JOIN_FAILED, + "pthread_join() error"); + /* try to join the rest */ + continue; + } + + if (publish == _gf_false) + continue; + + curr = &ccd[iter]; + if (ccd->retval) { + publish = _gf_false; + gf_msg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_PARSE_ERROR, + "parsing error, ceased publishing..."); + continue; + } + + ret = gf_changelog_publish(curr->this, curr->jnl, curr->changelog); + if (ret) { + publish = _gf_false; + gf_msg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_PUBLISH_ERROR, + "publish error, ceased publishing..."); + } } + } - jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); - if (!jnl) { - ret = -1; - goto out; - } - - hist_jnl = jnl->hist_jnl; - if (!hist_jnl) { - ret = -1; - goto out; - } - - while (from <= to) { - n_envoked = 0; - - for (iter = 0 ; (iter < n_parallel) && (from <= to); iter++) { - curr = &ccd[iter]; - - curr->this = this; - curr->jnl = hist_jnl; - curr->fd = fd; - curr->offset = from * (len + 1); - - curr->retval = 0; - memset (curr->changelog, '\0', PATH_MAX); - snprintf (thread_name, sizeof(thread_name), - "clogc%03hx", ((iter + 1) & 0x3ff)); - - ret = gf_thread_create (&th_id[iter], NULL, - gf_changelog_consume_wrap, curr, - thread_name); - if (ret) { - gf_msg (this->name, GF_LOG_ERROR, ret, - CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED - , "could not create consume-thread"); - goto sync; - } else - n_envoked++; - - from++; - } - - sync: - for (iter = 0; iter < n_envoked; iter++) { - ret = pthread_join (th_id[iter], NULL); - if (ret) { - publish = _gf_false; - gf_msg (this->name, GF_LOG_ERROR, ret, - CHANGELOG_LIB_MSG_PTHREAD_JOIN_FAILED, - "pthread_join() error"); - /* try to join the rest */ - continue; - } - - if (publish == _gf_false) - continue; - - curr = &ccd[iter]; - if (ccd->retval) { - publish = _gf_false; - gf_msg (this->name, GF_LOG_ERROR, - 0, CHANGELOG_LIB_MSG_PARSE_ERROR, - "parsing error, ceased publishing..."); - continue; - } - - ret = gf_changelog_publish (curr->this, - curr->jnl, curr->changelog); - if (ret) { - publish = _gf_false; - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_LIB_MSG_PUBLISH_ERROR, - "publish error, ceased publishing..."); - } - } - } - - /* informing "parsing done". */ - hist_jnl->hist_done = (publish == _gf_true) ? 0 : -1; + /* informing "parsing done". */ + hist_jnl->hist_done = (publish == _gf_true) ? 0 : -1; out: - if (fd != -1) - (void) sys_close (fd); - GF_FREE (hist_data); - return NULL; + if (fd != -1) + (void)sys_close(fd); + GF_FREE(hist_data); + return NULL; } /** @@ -707,77 +700,73 @@ out: * -2 : Ignore this metadata file and process next */ int -gf_changelog_extract_min_max (const char *dname, const char *htime_dir, - int *fd, unsigned long *total, - unsigned long *min_ts, unsigned long *max_ts) +gf_changelog_extract_min_max(const char *dname, const char *htime_dir, int *fd, + unsigned long *total, unsigned long *min_ts, + unsigned long *max_ts) { - int ret = -1; - xlator_t *this = NULL; - char htime_file[PATH_MAX] = {0,}; - struct stat stbuf = {0,}; - char *iter = NULL; - char x_value[30] = {0,}; - - this = THIS; - - snprintf (htime_file, PATH_MAX, "%s/%s", htime_dir, dname); - - iter = (htime_file + strlen (htime_file) - TIMESTAMP_LENGTH); - sscanf (iter ,"%lu",min_ts); - - ret = sys_stat (htime_file, &stbuf); - if (ret) { - ret = -1; - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_HTIME_ERROR, - "stat() failed on htime file", - "path=%s", htime_file, - NULL); - goto out; - } - - /* ignore everything except regular files */ - if (!S_ISREG (stbuf.st_mode)) { - ret = -2; - goto out; - } - - *fd = open (htime_file, O_RDONLY); - if (*fd < 0) { - ret = -1; - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_HTIME_ERROR, - "open() failed for htime file", - "path=%s", htime_file, - NULL); - goto out; - } - - /* Looks good, extract max timestamp */ - ret = sys_fgetxattr (*fd, HTIME_KEY, x_value, sizeof (x_value)); - if (ret < 0) { - ret = -1; - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_GET_XATTR_FAILED, - "error extracting max timstamp from htime file" - "path=%s", htime_file, - NULL); - goto out; - } + int ret = -1; + xlator_t *this = NULL; + char htime_file[PATH_MAX] = { + 0, + }; + struct stat stbuf = { + 0, + }; + char *iter = NULL; + char x_value[30] = { + 0, + }; + + this = THIS; + + snprintf(htime_file, PATH_MAX, "%s/%s", htime_dir, dname); + + iter = (htime_file + strlen(htime_file) - TIMESTAMP_LENGTH); + sscanf(iter, "%lu", min_ts); + + ret = sys_stat(htime_file, &stbuf); + if (ret) { + ret = -1; + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HTIME_ERROR, + "stat() failed on htime file", "path=%s", htime_file, NULL); + goto out; + } + + /* ignore everything except regular files */ + if (!S_ISREG(stbuf.st_mode)) { + ret = -2; + goto out; + } + + *fd = open(htime_file, O_RDONLY); + if (*fd < 0) { + ret = -1; + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HTIME_ERROR, + "open() failed for htime file", "path=%s", htime_file, NULL); + goto out; + } + + /* Looks good, extract max timestamp */ + ret = sys_fgetxattr(*fd, HTIME_KEY, x_value, sizeof(x_value)); + if (ret < 0) { + ret = -1; + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_GET_XATTR_FAILED, + "error extracting max timstamp from htime file" + "path=%s", + htime_file, NULL); + goto out; + } + + sscanf(x_value, "%lu:%lu", max_ts, total); + gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_LIB_MSG_TOTAL_LOG_INFO, + "changelogs min max", "min=%lu", *min_ts, "max=%lu", *max_ts, + "total_changelogs=%lu", *total, NULL); + + ret = 0; - sscanf (x_value, "%lu:%lu", max_ts, total); - gf_smsg (this->name, GF_LOG_INFO, 0, - CHANGELOG_LIB_MSG_TOTAL_LOG_INFO, - "changelogs min max", - "min=%lu", *min_ts, - "max=%lu", *max_ts, - "total_changelogs=%lu", *total, - NULL); - - ret = 0; - - out: - return ret; +out: + return ret; } /* gf_history_changelog returns actual_end and spawns threads to @@ -790,275 +779,257 @@ gf_changelog_extract_min_max (const char *dname, const char *htime_dir, * -1 : On any error */ int -gf_history_changelog (char* changelog_dir, unsigned long start, - unsigned long end, int n_parallel, - unsigned long *actual_end) +gf_history_changelog(char *changelog_dir, unsigned long start, + unsigned long end, int n_parallel, + unsigned long *actual_end) { - int ret = 0; - int len = -1; - int fd = -1; - int n_read = -1; - unsigned long min_ts = 0; - unsigned long max_ts = 0; - unsigned long end2 = 0; - unsigned long ts1 = 0; - unsigned long ts2 = 0; - unsigned long to = 0; - unsigned long from = 0; - unsigned long total_changelog = 0; - xlator_t *this = NULL; - gf_changelog_journal_t *jnl = NULL; - gf_changelog_journal_t *hist_jnl = NULL; - gf_changelog_history_data_t *hist_data = NULL; - DIR *dirp = NULL; - struct dirent *entry = NULL; - struct dirent scratch[2] = {{0,},}; - pthread_t consume_th = 0; - char htime_dir[PATH_MAX] = {0,}; - char buffer[PATH_MAX] = {0,}; - gf_boolean_t partial_history = _gf_false; - - pthread_attr_t attr; - - this = THIS; - if (!this) { - ret = -1; - goto out; + int ret = 0; + int len = -1; + int fd = -1; + int n_read = -1; + unsigned long min_ts = 0; + unsigned long max_ts = 0; + unsigned long end2 = 0; + unsigned long ts1 = 0; + unsigned long ts2 = 0; + unsigned long to = 0; + unsigned long from = 0; + unsigned long total_changelog = 0; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_journal_t *hist_jnl = NULL; + gf_changelog_history_data_t *hist_data = NULL; + DIR *dirp = NULL; + struct dirent *entry = NULL; + struct dirent scratch[2] = { + { + 0, + }, + }; + pthread_t consume_th = 0; + char htime_dir[PATH_MAX] = { + 0, + }; + char buffer[PATH_MAX] = { + 0, + }; + gf_boolean_t partial_history = _gf_false; + + pthread_attr_t attr; + + this = THIS; + if (!this) { + ret = -1; + goto out; + } + + ret = pthread_attr_init(&attr); + if (ret != 0) { + gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_PTHREAD_ERROR, + "Pthread init failed"); + return -1; + } + + jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); + if (!jnl) { + ret = -1; + goto out; + } + + hist_jnl = (gf_changelog_journal_t *)jnl->hist_jnl; + if (!hist_jnl) { + ret = -1; + goto out; + } + + gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_LIB_MSG_TOTAL_LOG_INFO, + "Requesting historical changelogs", "start=%lu", start, "end=%lu", + end, NULL); + + /* basic sanity check */ + if (start > end || n_parallel <= 0) { + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HIST_FAILED, + "Sanity check failed", "start=%lu", start, "end=%lu", end, + "thread_count=%d", n_parallel, NULL); + ret = -1; + goto out; + } + + /* cap parallelism count */ + if (n_parallel > MAX_PARALLELS) + n_parallel = MAX_PARALLELS; + + CHANGELOG_FILL_HTIME_DIR(changelog_dir, htime_dir); + + dirp = sys_opendir(htime_dir); + if (dirp == NULL) { + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HTIME_ERROR, + "open dir on htime failed", "path=%s", htime_dir, NULL); + ret = -1; + goto out; + } + + for (;;) { + errno = 0; + + entry = sys_readdir(dirp, scratch); + + if (!entry || errno != 0) { + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_HIST_FAILED, + "Requested changelog range is not availbale", "start=%lu", + start, "end=%lu", end, NULL); + ret = -2; + break; } - ret = pthread_attr_init (&attr); - if (ret != 0) { - gf_msg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_PTHREAD_ERROR, - "Pthread init failed"); - return -1; + ret = gf_changelog_extract_min_max(entry->d_name, htime_dir, &fd, + &total_changelog, &min_ts, &max_ts); + if (ret) { + if (-2 == ret) + continue; + goto out; } - jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); - if (!jnl) { + if (start >= min_ts && start < max_ts) { + /** + * TODO: handle short reads later... + */ + n_read = sys_read(fd, buffer, PATH_MAX); + if (n_read < 0) { ret = -1; + gf_msg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_READ_ERROR, + "unable to read htime file"); goto out; - } + } + + len = strlen(buffer); + + /** + * search @start in the htime file returning it's index + * (@from) + */ + from = gf_history_b_search(fd, start, 0, total_changelog - 1, len); - hist_jnl = (gf_changelog_journal_t *) jnl->hist_jnl; - if (!hist_jnl) { + /* ensuring correctness of gf_b_search */ + if (gf_history_check(fd, from, start, len) != 0) { ret = -1; + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_GET_TIME_ERROR, + "wrong result for start", "start=%lu", start, "idx=%lu", + from, NULL); goto out; - } - - gf_smsg (this->name, GF_LOG_INFO, 0, - CHANGELOG_LIB_MSG_TOTAL_LOG_INFO, - "Requesting historical changelogs", - "start=%lu", start, "end=%lu", end, NULL); - - /* basic sanity check */ - if (start > end || n_parallel <= 0) { - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_HIST_FAILED, "Sanity check failed", - "start=%lu", start, - "end=%lu", end, - "thread_count=%d", n_parallel, - NULL); + } + + end2 = (end <= max_ts) ? end : max_ts; + + /* Check if end falls out of same HTIME file. The end + * falling to a different htime file or changelog + * disable-enable is detected only after 20 seconds. + * This is required because, applications generally + * asks historical changelogs till current time and + * it is possible changelog is not rolled over yet. + * So, buffer time of default rollover time plus 5 + * seconds is subtracted. If the application requests + * the end time with in half a minute of changelog + * disable, it's not detected as changelog disable and + * it's application's responsibility to retry after + * 20 seconds before confirming it as partial history. + */ + if ((end - 20) > max_ts) { + partial_history = _gf_true; + } + + /** + * search @end2 in htime file returning it's index (@to) + */ + to = gf_history_b_search(fd, end2, 0, total_changelog - 1, len); + + if (gf_history_check(fd, to, end2, len) != 0) { ret = -1; + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_LIB_MSG_GET_TIME_ERROR, + "wrong result for end", "start=%lu", end2, "idx=%lu", + to, NULL); goto out; - } + } - /* cap parallelism count */ - if (n_parallel > MAX_PARALLELS) - n_parallel = MAX_PARALLELS; + ret = gf_history_get_timestamp(fd, from, len, &ts1); + if (ret == -1) + goto out; - CHANGELOG_FILL_HTIME_DIR (changelog_dir, htime_dir); + ret = gf_history_get_timestamp(fd, to, len, &ts2); + if (ret == -1) + goto out; - dirp = sys_opendir (htime_dir); - if (dirp == NULL) { - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_HTIME_ERROR, - "open dir on htime failed", - "path=%s", htime_dir, - NULL); + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_LIB_MSG_TOTAL_LOG_INFO, "FINAL", "from=%lu", ts1, + "to=%lu", ts2, "changes=%lu", (to - from + 1), NULL); + + hist_data = GF_CALLOC(1, sizeof(gf_changelog_history_data_t), + gf_changelog_mt_history_data_t); + + hist_data->htime_fd = fd; + hist_data->from = from; + hist_data->to = to; + hist_data->len = len; + hist_data->n_parallel = n_parallel; + hist_data->this = this; + + ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if (ret != 0) { + gf_msg(this->name, GF_LOG_ERROR, ret, + CHANGELOG_LIB_MSG_PTHREAD_ERROR, + "unable to sets the detach" + " state attribute"); ret = -1; goto out; - } + } + + /* spawn a thread for background parsing & publishing */ + ret = gf_thread_create(&consume_th, &attr, gf_history_consume, + hist_data, "cloghcon"); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, ret, + CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, + "creation of consume parent-thread" + " failed."); + ret = -1; + goto out; + } - for (;;) { - - errno = 0; - - entry = sys_readdir (dirp, scratch); - - if (!entry || errno != 0) { - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_HIST_FAILED, - "Requested changelog range is not availbale", - "start=%lu", start, "end=%lu", end, NULL); - ret = -2; - break; - } - - ret = gf_changelog_extract_min_max (entry->d_name, htime_dir, - &fd, &total_changelog, - &min_ts, &max_ts); - if (ret) { - if (-2 == ret) - continue; - goto out; - } - - if (start >= min_ts && start < max_ts) { - /** - * TODO: handle short reads later... - */ - n_read = sys_read (fd, buffer, PATH_MAX); - if (n_read < 0) { - ret = -1; - gf_msg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_READ_ERROR, - "unable to read htime file"); - goto out; - } - - len = strlen (buffer); - - /** - * search @start in the htime file returning it's index - * (@from) - */ - from = gf_history_b_search (fd, start, 0, - total_changelog - 1, len); - - /* ensuring correctness of gf_b_search */ - if (gf_history_check (fd, from, start, len) != 0) { - ret = -1; - gf_smsg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_LIB_MSG_GET_TIME_ERROR, - "wrong result for start", - "start=%lu", start, - "idx=%lu", from, - NULL); - goto out; - } - - end2 = (end <= max_ts) ? end : max_ts; - - /* Check if end falls out of same HTIME file. The end - * falling to a different htime file or changelog - * disable-enable is detected only after 20 seconds. - * This is required because, applications generally - * asks historical changelogs till current time and - * it is possible changelog is not rolled over yet. - * So, buffer time of default rollover time plus 5 - * seconds is subtracted. If the application requests - * the end time with in half a minute of changelog - * disable, it's not detected as changelog disable and - * it's application's responsibility to retry after - * 20 seconds before confirming it as partial history. - */ - if ((end - 20) > max_ts) { - partial_history = _gf_true; - } - - /** - * search @end2 in htime file returning it's index (@to) - */ - to = gf_history_b_search (fd, end2, - 0, total_changelog - 1, len); - - if (gf_history_check (fd, to, end2, len) != 0) { - ret = -1; - gf_smsg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_LIB_MSG_GET_TIME_ERROR, - "wrong result for end", - "start=%lu", end2, - "idx=%lu", to, - NULL); - goto out; - } - - ret = gf_history_get_timestamp (fd, from, len, &ts1); - if (ret == -1) - goto out; - - ret = gf_history_get_timestamp (fd, to, len, &ts2); - if (ret == -1) - goto out; - - gf_smsg (this->name, GF_LOG_INFO, 0, - CHANGELOG_LIB_MSG_TOTAL_LOG_INFO, - "FINAL", - "from=%lu", ts1, - "to=%lu", ts2, - "changes=%lu", (to - from + 1), - NULL); - - hist_data = GF_CALLOC (1, - sizeof (gf_changelog_history_data_t), - gf_changelog_mt_history_data_t); - - hist_data->htime_fd = fd; - hist_data->from = from; - hist_data->to = to; - hist_data->len = len; - hist_data->n_parallel = n_parallel; - hist_data->this = this; - - ret = pthread_attr_setdetachstate - (&attr, PTHREAD_CREATE_DETACHED); - if (ret != 0) { - gf_msg (this->name, GF_LOG_ERROR, ret, - CHANGELOG_LIB_MSG_PTHREAD_ERROR, - "unable to sets the detach" - " state attribute"); - ret = -1; - goto out; - } - - /* spawn a thread for background parsing & publishing */ - ret = gf_thread_create (&consume_th, &attr, - gf_history_consume, hist_data, - "cloghcon"); - if (ret) { - gf_msg (this->name, GF_LOG_ERROR, ret, - CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED - , "creation of consume parent-thread" - " failed."); - ret = -1; - goto out; - } - - goto out; - - } else {/* end of range check */ - gf_smsg (this->name, GF_LOG_ERROR, errno, - CHANGELOG_LIB_MSG_HIST_FAILED, - "Requested changelog range is not " - "available. Retrying next HTIME", - "start=%lu", start, - "end=%lu", end, - "chlog_min=%lu", min_ts, - "chlog_max=%lu", max_ts, - NULL); - } - } /* end of readdir() */ + goto out; + + } else { /* end of range check */ + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_LIB_MSG_HIST_FAILED, + "Requested changelog range is not " + "available. Retrying next HTIME", + "start=%lu", start, "end=%lu", end, "chlog_min=%lu", min_ts, + "chlog_max=%lu", max_ts, NULL); + } + } /* end of readdir() */ out: - if (dirp != NULL) - (void) sys_closedir (dirp); + if (dirp != NULL) + (void)sys_closedir(dirp); - if (ret < 0) { - if (fd != -1) - (void) sys_close (fd); - GF_FREE (hist_data); - (void) pthread_attr_destroy (&attr); + if (ret < 0) { + if (fd != -1) + (void)sys_close(fd); + GF_FREE(hist_data); + (void)pthread_attr_destroy(&attr); - return ret; - } + return ret; + } - hist_jnl->hist_done = 1; - *actual_end = ts2; + hist_jnl->hist_done = 1; + *actual_end = ts2; - if (partial_history) { - ret = 1; - } + if (partial_history) { + ret = 1; + } - return ret; + return ret; } |